# 1 数据读取

#### 将DataFrame数据按行和列分别分割保存为csv数据文件来模拟各方持有的数据情况，读取csv数据分别为HDataFrame和VDataFrame类型变量

## 1.1 初始化

In [1]:
import secretflow as sf

# Check the version of your SecretFlow
print('The version of SecretFlow: {}'.format(sf.__version__))

# In case you have a running secretflow runtime already.
sf.shutdown()

sf.init(['alice', 'bob', 'carol'], address='local')
alice, bob, carol = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('carol')

The version of SecretFlow: 1.8.0b0


  self.pid = _posixsubprocess.fork_exec(
2024-07-25 20:40:05,850	INFO worker.py:1724 -- Started a local Ray instance.


## 1.2 查看Dataframe数据

In [2]:
import pandas as pd
from sklearn.datasets import load_iris

iris = load_iris(as_frame=True)
data = pd.concat([iris.data, iris.target], axis=1)
data

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,2
146,6.3,2.5,5.0,1.9,2
147,6.5,3.0,5.2,2.0,2
148,6.2,3.4,5.4,2.3,2


## 1.3 数据横向分割（每个参与方持有一定数量的完整数据）

In [3]:
# Horizontal partitioning.
h_alice, h_bob, h_carol = data.iloc[:40, :], data.iloc[40:100, :], data.iloc[100:, :]

# Save to temporary files.
import tempfile
import os

temp_dir = tempfile.mkdtemp()

h_alice_path = os.path.join(temp_dir, 'h_alice.csv')
h_bob_path = os.path.join(temp_dir, 'h_bob.csv')
h_carol_path = os.path.join(temp_dir, 'h_carol.csv')
h_alice.to_csv(h_alice_path, index=False)
h_bob.to_csv(h_bob_path, index=False)
h_carol.to_csv(h_carol_path, index=False)

## 1.4 数据纵向分割（每个参与方持有一定数量的特征数据）

In [4]:
# Vertical partitioning.
v_alice, v_bob, v_carol = data.iloc[:, :2], data.iloc[:, 2:4], data.iloc[:, 4:]

# Save to temporary files.
v_alice_path = os.path.join(temp_dir, 'v_alice.csv')
v_bob_path = os.path.join(temp_dir, 'v_bob.csv')
v_carol_path = os.path.join(temp_dir, 'v_carol.csv')
v_alice.to_csv(v_alice_path, index=False)
v_bob.to_csv(v_bob_path, index=False)
v_carol.to_csv(v_carol_path, index=False)

## 1.5 读取横向分割的csv数据

In [5]:
from secretflow.data.horizontal import read_csv as h_read_csv
from secretflow.security.aggregation import SecureAggregator
from secretflow.security import SecureAggregator
from secretflow.security.compare import SPUComparator

# The aggregator and comparator are respectively used to aggregate
# or compare data in subsequent data analysis operations.
aggr = SecureAggregator(device=alice, participants=[alice, bob, carol])

spu = sf.SPU(sf.utils.testing.cluster_def(parties=['alice', 'bob', 'carol']))
comp = SPUComparator(spu)
hdf = h_read_csv(
    {alice: h_alice_path, bob: h_bob_path, carol: h_carol_path},
    aggregator=aggr,
    comparator=comp,
)

INFO:root:Create proxy actor <class 'secretflow.device.proxy.Actor_Masker'> with party alice.
INFO:root:Create proxy actor <class 'secretflow.device.proxy.Actor_Masker'> with party bob.
INFO:root:Create proxy actor <class 'secretflow.device.proxy.Actor_Masker'> with party carol.


OutOfMemoryError: Task was killed due to the node running low on memory.
Memory on the node (IP: 10.0.0.4, ID: 43a200dfce78102563f0c50cab8e75402270b9d4c6f17cc1b4d22415) where the task (task ID: ffffffffffffffff655dff22c4aa4ebca08f2ca301000000, name=Actor_Masker.__init__, pid=1983885, memory used=0.06GB) was running was 14.39GB / 15.11GB (0.952129), which exceeds the memory usage threshold of 0.95. Ray killed this worker (ID: 35b3d06e1bee83d9754a27323c7d3701de3aea71049389d19010cf6b) because it was the most recently scheduled task; to see more information about memory usage on this node, use `ray logs raylet.out -ip 10.0.0.4`. To see the logs of the worker, use `ray logs worker-35b3d06e1bee83d9754a27323c7d3701de3aea71049389d19010cf6b*out -ip 10.0.0.4. Top 10 memory users:
PID	MEM(GB)	COMMAND
1946357	0.85	/home/beng003/.vscode-server/cli/servers/Stable-f1e16e1e6214d7c44d078b1f0607b2388f29d729/server/node...
1946893	0.68	/home/beng003/.vscode-server/cli/servers/Stable-f1e16e1e6214d7c44d078b1f0607b2388f29d729/server/node...
1952541	0.25	/home/beng003/anaconda/envs/sf/bin/python -m ipykernel_launcher --f=/home/beng003/.local/share/jupyt...
1981763	0.22	/home/beng003/anaconda/envs/sf/bin/python -m ipykernel_launcher --f=/home/beng003/.local/share/jupyt...
1962420	0.20	
1962419	0.20	ray::HEUEvaluator
1774465	0.19	/home/beng003/anaconda/envs/sf/bin/python -m ipykernel_launcher -f /home/beng003/.local/share/jupyte...
1958029	0.14	ray::ActorPartitionAgent
1957917	0.14	ray::ActorPartitionAgent
1958137	0.14	ray::ActorPartitionAgent
Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. Set max_restarts and max_task_retries to enable retry when the task crashes due to OOM. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.

## 1.6 读取纵向分割的csv数据

In [8]:
from secretflow.data.vertical import read_csv as v_read_csv

vdf = v_read_csv({alice: v_alice_path, bob: v_bob_path, carol: v_carol_path})

INFO:root:Create proxy actor <class 'secretflow.device.proxy.ActorPartitionAgent'> with party alice.
INFO:root:Create proxy actor <class 'secretflow.device.proxy.ActorPartitionAgent'> with party bob.
INFO:root:Create proxy actor <class 'secretflow.device.proxy.ActorPartitionAgent'> with party carol.


In [2]:
sf.shutdown()