## Pipeline Upload Data Tutorial 

### install

`Pipeline` is distributed along with [fate_client](https://pypi.org/project/fate-client/).

```bash
pip install fate_client
```

To use Pipeline, we need to first specify which `FATE Flow Service` to connect to. Once `fate_client` installed, one can find an cmd enterpoint name `pipeline`:

In [19]:
import os

In [20]:
!pipeline --help

Usage: pipeline [OPTIONS] COMMAND [ARGS]...

Options:
  --help  Show this message and exit.

Commands:
  config  pipeline config tool
  init     - DESCRIPTION: Pipeline Config Command.


Assume we have a `FATE Flow Service` which name is `fateflow` and the port is `9380`, then exec

In [21]:
!pipeline init --ip fateflow --port 9380

Pipeline configuration succeeded.


### upload data

 Before start a modeling task, the data to be used should be uploaded. 
 Typically, a party is usually a cluster which include multiple nodes. Thus, when we upload these data, the data will be allocated to those nodes.

In [22]:
from pipeline.backend.pipeline import PipeLine

Make a `pipeline` instance:

    - initiator: 
        * role: guest
        * party: 9999
    - roles:
        * guest: 9999

note that only local party id is needed.
    

In [23]:
pipeline_upload = PipeLine().set_initiator(role='guest', party_id=8888).set_roles(guest=8888)

Define partitions for data storage

In [24]:
partition = 4

Define table name and namespace, which will be used in FATE job configuration

In [25]:
dense_data_guest = {"name": "breast_homo_guest", "namespace": f"experiment"}
dense_data_host = {"name": "breast_homo_host", "namespace": f"experiment"}
tag_data = {"name": "breast_homo_host", "namespace": f"experiment"}

Now, we add data to be uploaded

In [26]:
data_base = "/data/projects/fate/"
pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_homo_guest.csv"),
                                table_name=dense_data_guest["name"],             # table name
                                namespace=dense_data_guest["namespace"],         # namespace
                                head=1, partition=partition)               # data info

pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_homo_host.csv"),
                                table_name=dense_data_host["name"],
                                namespace=dense_data_host["namespace"],
                                head=1, partition=partition)

pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_homo_host.csv"),
                                table_name=tag_data["name"],
                                namespace=tag_data["namespace"],
                                head=1, partition=partition)

We can then upload data

In [27]:
pipeline_upload.upload(drop=1)

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2022-11-28 15:17:34.702[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202211281517345519810
[0m
[32m2022-11-28 15:17:34.709[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2022-11-28 15:17:35.720[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2022-11-28 15:17:36.735[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2022-11-28 15:17:36.736[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:02[0m
[32m2022-11-28 15:17:37.751[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2022-11-28 15:17:38.770[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2022-1

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2022-11-28 15:17:42.989[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202211281517428479710
[0m
[32m2022-11-28 15:17:42.998[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2022-11-28 15:17:44.008[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2022-11-28 15:17:46.044[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2022-11-28 15:17:46.046[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2022-11-28 15:17:47.063[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2022-11-28 15:17:48.079[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:05[0m
[32m2022-1

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2022-11-28 15:17:52.268[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202211281517521503180
[0m
[32m2022-11-28 15:17:52.276[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2022-11-28 15:17:53.285[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2022-11-28 15:17:54.301[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2022-11-28 15:17:54.302[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:02[0m
[32m2022-11-28 15:17:55.318[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2022-11-28 15:17:56.401[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2022-1

For more demo on using pipeline to submit jobs, please refer to [pipeline demos](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/demo)