In [32]:
import os
import time
import boto3
import json
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import sagemaker
from sagemaker import get_execution_role

from io import StringIO, BytesIO

In [33]:
with open('/'.join(os.getcwd().split('/')[:-1])+'/params.json','r') as params:
    params = json.load(params)

In [34]:
sess = boto3.Session()
sm = sess.client('sagemaker')
s3_client = boto3.client('s3')
sm_role = params['sm_role']
lambda_role = params['lambda_role']

sagemaker_session = sagemaker.Session(boto_session=sess, default_bucket=params['bucket_pipeline'])
bucket = params['bucket_pipeline'] # sagemaker_session.default_bucket()
region = boto3.Session().region_name

model_package_group_name = 'nba-models-2023' # model name in model registry
pipeline_name = 'nba-pipeline-1'  # sageMaker Pipeline name
prefix = 'nba'

# must reset this time every time we want a pipeline
current_time = time.strftime('%m-%d-%H-%M-%S', time.localtime())

In [35]:
from sagemaker.workflow.pipeline_context import PipelineSession
pipeline_session = PipelineSession()

In [36]:
raw_s3 = f's3://{bucket}/{prefix}/raw'

In [37]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# raw input data
input_data = ParameterString(name='InputData', default_value=raw_s3)

# training step parameters
training_epochs = ParameterString(name='TrainingEpochs', default_value='10')

# model performance step parameters
accuracy_threshold = ParameterFloat(name='AccuracyThreshold', default_value=0.6)

# inference step parameters
endpoint_instance_type = ParameterString(name='EndpointInstanceType', default_value='ml.m5.large')

In [38]:
tf_version = '2.9.2'
sk_version = '1.0-1'
python_version = 'py39'

#### **HELPERS**

In [39]:
def write_s3(file_path, myfile, bucket=params['bucket_data'], dedupe_cols=None, sort=None, ascending=True, compression='zip'):
    file_name = file_path.split('/')[-1]
    if type(myfile) == pd.core.frame.DataFrame:
        if sort is not None:
            myfile = myfile.sort_values(by=sort, ascending=ascending)
        if dedupe_cols is not None:
            myfile = myfile.drop_duplicates(subset=dedupe_cols, keep='first')
        output_buffer = BytesIO() if compression == 'zip' else StringIO()
        if compression == 'zip': file_path = '.'.join(file_path.split('.')[:-1])+'.zip'
        myfile.to_csv(output_buffer, index=False, compression={'method':compression, 'archive_name':file_name})
        myfile = output_buffer
    s3_resource.Object(bucket, file_path).put(Body=myfile.getvalue())    

#### **PREPROCESS**

In [40]:
%%writefile code/preprocess.py
import os, re
import glob
import boto3
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None

import subprocess

from sklearn.compose import make_column_transformer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder, OrdinalEncoder, LabelEncoder

from collections import Counter, deque, defaultdict
import datetime
from datetime import timedelta as td
from datetime import datetime as dt

from zipfile import ZipFile
import sys

s3_resource = boto3.resource('s3')
s3_client = boto3.client('s3')
bucket = 'sagemaker-pipelines-hwm'

input_path = '/opt/ml/processing/input'
train_path = '/opt/ml/processing/train'
val_path = '/opt/ml/processing/val'
test_path = '/opt/ml/processing/test'

def read_s3(file_path, bucket='hwm-nba', output=None, columns=None):
    data = s3_resource.Object(bucket, file_path).get()['Body'].read()
    if file_path[-3:] == 'zip':
        data = zipfile.ZipFile(BytesIO(data))
        data = data.read(data.namelist()[0])
    data = data.decode('utf-8')
    if output == 'dataframe':
        kwargs = {'header': 0}
        if columns is not None: kwargs['names'] = columns
        data = pd.read_csv(StringIO(data), **kwargs)
    return data


def basic_features(games):
    filters = []
    filters.append("away != 'No Games'")
    filters.append("away != '0'")
    filters.append("hs >= 50")
    filters.append("hs != '-'")
    filters.append("diff != 0")
    filters.append("date >= '1996'")

    # games.columns = [re.sub(' ','_',c.lower()) for c in list(games.columns)]
    games = games.drop_duplicates(subset=['detail_path'], keep='last')
    
    games.loc[:,'hs'] = games.loc[:,'hs'].apply(lambda x: make_int(x))
    games.loc[:,'as'] = games.loc[:,'as'].apply(lambda x: make_int(x))

    # clean up home and away scores
    games['ot'] = games.apply(lambda x: 0 if (x['ot']=='N' or x['away']=='No Games') \
                              else (1 if int(max(x['as'],x['hs']))<200 else int(str(max(x['as'],x['hs']))[:1])),axis=1)

    games['as'] = games.apply(lambda x: x['as'] if (x['ot']<2 or x['as']<200) else x['as'] % 1000, axis=1)
    games['hs'] = games.apply(lambda x: x['hs'] if (x['ot']<2 or x['hs']<200) else x['hs'] % 1000, axis=1)

    games['game_subtype'] = games['game_type'].apply(lambda x: re.sub('^(east|west)-', '', re.sub('-[0-9]{1}$','',x)))
    games['game_type'] = games['game_type'].apply(lambda x: x if x in ['preseason','regular'] else 'playoffs')

    games['diff'] = games.apply(lambda r: np.abs(r['hs']-r['as']),axis=1)
    games = games.query(' & '.join(filters)).reset_index(drop=True)
    
    # clean the team names - reconcile old franchises with new, and aggregate the long tail of guest/novelty teams
    games['home'] = games['home'].apply(lambda x: team_names_dict[x])
    games['away'] = games['away'].apply(lambda x: team_names_dict[x])
        
    games['month'] = games['date'].apply(lambda x: x[5:7])
    games['date'] = games['date'].apply(lambda x: dt.strptime(x,'%Y-%m-%d').date())
    games['winner'] = games.apply(lambda x: x['home'] if x['hs'] >= x['as'] else x['away'], axis=1)
    games['home_win'] = games.apply(lambda x: 1 if x['hs'] >= x['as'] else 0, axis=1)
    games['season'] = games['date'].apply(lambda x: str((x-td(days=225)).year)+'-'+str((x-td(days=225)).year+1)) # mid-august split
    # exceptions
    games['season'] = games.apply(lambda x: '2019-2020' if (dt.strftime(x['date'],'%Y-%m-%d')>='2020-03-01' \
                                                            and dt.strftime(x['date'],'%Y-%m-%d')<='2020-11-15') else x['season'], axis=1)

    games['is_preseason'] = games.apply(lambda x: 1 if x['game_type']=='preseason' else 0,axis=1)
    games['is_playoffs'] = games.apply(lambda x: 0 if x['game_type'] in ['preseason','regular'] else 1,axis=1)
    
    games['team_pair'] = games.apply(lambda r: [r['away'], r['home']], axis=1)
    games['team_pair'].apply(lambda r: r.sort())
    games['team_pair'] = games['team_pair'].apply(lambda r: '-'.join(r))
    
    del games['detail_data']
    del games['rand']
    
    return games


