In [1]:
import time
from timeit import default_timer as timer

import pandas as pd
from collections import OrderedDict
from dataclasses import dataclass
import typing
from flytekit import Resources, task, workflow, dynamic
from flytekit.types.file import FlyteFile
from flytekit.types.schema import FlyteSchema
import numpy as np
import sklearn.linear_model as lm
from sklearn.model_selection import train_test_split
from sklearn import config_context

In [2]:
import warnings

warnings.filterwarnings("ignore")

### Common part: global variables and functions which don't require @task coverage

In [13]:
def split(X, y, test_size=0.1, stratify=None, random_state=None):
    t0 = timer()
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, stratify=stratify, random_state=random_state
    )
    split_time = timer() - t0

    return (X_train, y_train, X_test, y_test), split_time

In [3]:
DATASET_PATH = "https://rapidsai-data.s3.us-east-2.amazonaws.com/datasets/ipums_education2income_1970-2010.csv.gz"

COLS = [
    "YEAR",
    "DATANUM",
    "SERIAL",
    "CBSERIAL",
    "HHWT",
    "CPI99",
    "GQ",
    "PERNUM",
    "SEX",
    "AGE",
    "INCTOT",
    "EDUC",
    "EDUCD",
    "EDUC_HEAD",
    "EDUC_POP",
    "EDUC_MOM",
    "EDUCD_MOM2",
    "EDUCD_POP2",
    "INCTOT_MOM",
    "INCTOT_POP",
    "INCTOT_MOM2",
    "INCTOT_POP2",
    "INCTOT_HEAD",
    "SEX_HEAD",
]

COLUMNS_TYPES = [
    "int",
    "int",
    "int",
    "float",
    "int",
    "float",
    "int",
    "float",
    "int",
    "int",
    "int",
    "int",
    "int",
    "int",
    "int",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
    "float",
]

# X = OrderedDict((zip(COLS, list(map(eval, COLUMNS_TYPES)))))
# Y = OrderedDict({"EDUC": X.pop("EDUC")})
# Y

In [4]:
# ML specific
N_RUNS = 50
TEST_SIZE = 0.1
RANDOM_STATE = 777

ML_KEYS = ["t_train_test_split", "t_train", "t_inference", "t_ml"]
ML_SCORE_KEYS = ["mse_mean", "cod_mean", "mse_dev"]

## Workflow consisting from 2 tasks

In [None]:
# utils


def mse(y_test, y_pred):
    return ((y_test - y_pred) ** 2).mean()


def cod(y_test, y_pred):
    y_bar = y_test.mean()
    total = ((y_test - y_bar) ** 2).sum()
    residuals = ((y_test - y_pred) ** 2).sum()
    return 1 - (residuals / total)

In [8]:
@task
def feature_eng_task(
    data: FlyteFile[typing.TypeVar("csv")], cols: typing.List[str]
) -> (pd.DataFrame):

    df = pd.read_csv(data)[cols]

    df = df[df["INCTOT"] != 9999999]
    df = df[df["EDUC"] != -1]
    df = df[df["EDUCD"] != -1]

    df["INCTOT"] = df["INCTOT"] * df["CPI99"]

    for column in cols:
        df[column] = df[column].fillna(-1)
        df[column] = df[column].astype("float64")

    return df

{"asctime": "2021-08-11 14:06:52,456", "name": "flytekit", "levelname": "DEBUG", "message": "Task returns unnamed native tuple <class 'pandas.core.frame.DataFrame'>"}


