# Databolt Flow
For data scientists and data engineers, d6tflow is a python library which makes building complex data science workflows easy, fast and intuitive.

https://github.com/d6t/d6tflow

## Benefits of using d6tflow

[4 Reasons Why Your Machine Learning Code is Probably Bad](https://medium.com/@citynorman/4-reasons-why-your-machine-learning-code-is-probably-bad-c291752e4953)

# Example Usage For a Machine Learning Workflow

Below is an example of a typical machine learning workflow: you retreive data, preprocess it, train a model and evaluate the model output.

In this example you will:
* Build a machine learning workflow made up of individual tasks
* Check task dependencies and their execution status
* Execute the model training task including dependencies
* Save intermediary task output to Parquet, pickle and in-memory
* Load task output to pandas dataframe and model object for model evaluation
* Intelligently rerun workflow after changing a preprocessing parameter


### Step 1: Define workflow

In [1]:
import d6tflow
import luigi
from luigi.util import inherits
import sklearn, sklearn.datasets, sklearn.svm, sklearn.linear_model
import pandas as pd

# define workflow
class TaskGetData(d6tflow.tasks.TaskPqPandas):  # save dataframe as parquet

    def run(self):
        ds = sklearn.datasets.load_breast_cancer()
        df_train = pd.DataFrame(ds.data, columns=ds.feature_names)
        df_train['y'] = ds.target
        self.save(df_train) # quickly save dataframe


@d6tflow.requires(TaskGetData) # define dependency
class TaskPreprocess(d6tflow.tasks.TaskPqPandas):
    do_preprocess = luigi.BoolParameter(default=True) # parameter for preprocessing yes/no

    def run(self):
        df_train = self.input().load() # quickly load required data
        if self.do_preprocess:
            df_train.iloc[:,:-1] = sklearn.preprocessing.scale(df_train.iloc[:,:-1])
        self.save(df_train)

@d6tflow.requires(TaskPreprocess) # automatically pass parameters upstream
class TaskTrain(d6tflow.tasks.TaskPickle): # save output as pickle
    model = luigi.Parameter(default='ols') # parameter for model selection

    def run(self):
        df_train = self.input().load()
        if self.model=='ols':
            model = sklearn.linear_model.LogisticRegression()
        elif self.model=='svm':
            model = sklearn.svm.SVC()
        else:
            raise ValueError('invalid model selection')
        model.fit(df_train.drop('y',1), df_train['y'])
        self.save(model)

Welcome to d6tflow!


### Step 2: Define experiments

In [2]:
# goal: compare performance of two models
params_model1 = {'do_preprocess':True, 'model':'ols'}
params_model2 = {'do_preprocess':False, 'model':'svm'}


### Step 3: Run workflow

In [3]:
# run workflow for model 1
d6tflow.run(TaskTrain(**params_model1)) 



===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskGetData()
    - 1 TaskPreprocess(do_preprocess=True)
    - 1 TaskTrain(do_preprocess=True, model=ols)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



LuigiRunResult(status=<LuigiStatusCode.SUCCESS: (':)', 'there were no failed tasks or missing dependencies')>,worker=<luigi.worker.Worker object at 0x0000015DF1351D08>,scheduling_succeeded=True)

In [4]:
# Intelligently rerun workflow after changing parameters
d6tflow.preview(TaskTrain(**params_model2))



 ===== Luigi Execution Preview ===== 


└─--[TaskTrain-{'do_preprocess': 'False', 'model': 'svm'} ([94mPENDING[0m)]
   └─--[TaskPreprocess- ([94mPENDING[0m)]
      └─--[TaskGetData- ([92mCOMPLETE[0m)]

 ===== Luigi Execution Preview ===== 



In [5]:
# run workflow for model 2
d6tflow.run(TaskTrain(**params_model2))



===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 1 complete ones were encountered:
    - 1 TaskGetData()
* 2 ran successfully:
    - 1 TaskPreprocess(do_preprocess=False)
    - 1 TaskTrain(do_preprocess=False, model=svm)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



LuigiRunResult(status=<LuigiStatusCode.SUCCESS: (':)', 'there were no failed tasks or missing dependencies')>,worker=<luigi.worker.Worker object at 0x0000015DF67C74C8>,scheduling_succeeded=True)

In [6]:
# compare results from new model
# Load task output to pandas dataframe and model object for model evaluation

model1 = TaskTrain(**params_model1).output().load()
df_train = TaskPreprocess(**params_model1).output().load()
print(model1.score(df_train.drop('y',1), df_train['y']))
# 0.987

model2 = TaskTrain(**params_model2).output().load()
df_train = TaskPreprocess(**params_model2).output().load()
print(model2.score(df_train.drop('y',1), df_train['y']))
# 0.922


0.9876977152899824
0.9226713532513181


# Next steps: Transition code to d6tflow

See https://d6tflow.readthedocs.io/en/latest/transition.html