def playoff_features(games): # etl the playoff calculations
    games.columns = [re.sub(' ','_',c.lower()) for c in list(games.columns)]
    playoff_games = games[games['game_type']=='playoffs'].iloc[:,:].sort_values(by=['season', 'team_pair', 'date']).reset_index(drop=True)

    playoff_games['winner_1'] = playoff_games.apply(lambda r: 1 if r['winner']==r['team_pair'][:3] else 0, axis=1)
    playoff_games['winner_2'] = playoff_games.apply(lambda r: 1 if r['winner']==r['team_pair'][-3:] else 0, axis=1)

    playoff_games[['t1_wins_after_game', 't2_wins_after_game']] = \
        playoff_games[['season', 'team_pair', 'winner_1', 'winner_2']].groupby(['season', 'team_pair'])[['winner_1', 'winner_2']].transform(pd.Series.cumsum)
    del playoff_games['winner_1']
    del playoff_games['winner_2']

    playoff_games['leader_after_game'] = playoff_games.apply(lambda r: r['team_pair'][:3] if r['t1_wins_after_game']>r['t2_wins_after_game'] \
                                                             else (r['team_pair'][-3:] if r['t2_wins_after_game']>r['t1_wins_after_game'] \
                                                                   else 'tied series'), axis=1)
    playoff_games[['season2', 'team_pair2', 't1_wins_before_game', 't2_wins_before_game', 
                   'leader_before_game']] = playoff_games[['season', 'team_pair', 't1_wins_after_game',
                                                           't2_wins_after_game', 'leader_after_game']].shift(periods=1)
    
    playoff_games['t1_wins_before_game'] = playoff_games.apply(lambda r: 0 if np.isnan(r['t1_wins_before_game']) \
                                                               or r['season']!=r['season2'] else int(r['t1_wins_before_game']), axis=1)
    playoff_games['t2_wins_before_game'] = playoff_games.apply(lambda r: 0 if np.isnan(r['t2_wins_before_game']) \
                                                               or r['season']!=r['season2'] else int(r['t2_wins_before_game']), axis=1)
    playoff_games['leader_before_game'] = playoff_games.apply(lambda r: 'series starting' \
                                                              if str(r['leader_before_game'])=='nan' or r['season']!=r['season2'] \
                                                              else r['leader_before_game'], axis=1)
    del playoff_games['season2']
    del playoff_games['team_pair2']
    
    playoff_series_winners = playoff_games.drop_duplicates(subset=['season', 'team_pair'],keep='last').reset_index(drop=True)
    playoff_series_winners = playoff_series_winners[['date', 'season', 'game_type', 'team_pair', 'winner']]
    playoff_series_winners = playoff_series_winners.sort_values(by=['winner', 'date']).reset_index(drop=True)
    playoff_series_winners['count'] = 1

    playoff_series_winners['playoff_round'] = playoff_series_winners[['winner', 'season', 'game_type',
                                                                      'count']].groupby(['winner', 'season', 'game_type'])['count'].transform(pd.Series.cumsum)
    del playoff_series_winners['count']

    playoff_series_winners = playoff_series_winners[['winner', 'season', 'game_type', 'team_pair', 'playoff_round']]
    playoff_series_winners.columns = ['series_winner', 'season', 'game_type', 'team_pair', 'playoff_round']

    playoff_games = playoff_games.merge(playoff_series_winners, how='left', on=['season', 'team_pair', 'game_type'], sort=False)
    
    playoff_games['knockout_game'] = playoff_games.apply(lambda r: 1 if max(r['t1_wins_before_game'], r['t2_wins_before_game'])==3 \
                                                         else (1 if max(r['t1_wins_before_game'], r['t2_wins_before_game'])==2 and \
                                                               r['playoff_round']==1 and r['season'] < '2002' else 0), axis=1)
    
    playoff_games = playoff_games[['date', 'detail_path', 't1_wins_after_game', 't2_wins_after_game', 'leader_after_game', 't1_wins_before_game',
                                   't2_wins_before_game', 'leader_before_game', 'series_winner', 'playoff_round', 'knockout_game']]
    
    games = games.merge(playoff_games, how='left', on=['date', 'detail_path'], sort=False)
    
    games['game_subtype'] = games['playoff_round'].apply(lambda x: 0 if np.isnan(x) else int(x))
    del games['playoff_round']

    return games


def wins_n_games(games):
    teams_unique = list(set(get_team_names(games, output='df')['team']))+['Other']
    sequences = [3, 5, 10, 20, 50, 100]
    for j, team in enumerate(teams_unique):
        temp = games[games['team_pair'].str.contains(team)][['detail_path','home','away','team_pair','winner']]
        temp_home = games[games['home'].str.contains(team)][['detail_path','home','winner']]
        temp_away = games[games['away'].str.contains(team)][['detail_path','away','winner']]
        
        wins = np.array(np.where(temp['winner']==team, 1, 0))
        wins_home = np.array(np.where(temp_home['winner']==team, 1, 0))
        wins_away = np.array(np.where(temp_away['winner']==team, 1, 0))
        
        # streak test
        streak = [0,wins[0]]
        for i in range(1, len(wins)-1):
            next_val = streak[-1]+1 if wins[i]==1 else 0
            streak.append(next_val)
        temp['streak'] = np.asarray(streak)
        temp['streak_home'] = np.where(temp['home']==team, temp['streak'], -1)
        temp['streak_away'] = np.where(temp['away']==team, temp['streak'], -1)

        # streak home test
        streak_home = [0,wins_home[0]]
        for i in range(1, len(wins_home)-1):
            next_val = streak_home[-1]+1 if wins_home[i]==1 else 0
            streak_home.append(next_val)
        temp_home['streak_home_home'] = np.asarray(streak_home)
        
        # streak away test
        streak_away = [0,wins_away[0]]
        for i in range(1, len(wins_away)-1):
            next_val = streak_away[-1]+1 if wins_away[i]==1 else 0
            streak_away.append(next_val)
        temp_away['streak_away_away'] = np.asarray(streak_away)
        
        temp = temp.merge(temp_home[['detail_path','streak_home_home']], how='left', on='detail_path', sort=False)
        temp = temp.merge(temp_away[['detail_path','streak_away_away']], how='left', on='detail_path', sort=False)
        
        for s in sequences:
            temp['wins'+str(s)] = np.asarray([sum(wins[max(0,i-s):i]) for i in range(len(wins))])
            temp['wins'+str(s)+'_home'] = np.where(temp['home']==team, temp['wins'+str(s)], -1)
            temp['wins'+str(s)+'_away'] = np.where(temp['away']==team, temp['wins'+str(s)], -1)
        wins_df = temp if j==0 else pd.concat([wins_df, temp], axis=0)

        # return wins_df
    
    wins_df = wins_df[~wins_df['team_pair'].str.contains('Other')]

    home_streaks = wins_df[wins_df['streak_home']>-1].sort_index()
    away_streaks = wins_df[wins_df['streak_away']>-1].sort_index()

    home_wins = wins_df[wins_df['wins3_home']>-1].sort_index()
    away_wins = wins_df[wins_df['wins3_away']>-1].sort_index()
    
    wins_df = home_wins[['detail_path']]
    wins_df = wins_df.merge(home_streaks[['detail_path', 'streak_home', 'streak_home_home']], how='left', on='detail_path', sort=False)
    wins_df = wins_df.merge(away_streaks[['detail_path', 'streak_away', 'streak_away_away']], how='left', on='detail_path', sort=False)
    wins_df = wins_df.merge(home_wins[['detail_path']+['wins'+str(s)+'_home' for s in sequences]], how='left', on='detail_path', sort=False)
    wins_df = wins_df.merge(away_wins[['detail_path']+['wins'+str(s)+'_away' for s in sequences]], how='left', on='detail_path', sort=False)
    for col in list(wins_df.columns)[1:]:
        wins_df[col] = wins_df[col].fillna(0).astype(int)

    games = games.merge(wins_df, how='inner', on=['detail_path'], sort=-False).reset_index(drop=True)
    return games