In [9]:
@task
def ml_task(
    df: pd.DataFrame,
    random_state: int,
    n_runs: int,
    test_size: float,
    ml_keys: typing.List[str],
    ml_score_keys: typing.List[str],
) -> (typing.Dict[str, float], typing.Dict[str, float]):

    # Fetch the input and output data from train dataset
    y = np.ascontiguousarray(df["EDUC"], dtype=np.float64)
    X = np.ascontiguousarray(df.drop(columns=["EDUC", "CPI99"]), dtype=np.float64)

    clf = lm.Ridge()

    mse_values, cod_values = [], []
    ml_times = {key: 0.0 for key in ml_keys}
    ml_scores = {key: 0.0 for key in ml_score_keys}

    print("ML runs: ", n_runs)
    for i in range(n_runs):
        (X_train, y_train, X_test, y_test), split_time = split(
            X, y, test_size=test_size, random_state=random_state
        )
        ml_times["t_train_test_split"] += split_time
        random_state += 777

        t0 = timer()
        with config_context(assume_finite=True):
            model = clf.fit(X_train, y_train)
        ml_times["t_train"] += timer() - t0

        t0 = timer()
        y_pred = model.predict(X_test)
        ml_times["t_inference"] += timer() - t0

        mse_values.append(mse(y_test, y_pred))
        cod_values.append(cod(y_test, y_pred))

    ml_times["t_ml"] += ml_times["t_train"] + ml_times["t_inference"]

    ml_scores["mse_mean"] = sum(mse_values) / len(mse_values)
    ml_scores["cod_mean"] = sum(cod_values) / len(cod_values)
    ml_scores["mse_dev"] = pow(
        sum([(mse_value - ml_scores["mse_mean"]) ** 2 for mse_value in mse_values])
        / (len(mse_values) - 1),
        0.5,
    )
    ml_scores["cod_dev"] = pow(
        sum([(cod_value - ml_scores["cod_mean"]) ** 2 for cod_value in cod_values])
        / (len(cod_values) - 1),
        0.5,
    )

    return ml_scores, ml_times

In [10]:
@workflow
def census_bench_wf(
    dataset: FlyteFile["csv"] = DATASET_PATH,
    cols: typing.List[str] = COLS,
    random_state: int = RANDOM_STATE,
    n_runs: int = N_RUNS,
    test_size: float = TEST_SIZE,
    ml_keys: typing.List[str] = ML_KEYS,
    ml_score_keys: typing.List[str] = ML_SCORE_KEYS,
) -> (typing.Dict[str, float], typing.Dict[str, float]):
    df = feature_eng_task(data=dataset, cols=cols)
    ml_scores, ml_times = ml_task(
        df=df,
        random_state=random_state,
        n_runs=n_runs,
        test_size=test_size,
        ml_keys=ml_keys,
        ml_score_keys=ml_score_keys,
    )
    return ml_scores, ml_times

In [18]:
if __name__ == "__main__":
    start = time.time()
    print(census_bench_wf())
    print("--- %s seconds ---" % (time.time() - start))

