This notebook contains the bulk of the code for this project. It creates several kubeflow pipelines components, some custom, some based on Vertex AI's ready-made components to integrate with google cloud. This notebook should only need to be run once; after that, setting up the cloud function will fully automate the process to run on its own when a new foundational csv is uploaded (for example, if you wanted to get new predictions each week of the NFL season).

Please note that this notebook can be repurposed for others' projects. Preprocessing and model-building would need to be re-defined to fit the new data, but the infrastructure to automate a full modeling pipeline kicked off by uploading a beginning csv will remain. 

Further, in the cases of hard-coded paths, one would need to change these to align with their own google cloud setup.

Lastly, the purpose of this project was to build out the infrastructure for an automated modeling process kicked off by introducing new data; its purpose is NOT to build the most performant model possible. As such, any model predictions should not be considered a gambling recommendation. No bets should be made based on its output.

INSTALL PACKAGES USED FOR PIPELINE CONSTRUCTION

In [None]:
!pip install kfp --pre

In [None]:
!python3 -m pip install --upgrade kfp --user

In [None]:
USER_FLAG = "--user"

In [None]:
!pip3 install {USER_FLAG} google-cloud-aiplatform --upgrade
!pip3 install {USER_FLAG} kfp google-cloud-pipeline-components==0.1.1 --upgrade

In [None]:
# AND IMPORT THEM
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from kfp.components import create_component_from_func

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip


In [None]:
SERVICE_ACCOUNT = ""

In [None]:
# REPURPOSED FROM GOOGLE'S QWIKLABS TUTORIALS
import os
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

print("Service Account:", SERVICE_ACCOUNT)
     

In [None]:
# DEFINE VARIABLES FOR GOOGLE CLOUD INTEGRATION - CHANGE TO MATCH OWN BUCKET NAMES
REGION = 'us-central1' # NEEDS TO BE SAME REGION AS BUCKETS
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]
BUCKET = 'gs://nfl_spreads_model_output_gamma' # FOR PIPELINE OUTPUT - MUST CREATE BUCKET TO RUN
INPUT_BUCKET = 'gs://nfl_spreads_central' # BUCKET HOSTING SPREADS DATA - MUST CREATE AND UPLOAD CSV TO RUN
PIPELINE_ROOT = f"{BUCKET}/pipeline_root/"
PIPELINE_ROOT

In [None]:
# CREATING CUSTOM KUBEFLOW COMPONENT
# CORE GOAL IS TO REFORMAT ORIGINAL DATASET TO BE MACHINE READABLE FOR MODEL, SPLIT TO TRAIN/SERVE GROUPS
# BOTH DATASETS BE PASSED AS AN ARTIFACT TO TRAINING COMPONENT