def opponents(games):
    games['team_pair_sorted'] = np.asarray([g[:3]+'-'+g[-3:] if g[:3]<=g[-3:] else g[-3:]+'-'+g[:3] for g in games['team_pair']])
    unique_pairs = np.unique(games['team_pair_sorted'])

    for j, teams in enumerate(unique_pairs):
        temp = games[games['team_pair_sorted']==teams][['detail_path','team_pair_sorted','winner']]
        wins = np.array(np.where(temp['winner']==teams[:3], 1, 0))

        streak1, streak2 = [0], [0]
        for i in range(len(wins)-1):
            next_val1 = streak1[-1]+1 if wins[i]==1 else 0
            streak1.append(next_val1)
            next_val2 = streak2[-1]+1 if wins[i]==0 else 0
            streak2.append(next_val2)
        temp['streak1'] = np.asarray(streak1)
        temp['streak2'] = np.asarray(streak2)

        opponents_df = temp if j==0 else pd.concat([opponents_df, temp], axis=0)

    games = games.merge(opponents_df[['detail_path','team_pair_sorted','streak1','streak2']],
                        how='left', on=['detail_path', 'team_pair_sorted'], sort=False)
    games['streak_opponents_home'] = games.apply(lambda r: r['streak1'] if r['home'] == r['team_pair_sorted'][:3] else r['streak2'], axis=1)
    games['streak_opponents_away'] = games.apply(lambda r: r['streak1'] if r['away'] == r['team_pair_sorted'][:3] else r['streak2'], axis=1)

    del games['streak1']
    del games['streak2']

    return games


def last_n_days(games):
    teams_unique = list(set(get_team_names(games, output='df')['team'])) # +['Other']
    sequences = [2, 3, 5, 10, 20]

    min_date = min(games['date'])
    max_date = max(games['date'])
    dates = [min_date + datetime.timedelta(days=d) for d in range((max_date - min_date).days)]

    for j, team in enumerate(teams_unique):
        temp = games[games['team_pair'].str.contains(team)][['date','detail_path','home','away']]
        d = np.array(temp['date'])
        temp['days_last_played'] = [0]+[(d[i]-d[i-1]).days for i in range(1,len(d))]

        y_dates = np.array(temp['date'])
        y_df = pd.DataFrame(y_dates,columns=['date'])
        y_df['game'] = 1
        n_dates = list(set(dates) - set(y_dates))
        n_df = pd.DataFrame(n_dates,columns=['date'])
        n_df['game'] = 0
        df = pd.concat([y_df, n_df], axis=0).sort_values(by='date').reset_index(drop=True)

        for s in sequences:
            game_seq = np.array(df['game'])
            cum_games = np.asarray([0]+[sum(game_seq[max(0,i-s):i]) for i in range(1,len(game_seq))])
            df['cum_games_'+str(s)] = cum_games

        temp_home = temp[temp['home']==team]
        temp_home = temp_home.merge(df, how='inner', on=['date'], sort=False)
        temp_home_all = temp_home if j==0 else pd.concat([temp_home_all, temp_home], axis=0)

        temp_away = temp[temp['away']==team]
        temp_away = temp_away.merge(df, how='inner', on=['date'], sort=False)
        temp_away_all = temp_away if j==0 else pd.concat([temp_away_all, temp_away], axis=0)

    temp_home_all = temp_home_all[['date','detail_path']+['cum_games_'+str(s) for s in sequences]+['days_last_played']]
    temp_home_all.columns = ['date','detail_path']+[c+'_home' for c in list(temp_home_all.columns)[2:]]

    temp_away_all = temp_away_all[['date','detail_path']+['cum_games_'+str(s) for s in sequences]+['days_last_played']]
    temp_away_all.columns = ['date','detail_path']+[c+'_away' for c in list(temp_away_all.columns)[2:]]        

    games = games.merge(temp_home_all, how='left', on=['date','detail_path'], sort=False)
    games = games.merge(temp_away_all, how='left', on=['date','detail_path'], sort=False)

    return games


def generate_standings(table, date, conf=None):
    if type(date) == str:
        date = datetime.datetime.strptime(date,'%Y-%m-%d').date()
    if type(table['date'][0]) == str:
        table['date'] = table['date'].apply(lambda x: datetime.datetime.strptime(x,'%Y-%m-%d').date())

    season = max(table[table['date']<date]['season'])
    games_season = table[(table['season']==season) & (table['date']<date)]
    games_season.loc[:,'count'] = 1
    games_preseason = games_season[games_season['game_type']=='preseason'].reset_index(drop=True)
    games_regular = games_season[games_season['game_type']=='regular'].reset_index(drop=True)
    games_playoffs = games_season[games_season['game_type']=='playoffs'].reset_index(drop=True)

    hw = games_regular[games_regular['home']==games_regular['winner']][['home','count','hs']].groupby('home').sum()
    hl = games_regular[games_regular['home']!=games_regular['winner']][['home','count']].groupby('home').sum()
    aw = games_regular[games_regular['away']==games_regular['winner']][['away','count','as']].groupby('away').sum()
    al = games_regular[games_regular['away']!=games_regular['winner']][['away','count']].groupby('away').sum()

    final_table = pd.concat([hw, hl, aw, al], axis=1).reset_index().fillna(0)

    final_table.columns = ['team', 'wins_home', 'points_home', 'losses_home', 'wins_away', 'points_away', 'losses_away']

    final_table['wins'] = final_table['wins_home']+final_table['wins_away']
    final_table['losses'] = final_table['losses_home']+final_table['losses_away']
    final_table['played'] = final_table['wins']+final_table['losses']
    final_table['diff'] = final_table['wins'] - final_table['losses']
    final_table['conf'] = final_table['team'].map(get_conference)

    final_table = final_table[['team', 'conf', 'played', 'wins', 'losses', 'wins_home', 'losses_home',
                               'points_home', 'wins_away', 'losses_away', 'points_away', 'diff']]
    final_table = final_table.sort_values(by=['diff', 'played'],ascending=[False, True]).reset_index(drop=True)

    if conf is not None:
        final_table = final_table[final_table['conf']==conf].reset_index(drop=True)

    final_table = final_table.reset_index()
    final_table.columns = ['pos']+list(final_table.columns)[1:]
    final_table['pos'] += 1

    return final_table


def build_standings(games):
    for i, d in enumerate(set(games['date'])):
        if i%500==0: print(i)
        try:
            standings = generate_standings(games, d)
            standings['date'] = d
            if i == 0:
                standings_all = standings
            else:
                standings_all = pd.concat([standings_all, standings], axis=0)
        except:
            # print(d)
            pass
    standings_all = standings_all.sort_values(by=['date', 'pos'], ascending=[True, True])
    standings_all.to_csv('../data/processed/standings.csv', index=False)
    return standings_all


def standings(games, standing_path=input_path+'/standings.zip'):
    standing = read_csv_from_zip(standing_path) # build_standings(games)
    try:
        standing['date'] = standing['date'].apply(lambda x: dt.strptime(x,'%Y-%m-%d').date())
    except:
        pass
    standings_home = standing[['team','date','wins','losses','wins_home','losses_home','points_home','wins_away','losses_away','points_away','diff']]
    standings_home.columns = ['team','date']+['home_standing_'+c for c in list(standings_home.columns)[2:]]
    standings_away = standing[['team','date','wins','losses','wins_home','losses_home','points_home','wins_away','losses_away','points_away','diff']]
    standings_away.columns = ['team','date']+['away_standing_'+c for c in list(standings_away.columns)[2:]]

    games = games.merge(standings_home, how='left', left_on=['home','date'], right_on=['team','date'], sort=False)
    games = games.merge(standings_away, how='left', left_on=['away','date'], right_on=['team','date'], sort=False)

    del games['team_x']
    del games['team_y']

    for c in list(games.columns)[-18:]:
        games[c] = games[c].fillna(0)

    games['standing_diff'] = games['home_standing_diff'] - games['away_standing_diff']
    games['standing_points_diff'] = games['home_standing_points_home'] - games['away_standing_points_away']

    return games


def make_int(c):
    try:
        return int(c)
    except:
        return 0


def day_diff(d1,d2,t):
    try:
        return int((d1-d2).days)
    except:
        return np.nan


def delcols(df,c):
    try:
        del df[c]
    except:
        pass


def datefill(d,end=None,numdays=0):
    if end is None:
        end = max(d) #.date()
    dr = (end-min(d)).days #.date()
    dl = [end - datetime.timedelta(days=x) for x in range(dr+1+numdays)]
    # d = [d.date() for d in d]
    date_list = [a for a in dl if a not in d]
    if numdays > 0: return date_list[:numdays]
    return date_list


