# A Machine Learning Productionalization Workflow

- Code and project layout
- ML "Step" coding style
- ML "Workflow" coding style
- Unit testing style
- Python coding style and linting

# Concepts: Workflow

The **workflow** is the entire end-to-end machine learning project. It includes 

- loading the data from data warehouses
- feature creation
- model training
- value prediction
- exposure of predicted values

# Concepts: Step and Stage

A **step** is the smallest building block of an ML workflow. A step might do one of any number of things:

- data loading
- data transformation
- model training
- model inference 
- model serving

A **stage** is a collection of **step**s that all run on the same machine.

# Code organization

Folder structure

- ProjectName/
    - packagename/
        - stage1/
            - steps/
                - (e.g.) load_data.py
            - tests/
            - main.py
        - stage2/
            - steps/
            - tests/
            - main.py
        - mock_data/
        - workflow_test.py
        - config.py
        - constants.py

# Step Code

A step is Python module containing a `class` that implements the step. Step classes should provide the following methods:

- `__init__` should initialize the step, providing all hyperparameters necessary as arguments to the initializer
- `execute(...)` should actually run the step. Execute has as its input any 'upstream' data and may provide output data for 'downstream' consumption as well
- (optional) `persist(path)` should persist the results of running the step at the given path

Additionally, Step code must provide the following docstrings:

- The step module docstring contains an example of how to use the step
- The step class docstring contains documentation of
    - the `__init__` parameters (configuration of the step)
    - expected data in `execute()`
    - format of data returned by `execute()` 
    - persistence strategy, if any


A possible step is shown below:

In [2]:
cd data/recommender

/Users/rick446/src/arborian-classes/data/recommender


In [3]:
%%file ./recommender/stage1/steps/train_test_split.py
"""
Split events into training and test sets

Example usage:
>>> import pandas as pd
>>> from recommender.stage1.train_test_split import TrainTestSplit
>>> df = pd.read_csv('workflow_dev/mock_data/events.csv')
>>> step = TrainTestSplit(train_percentile=90)
>>> (df_train, df_test) = step.execute(df)
"""
import pickle
import numpy as np


class TrainTestSplit():
    """Workflow step that splits an event stream into two event streams based on a particular
    column in the stream.

    Init parameters:
        column_name: string representing the column to partition based on
        train_percentile: percentage of the values in the given column
            to be partitioned into the training set

    Execution Args:
        df: Pandas DataFrame containing an event stream with (at least) a column
            with the specified column name

    Execution Returns:
        Tuple of (training, test) dataframes

    Persist Strategy:
        Pickle the tuple of dataframes
    """

    def __init__(
            self,
            column_name='visitorid',
            train_percentile=90,
    ):
        self.column_name = column_name
        self.train_percentile = train_percentile

    def execute(self, df):
        col = df[self.column_name]
        split_value = np.percentile(col, self.train_percentile)
        train = df[col < split_value]
        test = df[col >= split_value]
        return train, test

    def persist(self, path, data):
        with open(path, 'wb') as f:
            pickle.dump(f, data)

    def execute_persist(self, path, df):
        result = self.execute(df)
        self.persist(path, result)
        return result


Overwriting ./recommender/stage1/steps/train_test_split.py


# Testing the step

To test this step, there should be a unittest file defined in the corresponding tests folder:

In [4]:
ls ./recommender/stage1