@kfp.v2.dsl.component(
    # PACKAGES NECESSARY FOR COMPONENT TO RUN
    packages_to_install = [
        'pandas', 
        'numpy', 
        'scikit-learn', 
        'tensorflow', 
        'gcsfs', 
        'fsspec']
)
def preprocess_raw_data(
    input_file: Input[Dataset], # INPUT FILE WILL BE FED BY IMPORTER NODE IN PIPELINE DEFINITION
    dataset: Output[Dataset], # TRAINING DATASET OUTPUT
    preds_dataset: Output[Dataset] # SERVING DATASET OUTPUT
): 
    import pandas as pd
    import numpy as np
    import re
    from sklearn.preprocessing import StandardScaler, normalize
    import os
    import tensorflow as tf
    import warnings
    warnings.filterwarnings("ignore")
    
    
    # READ INPUT FILE IN AS PANDAS DATAFRAME
    # ADDITIONAL DOCUMENTATION ON PASSING KUBEFLOW ARTIFACTS B/W COMPONENTS HERE: 
    # https://www.kubeflow.org/docs/components/pipelines/v2/data-types/artifacts/
    df = pd.read_csv(input_file.path) 
    

    # FIRST YEAR W/SPREAD, ONLY 1 GAME IN RECORDS; TOSSING
    df = df[df['schedule_season'] != 1978]

    # ONLY WANT REGULAR SEASON; NOT DEALING WITH PICK EMS INITIALLY
    df = df[~df['schedule_week'].isin(['Wildcard', 'Division', 'Conference', 'Superbowl'])]
    df = df[~df['team_favorite_id'].isna()]

    # MARGIN = FAVORITE'S SCORE - DOG'S SCORE
    df['margin'] = np.where(df['team_home'] == df['team_favorite_id'],\
                            df['score_home'] - df['score_away'],\
                            df['score_away'] - df['score_home'])

    # NY GAMBLING (AT LEAST MY APP) CAN'T BET PUSH - GIVES BACK ORIGINAL BET IF IT HAPPENS
    # SO NOT GOING TO MODEL FOR PUSH OUTCOMES
    df = df[df['margin'] != df['spread_favorite']]
    df = df.reset_index()

    # DID THE FAVORITE BEAT THE SPREAD?
    df['beat_spread'] = np.where(df['margin'] > df['spread_favorite'],\
                            1,\
                            0)

    # WANT A MONTH OF YEAR VAR; SOME MINOR STRING FORMATTING
    df['month'] = df['schedule_date'].apply(lambda x: re.sub('/',  '', x[:2]))
    
    # AND WHETHER HOME TEAM WON OR NOT
    df['home_win'] = np.where(df['score_home'] > df['score_away'], \
                              1,\
                              0)

    df['month'] = df['month'].astype(int)
    df['schedule_week'] = df['schedule_week'].astype(int)
    df['over_under_line'] = df['over_under_line'].astype(float)

    # PLACEHOLDER VALS FOR CURRENT WIN TOTALS/LAST YEAR'S WIN TOTALS FOR EACH TEAM
    df['away_season_wins'] = None
    df['away_season_wins_yr_prior'] = None
    df['home_season_wins'] = None
    df['home_season_wins_yr_prior'] = None

    # ACTUALLY DEFINE THEM
    for i in range(len(df)):
        wk = df.loc[i, 'schedule_week']
        yr = df.loc[i, 'schedule_season']
        awy_tm = df.loc[i, 'team_away']
        hm_tm = df.loc[i, 'team_home']
        
        # DEFINE CURRENT SEASON AWAY TEAM WIN TOTAL = NUM OF OBSERVATIONS FITTING "THEY WON" CRITERIA
        df.loc[i, 'away_season_wins'] = len(df[
                                        # SAME YEAR
                                        (df['schedule_season'] == yr) &
                                        # EARLIER WEEK
                                        (df['schedule_week'] < wk) &\
                                        (    # WAS AWAY TEAM, HOME TEAM DIDN'T WIN
                                            ((df['team_away'] == awy_tm) & (df['home_win'] == 0)) |\
                                            # WAS HOME TEAM, HOME TEAM DID WIN
                                            ((df['team_home'] == awy_tm) & (df['home_win'] == 1))
                                        )
                                       ])
        
        # LAST YEAR'S WIN TOTAL
        df.loc[i, 'away_season_wins_yr_prior'] = len(df[
                                                 (df['schedule_season'] == yr - 1) &\
                                                 (
                                                     ((df['team_away'] == awy_tm) & (df['home_win'] == 0)) |\
                                                     ((df['team_home'] == awy_tm) & (df['home_win'] == 1))
                                                 )
                                                ])
        
        # CURRENT WINS FOR HOME TEAM
        df.loc[i, 'home_season_wins'] = len(df[
                                        (df['schedule_season'] == yr) &\
                                        (df['schedule_week'] < wk) &\
                                        (
                                            ((df['team_away'] == hm_tm) & (df['home_win'] == 0)) |\
                                            ((df['team_home'] == hm_tm) & (df['home_win'] == 1))
                                        )
                                       ])
        
        # HOME TEAM LAST YEAR
        df.loc[i, 'home_season_wins_yr_prior'] = len(df[
                                                 (df['schedule_season'] == yr - 1) &\
                                                 (
                                                     ((df['team_away'] == hm_tm) & (df['home_win'] == 0)) |\
                                                     ((df['team_home'] == hm_tm) & (df['home_win'] == 1))
                                                 )
                                                ])

    df['away_season_wins'] = df['away_season_wins'].astype(int)
    df['away_season_wins_yr_prior'] = df['away_season_wins_yr_prior'].astype(int)
    df['home_season_wins'] = df['home_season_wins'].astype(int)
    df['home_season_wins_yr_prior'] = df['home_season_wins_yr_prior'].astype(int)

    # ONE-HOT ENCODE CATEGORICALS
    df = pd.get_dummies(df, prefix = ['home', 'away'], columns = ['team_home', 'team_away'] )

    # DROP COLUMNS I DON'T EXPECT TO EVER BE IN IT
    df_model = df.drop(columns = ['index', 'schedule_date', 'schedule_playoff', 'team_favorite_id',\
                                  'score_home', 'score_away', 'stadium', 'weather_detail', 'margin', 'home_win'])

    #DROP COLUMNS I THINK WILL BE HARD TO KNOW AT BETTING TIME (I.E. WEATHER) W/OUT ADDITIONAL DATA
    df_model = df_model.drop(columns = ['weather_temperature', 'weather_wind_mph', 'weather_humidity'])

    std_scl = StandardScaler()

    # TRAIN, SERVE SPLIT
    # SERVING FOR LAST 3 WEEKS IN DATASET (IN FUTURE MAY DO JUST ONE WEEK - FOR NOW PREFER LARGER SAMPLE FOR TESTING)
    # REST IS TRAINING
    
    latest_season = max(df_model['schedule_season'].values.tolist())
    latest_week_of_latest_season = max(df_model[df_model['schedule_season'] == latest_season]['schedule_week'].values.tolist())
    latest_week_of_int = latest_week_of_latest_season - 2
    
    serve = df_model[(df_model['schedule_season'] == latest_season) & (df_model['schedule_week'] >= latest_week_of_int)]
    train = df_model[~df_model.index.isin(serve.index)]

    # X, Y SPLIT
    ex_train = train.drop(columns = ['beat_spread'])
    why_train = train.drop(columns = [x for x in train.columns if x != 'beat_spread'])

    ex_serve = serve.drop(columns = ['beat_spread'])
    why_serve = serve.drop(columns = [x for x in df_model.columns if x != 'beat_spread'])

    #EX TRANSFORMATIONS
    ex_serve_scaled = std_scl.fit_transform(ex_serve)
    ex_serve_norm = normalize(ex_serve)

    ex_train_scaled = std_scl.fit_transform(ex_train)
    ex_train_norm = normalize(ex_train)

    # CONVERT BOOLS TO NUMBERS FOR EASIER CSV TRANSFERS
    for col in ex_train.columns:
        if False in ex_train[col].values.tolist():
            train[col] = train[col].apply(lambda x: 1 if x == True else 0)
            serve[col] = serve[col].apply(lambda x: 1 if x == True else 0)

    # MAKE SURE ALL NUMERIC
    for col in ex_train.columns:
        train[col] = train[col].astype('float')
        serve[col] = serve[col].astype('float')

    train.to_csv(f"{dataset.path}.csv", index = False) # PASS ON TRAINING DATA ARTIFACT
    serve.to_csv(f"{preds_dataset.path}.csv", index = False) # AND SERVING DATA