def get_counts_list(col, name, ascending=False, lim=None):
    data = pd.DataFrame.from_dict([dict(Counter(col))]).T.sort_values(by=0, ascending=ascending).reset_index()
    if lim is not None:
        data = data[:lim]
    data.columns = [name, 'count']
    return data


def get_dupes(col):
    df = pd.DataFrame.from_dict([dict(Counter(col))]).T.sort_values(by=0,ascending=False).reset_index()
    dupes = df[df[0]>1]
    if len(dupes)>0: return dupes
    print('No Dupes')


# team lookup to address the long-tail and reconcile franchises
def get_team_names(games, output='dict', default='Other'):
    team_replacements = {'NOH': 'NOP', 'NOK': 'NOP', 'SAN': 'SAS', 'GOS': 'GSW', 'UTH': 'UTA', 'PHL': 'PHI'}
    active_teams = get_counts_list(games[games['away'] != 'No Games']['home'], 'team_raw', lim=40) # these are the valid teams
    active_teams['team'] = active_teams['team_raw'].apply(lambda x: team_replacements[x] if x in team_replacements.keys() else x)
    if output=='dict':
        def def_value():
            return default
        team_dict = defaultdict(def_value)
        for i in range(len(active_teams)):
            team_dict[active_teams['team_raw'][i]] = active_teams['team'][i]
        return team_dict
    return active_teams[['team_raw', 'team']]


def etl_process(data, tasks):
    for t in tasks:
        data = t(data)
    return data


def get_conference(team=None):
    confs = {'East': ['BOS', 'MIL', 'PHI', 'CLE', 'BKN', 'MIA', 'NYK', 'ATL', 'WAS', 'CHI', 'TOR', 'IND', 'ORL', 'DET', 'CHA'],
             'West': ['DEN', 'MEM', 'SAC', 'PHX', 'DAL', 'LAC', 'NOP', 'MIN', 'GSW', 'OKC', 'UTA', 'POR', 'LAL', 'SAS', 'HOU']}
    if team is not None:
        return 'East' if team in confs['East'] else 'West'
    return confs


def read_csv_from_zip(zip_path):
    zip_file = ZipFile(zip_path)
    file_name = zip_path.split('/')[-1].split('.')[0]+'.csv'
    for z in zip_file.infolist():
        if z.filename == file_name:
            return pd.read_csv(zip_file.open(file_name))

        
def write_s3(file_path, myfile, bucket=bucket, dedupe_cols=None, sort=None, compression='zip'):
    file_name = file_path.split('/')[-1]
    if type(myfile) == pd.core.frame.DataFrame:
        if dedupe_cols is not None:
            myfile = myfile.drop_duplicates(subset=dedupe_cols, keep='first')
        if sort is not None:
            myfile = myfile.sort_values(by=sort, ascending=True)
        output_buffer = BytesIO() if compression == 'zip' else StringIO()
        if compression == 'zip': file_path = '.'.join(file_path.split('.')[:-1])+'.zip'
        myfile.to_csv(output_buffer, index=False, compression={'method':compression, 'archive_name':file_name})
        myfile = output_buffer
    s3_resource.Object(bucket, file_path).put(Body=myfile.getvalue())
        

def encode_features(col):
    le = LabelEncoder()
    le.fit(col)
    col = le.transform(col)
    return col


def standardize_features(col):
    return (col-np.mean(col))/(np.std(col))


def normalize_features(col):
    return (col-min(col))/(max(col)-min(col))


def install(package):
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--upgrade', package])

        
if __name__ == '__main__':
    
    input_files = os.listdir(input_path)
    
    games = read_csv_from_zip(input_path + '/games.zip')
    games.columns = [re.sub(' ','_',c.lower()) for c in list(games.columns)]
    team_names_dict = get_team_names(games) 
    
    max_played = np.max(np.unique(games[games['as'] != '-']['date']))
    games = games[games['date'] <= max_played].reset_index(drop=True)
    
    games_final = etl_process(games, [basic_features, playoff_features, wins_n_games, opponents, last_n_days, standings])
    
    # making train and test, x and y
    split = int(len(games)*0.8)
    
    target = 'home_win'
    skip_cols = ['date', 'detail_path', 'team_pair', 'team_pair_sorted', 'winner', 'winner', 'hs', 'diff',
                 'as', 'ot', 't1_wins_after_game', 't2_wins_after_game', 'leader_after_game', 'series_winner']
    
    col_map = {}
    for c in games_final.columns:
        if c not in skip_cols: col_map[c] = str(games_final[c].dtype)
    
    for c in col_map.keys():
        if col_map[c] == 'object':
            print('column:',c)
            games_final[c] = games_final[c].astype(str)
            games_final[c] = encode_features(games_final[c])
        
    games_final = games_final.fillna(0)
    games_final = games_final[[c for c in games_final.columns if c not in skip_cols]]
    games_final['standing_diff'] = np.where(games_final['standing_diff']<0, 0, np.where(games_final['standing_diff']<10, 1, 2))
    
    for i, c in enumerate(games_final.columns):
        games_final[c] = standardize_features(games_final[c])
        games_final[c] = normalize_features(games_final[c])
    
    test_frac = 0.1
    train_frac = 1 - test_frac
    
    np.random.seed(56)
    games_final_shuffled = games_final.sample(frac=1).reset_index(drop=True)
    
    train = games_final_shuffled.iloc[:int(len(games_final)*train_frac),:]
    test = games_final_shuffled.iloc[int(len(games_final)*train_frac):,:]
    
    train.to_csv(train_path+'/train.csv', index=False)
    test.to_csv(test_path+'/test.csv', index=False)
    
    s3_client.upload_file(train_path+'/train.csv', bucket, 'nba/processed/train.csv')
    s3_client.upload_file(test_path+'/test.csv', bucket, 'nba/processed/test.csv')

Overwriting code/preprocess.py


In [41]:
#from sagemaker.processing import FrameworkProcessor
#from sagemaker.sklearn import SKLearn
#from sagemaker.sklearn.processing import SKLearnProcessor
#from sagemaker.processing import ProcessingInput, ProcessingOutput
#from sagemaker.workflow.steps import ProcessingStep

#sklearn_processor = FrameworkProcessor(
#    estimator_cls=SKLearn,
#    framework_version='0.23-1',
#    instance_type='ml.m5.xlarge',
#    instance_count=1,
#    base_job_name=f'nba-preprocess',
#    sagemaker_session=pipeline_session,
#    role=role,
#)

# s3_input_meta = sagemaker.dataset_definition.inputs.S3Input(
#     s3_uri='s3://sagemaker-pipelines-hwm/nba-pipeline-1/code/sourcedir.tar.gz'
# )

# processor_run_args = sklearn_processor.run(
#     inputs=[
#         ProcessingInput(input_name='input', source=input_data, destination='/opt/ml/processing/input'), #input_name='booba', s3_input=s3_input_meta, 
#     ],
#     outputs=[
#         ProcessingOutput(output_name='train', source='/opt/ml/processing/train'),
#         ProcessingOutput(output_name='val', source='/opt/ml/processing/val'),
#         ProcessingOutput(output_name='test', source='/opt/ml/processing/test'),
#     ],
#     source_dir='preprocess',
#     code='preprocess.py'
#     # arguments=['--input-data', input_data],
# )

# step_process = ProcessingStep(
#     name='nba-preprocess',
#     step_args=processor_run_args,
# )

In [42]:
from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn import SKLearn
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

s3_processing_path = 'nba/code/processing.py'
s3_client.upload_file('code/preprocess.py', bucket, s3_processing_path)

sklearn_processor = SKLearnProcessor(
    framework_version=sk_version,
    role=sm_role,
    instance_type='ml.m5.large',
    instance_count=1,
    base_job_name=f'nba-preprocess',
)

