# Did Kobe Bryant sink the shot


In [1]:
from pyspark.sql import SparkSession

# initiate our session and read the main CSV file, then we print the dataframe schema

spark = SparkSession.builder.appName('imbalanced_binary_classification').getOrCreate()
df = spark.read.csv('data.csv', header=True, inferSchema=True)

### MLlib uses only "features" and "labels" columns.

In [2]:
#Rename the shot_made_flag to "label", as required
df = df.withColumnRenamed('shot_made_flag', 'label')
new_df = df
new_df.printSchema()

root
 |-- action_type: string (nullable = true)
 |-- combined_shot_type: string (nullable = true)
 |-- game_event_id: integer (nullable = true)
 |-- game_id: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- loc_x: integer (nullable = true)
 |-- loc_y: integer (nullable = true)
 |-- lon: double (nullable = true)
 |-- minutes_remaining: integer (nullable = true)
 |-- period: integer (nullable = true)
 |-- playoffs: integer (nullable = true)
 |-- season: string (nullable = true)
 |-- seconds_remaining: integer (nullable = true)
 |-- shot_distance: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- shot_type: string (nullable = true)
 |-- shot_zone_area: string (nullable = true)
 |-- shot_zone_basic: string (nullable = true)
 |-- shot_zone_range: string (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- team_name: string (nullable = true)
 |-- game_date: timestamp (nullable = true)
 |-- matchup: string (nullable = true)
 |-- opponent: string (

In [3]:
#Get rid of columns we do not want to train with
drop_col = ['game_id','game_event_id','team_id','team_name','game_date','combined_shot_type','lat','lon','shot_zone_basic','shot_id']
new_df = new_df.select([column for column in new_df.columns if column not in drop_col])

In [4]:
new_df.limit(1).toPandas()

Unnamed: 0,action_type,loc_x,loc_y,minutes_remaining,period,playoffs,season,seconds_remaining,shot_distance,label,shot_type,shot_zone_area,shot_zone_range,matchup,opponent
0,Jump Shot,167,72,10,1,0,2000-01,27,18,,2PT Field Goal,Right Side(R),16-24 ft.,LAL @ POR,POR


### Check out the possible label values

In [5]:
new_df.groupby('label').count().toPandas()

Unnamed: 0,label,count
0,,5000
1,1.0,11465
2,0.0,14232


### The null value labels, are the data we need to make predictions on, for the competition

In [6]:
val_df = df.filter('label is null')

In [7]:
val_df.toPandas().head()

Unnamed: 0,action_type,combined_shot_type,game_event_id,game_id,lat,loc_x,loc_y,lon,minutes_remaining,period,...,shot_type,shot_zone_area,shot_zone_basic,shot_zone_range,team_id,team_name,game_date,matchup,opponent,shot_id
0,Jump Shot,Jump Shot,10,20000012,33.9723,167,72,-118.1028,10,1,...,2PT Field Goal,Right Side(R),Mid-Range,16-24 ft.,1610612747,Los Angeles Lakers,2000-10-31,LAL @ POR,POR,1
1,Jump Shot,Jump Shot,254,20000012,34.0163,1,28,-118.2688,8,3,...,2PT Field Goal,Center(C),Restricted Area,Less Than 8 ft.,1610612747,Los Angeles Lakers,2000-10-31,LAL @ POR,POR,8
2,Driving Layup Shot,Layup,100,20000019,34.0443,0,0,-118.2698,0,1,...,2PT Field Goal,Center(C),Restricted Area,Less Than 8 ft.,1610612747,Los Angeles Lakers,2000-11-01,LAL vs. UTA,UTA,17
3,Driving Layup Shot,Layup,249,20000019,34.0443,0,0,-118.2698,10,3,...,2PT Field Goal,Center(C),Restricted Area,Less Than 8 ft.,1610612747,Los Angeles Lakers,2000-11-01,LAL vs. UTA,UTA,20
4,Jump Shot,Jump Shot,4,20000047,33.9683,163,76,-118.1068,11,1,...,2PT Field Goal,Right Side(R),Mid-Range,16-24 ft.,1610612747,Los Angeles Lakers,2000-11-04,LAL @ VAN,VAN,33


In [None]:
ones = new_df.filter("label=1").count()
zeros = new_df.filter("label=0").count()

total = ones + zeros
print('all: {}'.format(total))
print('ones: {}'.format(ones))
print('zeros: {}'.format(zeros))
ratio = round(zeros/total,2)
print('ratio is: {}'.format(ratio))

imbalanced = False
if ratio > .7:
    imbalanced = True
    print('This is an imbalanced dataset: {}'.format(ratio))
else:
    imbalanced = False
    print('This is not an imbalanced dataset: {}'.format(ratio))

In [None]:
val_df.count()

In [None]:
new_df = new_df.na.drop()

In [None]:
# let's have a look at the distribution of our target variable:
# to make it look better, we first convert our spark df to a Pandas df

import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
df_pd = new_df.toPandas()
print(len(df_pd))
plt.figure(figsize=(12,10))
sns.countplot(x='label', data=df_pd, order=df_pd['label'].value_counts().index)

In [None]:
# let's see how everything look in Pandas
import pandas as pd
pd.DataFrame(new_df.take(10), columns= new_df.columns)

## Data Wrangling

In [None]:
for item in new_df.dtypes:
    print('{} : {}'.format(item[0], item[1]))

In [None]:
# we use the below function to find more information about the missing values

def info_missing_table(df_pd):
    """Input pandas dataframe and Return columns with missing value and percentage"""
    mis_val = df_pd.isnull().sum() #count total of null in each columns in dataframe
    mis_val_percent = 100 * df_pd.isnull().sum() / len(df_pd) #count percentage of null in each columns
    mis_val_table = pd.concat([mis_val, mis_val_percent], axis=1)  #join to left (as column) between mis_val and mis_val_percent
    mis_val_table_ren_columns = mis_val_table.rename(
    columns = {0 : 'Missing Values', 1 : '% of Total Values'}) #rename columns in table
    mis_val_table_ren_columns = mis_val_table_ren_columns[
    mis_val_table_ren_columns.iloc[:,1] != 0].sort_values('% of Total Values', ascending=False).round(1)         
    print ("Your selected dataframe has " + str(df_pd.shape[1]) + " columns.\n"    #.shape[1] : just view total columns in dataframe  
    "There are " + str(mis_val_table_ren_columns.shape[0]) +              
    " columns that have missing values.") #.shape[0] : just view total rows in dataframe
    return mis_val_table_ren_columns

In [None]:
missings = info_missing_table(df_pd)
missings

In [None]:
# so this function deals with the a spark dataframe directly to find more about the missing values

def count_missings(spark_df):
    null_counts = []        
    for col in spark_df.dtypes:    
        cname = col[0]     
        ctype = col[1]      
        nulls = spark_df.where( spark_df[cname].isNull()).count() #check count of null in column name
        result = tuple([cname, nulls])  #new tuple, (column name, null count)
        null_counts.append(result)      #put the new tuple in our result list
    null_counts=[(x,y) for (x,y) in null_counts if y!=0]  #view just columns that have missing values
    return null_counts


In [None]:
miss_counts = count_missings(new_df)
miss_counts

In [None]:
# here we seperate missing columns in our new_df based on categorical and numerical types

list_cols_miss=[x[0] for x in miss_counts]
df_miss= new_df.select(*list_cols_miss)
#categorical columns
catcolums_miss=[item[0] for item in df_miss.dtypes if item[1].startswith('string')]  #will select name of column with string data type
print("cateogrical columns_miss:", catcolums_miss)

### numerical columns
numcolumns_miss = [item[0] for item in df_miss.dtypes if item[1].startswith('int') | item[1].startswith('double')] #will select name of column with integer or double data type
# print("numerical columns_miss:", numcolumns_miss)

In [None]:
# now that we have seperated the columns based on categorical and numerical types, we will fill the missing categiracl 
# values with the most frequent category

from pyspark.sql.functions import rank,sum,col
df_Nomiss=new_df.na.drop()
for x in catcolums_miss:
    mode=df_Nomiss.groupBy(x).count().sort(col("count").desc()).collect()[0][0] 
    print(x, mode) #print name of columns and it's most categories 
    new_df = new_df.na.fill({x:mode}) 

In [None]:
# and we fill the missing numerical values with the average of each column

from pyspark.sql.functions import mean, round

for i in numcolumns_miss:
    meanvalue = new_df.select(round(mean(i))).collect()[0][0] 
    print(i, meanvalue) 
    new_df=new_df.na.fill({i:meanvalue}) 

In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import re
re_at = re.compile('@')
def do_matchup(label):
    if re_at.search(label):
        return 0
    else: 
        return 1

do_matchup_udf = udf(lambda x: do_matchup(x), IntegerType())

new_df = new_df.withColumn('home_game', do_matchup_udf(col('matchup')))
val_df = val_df.withColumn('home_game', do_matchup_udf(col('matchup')))


## Dealing with Imbalanced classes if necessary:
### If the dataset is imbalanced, automatically add a new column to the dataset called "weights" and fill it with the ratio of each class.

In [None]:
# adding the new column weights and fill it with ratios

from pyspark.sql.functions import when

def weight_balance(labels):
    return when(labels == 1, ratio).otherwise(1*(1-ratio))

if imbalanced:
    new_df = new_df.withColumn('weights', weight_balance(col('label')))

In [None]:
new_df = new_df.drop('matchup')
val_df = val_df.drop('matchup')

In [None]:
new_df.limit(10).toPandas()

In [None]:
# now let's count if we have still any missing values in our dataset:

miss_counts2 = count_missings(new_df)
miss_counts2

In [None]:
# and have another look at the data after filling the missing values and adding the weight column
pd.DataFrame(new_df.take(10), columns= new_df.columns)

## feature engineering:

In [None]:
# now let's see how many categorical and numerical features we have:

cat_cols = [item[0] for item in new_df.dtypes if item[1].startswith('string')] 
print(str(len(cat_cols)) + '  categorical features')

#num_cols = [item[0] for item in new_df.dtypes if item[1].startswith('int') | item[1].startswith('double')][1:]

#don't include 'label' column as a numeric feature.  That would be cheating and result in a perfect model
num_cols = []
for item in new_df.dtypes:
    if item[0] == 'label':
        print('found label')
    else:
        if item[1].startswith('int') | item[1].startswith('double'):
            num_cols += [item[0]]
            
print(str(len(num_cols)) + '  numerical features')

In [None]:
num_cols

In [None]:
cat_cols

In [None]:
# we use the OneHotEncoderEstimator from MLlib in spark to convert aech v=categorical feature into one-hot vectors
# next, we use VectorAssembler to combine the resulted one-hot ector and the rest of numerical features into a 
# single vector column. we append every step of the process in a stages array

from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

stages = []
for categoricalCol in cat_cols:
    print(categoricalCol)
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index').setHandleInvalid("keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

assemblerInputs = [c + "classVec" for c in cat_cols] + num_cols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
new_df.columns

In [None]:
from pandas.plotting import scatter_matrix
numeric_data = new_df.select('loc_x','loc_y').toPandas()
#numeric_data = new_df.select(num_cols).toPandas()

axs = scatter_matrix(numeric_data, figsize=(8, 8));
n = len(numeric_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

In [None]:
# we use a pipeline to apply all the stages of tranformation to the data

from pyspark.ml import Pipeline
cols = new_df.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(new_df)
new_df = pipelineModel.transform(new_df)

In [None]:
new_df.toPandas().head()

In [None]:
selectedCols = ['features']+cols
new_df = new_df.select(selectedCols)
pd.DataFrame(new_df.take(5), columns=new_df.columns)

In [None]:
selectedCols

In [None]:
# split the data into trainign and testin sets

train, test = new_df.randomSplit([0.90, 0.10], seed = 53)
print(train.count())
print(test.count())

## Training models

In [None]:
# first we check how LogisticRegression perform 
from pyspark.ml.classification import LogisticRegression

LR = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=15)
LR_model = LR.fit(train)


In [None]:
#plotting the ROC Curve

trainingSummary = LR_model.summary

roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()

print('Training set ROC: ' + str(trainingSummary.areaUnderROC))

In [None]:

import numpy as np
beta = np.sort(LR_model.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions_LR = LR_model.transform(test)
evaluator = BinaryClassificationEvaluator()
print("Test_SET Area Under ROC: " + str(evaluator.evaluate(predictions_LR, {evaluator.metricName: "areaUnderROC"})))

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
# next we checkout gradient boosting trees

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(maxIter=15)
GBT_Model = gbt.fit(train)
predictions = GBT_Model.transform(test)

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test_SET Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

### Run the data thru a Cross Validator to find the best gbt model params

In [None]:
if False:
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

    paramGrid = (ParamGridBuilder()
                 .addGrid(gbt.maxDepth, [2, 4, 6])
                 .addGrid(gbt.maxBins, [20, 30])
                 .addGrid(gbt.maxIter, [10, 15])
                 .build())

    cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

    # Run cross validations.
    cvModel = cv.fit(train)
    predictions = cvModel.transform(test)
    evaluator.evaluate(predictions)

### Peek at test set predictions

In [None]:
predictions.select('shot_type','shot_distance','label','prediction').limit(100).toPandas()

### Peek at the validation set data

In [None]:
val_df.limit(10).toPandas()

### Send the validation set (set used for the competition) thru the data transformation pipeline

In [None]:
transform_val_df = pipelineModel.transform(val_df)

In [None]:
transform_val_df.limit(10).toPandas()

In [None]:
transform_val_df = transform_val_df.drop('label')

### Create Predictions based on the GBT Model

In [None]:
val_predictions = GBT_Model.transform(transform_val_df)

### Take a peek at some predictions

In [None]:
val_predictions.select('action_type','shot_zone_area','shot_distance','opponent','home_game','prediction').limit(100).toPandas()
#val_predictions.select('features').toPandas()

### Save the best model if using Cross-Validation or save the only model if using stright Gradient Boost

In [None]:
try:
    cvModel.bestModel.save('kobe.gt')
    cvModel.save('kobe.gbt.model')
except NameError:
    GBT_Model.save('kobe.gbt.model')

### Save the predicitons in the form (shot_id, prediciton) in a single .csv file

In [None]:
val_predictions.select('shot_id','prediction').coalesce(1).write.csv('kobe.predictions.csv',header=True)

In [None]:
val_predictions.select('shot_id','prediction').limit(10).toPandas()