# Pipeline Tutorial With Supporing Multiple ID Columns

Starting at FATE-v1.9.0, FATE supports data with multiple matching columns. To use this feature, data should be uploaded with meta. Please check the [tutorial](./pipeline_tutorial_uploading_data_with_meta.ipynb) "Pipeline Tutorial With Using Data With Recording Meta" first before proceeding.

## Install

`Pipeline` is distributed along with [FATE-Client](https://pypi.org/project/fate-client/).

```bash
pip install fate_client
```
To use Pipeline, we need to first specify which `FATE Flow Service` to connect to. Once `fate_client` installed, one can find a cmd enterpoint named `pipeline`

In [1]:
!pipeline --help

Usage: pipeline [OPTIONS] COMMAND [ARGS]...

Options:
  --help  Show this message and exit.

Commands:
  config  pipeline config tool
  init     - DESCRIPTION: Pipeline Config Command.


Assume we have a FATE Flow Service in 127.0.0.1:9380(defaults in standalone), then exec

In [2]:
!pipeline init --ip 127.0.0.1 --port 9380

Pipeline configuration succeeded.


## Upload Data with Multiple id Columns

We should first prepare the data with multiple id columns. Assume that we have twe sample data with the follwing format:

guest site's data:  

phone,device_id,seq_id,x0  
10000,device_a,seq_1,0  
10002,device_b,seq_3,1  
10004,device_c,seq_5,2  
10006,device_d,seq_7,4  
10008,device_e,seq_9,5  
100010,device_f,seq_11,6  
100012,device_g,seq_13,7  
100014,device_h,seq_15,8  
100016,device_i,seq_17,9  
100018,device_j,seq_19,10  

host site's data:  
device_id,seq_id,phone,x0  
device_d,seq_0,10000,0  
device_e,seq_1,10001,1  
device_f,seq_2,10002,2  
device_g,seq_3,10003,3  
device_h,seq_4,10004,4  
device_i,seq_5,10005,5  
device_j,seq_6,10005,6  
device_k,seq_7,10006,7  
device_l,seq_8,10007,8  
device_k,seq_9,10008,9  

In [81]:
fate_project_base="/data/projects/fate"

guest_data_path = fate_project_base + "/examples/data/guest_multi_id_columns.csv"
host_data_path = fate_project_base + "/examples/data/host_multi_id_columns.csv"

Write guest example data to local

In [61]:
with open(guest_data_path, "w") as fout:
    fout.write("phone,device_id,seq_id,x0\n10000,device_a,seq_1,0\n10002,device_b,seq_3,1\n10004,device_c,seq_5,2\n")
    fout.write("10006,device_d,seq_7,4\n10008,device_e,seq_9,5\n100010,device_f,seq_11,6\n100012,device_g,seq_13,7\n")
    fout.write("100014,device_h,seq_15,8\n100016,device_i,seq_17,9\n100018,device_j,seq_19,10\n")

Write host example data to local

In [62]:
with open(host_data_path, "w") as fout:
    fout.write("device_id,seq_id,phone,x0\ndevice_d,seq_0,10000,0\ndevice_e,seq_1,10001,1\ndevice_f,seq_2,10002,2\n")
    fout.write("device_g,seq_3,10003,3\ndevice_h,seq_4,10004,4\ndevice_i,seq_5,10005,5\ndevice_j,seq_6,10005,6\n")
    fout.write("device_k,seq_7,10006,7\ndevice_l,seq_8,10007,8\ndevice_k,seq_9,10008,9\n")

Make a `pipeline` instance with the following setting:
```yaml
initiator:
    role: guest
    party: 9999
roles:
    guest: 9999
```

In [63]:
from pipeline.backend.pipeline import PipeLine

In [64]:
pipeline_upload = PipeLine().set_initiator(role="guest", party_id=9999).set_roles(guest=9999)

Define data meta:

In [65]:
guest_data_meta = {"delimiter": ",", "with_label": False,
                   "input_format": "dense", "data_type": "int",
                   "with_match_id": True,                 # with_match_id should be true
                   "id_list": ["phone","device_id","seq_id"]} # id_list specifies the id columns

In [66]:
host_data_meta = {"delimiter": ",", "with_label": False,
                  "input_format": "dense", "data_type": "int",
                  "with_match_id": True,                 # with_match_id should be true
                  "id_list": ["device_id","seq_id","phone"]} # id_list specifies the id columns

In [67]:
multi_id_guest = {"name": "multi_id_guest", "namespace": f"experiment"}
multi_id_host = {"name": "multi_id_host", "namespace": f"experiment"}

In [68]:
pipeline_upload.add_upload_data(file=guest_data_path,
                                table_name=multi_id_guest["name"],         
                                namespace=multi_id_guest["namespace"],         
                                head=1, partition=4,
                                extend_sid=True,                      # upload data with automatically append sample id
                                with_meta=True, meta=guest_data_meta) # with_meta=True means uploading data with meta                       

pipeline_upload.add_upload_data(file=host_data_path,
                                table_name=multi_id_host["name"],
                                namespace=multi_id_host["namespace"],
                                head=1, partition=4,      
                                extend_sid=True,
                                with_meta=True, meta=host_data_meta)

We can then upload the dataset

In [70]:
pipeline_upload.upload(drop=1)

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2022-08-29 14:43:21.101[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202208291443209718580
[0m
[32m2022-08-29 14:43:21.108[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2022-08-29 14:43:22.117[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2022-08-29 14:43:23.131[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2022-08-29 14:43:23.132[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:02[0m
[32m2022-08-29 14:43:24.147[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2022-08-29 14:43:25.165[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2022-0

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2022-08-29 14:43:27.349[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202208291443272038530
[0m
[32m2022-08-29 14:43:27.356[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2022-08-29 14:43:28.364[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2022-08-29 14:43:30.390[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2022-08-29 14:43:30.391[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2022-08-29 14:43:31.406[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2022-08-29 14:43:32.421[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:05[0m
[32m2022-0

## Run Intersection Task Using Specified id Column

In [71]:
from pipeline.component import Reader, DataTransform, Intersection
from pipeline.interface import Data

Make a `pipeline` instance:,

```yaml
initiator:
    role: guest
    party: 9999
roles:
    guest: 9999
    host: 10000
```

In [72]:
pipeline = PipeLine() \
        .set_initiator(role='guest', party_id=9999) \
        .set_roles(guest=9999, host=10000)

Define `Reader` to load data

In [73]:
reader_0 = Reader(name="reader_0")
# set guest parameter
reader_0.get_party_instance(role='guest', party_id=9999).component_param(
    table=multi_id_guest)
# set host parameter
reader_0.get_party_instance(role='host', party_id=10000).component_param(
    table=multi_id_host)

Configure `match_id_name` in `DataTransform` component.

In [74]:
data_transform_0 = DataTransform(name="data_transform_0",
                                 match_id_name="device_id") # specify "device_id" to be the match id column

Include an `Intersection` component

In [75]:
intersect_0 = Intersection(name="intersect_0")

Add components to pipeline, in order of execution:  

* `data_transform_0` comsumes `reader_0's` output data  
* `intersect_0` comsumes `data_transform_0's` output data

Then compile our pipeline to make it ready for submission.

In [76]:
pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))
pipeline.compile();

Now, submit(fit) our pipeline:

In [77]:
pipeline.fit()

[32m2022-08-29 14:44:14.834[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202208291444146346770
[0m
[32m2022-08-29 14:44:14.842[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m
[32m2022-08-29 14:44:15.855[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2022-08-29 14:44:16.870[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2022-08-29 14:44:16.871[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component reader_0, time elapse: 0:00:02[0m
[32m2022-08-29 14:44:17.891[0m | [1mINFO    

Get intersection task summary info

In [79]:
print(pipeline.get_component("intersect_0").get_summary())

{'cardinality_only': False, 'intersect_num': 7, 'intersect_rate': 0.7}


Get intersection task's output data

In [80]:
print(pipeline.get_component("intersect_0").get_output_data())

                          extend_sid device_id  x0
0  e19da596276511edb731acde480011223  device_d   4
1  e19da596276511edb731acde480011224  device_e   5
2  e19da596276511edb731acde480011225  device_f   6
3  e19da596276511edb731acde480011226  device_g   7
4  e19da596276511edb731acde480011227  device_h   8
5  e19da596276511edb731acde480011228  device_i   9
6  e19da596276511edb731acde480011229  device_j  10


For more examples on using pipeline to submit jobs, please refer to [pipeline demos](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/demo)