step_process = ProcessingStep(
    name='nba-preprocess',
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination='/opt/ml/processing/input'),
    ],
    outputs=[
        ProcessingOutput(output_name='train', source='/opt/ml/processing/train', destination='s3://sagemaker-pipelines-hwm/nba/preprocess/train'),
        # ProcessingOutput(output_name='val', source='/opt/ml/processing/val'),
        ProcessingOutput(output_name='test', source='/opt/ml/processing/test', destination='s3://sagemaker-pipelines-hwm/nba/preprocess/test'),
    ],
    code='s3://{}/{}'.format(bucket, s3_processing_path)
)

# step_process.arguments

#### **TRAIN**

In [71]:
%%writefile code/train.py    
import argparse
import boto3
import numpy as np
import pandas as pd
import os
import json
import pathlib
import subprocess
import sys
import tarfile
import glob
import shutil

import datetime
from datetime import timedelta as td
from datetime import datetime as dt

from io import StringIO, BytesIO

s3_resource = boto3.resource('s3')
s3_client = boto3.client('s3')
bucket = 'sagemaker-pipelines-hwm'

def parse_args():
    parser = argparse.ArgumentParser()

    parser.add_argument('--epochs', type=int, default=1)
    # parser.add_argument('--batch_size', type=int, default=64)
    # parser.add_argument('--learning_rate', type=float, default=0.1)
    parser.add_argument('--train', type=str, default='/opt/ml/processing/train/') 
    parser.add_argument('--test', type=str, default='/opt/ml/processing/test/')
    parser.add_argument('--sm-model-dir', type=str, default='/opt/ml/processing/model/')

    return parser.parse_known_args()


# class PrintDot(tf.keras.callbacks.Callback):
#     def on_epoch_end(self, epoch, logs):
#         # if epoch == 0:
#         print(str(epoch)+' ', end='')


def install(package):
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])

        
if __name__ == '__main__':
    
    # if we use a processing step to train, install tensorflow first
    install('tensorflow==2.9.2')
    import tensorflow as tf
    
    args, _ = parse_args()
    
    print('args are:', args)
    
    target = 'home_win'
    
    x_train = pd.read_csv(os.path.join(args.train, 'train.csv'))
    x_test = pd.read_csv(os.path.join(args.test, 'test.csv'))
    
    y_train = x_train.pop(target)
    y_test = x_test.pop(target)
    
    x_train = np.array(x_train)
    x_test = np.array(x_test)
    y_train = np.array(y_train)
    y_test = np.array(y_test)
    
    # y_train = np.load(os.path.join(args.train, 'y_train.npy'))
    # y_val = np.load(os.path.join(args.val, 'y_val.npy'))
    # y_test = np.load(os.path.join(args.test, 'y_test.npy'))
    
    # set random seed
    tf.random.set_seed(16)

    # create the model
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(256, activation='relu'),
        #tf.keras.layers.Dropout(0.1), 
        tf.keras.layers.Dense(32, activation='relu'),
        #tf.keras.layers.Dropout(0.1),
        tf.keras.layers.Dense(4, activation='relu'),
        #tf.keras.layers.Dropout(0.1),
        tf.keras.layers.Dense(1, activation='sigmoid') # output shape is 1
    ])

    # compile the model
    model.compile(
        loss=tf.keras.losses.BinaryCrossentropy(),
        optimizer=tf.keras.optimizers.Adam(1e-4),
        metrics=['accuracy']
    )

    # fit the model
    history = model.fit(
        x_train,
        y_train,
        epochs=args.epochs,
        validation_data=(x_test, y_test)
    ) # see how the model performs on the test set during training

    model_assets = {
        'model': model,
        'history': pd.DataFrame.from_dict(history.history),
        'name': 'model_'+str(int(datetime.datetime.now().timestamp()*100000))+'_'+str(history.history['val_accuracy'][-1])
    }
    
    name = 'model'
    tar_name = 'model.tar.gz'
    local_path = args.sm_model_dir
    model.save(os.path.join(local_path, name, '1'))
    # tf.keras.models.save_model(model_assets['model'], os.path.join(path, name, '00001'))
    tar = tarfile.open(os.path.join(local_path, tar_name), 'w:gz')
    for file_name in glob.glob(os.path.join(local_path, name, '*')):
        print('Adding %s...' % file_name)
        tar.add(file_name, os.path.basename(file_name))
    tar.close()

    try:
        shutil.rmtree('/opt/ml/processing/model/model') 
    except:
        pass
    
    # store model accuracy in a dict (JSON)
    my_result = {'accuracy': history.history['val_accuracy'][-1]}
    
    # option 1) write JSON to S3 via buffer
    output_buffer = StringIO()
    output_buffer.write(json.dumps(my_result))
    s3_resource.Object(bucket, 'nba/eval/accuracy.json').put(Body=output_buffer.getvalue())

    # option 2) write JSON inside sagemaker session for use in next steps
    output_dir = '/opt/ml/processing/evaluation'
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f'{output_dir}/evaluation.json'
    with open(evaluation_path, 'w') as f:
       f.write(json.dumps(my_result))

Overwriting code/train.py


In [72]:
import time

from sagemaker.tensorflow.estimator import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.properties import PropertyFile

# where to store the trained model
model_path = f's3://{bucket}/{prefix}/models/{current_time}/'

s3_training_path = 'nba/code/train.py'
s3_client.upload_file('code/train.py', bucket, s3_training_path)

hyperparameters = {'epochs': training_epochs}
tensorflow_version = tf_version
python_version = python_version

tf2_estimator = TensorFlow(
    source_dir='code',
    entry_point='train.py',
    instance_type='ml.m5.large',
    instance_count=1,
    framework_version=tensorflow_version,
    role=sm_role,
    base_job_name='nba-train',
    output_path=model_path,
    hyperparameters=hyperparameters,
    py_version=python_version
)

# # Use the tf2_estimator in a Sagemaker pipelines ProcessingStep.
# # NOTE how the input to the training job directly references the output of the previous step.
# step_train = TrainingStep(
#     name='nba-train',
#     estimator=tf2_estimator,
#     inputs={
#         'train': TrainingInput(
#             s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
#                 'train'
#             ].S3Output.S3Uri,
#             content_type='text/csv',
#         ),
#         #'val': TrainingInput(
#         #    s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
#         #        'val'
#         #    ].S3Output.S3Uri,
#         #    content_type='text/csv',
#         #),
#         'test': TrainingInput(
#             s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
#                 'test'
#             ].S3Output.S3Uri,
#             content_type='text/csv',
#         ),
#     }
# )

## Here we avoid using the built-in TrainingStep, instead customising a ProcessingStep to do the same task.
## This allows more customisation of input and outplut locations for organizing S3 as we wish....
step_train = ProcessingStep(
    name='nba-train',
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                'train'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/train',
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                'test'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
        
    ],
    outputs=[
        ProcessingOutput(output_name='model', source='/opt/ml/processing/model', destination='s3://sagemaker-pipelines-hwm/nba/models/{}'.format(current_time)),
    ],
    code='s3://{}/{}'.format(bucket, s3_training_path)
)

#### **EVALUATE**

In [73]:
%%writefile code/evaluate.py
import os
import json
import subprocess
import sys
import numpy as np
import pandas as pd
import pathlib
import tarfile
import boto3

from io import StringIO, BytesIO

s3_resource = boto3.resource('s3')
s3_client = boto3.client('s3')
bucket = 'sagemaker-pipelines-hwm'

def install(package):
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])

