# Multiple flows

1. Flow 1 runs, inputs bar1 and outputs bar2
2. Flow 2 runs, inputs Flow1.input, Flow1.output

In [13]:
import logging

logger = logging.getLogger()
logger.setLevel(logging.WARNING)

import pandas as pd
from doltpy.core import Dolt
from doltpy.core.write import import_df

dolt = Dolt.init(".")

df_v1 = pd.DataFrame({"A": [1,1,1], "B": [1,1,1]})
df_v2 = pd.DataFrame({"A": [1,1,1,2,2,2], "B": [1,1,1,2,2,2]})

import_df(dolt, "bar", df_v1.reset_index(), ["index"], "create")
dolt.add("bar")
dolt.commit("Initialize bar")

v1 = list(dolt.log(number="1").keys())[0]

import_df(dolt, "bar", df_v2.reset_index(), ["index"], "update")
dolt.add("bar")
dolt.commit("Add rows to bar")

v2 = list(dolt.log(number="1").keys())[0]

In [15]:
!poetry run python3 demo_one.py run

[35m[1mMetaflow 2.2.5.post25+gitaad3052[0m[35m[22m executing [0m[31m[1mMultiFlowDemo1[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:max-hoffman[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2021-01-17 21:50:15.338 [0m[1mWorkflow starting (run-id 1610949015330315):[0m
[35m2021-01-17 21:50:15.344 [0m[32m[1610949015330315/start/1 (pid 32857)] [0m[1mTask is starting.[0m
[35m2021-01-17 21:50:16.412 [0m[32m[1610949015330315/start/1 (pid 32857)] [0m[22m01-17 21:50:16 doltpy.core.dolt INFO     Creating engine for Dolt SQL Server instance running on 127.0.0.1:3306[0m
[35m2021-01-17 21:50:16.440 [0m[32m[1610949015330315/start/1 (pid 32857)] [0m[22m01-17 21:50:16 doltpy.core.dolt INFO     Creating engine for Dolt SQL Server instance running on 127.0.0.1:3306[0m
[3

[35m2021-01-17 21:50:21.545 [0m[32m[1610949015330315/end/3 (pid 32932)] [0m[22m01-17 21:50:21 doltpy.core.system_helpers INFO     Before exiting cleaning up child processes[0m
[35m2021-01-17 21:50:21.552 [0m[32m[1610949015330315/end/3 (pid 32932)] [0m[22m01-17 21:50:21 doltpy.core.system_helpers INFO     No processes to clean up, exiting[0m
[35m2021-01-17 21:50:21.713 [0m[32m[1610949015330315/end/3 (pid 32932)] [0m[1mTask finished successfully.[0m
[35m2021-01-17 21:50:21.713 [0m[1mDone![0m
01-17 21:50:21 doltpy.core.system_helpers INFO     Before exiting cleaning up child processes
01-17 21:50:21 doltpy.core.system_helpers INFO     No processes to clean up, exiting


In [19]:
!poetry run python3 demo_two.py run --flow-dep MultiFlowDemo1/1610949015330315

[35m[1mMetaflow 2.2.5.post25+gitaad3052[0m[35m[22m executing [0m[31m[1mMultiFlowDemo2[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:max-hoffman[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2021-01-17 21:53:45.895 [0m[1mWorkflow starting (run-id 1610949225887511):[0m
[35m2021-01-17 21:53:45.902 [0m[32m[1610949225887511/start/1 (pid 32993)] [0m[1mTask is starting.[0m
[35m2021-01-17 21:53:46.974 [0m[32m[1610949225887511/start/1 (pid 32993)] [0m[22m01-17 21:53:46 doltpy.core.dolt INFO     Creating engine for Dolt SQL Server instance running on 127.0.0.1:3306[0m
[35m2021-01-17 21:53:47.063 [0m[32m[1610949225887511/start/1 (pid 32993)] [0m[22m01-17 21:53:47 doltpy.core.dolt INFO     flow_name,run_id,step_name,task_id,kind,database,table_name,commit,timestamp[

[35m2021-01-17 21:53:49.193 [0m[32m[1610949225887511/middle/2 (pid 33045)] [0m[22m01-17 21:53:49 doltpy.core.dolt INFO     Creating engine for Dolt SQL Server instance running on 127.0.0.1:3306[0m
[35m2021-01-17 21:53:49.218 [0m[32m[1610949225887511/middle/2 (pid 33045)] [0m[22m01-17 21:53:49 doltpy.core.dolt INFO     Creating engine for Dolt SQL Server instance running on 127.0.0.1:3306[0m
[35m2021-01-17 21:53:49.275 [0m[32m[1610949225887511/middle/2 (pid 33045)] [0m[22m01-17 21:53:49 doltpy.core.dolt INFO     * master                                        	kl3vpdq0qpbmjbk2n9illdueef9u9ro6[0m
[35m2021-01-17 21:53:49.374 [0m[32m[1610949225887511/middle/2 (pid 33045)] [0m[22m[0m
[35m2021-01-17 21:53:49.374 [0m[32m[1610949225887511/middle/2 (pid 33045)] [0m[22m01-17 21:53:49 doltpy.core.write.write INFO     No import mode specified, table exists, using "update"[0m
[35m2021-01-17 21:53:49.457 [0m[32m[1610949225887511/middle/2 (pid 33045)] [0m[22m01-17 2

In [20]:
!dolt sql -q "SELECT * from metadata;"

+----------------+------------------+-----------+---------+-------+----------+------------+----------------------------------+---------------+
| flow_name      | run_id           | step_name | task_id | kind  | database | table_name | commit                           | timestamp     |
+----------------+------------------+-----------+---------+-------+----------+------------+----------------------------------+---------------+
| MultiFlowDemo1 | 1610949015330315 | middle    | 2       | write | .        | baz        | iftmv7jctmtiqtpqlhjf4amql439flaf | 1.610949e+09  |
| MultiFlowDemo1 | 1610949015330315 | start     | 1       | read  | .        | bar        | usku3to1i48gkql3osi4g32gkcflgb6r | 1.610949e+09  |
| MultiFlowDemo2 | 1610949225887511 | middle    | 2       | write | .        | baz        | 6bsk3fsslrice4ts2d86s8te1htlthjq | 1.6109492e+09 |
| MultiFlowDemo2 | 1610949225887511 | start     | 1       | read  | .        | bar        | usku3to1i48gkql3osi4g32gkcflgb6r | 1.6109492

In [21]:
!cat demo_two.py

import logging

logger = logging.getLogger()

import pickle
import time

from metaflow import FlowSpec, step, DoltDT, Parameter
from metaflow.datatools.dolt import DoltRun
import pandas as pd
from sklearn import tree

class MultiFlowDemo2(FlowSpec):
    flow_dep = Parameter('flow-dep',  help="Specifc the tag for the input version", required=True)
    @step
    def start(self):
        flow, run = self.flow_dep.split("/")
        d = DoltRun(flow_name=flow, run_id=run)
        f_input = d.reads[0]
        f_output = d.writes[0]
        with DoltDT(run=self) as dolt:
            self.inp1 = dolt.read_table(f_input.table_name, commit=f_input.commit)
            self.inp2 = dolt.read_table(f_output.table_name, commit=f_output.commit)

        self.next(self.middle)

    @step
    def middle(self):
        with DoltDT(run=self) as dolt:

            df = self.inp1 + self.inp2

            dolt.write_table(table_name='baz', df=df, pks=['index'])

        sel