## Save time and effort with pipeline automation

#### In this example, we're particularly looking at d6tflow which is a tool to build easy, fast, and intuitive workflows. It is also used as a lightweight model to productionize ML models faster.

In [1]:
# import libraries

import d6tflow
import pandas as pd
from collections import defaultdict
import numpy as np

# helper function
from helpers import deploy_least_rmse_model

# Model training and evaluation libraries
from surprise import Reader
from surprise import NMF, SVDpp, SVD
from surprise import Dataset
from surprise.accuracy import rmse
from surprise.model_selection import train_test_split

Loading postgres module without psycopg2 nor pg8000 installed. Will crash at runtime if postgres functionality is used.
Loading S3 module without the python package boto3. Will crash at runtime if S3 functionality is used.


Welcome to d6tflow! For Q&A see https://github.com/d6t/d6tflow


### Define Workflows

##### In the below example, we'll be defining a workflow to read, pre-process, train ML models, and deploy the best model for a movie recommendation system. This is accomplished by creating small tasks for each of the stages involved in this pipeline.

##### First, we create a task to get data from a source. It can be anything, a Kafka stream, CSV file, database, and so on. In our example, we have a batch of data collected from a Kafka stream of logs whose data involve ratings of movies by users. To simplify the data collectin process, we have already collected the data from Kafka stream and have stored it in a CSV file which is read in the next task.

In [2]:
# Get the datar
class GetData(d6tflow.tasks.TaskCSVPandas):  # save dataframe as parquet
    def run(self):
        ratings_df = pd.read_csv("./data/kafka_ratings.txt",header=None)
        self.save(ratings_df) # quickly save dataframe

##### Next, we define a task to pre-process the data that is collected

In [3]:
# Pre-Process Data
@d6tflow.requires(GetData) # define dependency
class PreProcess(d6tflow.tasks.TaskCSVPandas):  # save dataframe as parquet
    def run(self):
        ratings_df = self.input().load()
        ratings_df.columns = ["Timestamp","User","Log"]
        ratings_df["MovieName"] = ratings_df["Log"].apply(lambda x: x.split("/")[2].split("=")[0])
        ratings_df["Rating"] = ratings_df["Log"].apply(lambda x: x.split("/")[2].split("=")[1])
        ratings_df = ratings_df[["Timestamp", "User", "MovieName", "Rating"]]
        ratings_df = ratings_df.drop_duplicates(subset=['User', 'MovieName'],keep="last")
        ratings_df = ratings_df.sort_values(by=['Timestamp'])
        self.save(ratings_df) # quickly save dataframe

##### The ML algorithm we are using for movie recommendation is called as [Collaborative filtering](https://developers.google.com/machine-learning/recommendation/collaborative/basics) which requires the user and item (in this case a movie) to be represented as a numerical ID. Hence, in the next task, we create IDs for movies and add them to the dataset

In [4]:
# Map movie names to a numerical id
@d6tflow.requires(PreProcess) # define dependency
class MapMovieNameToID(d6tflow.tasks.TaskCSVPandas):  # save dataframe as parquet
    def def_movie_value():
        return "Not Present"

    def run(self):
        ratings_df = self.input().load()
        movie_id_dict = defaultdict(MapMovieNameToID.def_movie_value)
        movie_list=ratings_df['MovieName'].unique()
        for index in range(len(movie_list)):
            movie_id_dict[movie_list[index]] = index
        ratings_df['Item'] = ratings_df['MovieName'].apply(lambda x: movie_id_dict[x])
        ratings_df_processed = ratings_df[['User','Item','Rating']]
        self.save(ratings_df_processed) # quickly save dataframe

##### Once the pre-processing of the data is complete, we define the next task which is model training. Here, we are training multiple models ([Matrix Factorization-based algorithms](https://surprise.readthedocs.io/en/stable/matrix_factorization.html)) such as [SVD (Singular Value Decomposition)](https://surprise.readthedocs.io/en/stable/matrix_factorization.html), [SVD++ (Singular Value Decomposition Plus Plus)](https://surprise.readthedocs.io/en/stable/matrix_factorization.html#surprise.prediction_algorithms.matrix_factorization.SVDpp), and [NMF (Non-Negative Matrix Factorization)](https://surprise.readthedocs.io/en/stable/matrix_factorization.html#surprise.prediction_algorithms.matrix_factorization.NMF). We save all of the models created, as well as evaluate the quality of the model through [RMSE (Root Mean Squared Error)](https://en.wikipedia.org/wiki/Root-mean-square_deviation)