if __name__ == '__main__':

    install('tensorflow==2.9.2') # we can add any package similarly
    
    target = 'home_win'
    
    model_path = f'/opt/ml/processing/model/model.tar.gz'
    with tarfile.open(model_path, 'r:gz') as tar:
        tar.extractall('./model')
    import tensorflow as tf

    model = tf.keras.models.load_model('./model/1')
    test_path = '/opt/ml/processing/test/'
    x_test = pd.read_csv(os.path.join(test_path, 'test.csv'))
    y_test = x_test.pop(target)
    x_test, y_test = np.array(x_test), np.array(y_test)
    #y_test = np.load(os.path.join(test_path, 'y_test.npy'))
    scores = model.evaluate(x_test, y_test, verbose=2)
    print('\nTest Loss & Accuracy:', scores)

    report_dict = {'accuracy': scores[1]}

    output_dir = '/opt/ml/processing/evaluation'
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f'{output_dir}/accuracy_eval.json'
    with open(evaluation_path, 'w') as f:
        f.write(json.dumps(report_dict))
        
    output_buffer = StringIO()
    output_buffer.write(json.dumps(report_dict))
    s3_resource.Object(bucket, 'nba/eval/accuracy_eval.json').put(Body=output_buffer.getvalue())

Overwriting code/evaluate.py


In [74]:
from sagemaker.workflow.properties import PropertyFile

evaluate_model_processor = SKLearnProcessor(
    framework_version='1.0-1',
    instance_type='ml.m5.large',
    instance_count=1,
    base_job_name='nba-evaluate',
    role=sm_role
)

evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='evaluation',
    path='accuracy_eval.json'
)

step_evaluate = ProcessingStep(
    name='nba-evaluate',
    processor=evaluate_model_processor,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri,
            # step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/model',
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                'test'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(output_name='evaluation', source='/opt/ml/processing/evaluation'),
    ],
    code='code/evaluate.py',
    property_files=[evaluation_report],
)

#### **SEND PERFORMANCE EMAIL - GOOD**

In [75]:
email_code = """
import json
import boto3
import pathlib

from io import StringIO, BytesIO

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
    
s3_client = client = boto3.client('s3')
s3_resource = boto3.resource('s3')
bucket = 'sagemaker-pipelines-hwm'

def lambda_handler(event, context):

    evaluation_s3_uri = 's3://sagemaker-pipelines-hwm/nba/eval/accuracy.json' # event['evaluation_s3_uri']
    path_parts = evaluation_s3_uri.replace('s3://', '').split('/')
    bucket = path_parts.pop(0)
    key = '/'.join(path_parts)

    content = s3_client.get_object(Bucket=bucket, Key=key)
    text = content['Body'].read().decode()
    evaluation_json = json.loads(text)
    accuracy = evaluation_json['accuracy'] #['regression_metrics']['mse']['value']
    
    # email dispatch code
    msg = MIMEMultipart('alternative')
    msg['From'] = formataddr((str(Header('MyWebsite', 'utf-8')), params['my_email']))
    msg['To'] = params['my_email']
    msg['Subject'] = 'Great Accuracy! {}'.format(accuracy)
    html = 'be proud of this great accuracy wooop!'
    msg.attach(MIMEText(html, 'html'))
    s = smtplib.SMTP_SSL('smtp.gmail.com', 465)
    s.ehlo()
    s.login(params['my_email'], params['my_app_password'])
    s.sendmail(params['my_email'], params['my_email'], msg.as_string())
    s.quit()
    
    report_dict = {
        'regression_metrics': {
            'mse': {'value': 0.5, 'standard_deviation': 'NaN'},
        },
    }
        
    output_buffer = StringIO()
    output_buffer.write(json.dumps(report_dict))
    s3_resource.Object(bucket, 'nba/eval/accuracy_good.json').put(Body=output_buffer.getvalue())
    
    return {'statusCode': 200, 'body': json.dumps('E-Mail Sent Successfully')}
""".replace("params['my_email']", "'"+params['my_email']+"'").replace("params['my_app_password']", "'"+params['my_app_password']+"'")

with open('send_email_lambda.py', 'w') as f:
    f.write(email_code)

In [48]:
%%writefile code/send_email_lambda.py
import json
import boto3
import pathlib

from io import StringIO, BytesIO

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
    
s3_client = client = boto3.client('s3')
s3_resource = boto3.resource('s3')
bucket = 'sagemaker-pipelines-hwm'

def lambda_handler(event, context):

    evaluation_s3_uri = 's3://sagemaker-pipelines-hwm/nba/eval/accuracy.json' # event['evaluation_s3_uri']
    path_parts = evaluation_s3_uri.replace('s3://', '').split('/')
    bucket = path_parts.pop(0)
    key = '/'.join(path_parts)

    content = s3_client.get_object(Bucket=bucket, Key=key)
    text = content['Body'].read().decode()
    evaluation_json = json.loads(text)
    accuracy = evaluation_json['accuracy'] #['regression_metrics']['mse']['value']
    
    # email dispatch code
    msg = MIMEMultipart('alternative')
    msg['From'] = formataddr((str(Header('MyWebsite', 'utf-8')), params['my_email']))
    msg['To'] = params['my_email']
    msg['Subject'] = 'Great Accuracy! {}'.format(accuracy)
    html = 'be proud of this great accuracy wooop!'
    msg.attach(MIMEText(html, 'html'))
    s = smtplib.SMTP_SSL('smtp.gmail.com', 465)
    s.ehlo()
    s.login(params['my_email'], params['my_app_password'])
    s.sendmail(params['my_email'], params['my_email'], msg.as_string())
    s.quit()
    
    report_dict = {
        'regression_metrics': {
            'mse': {'value': 0.5, 'standard_deviation': 'NaN'},
        },
    }
        
    output_buffer = StringIO()
    output_buffer.write(json.dumps(report_dict))
    s3_resource.Object(bucket, 'nba/eval/accuracy_good.json').put(Body=output_buffer.getvalue())
    
    return {'statusCode': 200, 'body': json.dumps('E-Mail Sent Successfully')}

Overwriting code/send_email_lambda.py


In [76]:
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

evaluation_s3_uri = '{}/accuracy.json'.format(
    's3://sagemaker-pipelines-hwm/nba/eval'
    # step_evaluate.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
)
evaluation_s3_uri_raw = step_evaluate.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']

send_email_lambda_function_good_name = 'sagemaker-lambda-email' #-' + current_time

send_email_lambda_function_good = Lambda(
    function_name=send_email_lambda_function_good_name,
    execution_role_arn=lambda_role, # role,
    script='send_email_lambda.py',
    handler='send_email_lambda.lambda_handler'
)

send_email_good = LambdaStep(
    name='send-email-good',
    lambda_func=send_email_lambda_function_good,
    inputs={
        'evaluation_s3_uri': evaluation_s3_uri
        #ProcessingInput(
        #    source=step_evaluate.properties.ProcessingOutputConfig.Outputs[
        #        'evaluation'
        #    ].S3Output.S3Uri,
        #    destination='/opt/ml/processing/eval',
        #)
        # 'evaluation_s3_uri_raw': evaluation_s3_uri_raw,
    }
)

#### **SEND PERFORMANCE EMAIL - BAD**

In [77]:
email_code_bad = """
import json
import boto3
import json
import pathlib

from io import StringIO, BytesIO

import smtplib
from email.mime.text import MIMEText

s3_client = client = boto3.client('s3')
s3_resource = boto3.resource('s3')
bucket = 'sagemaker-pipelines-hwm'

def lambda_handler(event, context):

    # print(f'Received Event: {event}')

    evaluation_s3_uri = 's3://sagemaker-pipelines-hwm/nba/eval/accuracy.json' # event['evaluation_s3_uri']
    path_parts = evaluation_s3_uri.replace('s3://', '').split('/')
    bucket = path_parts.pop(0)
    key = '/'.join(path_parts)

    content = s3_client.get_object(Bucket=bucket, Key=key)
    text = content['Body'].read().decode()
    evaluation_json = json.loads(text)
    mse = evaluation_json['accuracy'] #['regression_metrics']['mse']['value']

    subject_line = 'Please check high MSE ({}) detected on model evaluation'.format(mse)
    print(f'Sending E-Mail to Data Science Team with subject line: {subject_line}')
    
    report_dict = {
        'regression_metrics': {
            'mse': {'value': 0.5, 'standard_deviation': 'NaN'},
        },
    }
        
    output_buffer = StringIO()
    output_buffer.write(json.dumps(report_dict))
    s3_resource.Object(bucket, 'nba/eval/accuracy_bad.json').put(Body=output_buffer.getvalue())

    
    return {'statusCode': 200, 'body': json.dumps('E-Mail Sent Successfully')}
""".replace("params['my_email']", "'"+params['my_email']+"'").replace("params['my_app_password']", "'"+params['my_app_password']+"'")