In [None]:
# SECOND CUSTOM KUBEFLOW COMPONENT - THIS ONE TO BUILD MODEL, BATCH PREDICT ON SERVING DATA
# PLEASE NOTE, MY FOCUS IS PIPELINE INFRASTRUCTURE, NOT MODEL PERFORMANCE; NO MODEL OPTIMIZATION WORK HAS BEEN DONE
# NO ONE SHOULD USE THIS APPLICATION FOR ACTUAL GAMBLING PURPOSES

@kfp.v2.dsl.component(
    packages_to_install = [
        'tensorflow', 
        'pandas', 
        'joblib']
)
def build_model(
    ds: Input[Dataset], # TRAINING DATASET PASSED FROM PREPROCESS COMPONENT
    input_file: Input[Dataset], # SERVING DATASET PASSED FROM PREPROCESS COMPONENT
    epcs : int, # EPOCHS TO TRAIN SIMPLE NN MODEL
    output_model: Output[Model], # OUTPUT A MODEL TO SERVE TO AN ENDPOINT
    prediction_dataset: Output[Dataset] # AND OUTPUT A CSV WITH BATCH PREDICTIONS
    # NOTE - BOTH DEPLOYING A MODEL AND MAKING BATCH PREDICTIONS IS FAIRLY ATYPICAL
    # I DID SO TO RETAIN THE FUNCTIONALITY TO DO EITHER IF REPURPOSING THIS CODE FOR OTHER PROJECTS
):
    import tensorflow as tf
    from tensorflow.keras.layers import Dense, Flatten, Input, DenseFeatures
    import pandas as pd
    import os
    import joblib
    import numpy as np
    
    
    train = pd.read_csv(f"{ds.path}.csv")
    batch_size = 32

    # The `tf.data.experimental.make_csv_dataset()` method reads CSV files into a dataset
    train_ds = tf.data.experimental.make_csv_dataset(
        f"{ds.path}.csv", 
        batch_size,
        column_names=train.columns.tolist(),
        label_name='beat_spread',
        num_epochs=1)

    def pack_features_vector(features, labels):
      """Pack the features into a single array."""
    # Using `tf.stack` we can stack a list of rank-R tensors into one rank-(R+1) tensor.
      features = tf.stack(list(features.values()), axis=1)
      return features, labels

    train_ds = train_ds.map(pack_features_vector)
    
    model = tf.keras.Sequential([
    Dense(128, activation = 'relu', name = 'hidden_layer_1'),
    Dense(64, activation = 'relu', name = 'hidden_layer_2'),
    Dense(1, activation='sigmoid', name = 'pred_layer') 
    ])

    model.compile(optimizer='adam',
                  loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
                  metrics=['accuracy'])



    # REDUCE OVERFITTING AND SHORTEN TRAIN TIME
    earlystopping_callback = tf.keras.callbacks.EarlyStopping(patience=2)

    # PROTECT AGAINST VM RESTARTS
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath='./local-training/checkpoints',
        save_weights_only=True,
        monitor='val_loss',
        mode='min')


    model.fit(train_ds,
              callbacks=[earlystopping_callback, checkpoint_callback],
              epochs=epcs)
    
    model.save(output_model.path) # MODEL OUTPUT FOR DEPLOYMENT
    
    pred_df = pd.read_csv(f"{input_file.path}.csv")
    batch_size = 32
    
    pred_ds = tf.data.experimental.make_csv_dataset(
        f"{input_file.path}.csv", 
        batch_size,
        column_names=pred_df.columns.tolist(),
        label_name='beat_spread',
        num_epochs=1,  #DON'T WANT TO REPEAT INDEFINITELY
        shuffle = False)  #IMPORTANT TO RETAIN ORDER FOR APPENDING PREDICTIONS TO DATAFRAME)
    
    pred_ds = pred_ds.map(pack_features_vector)

    counter = 0 # USE FOR INDEX
    
    # PLACE HOLDERS TO FILL W/ACTUAL VALUES
    pred_df['prediction'] = None
    pred_df['predict_prob'] = None

    for input_vals, answer in pred_ds: # ITERATE THROUGH TF DATASET
        for prediction in model.predict(x = input_vals): # PULL NN's PREDS
            pred_df.loc[counter, 'prediction'] = round(prediction[0]) # APPEND BINARY PRED TO APPROPRIATE DF RECORD
            pred_df.loc[counter, 'predict_prob'] = prediction[0] # AND FULL PROBABILITY IN CASE IT IS OF INTEREST
            counter += 1 # INCREMENT INDEX COUNTER
    
    pred_df.to_csv(f"{prediction_dataset.path}.csv", index = False) # OUTPUT PREDS CSV AS AN ARTIFACT