{"asctime": "2021-08-10 11:01:51,102", "name": "flytekit", "levelname": "INFO", "message": "Invoking __main__.feature_eng_task with inputs: {'data': /tmp/flyte/20210810_104954/mock_remote/fd6269515537e60ac9321275db9c9722/census.csv, 'cols': ['YEAR', 'DATANUM', 'SERIAL', 'CBSERIAL', 'HHWT', 'CPI99', 'GQ', 'PERNUM', 'SEX', 'AGE', 'INCTOT', 'EDUC', 'EDUCD', 'EDUC_HEAD', 'EDUC_POP', 'EDUC_MOM', 'EDUCD_MOM2', 'EDUCD_POP2', 'INCTOT_MOM', 'INCTOT_POP', 'INCTOT_MOM2', 'INCTOT_POP2', 'INCTOT_HEAD', 'SEX_HEAD']}"}
INFO:flytekit:Invoking __main__.feature_eng_task with inputs: {'data': /tmp/flyte/20210810_104954/mock_remote/fd6269515537e60ac9321275db9c9722/census.csv, 'cols': ['YEAR', 'DATANUM', 'SERIAL', 'CBSERIAL', 'HHWT', 'CPI99', 'GQ', 'PERNUM', 'SEX', 'AGE', 'INCTOT', 'EDUC', 'EDUCD', 'EDUC_HEAD', 'EDUC_POP', 'EDUC_MOM', 'EDUCD_MOM2', 'EDUCD_POP2', 'INCTOT_MOM', 'INCTOT_POP', 'INCTOT_MOM2', 'INCTOT_POP2', 'INCTOT_HEAD', 'SEX_HEAD']}
{"asctime": "2021-08-10 11:02:46,183", "name": "flytekit", "

INFO:flytekit:Invoking __main__.ml_task with inputs: {'df':             YEAR  DATANUM     SERIAL   CBSERIAL   HHWT  CPI99   GQ  PERNUM  \
0         1970.0      2.0        1.0       -1.0  100.0  4.540  1.0     1.0   
1         1970.0      2.0        1.0       -1.0  100.0  4.540  1.0     2.0   
2         1970.0      2.0        2.0       -1.0  100.0  4.540  1.0     1.0   
3         1970.0      2.0        2.0       -1.0  100.0  4.540  1.0     2.0   
4         1970.0      2.0        4.0       -1.0  100.0  4.540  1.0     1.0   
...          ...      ...        ...        ...    ...    ...  ...     ...   
21721915  2010.0      1.0  1397787.0  1413806.0   18.0  0.764  1.0     1.0   
21721916  2010.0      1.0  1397788.0  1414099.0   57.0  0.764  1.0     1.0   
21721917  2010.0      1.0  1397788.0  1414099.0   57.0  0.764  1.0     2.0   
21721920  2010.0      1.0  1397789.0  1414268.0  101.0  0.764  1.0     1.0   
21721921  2010.0      1.0  1397789.0  1414268.0  101.0  0.764  1.0     2.0   

   

ML runs:  50


{"asctime": "2021-08-10 11:07:39,080", "name": "flytekit", "levelname": "INFO", "message": "Task executed successfully in user level, outputs: ({'mse_mean': 0.03256456908804994, 'cod_mean': 0.9953675334603814, 'mse_dev': 4.179940420229173e-05, 'cod_dev': 5.869227912341005e-06}, {'t_train_test_split': 141.7956369665917, 't_train': 137.82118896697648, 't_inference': 2.0071465999353677, 't_ml': 139.82833556691185})"}
INFO:flytekit:Task executed successfully in user level, outputs: ({'mse_mean': 0.03256456908804994, 'cod_mean': 0.9953675334603814, 'mse_dev': 4.179940420229173e-05, 'cod_dev': 5.869227912341005e-06}, {'t_train_test_split': 141.7956369665917, 't_train': 137.82118896697648, 't_inference': 2.0071465999353677, 't_ml': 139.82833556691185})


{'mse_mean': 0.03256456908804994, 'cod_mean': 0.9953675334603814, 'mse_dev': 4.179940420229173e-05, 'cod_dev': 5.869227912341005e-06}
{'t_train_test_split': 141.7956369665917, 't_train': 137.82118896697648, 't_inference': 2.0071465999353677, 't_ml': 139.82833556691185}
--- 349.6951377391815 seconds ---


## Workflow consisting from more detalized tasks

In [5]:
FEATURES = OrderedDict((zip(COLS, list(map(eval, COLUMNS_TYPES)))))
TARGET = OrderedDict({"EDUC": FEATURES.pop("EDUC")})

In [14]:
# utils


@task
def mse(y_test: pd.DataFrame, y_pred: pd.DataFrame) -> float:
    return ((y_test - y_pred) ** 2).mean()

{"asctime": "2021-08-13 10:37:41,409", "name": "flytekit", "levelname": "DEBUG", "message": "Task returns unnamed native tuple <class 'float'>"}
DEBUG:flytekit:Task returns unnamed native tuple <class 'float'>


In [15]:
@task
def cod(y_test: pd.DataFrame, y_pred: pd.DataFrame) -> float:
    y_bar = y_test.mean()
    total = ((y_test - y_bar) ** 2).sum()
    residuals = ((y_test - y_pred) ** 2).sum()
    return 1 - (residuals / total)

{"asctime": "2021-08-13 10:37:43,791", "name": "flytekit", "levelname": "DEBUG", "message": "Task returns unnamed native tuple <class 'float'>"}
DEBUG:flytekit:Task returns unnamed native tuple <class 'float'>


In [8]:
@task
def feature_eng_task(
    data: FlyteFile[typing.TypeVar("csv")], cols: typing.List[str]
) -> (pd.DataFrame):

    df = pd.read_csv(data)[cols]

    df = df[df["INCTOT"] != 9999999]
    df = df[df["EDUC"] != -1]
    df = df[df["EDUCD"] != -1]

    df["INCTOT"] = df["INCTOT"] * df["CPI99"]

    for column in cols:
        df[column] = df[column].fillna(-1)
        df[column] = df[column].astype("float64")

    return df

{"asctime": "2021-08-13 10:35:00,092", "name": "flytekit", "levelname": "DEBUG", "message": "Task returns unnamed native tuple <class 'pandas.core.frame.DataFrame'>"}


In [16]:
@dynamic
def ml_task(
    df: pd.DataFrame,
    random_state: int,
    n_runs: int,
    test_size: float,
    ml_keys: typing.List[str],
    ml_score_keys: typing.List[str],
) -> (typing.Dict[str, float], typing.Dict[str, float]):

    # Fetch the input and output data from train dataset
    #     y = np.ascontiguousarray(df["EDUC"], dtype=np.float64)
    #     X = np.ascontiguousarray(df.drop(columns=["EDUC", "CPI99"]), dtype=np.float64)
    y = df["EDUC"]
    X = df.drop(columns=["EDUC", "CPI99"])

    clf = lm.Ridge()

    mse_values, cod_values = [], []
    ml_times = {key: 0.0 for key in ml_keys}
    ml_scores = {key: 0.0 for key in ml_score_keys}

    print("ML runs: ", n_runs)
    for i in range(n_runs):
        (X_train, y_train, X_test, y_test), split_time = split(X=X, y=y)
        y_test = pd.DataFrame({"EDUC": y_test})
        ml_times["t_train_test_split"] += split_time
        random_state += 777

        t0 = timer()
        with config_context(assume_finite=True):
            model = clf.fit(X_train, y_train)
        ml_times["t_train"] += timer() - t0

        t0 = timer()
        y_pred = pd.DataFrame({"EDUC": model.predict(X_test)})
        ml_times["t_inference"] += timer() - t0

        mse_values.append(mse(y_test=y_test, y_pred=y_pred))
        cod_values.append(cod(y_test=y_test, y_pred=y_pred))

    ml_times["t_ml"] += ml_times["t_train"] + ml_times["t_inference"]

    ml_scores["mse_mean"] = sum(mse_values) / len(mse_values)
    ml_scores["cod_mean"] = sum(cod_values) / len(cod_values)
    ml_scores["mse_dev"] = pow(
        sum([(mse_value - ml_scores["mse_mean"]) ** 2 for mse_value in mse_values])
        / (len(mse_values) - 1),
        0.5,
    )
    ml_scores["cod_dev"] = pow(
        sum([(cod_value - ml_scores["cod_mean"]) ** 2 for cod_value in cod_values])
        / (len(cod_values) - 1),
        0.5,
    )

    return ml_scores, ml_times

In [17]:
@workflow
def census_bench_wf(
    dataset: FlyteFile["csv"] = DATASET_PATH,
    cols: typing.List[str] = COLS,
    random_state: int = RANDOM_STATE,
    n_runs: int = N_RUNS,
    test_size: float = TEST_SIZE,
    ml_keys: typing.List[str] = ML_KEYS,
    ml_score_keys: typing.List[str] = ML_SCORE_KEYS,
) -> (typing.Dict[str, float], typing.Dict[str, float]):
    df = feature_eng_task(data=dataset, cols=cols)
    ml_scores, ml_times = ml_task(
        df=df,
        random_state=random_state,
        n_runs=n_runs,
        test_size=test_size,
        ml_keys=ml_keys,
        ml_score_keys=ml_score_keys,
    )
    return ml_scores, ml_times

In [18]:
#  how workflow output looks like if ml function is decorated as @dynamic
if __name__ == "__main__":
    start = time.time()
    print(census_bench_wf())
    print("--- %s seconds ---" % (time.time() - start))

{"asctime": "2021-08-13 10:38:03,954", "name": "flytekit", "levelname": "INFO", "message": "Invoking __main__.feature_eng_task with inputs: {'data': /tmp/flyte/20210813_103434/mock_remote/23598bc8da2ed8cd458bca0b445bb3be/census.csv, 'cols': ['YEAR', 'DATANUM', 'SERIAL', 'CBSERIAL', 'HHWT', 'CPI99', 'GQ', 'PERNUM', 'SEX', 'AGE', 'INCTOT', 'EDUC', 'EDUCD', 'EDUC_HEAD', 'EDUC_POP', 'EDUC_MOM', 'EDUCD_MOM2', 'EDUCD_POP2', 'INCTOT_MOM', 'INCTOT_POP', 'INCTOT_MOM2', 'INCTOT_POP2', 'INCTOT_HEAD', 'SEX_HEAD']}"}
INFO:flytekit:Invoking __main__.feature_eng_task with inputs: {'data': /tmp/flyte/20210813_103434/mock_remote/23598bc8da2ed8cd458bca0b445bb3be/census.csv, 'cols': ['YEAR', 'DATANUM', 'SERIAL', 'CBSERIAL', 'HHWT', 'CPI99', 'GQ', 'PERNUM', 'SEX', 'AGE', 'INCTOT', 'EDUC', 'EDUCD', 'EDUC_HEAD', 'EDUC_POP', 'EDUC_MOM', 'EDUCD_MOM2', 'EDUCD_POP2', 'INCTOT_MOM', 'INCTOT_POP', 'INCTOT_MOM2', 'INCTOT_POP2', 'INCTOT_HEAD', 'SEX_HEAD']}
{"asctime": "2021-08-13 10:38:56,868", "name": "flytekit", "

INFO:flytekit:Invoking __main__.ml_task with inputs: {'df':             YEAR  DATANUM     SERIAL   CBSERIAL   HHWT  CPI99   GQ  PERNUM  \
0         1970.0      2.0        1.0       -1.0  100.0  4.540  1.0     1.0   
1         1970.0      2.0        1.0       -1.0  100.0  4.540  1.0     2.0   
2         1970.0      2.0        2.0       -1.0  100.0  4.540  1.0     1.0   
3         1970.0      2.0        2.0       -1.0  100.0  4.540  1.0     2.0   
4         1970.0      2.0        4.0       -1.0  100.0  4.540  1.0     1.0   
...          ...      ...        ...        ...    ...    ...  ...     ...   
21721915  2010.0      1.0  1397787.0  1413806.0   18.0  0.764  1.0     1.0   
21721916  2010.0      1.0  1397788.0  1414099.0   57.0  0.764  1.0     1.0   
21721917  2010.0      1.0  1397788.0  1414099.0   57.0  0.764  1.0     2.0   
21721920  2010.0      1.0  1397789.0  1414268.0  101.0  0.764  1.0     1.0   
21721921  2010.0      1.0  1397789.0  1414268.0  101.0  0.764  1.0     2.0   

   

ML runs:  50






{"asctime": "2021-08-13 10:48:32,120", "name": "flytekit", "levelname": "INFO", "message": "Task executed successfully in user level, outputs: ({'mse_mean': 15.163305349402624, 'cod_mean': EDUC    0.841946\ndtype: float64, 'mse_dev': 0.062010322840364805, 'cod_dev': EDUC    0.000746\ndtype: float64}, {'t_train_test_split': 389.54016741202213, 't_train': 107.19489938160405, 't_inference': 3.370379501953721, 't_ml': 110.56527888355777})"}
INFO:flytekit:Task executed successfully in user level, outputs: ({'mse_mean': 15.163305349402624, 'cod_mean': EDUC    0.841946
dtype: float64, 'mse_dev': 0.062010322840364805, 'cod_dev': EDUC    0.000746
dtype: float64}, {'t_train_test_split': 389.54016741202213, 't_train': 107.19489938160405, 't_inference': 3.370379501953721, 't_ml': 110.56527888355777})


DefaultNamedTupleOutput(o0={'mse_mean': 15.163305349402624, 'cod_mean': EDUC    0.841946
dtype: float64, 'mse_dev': 0.062010322840364805, 'cod_dev': EDUC    0.000746
dtype: float64}, o1={'t_train_test_split': 389.54016741202213, 't_train': 107.19489938160405, 't_inference': 3.370379501953721, 't_ml': 110.56527888355777})
--- 629.7847700119019 seconds ---