with open('send_email_lambda_bad.py', 'w') as f:
    f.write(email_code_bad)

In [78]:
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

evaluation_s3_uri = '{}/accuracy.json'.format(
    's3://sagemaker-pipelines-hwm/nba/eval'
    # step_evaluate.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
)
evaluation_s3_uri_raw = step_evaluate.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']

send_email_lambda_function_bad_name = 'sagemaker-send-email-to-ds-team-lambda-' + current_time

send_email_lambda_function_bad = Lambda(
    function_name=send_email_lambda_function_bad_name,
    execution_role_arn=lambda_role, # role,
    script='send_email_lambda_bad.py',
    handler='send_email_lambda_bad.lambda_handler',
)

send_email_bad = LambdaStep(
    name='send-email-bad',
    lambda_func=send_email_lambda_function_bad,
    inputs={
        'evaluation_s3_uri': evaluation_s3_uri
        #ProcessingInput(
        #    source=step_evaluate.properties.ProcessingOutputConfig.Outputs[
        #        'evaluation'
        #    ].S3Output.S3Uri,
        #    destination='/opt/ml/processing/eval',
        #)
        # 'evaluation_s3_uri_raw': evaluation_s3_uri_raw,
    }
)

#### **REGISTER MODEL**

In [79]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

# Create ModelMetrics object using the evaluation report from the evaluation step
# A ModelMetrics object contains metrics captured from a model.
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type='application/json',
    )
)

# Create a RegisterModel step, which registers the model with Sagemaker Model Registry.
step_register_model = RegisterModel(
    name='register-model',
    estimator=tf2_estimator,
    model_data='s3://sagemaker-pipelines-hwm/nba/models/{}/model.tar.gz'.format(current_time),
        # step_train.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
        # step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=['text/csv'],
    response_types=['text/csv'],
    inference_instances=['ml.m5.large', 'ml.m5.xlarge'],
    transform_instances=['ml.m5.xlarge'],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
)

#### **CREATE MODEL**

In [46]:
# from sagemaker.workflow.step_collections import CreateModelStep
# from sagemaker.tensorflow.model import TensorFlowModel

# model = TensorFlowModel(
#     role=role,
#     model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
#     framework_version=tensorflow_version,
#     sagemaker_session=sagemaker_session,
# )

# step_create_model = CreateModelStep(
#     name='create-model',
#     model=model,
#     display_name='does-this-work',
#     inputs=sagemaker.inputs.CreateModelInput(instance_type='ml.m5.large'),
# )

In [47]:
# import boto3
# import sagemaker
    
# def lambda_handler(event, context):
    
#     # role = sagemaker.get_execution_role()
#     role = params['sm_role']
#     container_def = {
#         'Image': '763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:2.9.2-cpu',
#         'Environment': {},
#         'ModelDataUrl': # 's3://....../model.tar.gz' (check this anbd make dynamic)
#     }
    
#     boto3.client('sagemaker').create_model(ModelName='model-name-lambda-awesomeness4', ExecutionRoleArn=role, PrimaryContainer=container_def)

In [22]:
%%writefile code/create_model.py
import argparse
import boto3
import json
import time

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('--endpoint_instance_type', type=str)
    parser.add_argument('--endpoint_config_name', type=str)
    parser.add_argument('--endpoint_name', type=str)
    parser.add_argument('--model_name', type=str)

    args = parser.parse_args()
    
    sm = boto3.client('sagemaker', region_name='us-west-2')

    role = params['sm_role']
    container_def = {
        'Image': '763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:2.9.2-cpu',
        'Environment': {},
        'ModelDataUrl': 's3:.....model.tar.gz' # (check and make dynamic)
    }

    sm.create_model(ModelName=args.model_name, ExecutionRoleArn=role, PrimaryContainer=container_def)

    current_time = time.strftime('%m-%d-%H-%M-%S', time.localtime())
    endpoint_instance_type = args.endpoint_instance_type
    endpoint_config_name = args.endpoint_config_name
    endpoint_name = args.endpoint_name
    model_name = args.model_name

    create_endpoint_config_response = sm.create_endpoint_config(
        EndpointConfigName = endpoint_config_name,
        ProductionVariants = [
            {
                'InstanceType': endpoint_instance_type,
                'InitialVariantWeight': 1,
                'InitialInstanceCount': 1,
                'ModelName': model_name,
                'VariantName': 'AllTraffic',
            }
        ],
    )
    print(f'create_endpoint_config_response: {create_endpoint_config_response}')

    list_endpoints_response = sm.list_endpoints(
        SortBy='CreationTime',
        SortOrder='Descending',
        NameContains=endpoint_name,
    )
    print(f'list_endpoints_response: {list_endpoints_response}')

    if len(list_endpoints_response['Endpoints']) > 0:
        print('Updating Endpoint with new Endpoint Configuration')
        update_endpoint_response = sm.update_endpoint(
            EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
        )
        print(f'update_endpoint_response: {update_endpoint_response}')
    else:
        print('Creating Endpoint')
        create_endpoint_response = sm.create_endpoint(
            EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
        )
        print(f'create_endpoint_response: {create_endpoint_response}')

Overwriting code/create_model.py


In [23]:
create_sagemaker_model_processor = SKLearnProcessor(
    framework_version=sk_version,
    instance_type='ml.m5.large',
    instance_count=1,
    base_job_name='nba-create-sagemaker-model',
    role=sm_role
)

model_name = 'model-name-lambda-awesomeness-100'
endpoint_config_name = 'nba-pipeline-model-config'
endpoint_name = 'nba-pipeline-model-endpoint'

# Use the evaluate_model_processor in a Sagemaker pipelines ProcessingStep.
step_create_model = ProcessingStep(
    name='nba-create-sagemaker-model',
    processor=create_sagemaker_model_processor,
    
    ProcessingInput(
        source=step_process.properties.ProcessingOutputConfig.Outputs[
            'train'
        ].S3Output.S3Uri,
        destination='/opt/ml/processing/train',
    )
    #    ProcessingInput(
    #        source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    #        destination='/opt/ml/processing/model',
    #    )
    #     ProcessingInput(
    #         source=step_process.properties.ProcessingOutputConfig.Outputs[
    #             'test'
    #         ].S3Output.S3Uri,
    #         destination='/opt/ml/processing/test',
    #     ),
    # ],
    
    outputs=[
        ProcessingOutput(output_name='create_output', source='/opt/ml/processing/train'),
    ],
    
    job_arguments = [
        '--model_name', model_name,
        '--endpoint_name', endpoint_name,
        '--endpoint_config_name', endpoint_config_name,
        '--endpoint_instance_type', endpoint_instance_type
    ], 
    
    code='code/create_model.py'
)

#### **DEPLOY**

In [24]:
%%writefile code/deploy_model_lambda.py
import json
import boto3
import time

sm_client = boto3.client('sagemaker')

