# Databolt Flow
For data scientists and data engineers, d6tflow is a python library which makes building complex data science workflows easy, fast and intuitive.

https://github.com/d6t/d6tflow

## Benefits of using d6tflow

[4 Reasons Why Your Machine Learning Code is Probably Bad](https://medium.com/@citynorman/4-reasons-why-your-machine-learning-code-is-probably-bad-c291752e4953)

# Example Usage For a Machine Learning Workflow

Below is an example of a typical machine learning workflow: you retreive data, preprocess it, train a model and evaluate the model output.

In this example you will:
* Build a machine learning workflow made up of individual tasks
* Check task dependencies and their execution status
* Execute the model training task including dependencies
* Save intermediary task output to Parquet, pickle and in-memory
* Load task output to pandas dataframe and model object for model evaluation
* Intelligently rerun workflow after changing a preprocessing parameter


In [7]:
import d6tflow
import luigi
import sklearn, sklearn.datasets, sklearn.svm
import pandas as pd

# define workflow
class TaskGetData(d6tflow.tasks.TaskPqPandas):  # save dataframe as parquet

    def run(self):
        ds = sklearn.datasets.load_breast_cancer()
        df_train = pd.DataFrame(ds.data, columns=ds.feature_names)
        df_train['y'] = ds.target
        self.save(df_train) # quickly save dataframe


@d6tflow.requires(TaskGetData) # define dependency
class TaskPreprocess(d6tflow.tasks.TaskPqPandas):
    do_preprocess = luigi.BoolParameter(default=True) # parameter for preprocessing yes/no

    def run(self):
        df_train = self.input().load() # quickly load required data
        if self.do_preprocess:
            df_train.iloc[:,:-1] = sklearn.preprocessing.scale(df_train.iloc[:,:-1])
        self.save(df_train)

@d6tflow.requires(TaskPreprocess) # automatically pass parameters upstream
class TaskTrain(d6tflow.tasks.TaskPickle): # save output as pickle
    model = luigi.Parameter(default='ols') # parameter for model selection

    def run(self):
        df_train = self.input().load()
        if self.model=='ols':
            model = sklearn.linear_model.LogisticRegression()
        elif self.model=='svm':
            model = sklearn.svm.SVC()
        else:
            raise ValueError('invalid model selection')
        model.fit(df_train.drop('y',1), df_train['y'])
        self.save(model)


In [8]:
# goal: compare performance of two models
params_model1 = {'do_preprocess':True, 'model':'ols'}
params_model2 = {'do_preprocess':False, 'model':'svm'}


In [9]:
# run workflow for model 1
d6tflow.run(TaskTrain(**params_model1)) 


INFO: Informed scheduler that task   TaskTrain_True_ols_23ea6043b6   has status   PENDING
INFO: Informed scheduler that task   TaskPreprocess_True_e00389f8b2   has status   PENDING
INFO: Informed scheduler that task   TaskGetData__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 10732] Worker Worker(salt=791898276, workers=1, host=DESKTOP-3CSM8J6, username=deepmind, pid=10732) running   TaskGetData()
INFO: [pid 10732] Worker Worker(salt=791898276, workers=1, host=DESKTOP-3CSM8J6, username=deepmind, pid=10732) done      TaskGetData()
INFO: Informed scheduler that task   TaskGetData__99914b932b   has status   DONE
INFO: [pid 10732] Worker Worker(salt=791898276, workers=1, host=DESKTOP-3CSM8J6, username=deepmind, pid=10732) running   TaskPreprocess(do_preprocess=True)
INFO: [pid 10732] Worker Worker(salt=791898276, workers=1, host=DESKTOP-3CSM8J6, username=deepmind, pid=10732) done      TaskPreprocess(do_preprocess=True)
INFO: 

True

In [10]:
# Intelligently rerun workflow after changing parameters
d6tflow.preview(TaskTrain(**params_model2))



└─--[TaskTrain-{'do_preprocess': 'False', 'model': 'svm'} ([94mPENDING[0m)]
   └─--[TaskPreprocess-{'do_preprocess': 'False'} ([94mPENDING[0m)]
      └─--[TaskGetData-{} ([92mCOMPLETE[0m)]


In [11]:
# run workflow for model 2
d6tflow.run(TaskTrain(**params_model2))


INFO: Informed scheduler that task   TaskTrain_False_svm_f71b180747   has status   PENDING
INFO: Informed scheduler that task   TaskPreprocess_False_57897150ee   has status   PENDING
INFO: Informed scheduler that task   TaskGetData__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 10732] Worker Worker(salt=395725029, workers=1, host=DESKTOP-3CSM8J6, username=deepmind, pid=10732) running   TaskPreprocess(do_preprocess=False)
INFO: [pid 10732] Worker Worker(salt=395725029, workers=1, host=DESKTOP-3CSM8J6, username=deepmind, pid=10732) done      TaskPreprocess(do_preprocess=False)
INFO: Informed scheduler that task   TaskPreprocess_False_57897150ee   has status   DONE
INFO: [pid 10732] Worker Worker(salt=395725029, workers=1, host=DESKTOP-3CSM8J6, username=deepmind, pid=10732) running   TaskTrain(do_preprocess=False, model=svm)
INFO: [pid 10732] Worker Worker(salt=395725029, workers=1, host=DESKTOP-3CSM8J6, username=deepmind, pid=

True

In [12]:
# compare results from new model
# Load task output to pandas dataframe and model object for model evaluation

model1 = TaskTrain(**params_model1).output().load()
df_train = TaskPreprocess(**params_model1).output().load()
print(model1.score(df_train.drop('y',1), df_train['y']))
# 0.987

model2 = TaskTrain(**params_model2).output().load()
df_train = TaskPreprocess(**params_model2).output().load()
print(model2.score(df_train.drop('y',1), df_train['y']))
# 0.922


0.9876977152899824
0.9226713532513181


# Next steps: Transition code to d6tflow

See https://d6tflow.readthedocs.io/en/latest/transition.html