__init__.py  [34m__pycache__[m[m/ main.py      [34msteps[m[m/       [34mtests[m[m/


In [5]:
%%file ./recommender/stage1/tests/test_train_test_split.py
from unittest import TestCase

import numpy as np
import pandas as pd

from recommender.stage1.steps.train_test_split import TrainTestSplit

class TestTestTrainSplit(TestCase):

    def setUp(self):
        values = np.r_[:100]
        self.df = pd.DataFrame({
            'visitorid': values,
            'anotherid': values[::-1],
        })

    def test_execute_defaults(self):
        step = TrainTestSplit()
        df_train, df_test = step.execute(self.df)
        self.assertEqual(len(df_train), 90)
        self.assertEqual(df_test.visitorid.min(), 90)

    def test_execute_50(self):
        step = TrainTestSplit(train_percentile=50)
        df_train, df_test = step.execute(self.df)
        self.assertEqual(len(df_train), 50)
        self.assertEqual(df_test.visitorid.min(), 50)

    def test_execute_othercol(self):
        step = TrainTestSplit(column_name='anotherid')
        df_train, df_test = step.execute(self.df)
        self.assertEqual(len(df_train), 90)
        self.assertEqual(df_test.visitorid.min(), 0)
        self.assertEqual(df_test.anotherid.min(), 90)


Overwriting ./recommender/stage1/tests/test_train_test_split.py


In [6]:
!python -m unittest recommender/stage1/tests/test_train_test_split.py

...
----------------------------------------------------------------------
Ran 3 tests in 0.014s

OK


# Workflow code

A workflow is a single Python module named `main.py` in the stage directory containing a `class` that implements a stage of execution. Stage classes should provide the following methods:

- `__init__` should initialize the stage and all the steps within the stage. Parameters to initialize the individual steps should be passed as a nested Python dictionary with the keys corresponding to the step names.
- `execute(...)` should actually run the stage. 
- (optional) `persist(path)` should persist the results of running the stage at the given path

Additionally, Workflow code must provide the following docstrings:

- The step module docstring contains an example of how to use the stage
- The stage class docstring contains documentation of
    - the `__init__` parameters (configuration of the step)
    - any expected data in `execute()`
    - format of data returned by `execute()` 
    - persistence strategy, if any


A possible workflow stage is shown below:


In [7]:
%%file recommender/stage1/main.py
"""
Load data, extract features, and prepare for fitting.

Example usage:
>>> from recommender.workflow_dev.workflow_stage1 import Stage1
>>> st1 = Stage1(config)
>>> st1.execute_persist()
"""
import pickle

from recommender.stage1.steps.data_loader import DataLoader
from recommender.stage1.steps.score_events import ScoreEvents
from recommender.stage1.steps.group_scores import ScoreGrouper
from recommender.stage1.steps.filter_scores import ScoreFilter
from recommender.stage1.steps.column_encoder import ColumnEncoder


class Stage1():
    """This stage loads the event and fits the scoring model.

    It consists of the following steps:
    - load the data
    - scoring events
    - group scores by visitorid/itemid
    - finding relevant visitors
    - encoding items for relevant visitors with a LabelEncoder

    Example config:
    cfg = {
        'data_loader': {
            'event_filename': 'mock_data/events.csv',
        },
        'score_events': {
            'input_column_name': 'event',
            'score_column_name': 'score',
            'score_map': {
                    'view': 1,
                    'addtocart': 5,
                    'transaction': 10,
            },
        },
        'encode_itemids': {
            'column_name': 'itemid',
        },
        'group_scores': {
            'key': 'visitorid itemid'.split(),
        },
        'score_filter': {
            'min_score': 10,
        },
    }

    Execution Args:
        None

    Execution Returns:
        Python Dictionary with the following keys:
            - item_encoder: LabelEncoder() with encoded relevant itemids
            - scores: Pandas Series indexed by (visitorid, encoded itemid) with
                the score for each relevant visitorid/item
        }

    Persist strategy:
        Pickle the result
    """

    def __init__(self, config):
        self.config = config

    def _load_data(self):
        step = DataLoader(**self.config.get('data_loader', {}))
        return step.execute()

    def _score_events(self, df_events):
        step = ScoreEvents(**self.config.get('score_events', {}))
        return step.execute(df_events)

    def _group_scores(self, df_scored_events):
        step = ScoreGrouper(**self.config.get('group_scores', {}))
        return step.execute(df_scored_events)

    def _filter_scores(self, s_grouped_scores):
        step = ScoreFilter(**self.config.get('event_filter', {}))
        return step.execute(s_grouped_scores)

    def _encode_itemids(self, s_filtered_scores):
        step = ColumnEncoder(**self.config.get('encode_itemids', {}))
        return step.execute(s_filtered_scores)

    def execute(self):
        df_events = self._load_data()
        df_scored_events = self._score_events(df_events)
        s_grouped_scores = self._group_scores(df_scored_events)
        df_filtered_scores = self._filter_scores(s_grouped_scores.reset_index())
        d_encoder = self._encode_itemids(df_filtered_scores)
        return {
            'item_encoder': d_encoder['encoder'],
            'scores': d_encoder['df'],
        }

    def persist(self, path, data):
        with open(path, 'wb') as f:
            pickle.dump(f, data)

    def execute_persist(self, path):
        data = self.execute()
        self.persist(path, data)
        return data


Overwriting recommender/stage1/main.py


# Testing the stage

Each of the workflow stages should be tested independently in a test script in the package directory:

In [8]:
%%file recommender/workflow_test.py
from unittest import TestCase


import recommender.constants as C
from recommender.stage1.main import Stage1


class TestDevWorkflow(TestCase):

    def setUp(self):
        self.config_stage1 = {
            'data_loader': {
                'event_filename': 'recommender/mock_data/events.csv',
            },
            'score_events': {
                'input_column_name': C.EVENT_COLUMN_NAME,
                'score_column_name': C.SCORE_COLUMN_NAME,
                'score_map': {
                        'view': 1,
                        'addtocart': 5,
                        'transaction': 10,
                },
            },
            'encode_itemids': {
                'column_name': 'itemid',
            },
            'group_scores': {
                'key': 'visitorid itemid'.split(),
            },
            'score_filter': {
                'min_score': 10,
            },
        }

    def test_stage_1(self):
        stage = Stage1(self.config_stage1)
        result = stage.execute()


Overwriting recommender/workflow_test.py


In [10]:
!python -m unittest recommender/workflow_test.py

.
----------------------------------------------------------------------
Ran 1 test in 0.044s

OK