def main(event, context):

    print(f'Received Event: {event}')

    current_time = time.strftime('%m-%d-%H-%M-%S', time.localtime())
    endpoint_instance_type = event['endpoint_instance_type']
    model_name = event['model_name']
    endpoint_config_name = '{}-{}'.format(event['endpoint_config_name'], current_time)
    endpoint_name = event['endpoint_name']

    # Create Endpoint Configuration
    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                'InstanceType': endpoint_instance_type,
                'InitialVariantWeight': 1,
                'InitialInstanceCount': 1,
                'ModelName': model_name,
                'VariantName': 'AllTraffic',
            }
        ],
    )
    print(f'create_endpoint_config_response: {create_endpoint_config_response}')

    # Check if an endpoint exists. If no - Create new endpoint, if yes - Update existing endpoint
    list_endpoints_response = sm_client.list_endpoints(
        SortBy='CreationTime',
        SortOrder='Descending',
        NameContains=endpoint_name,
    )
    print(f'list_endpoints_response: {list_endpoints_response}')

    if len(list_endpoints_response['Endpoints']) > 0:
        print('Updating Endpoint with new Endpoint Configuration')
        update_endpoint_response = sm_client.update_endpoint(
            EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
        )
        print(f'update_endpoint_response: {update_endpoint_response}')
    else:
        print('Creating Endpoint')
        create_endpoint_response = sm_client.create_endpoint(
            EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
        )
        print(f'create_endpoint_response: {create_endpoint_response}')

Overwriting code/deploy_model_lambda.py


In [25]:
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

endpoint_config_name = 'nba-pipeline-model-config'
endpoint_name = 'nba-pipeline-model-endpoint'

deploy_model_lambda_function_name = 'sagemaker-deploy-model'

deploy_model_lambda_function = Lambda(
    function_name=deploy_model_lambda_function_name,
    execution_role_arn=lambda_role,
    script='code/deploy_model_lambda.py',
    handler='deploy_model_lambda.main',
)

step_deploy_model = LambdaStep(
    name='nba-deploy-model',
    lambda_func=deploy_model_lambda_function,
    inputs={
        'model_name': step_create_model.arguments['AppSpecification']['ContainerArguments'][1],
        'endpoint_config_name': endpoint_config_name,
        'endpoint_name': endpoint_name,
        'endpoint_instance_type': endpoint_instance_type,
    },
)

#### **CONDITIONAL EXECUTION BASED ON MODEL ACCURACY**

In [80]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo, ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Create accuracy condition to ensure the model meets performance requirements.
# Models with a test accuracy lower than the condition will not be registered with the model registry.
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_evaluate.name,
        property_file=evaluation_report,
        json_path='accuracy',
    ),
    right=accuracy_threshold,
)

# Create a Sagemaker Pipelines ConditionStep, using the condition above.
# Enter the steps to perform if the condition returns True / False.
step_cond = ConditionStep(
    name='accuracy-above-threshold-condition',
    conditions=[cond_gte],
    if_steps=[send_email_good, step_register_model], # step_deploy_model], #, step_create_model
    else_steps=[send_email_bad],
)

In [None]:
def write_s3(file_path, myfile, bucket='hwm-nba', dedupe_cols=None, sort=None, compression='zip'):
    file_name = file_path.split('/')[-1]
    if type(myfile) == pd.core.frame.DataFrame:
        if dedupe_cols is not None:
            myfile = myfile.drop_duplicates(subset=dedupe_cols, keep='first')
        if sort is not None:
            myfile = myfile.sort_values(by=sort, ascending=True)
        output_buffer = BytesIO() if compression == 'zip' else StringIO()
        if compression == 'zip': file_path = '.'.join(file_path.split('.')[:-1])+'.zip'
        myfile.to_csv(output_buffer, index=False, compression={'method':compression, 'archive_name':file_name})
        myfile = output_buffer
    s3_resource.Object(bucket, file_path).put(Body=myfile.getvalue())

#### **ACTIVATE PIPELINE**

In [81]:
from sagemaker.workflow.pipeline import Pipeline
current_time = time.strftime('%m-%d-%H-%M-%S', time.localtime())
pipeline_name = 'nba-pipeline-1'

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        training_epochs,
        accuracy_threshold,
        endpoint_instance_type
    ],
    steps=[step_process, step_train, step_evaluate, step_cond] # send_email] #step_register_model, #step_create_model] # 
)

In [82]:
definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-pipelines-hwm/nba/raw'},
  {'Name': 'TrainingEpochs', 'Type': 'String', 'DefaultValue': '10'},
  {'Name': 'AccuracyThreshold', 'Type': 'Float', 'DefaultValue': 0.6},
  {'Name': 'EndpointInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'nba-preprocess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.large',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/processing.py']},
    'RoleArn': 'arn:aws:iam::668209712

In [83]:
pipeline.upsert(role_arn=sm_role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


{'PipelineArn': 'arn:aws:sagemaker:us-west-2:668209712187:pipeline/nba-pipeline-1',
 'ResponseMetadata': {'RequestId': 'f5ea4ed0-a82a-423c-be5a-c348a811e298',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f5ea4ed0-a82a-423c-be5a-c348a811e298',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Mon, 13 Mar 2023 11:25:47 GMT'},
  'RetryAttempts': 0}}

In [84]:
pipeline.start() # can add parameters here

_PipelineExecution(arn='arn:aws:sagemaker:us-west-2:668209712187:pipeline/nba-pipeline-1/execution/8luyen4fginm', sagemaker_session=<sagemaker.session.Session object at 0x282f3ad30>)

In [70]:
def delete_sagemaker_pipeline(sm_client, pipeline_name):
    try:
        sm_client.delete_pipeline(
            PipelineName=pipeline_name,
        )
        print("{} pipeline deleted".format(pipeline_name))
    except Exception as e:
        print("{} \n".format(e))
        return

pipelines = sm.list_pipelines()
pipeline_names = [ps['PipelineName'] for ps in pipelines['PipelineSummaries']]
for p in pipeline_names:
    try:
        delete_sagemaker_pipeline(sm, p)
    except:
        pass

nba-pipeline-2 pipeline deleted
nba-pipeline-1 pipeline deleted


In [94]:
response = sm.delete_model_package_group(
    ModelPackageGroupName='PipelineModelPackageGroup'
)

In [88]:
sm.list_model_package_groups()

{'ModelPackageGroupSummaryList': [{'ModelPackageGroupName': 'nba-models-2023',
   'ModelPackageGroupArn': 'arn:aws:sagemaker:us-west-2:668209712187:model-package-group/nba-models-2023',
   'CreationTime': datetime.datetime(2023, 3, 9, 17, 20, 52, 518000, tzinfo=tzlocal()),
   'ModelPackageGroupStatus': 'Completed'},
  {'ModelPackageGroupName': 'PipelineModelPackageGroup',
   'ModelPackageGroupArn': 'arn:aws:sagemaker:us-west-2:668209712187:model-package-group/pipelinemodelpackagegroup',
   'CreationTime': datetime.datetime(2023, 3, 2, 14, 36, 25, 649000, tzinfo=tzlocal()),
   'ModelPackageGroupStatus': 'Completed'}],
 'ResponseMetadata': {'RequestId': 'c054ef68-99b5-4d24-9f70-2c7a0474d956',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c054ef68-99b5-4d24-9f70-2c7a0474d956',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '486',
   'date': 'Mon, 13 Mar 2023 11:53:03 GMT'},
  'RetryAttempts': 0}}

In [92]:
sm.list_model_packages(ModelPackageGroupName='PipelineModelPackageGroup')

{'ModelPackageSummaryList': [{'ModelPackageGroupName': 'PipelineModelPackageGroup',
   'ModelPackageVersion': 1,
   'ModelPackageArn': 'arn:aws:sagemaker:us-west-2:668209712187:model-package/pipelinemodelpackagegroup/1',
   'CreationTime': datetime.datetime(2023, 3, 2, 14, 36, 25, 960000, tzinfo=tzlocal()),
   'ModelPackageStatus': 'Completed',
   'ModelApprovalStatus': 'Approved'}],
 'ResponseMetadata': {'RequestId': '853bbf8f-f3ec-4df3-8488-0ced3aac5e3b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '853bbf8f-f3ec-4df3-8488-0ced3aac5e3b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '307',
   'date': 'Mon, 13 Mar 2023 11:54:47 GMT'},
  'RetryAttempts': 0}}

In [93]:
response = sm.delete_model_package(
    ModelPackageName='arn:aws:sagemaker:us-west-2:668209712187:model-package/pipelinemodelpackagegroup/1'
)