![ZenML](_assets/Logo/zenml.svg)

https://github.com/zenml-io/nba-ml-pipeline

In [1]:
import warnings
warnings.filterwarnings('ignore')

# Why ZenML

![Sam](_assets/sam.png)

In [2]:
!zenml init
!zenml stack set local_stack

[32mInitializing ZenML repository at /home/hamza/temp_stuff/zenml_demo/nba-ml-pipeline.[0m
[1;35mRegistered stack component with name 'local_orchestrator'.[0m
[1;35mRegistered stack component with name 'local_metadata_store'.[0m
[1;35mRegistered stack component with name 'local_artifact_store'.[0m
[1;35mRegistered stack with name 'local_stack'.[0m
[32mZenML repository initialized at /home/hamza/temp_stuff/zenml_demo/nba-ml-pipeline.[0m
[32mActive stack: local_stack[0m


# Chapter 1 - Exploring NBA Data
## Did Steph Curry Change the Game?

https://www.youtube.com/watch?v=GEMVGHoenXM

![Steph Curry Drains the Game Winner vs Oklahoma City](https://i.makeagif.com/media/3-20-2016/7N5RWB.gif)

In [None]:
# We'll use this date in our pipelines
CURRYS_THREE_POINTER = '2016-02-27'

![PipelineStructure](_assets/DriftDetectionPipeline.png "PipelineStructure")

## Creating our first step

In [None]:
# reference
from zenml.steps import step
from steps.importer import ImporterConfig
import pandas as pd

@step
def game_data_importer(config: ImporterConfig) -> pd.DataFrame:
    """Downloads season data from NBA API and returns a pd.DataFrame"""
    dataframes = []
    for season in config.seasons:
        print(f"Fetching data for season: {season}")
        dataframes.append(leaguegamelog.LeagueGameLog(season=season, timeout=180).get_data_frames()[0])
        # sleep so as not to bomb api server :-)
        time.sleep(2)
    return pd.concat(dataframes)

In [None]:
### Import our steps
from steps.importer import game_data_importer
from steps.splitter import date_based_splitter, SplitConfig

## Creating an exploratory pipeline

In [None]:
from zenml.pipelines import pipeline

@pipeline
def data_analysis_pipeline(
        importer,          # Import NBA game data
        drift_splitter,    # Split data at relevant date
        drift_detector,    # Compare data distributions
):
    """Links all the steps together in a pipeline"""
    raw_data = importer()
    reference_dataset, comparison_dataset = drift_splitter(raw_data)
    drift_report, _ = drift_detector(reference_dataset, comparison_dataset)

https://blog.zenml.io/zenml-loves-evidently/  

![Evidently](_assets/zenml+evidently.png "Evidently")

In [None]:
!zenml integration install evidently

In [None]:
from zenml.integrations.evidently.steps import (
    EvidentlyProfileConfig,
    EvidentlyProfileStep,
)

evidently_drift_detector = EvidentlyProfileStep(
    EvidentlyProfileConfig(
        column_mapping=None,
        profile_sections=["datadrift"],
    )
)

### Run the pipeline

In [None]:
# Initialize the pipeline
eda_pipeline = data_analysis_pipeline(
    importer=game_data_importer(),
    drift_splitter=date_based_splitter(SplitConfig(date_split=CURRYS_THREE_POINTER, columns=['FG3M'])),
    drift_detector=evidently_drift_detector,
)

eda_pipeline.run()

## Post-execution: Fetching pipelines and reviewing results

In [None]:
from zenml.integrations.evidently.visualizers import EvidentlyVisualizer
from zenml.repository import Repository
import json

repo = Repository()
p = repo.get_pipeline(pipeline_name='data_analysis_pipeline')

In [None]:
p.runs

In [None]:
last_run = p.runs[-1]
last_run

In [None]:
drift_detection_step = last_run.get_step(
    name="drift_detector"
)
drift_detection_step

In [None]:
EvidentlyVisualizer().visualize(drift_detection_step)

# Chapter 2 - Training Pipeline 

![Training Pipeline](_assets/TrainingPipeline.png "Planned Architecture")

In [None]:
!zenml integration install mlflow

![Mlflow](_assets/zenml+evidently+mlflow.png "MLFlow")

In [None]:
!zenml integration install kubeflow

![All](_assets/evidently+mlflow+discord+kubeflow.png "All")

In [None]:
from datetime import date, timedelta
from zenml.pipelines import pipeline
from zenml.integrations.mlflow.mlflow_utils import (
    enable_mlflow,
    local_mlflow_backend,
)

@enable_mlflow
@pipeline
def training_pipeline(
        importer,
        feature_engineerer,
        encoder,
        ml_splitter,
        trainer,
        tester,
        drift_splitter,
        drift_detector,
        drift_alert
):
    """Links all the steps together in a pipeline"""
    # Data Preprocessing
    raw_data = importer()
    transformed_data = feature_engineerer(raw_data)
    encoded_data, le_seasons, ohe_teams = encoder(transformed_data)
    train_df_x, train_df_y, test_df_x, test_df_y, eval_df_x, eval_df_y = ml_splitter(encoded_data)
    
    # Model training
    model = trainer(train_df_x, train_df_y, eval_df_x, eval_df_y)
    test_results = tester(model, test_df_x, test_df_y)

    # drift detection branch
    reference_dataset, comparison_dataset = drift_splitter(raw_data)
    drift_report, _ = drift_detector(reference_dataset, comparison_dataset)
    drift_alert(drift_report)

In [None]:
import pandas as pd
import numpy as np
from sklearn.base import RegressorMixin
from sklearn.ensemble import RandomForestRegressor
import mlflow

from zenml.steps import step
from zenml.steps.base_step_config import BaseStepConfig


class RandomForestTrainerConfig(BaseStepConfig):
    """Config class for the sklearn trainer"""
    max_depth: int = 10000
    target_col: str = 'FG3M'


@step(enable_cache=False)
def random_forest_trainer(train_df_x: pd.DataFrame, train_df_y: pd.DataFrame,
                          eval_df_x: pd.DataFrame, eval_df_y: pd.DataFrame,
                          config: RandomForestTrainerConfig) -> RegressorMixin:

    mlflow.sklearn.autolog()
    clf = RandomForestRegressor(max_depth=config.max_depth)
    clf.fit(train_df_x, np.squeeze(train_df_y.values.T))
    eval_score = clf.score(eval_df_x, np.squeeze(eval_df_y.values.T))
    print(f"Eval score is: {eval_score}")
    return clf

In [None]:
from steps.analyzer import analyze_drift
from steps.encoder import data_encoder
from steps.evaluator import tester
from steps.feature_engineer import feature_engineer
from steps.importer import game_data_importer
from steps.splitter import sklearn_splitter, SklearnSplitterConfig, reference_data_splitter, TrainingSplitConfig
from steps.discord_bot import discord_alert

ONE_WEEK_AGO = (date.today() - timedelta(days=7)).strftime("%Y-%m-%d")


# Initialize the pipeline
train_pipeline = training_pipeline(
    # Data Wrangling
    importer=game_data_importer(),
    feature_engineerer=feature_engineer(),
    encoder=data_encoder(),
    ml_splitter=sklearn_splitter(SklearnSplitterConfig(ratios={'train': 0.6, 'test': 0.2, 'validation': 0.2})),
    
    # Model training
    trainer=random_forest_trainer(),
    tester=tester(),
    
    # Drift detection
    drift_splitter=reference_data_splitter(
        TrainingSplitConfig(
            new_data_split_date=ONE_WEEK_AGO,
            start_reference_time_frame=CURRYS_THREE_POINTER,
            end_reference_time_frame=ONE_WEEK_AGO,
            columns=["FG3M"])),
    
    drift_detector=EvidentlyProfileStep(
        EvidentlyProfileConfig(
            column_mapping=None,
            profile_sections=["datadrift"])),
    
    # Alert discord
    drift_alert=discord_alert(),
)

train_pipeline.run()

In [None]:
!mlflow ui --backend-store-uri {local_mlflow_backend()} --port 4999

In [None]:
from zenml.integrations.evidently.visualizers import EvidentlyVisualizer
from zenml.repository import Repository


last_week = date.today() - timedelta(days=7)
ONE_WEEK_AGO = last_week.strftime("%Y-%m-%d")
CURRY_FROM_DOWNTOWN = '2016-02-27'


repo = Repository()
p = repo.get_pipeline(pipeline_name='training_pipeline')
last_run = p.runs[-1]
drift_analysis_step = last_run.get_step(
    name="drift_alert"
)
print(f'Data drift detected: {drift_analysis_step.output.read()}')

drift_detection_step = last_run.get_step(
    name="drift_detector"
)
evidently_outputs = drift_detection_step

EvidentlyVisualizer().visualize(evidently_outputs)

In [None]:
!zenml stack list

## Zenml Stacks

### From Local

![LocalStack](_assets/localstack.png "LocalStack")

### To Kubeflow

![KubeflowStack](_assets/localstack-with-kubeflow-orchestrator.png "KubeflowStack")

In [3]:
!zenml container-registry register local_registry --type=default --uri=localhost:5000
!zenml orchestrator register kubeflow_orchestrator --type=kubeflow
!zenml stack register local_kubeflow_stack \
    -m local_metadata_store \
    -a local_artifact_store \
    -o kubeflow_orchestrator \
    -c local_registry

# Activate the newly created stack
!zenml stack set local_kubeflow_stack

[1;35mRegistered stack component with name 'local_registry'.[0m
[32mSuccessfully registered container registry `local_registry`.[0m
[1;35mRegistered stack component with name 'kubeflow_orchestrator'.[0m
[32mSuccessfully registered orchestrator `kubeflow_orchestrator`.[0m
[1;35mRegistered stack with name 'local_kubeflow_stack'.[0m
[32mStack `local_kubeflow_stack` successfully registered![0m
[32mActive stack: local_kubeflow_stack[0m


In [None]:
!zenml stack up

[32mProvisioning resources for stack 'local_kubeflow_stack'.[0m
[1;35mProvisioning resources for stack 'local_kubeflow_stack'.[0m
[1;35mProvisioning local Kubeflow Pipelines deployment...[0m
[1;35mCreating local K3D cluster 'zenml-kubeflow-edfcfa45'.[0m
[33mWARN[0m[0000] No node filter specified                     
[36mINFO[0m[0000] Prep: Network                                
[36mINFO[0m[0000] Created network 'k3d-zenml-kubeflow-edfcfa45' 
[36mINFO[0m[0000] Created volume 'k3d-zenml-kubeflow-edfcfa45-images' 
[36mINFO[0m[0000] Creating node 'k3d-zenml-kubeflow-registry.localhost' 
[36mINFO[0m[0000] Successfully created registry 'k3d-zenml-kubeflow-registry.localhost' 
[36mINFO[0m[0000] Starting new tools node...                   
[36mINFO[0m[0000] Starting Node 'k3d-zenml-kubeflow-edfcfa45-tools' 
[36mINFO[0m[0001] Creating node 'k3d-zenml-kubeflow-edfcfa45-server-0' 
[36mINFO[0m[0001] Creating LoadBalancer 'k3d-zenml-kubeflow-edfcfa45-serverlb' 
[36mIN

[1;35mOne or more pods not ready yet, waiting for 30 seconds...[0m
[1;35mCurrent pod status:[0m
NAME                                              READY   STATUS              RESTARTS   AGE
cache-server-55897df854-8vrlz                     0/1     ContainerCreating   0          63s
cache-deployer-deployment-d95f8b79f-qlfs8         0/1     ContainerCreating   0          63s
metadata-envoy-deployment-5b587ff9d4-bqdcm        0/1     ContainerCreating   0          63s
metadata-grpc-deployment-6b5685488-jn7sx          0/1     ContainerCreating   0          63s
metadata-writer-5c84d65485-vdzgq                  0/1     ContainerCreating   0          63s
ml-pipeline-persistenceagent-69bdb89cfc-tq4w6     0/1     ContainerCreating   0          62s
ml-pipeline-scheduledworkflow-f45d59698-szdzs     0/1     ContainerCreating   0          62s
ml-pipeline-visualizationserver-75d8c8cd9-fkrqh   0/1     ContainerCreating   0          61s
ml-pipeline-69c679bf86-wff88                      0/1     Conta

In [None]:
!zenml stack set local_kubeflow_stack
# Lets train within kubeflow pipelines
!python run_pipeline.py train

# Chapter 3 - The Prediction Pipeline

In [None]:
# Let's return to our local stack
!zenml stack set local_stack

![Training And Inference Pipeline](_assets/Training%20and%20Inference%20Pipeline.png "Planned Architecture Full")

In [None]:
from zenml.pipelines import pipeline


@pipeline(enable_cache=False)
def inference_pipeline(
        importer,
        preprocessor,
        extract_next_week,
        model_picker,
        predictor,
        post_processor
):
    """Links all the steps together in a pipeline"""
    season_schedule = importer()
    processed_season_schedule = preprocessor(season_schedule)
    upcoming_week = extract_next_week(processed_season_schedule)
    model, run_id = model_picker()
    predictions = predictor(model, upcoming_week)
    readable_predictions = post_processor(predictions)

In [None]:
from steps.encoder import encode_columns_and_clean
from steps.importer import import_season_schedule, SeasonScheduleConfig
from steps.model_picker import model_picker
from steps.predictor import predictor
from steps.splitter import get_coming_week_data, TimeWindowConfig
from steps.post_processor import data_post_processor

# Initialize the pipeline
inference_pipe = inference_pipeline(
    importer=import_season_schedule(
        SeasonScheduleConfig(current_season='2021-22')),
    preprocessor=encode_columns_and_clean(),
    extract_next_week=get_coming_week_data(TimeWindowConfig(time_window=7)),
    model_picker=model_picker(),
    predictor=predictor(),
    post_processor=data_post_processor()
)

inference_pipe.run()

In [None]:
# Lets have a look at some of our predictions
from zenml.repository import Repository

r = Repository()
df = r.get_pipeline(pipeline_name='inference_pipeline').runs[-1].steps[-1].output.read()
df.head(20)