In [1]:
import time
import secretflow as sf
import spu
import os

start = time.perf_counter()

network_conf = {
    "parties": {
        "alice": {
            "address": "alice:8000",
        },
        "bob": {
            "address": "bob:8000",
        },
    },
}

party = os.getenv("SELF_PARTY", "alice")
sf.shutdown()
sf.init(
    address="127.0.0.1:6379",
    cluster_config={**network_conf, "self_party": party},
    log_to_driver=True,
)

2024-07-31 08:09:01,987	INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 172.19.0.3:6379...
2024-07-31 08:09:01,996	INFO worker.py:1724 -- Connected to Ray cluster.
2024-07-31 08:09:02.016 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': 'alice:8000', 'bob': 'bob:8000'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-07-31 08:09:02.652 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
[36m(ReceiverProxyActor pid=3063)[0m 2024-07-31 08:09:02.648 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 8000, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryable

2024-07-31 08:09:01,998	INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 172.19.0.2:6379...
2024-07-31 08:09:02,007	INFO worker.py:1724 -- Connected to Ray cluster.
2024-07-31 08:09:02.026 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': 'alice:8000', 'bob': 'bob:8000'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-07-31 08:09:02.670 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
[36m(ReceiverProxyActor pid=1719)[0m 2024-07-31 08:09:02.667 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 8000, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCo

In [2]:
alice, bob=sf.PYU("alice"), sf.PYU("bob")
current_dir = os.getcwd()
input_path={
    "alice":f"{current_dir}/payment.csv",
    "bob":f"{current_dir}/record.csv"
}
output_path={
    "alice":f"{current_dir}/output.csv",
    "bob":f"{current_dir}/output.csv"
}

In [3]:
spu_conf = {
    "nodes": [
        {
            "party": "alice",
            "address": "alice:8001",
            "listen_addr": "alice:8001",
        },
        {
            "party": "bob",
            "address": "bob:8001",
            "listen_addr": "bob:8001",
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128,
        "sigmoid_mode": spu.spu_pb2.RuntimeConfig.SIGMOID_REAL,
    },
}
spu_device=sf.SPU(cluster_def=spu_conf)

In [4]:
report = spu_device.psi(
    keys={"alice":["uid"], "bob":["uid"]},
    input_path=input_path,
    output_path=output_path,
    receiver="alice",
    broadcast_result=False,
)

In [5]:
report

[{'party': 'alice', 'original_count': 1000000, 'intersection_count': 800000},
 {'party': 'bob', 'original_count': 1000000, 'intersection_count': -1}]

[{'party': 'alice', 'original_count': 1000000, 'intersection_count': 800000},
 {'party': 'bob', 'original_count': 1000000, 'intersection_count': -1}]

In [None]:
# 求交结果：800000