In [5]:
# Map movie names to a numerical id
@d6tflow.requires(MapMovieNameToID) # define dependency
class ModelTrain(d6tflow.tasks.TaskCache):  # save model as pickle
    model = d6tflow.Parameter(default='svd') # parameter for model selection

    def run(self):
        ratings_df_processed = self.input().load()
        reader = Reader(rating_scale=(1, 5))
        ratings_dataset = Dataset.load_from_df(ratings_df_processed[["User", "Item", "Rating"]], reader)
        ratings_trainset, ratings_testset = train_test_split(ratings_dataset, test_size=0.3, shuffle=False)

        if self.model == 'nmf':
            model = NMF()
        elif self.model == 'svdpp':
            model = SVDpp()
        elif self.model == 'svd':
            model = SVD()
        else:
            raise ValueError('invalid model selection')

        model.fit(ratings_trainset)
        self.save(model)
        self.saveMeta({'rmse': model.test(ratings_testset)})

##### Preview the flow of model training

In [6]:
d6tflow.preview(ModelTrain())


 ===== Luigi Execution Preview ===== 


└─--[ModelTrain-{'model': 'svd'} ([94mPENDING[0m)]
   └─--[MapMovieNameToID- ([92mCOMPLETE[0m)]
      └─--[PreProcess- ([92mCOMPLETE[0m)]
         └─--[GetData- ([92mCOMPLETE[0m)]

 ===== Luigi Execution Preview ===== 



##### Run the model flow

In [7]:
svd_param = {"model": "svd"}
svdpp_param = {"model": "svdpp"}
nmf_param = {"model": "nmf"}

flow = d6tflow.WorkflowMulti(ModelTrain, {"svd": svd_param, "svdpp": svdpp_param, "nmf": nmf_param})
flow.run()


===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 MapMovieNameToID()
* 1 ran successfully:
    - 1 ModelTrain(model=svd)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====


===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 MapMovieNameToID()
* 1 ran successfully:
    - 1 ModelTrain(model=svdpp)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====


===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 MapMovieNameToID()
* 1 ran successfully:
    - 1 ModelTrain(model=nmf)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



{'svd': LuigiRunResult(status=<LuigiStatusCode.SUCCESS: (':)', 'there were no failed tasks or missing dependencies')>,worker=<luigi.worker.Worker object at 0x117fdac10>,scheduling_succeeded=True),
 'svdpp': LuigiRunResult(status=<LuigiStatusCode.SUCCESS: (':)', 'there were no failed tasks or missing dependencies')>,worker=<luigi.worker.Worker object at 0x12200a520>,scheduling_succeeded=True),
 'nmf': LuigiRunResult(status=<LuigiStatusCode.SUCCESS: (':)', 'there were no failed tasks or missing dependencies')>,worker=<luigi.worker.Worker object at 0x117fcce20>,scheduling_succeeded=True)}

##### Get the RMSE score for each model

In [8]:
rmse_scores = flow.outputLoadMeta()
model_score = {}
for model in rmse_scores.keys():
    model_score[model] = rmse(rmse_scores[model]["rmse"])
    print(f"The RMSE score for {model} is:", model_score[model])

RMSE: 0.8192
The RMSE score for svd is: 0.8191501558851487
RMSE: 0.8176
The RMSE score for svdpp is: 0.8176416440078533
RMSE: 0.8982
The RMSE score for nmf is: 0.8982009355401239


##### Get the model pickle file for each model from the task "ModelTrain" and deploy the best one among them (least RMSE)

In [9]:
models = flow.outputLoad(task=ModelTrain)
deploy_least_rmse_model(models, model_score)

The model with least RMSE is deployed which is svdpp