In [None]:
# RELYING ON VERTEX AI CUSTOM COMPONENTS FOR DEPLOYING TO GOOGLE CLOUD
# STRONG GC PIPELINE EXAMPLES (INCLUDING THIS COMPONENT) HERE:
# https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/kfp2_pipeline.ipynb

@kfp.v2.dsl.component(
    packages_to_install=["google-cloud-aiplatform==1.25.0"],
)
def deploy_model(
    model: Input[Model],
    project_id: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model],
):
    """Deploys model to Vertex AI Endpoint.

    Args:
        model: The model to deploy.
        project_id: The project ID of the Vertex AI Endpoint.

    Returns:
        vertex_endpoint: The deployed Vertex AI Endpoint.
        vertex_model: The deployed Vertex AI Model.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project_id)

    deployed_model = aiplatform.Model.upload(
        display_name="nfl-spreads-model",
        artifact_uri=model.uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest",
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")

    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name
     

In [None]:
# PULL COMPONENTS TOGETHER (E.G. DEFINING DEPENDENCIES) TO DEFINE PIPELINE

@dsl.pipeline(
    name = 'nfl-spreads-model',
    description = 'attempt to deploy end-to-end spreads model pipeline w/kubeflow components'
)
def spreads_pipeline():
    importer_task = dsl.importer(
        artifact_uri = f"{INPUT_BUCKET}/spreadspoke_scores.csv", # DEFINE FILE FOR IMPORTER NODE TO PULL
        artifact_class = Dataset,
        reimport = True
    )
    
    # FEEDS INTO PREPROCESSING
    preprocess_task = preprocess_raw_data(importer_task.output)
   
    # INTO MODEL BUILDING
    training_task = build_model(preprocess_task.outputs['dataset'], preprocess_task.outputs['preds_dataset'], 2)
    
    # INTO MODEL DEPLOYING
    _ = deploy_model(
        project_id=PROJECT_ID,
        model=training_task.outputs['output_model'],
    )

In [None]:
# COMPILE MODEL AND DEFINE JSON PATH THAT STORES PIPELINE BUILD INFO
compiler.Compiler().compile(
    pipeline_func=spreads_pipeline,
    package_path="nfl_spreads_model.json" 
)

In [None]:
# UNIQUE DISPLAY NAME THAT INCLUDING TIME OF RUN
DISPLAY_NAME = 'nfl_spreads_gamma_12_14_8am'

job = aiplatform.PipelineJob(
    display_name = DISPLAY_NAME,
    template_path = "nfl_spreads_model.json",
    pipeline_root = PIPELINE_ROOT,
    enable_caching = False,
)

# RUN PIPELINE TO GET PREDICTION CSV ARTIFACT AND DEPLOYED MODEL
job.run(service_account = SERVICE_ACCOUNT)

In [None]:
# MOVING LOCAL JSON TO CLOUD STORAGE FOR CLOUD FUNCTION PURPOSES
!gsutil cp nfl_spreads_model.json gs://nfl_spreads_model_output_gamma/nfl_spreads_model.json