In [0]:


from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

Loading data and computation of outcome column used as the dependent variable

In [2]:
df = spark.read.format("csv").option("header", "true").load("/data/results.csv")

df=df.withColumn('diff', col('home_score')-col('away_score'))
df=df.withColumn('home_out',expr("""IF(diff > 0, 'w', IF(diff < 0, 'l', 'd'))""")).withColumn('away_out',expr("""IF(diff > 0, 'l', IF(diff < 0, 'w', 'd'))"""))
#df.show()
dfhome=df.select("home_team","away_team","city","home_out").withColumnRenamed("home_team", "team").withColumnRenamed("away_team", "rival").withColumnRenamed("home_out", "outcome")
dfaway=df.select("away_team","home_team","city","away_out").withColumnRenamed("away_team", "team").withColumnRenamed("home_team", "rival").withColumnRenamed("away_out", "outcome")

df_data = dfhome.unionByName(dfaway)
df_data.show()
print((df_data.count(), len(df_data.columns)))

In [3]:
from pyspark.ml.classification import LogisticRegression

####################################################################################################################################################

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Print out the parameters, documentation, and any default values.
print ("LogisticRegression parameters")

#categorical data


In [4]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['team', 'rival', 'city']
stages = []

#encoding categorical data
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    
label_stringIdx = StringIndexer(inputCol = 'outcome', outputCol = 'label')
stages += [label_stringIdx]

assemblerInputs = [c + "classVec" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [5]:

from pyspark.ml import Pipeline
cols = df_data.columns #############

pipeline = Pipeline(stages = stages)

pipelineModel = pipeline.fit(df_data)
df_data = pipelineModel.transform(df_data)
selectedCols = ['label', 'features'] + cols
df_data = df_data.select(selectedCols)
df_data.printSchema()

In [6]:
import pandas as pd
pd.DataFrame(df_data.take(5), columns=df_data.columns).transpose()

In [7]:

train, test = df_data.randomSplit([0.8, 0.2], seed = 1800)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [8]:

train.show()
test.show()

In [9]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

In [10]:
print ("lrModel  was fit using parameters: ")
print (lrModel.extractParamMap)

In [11]:
print("Multinomial coefficients: " + str(lrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(lrModel.interceptVector))


In [12]:
from pyspark.ml.classification import LogisticRegression

# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory

print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

In [13]:
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"% (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [14]:
predictions = lrModel.transform(test)
dr_predit=predictions.select('team','rival','city', 'label','rawPrediction', 'prediction', 'probability')
dr_predit.show(10)
#label_stringIdx = StringIndexer(inputCol = 'outcome', outputCol = 'label')
#user_labels = user_indexer_model.labels


In [15]:
from pyspark.ml.feature import IndexToString
print(stages)
print(pipelineModel)
#label_stringIdx = StringIndexer(inputCol = 'outcome', outputCol = 'label')

ind_str = IndexToString(inputCol='prediction',outputCol='pred_label',labels=pipelineModel.stages[6].labels)

dr_predit=dr_predit.withColumnRenamed("label", "enc_label")
predictions_dec = ind_str.transform(dr_predit)
predictions_dec.show()


In [16]:
df_with_prediction = StringIndexerModel.transform(dr_predit).withColumnRenamed(
    "label", "prediction"
)

idx_to_string.transform(df_with_prediction).show()


user_id_to_label = IndexToString(inputCol="prediction", outputCol="predictionlabel", labels=label_stringIdx)
user_id_to_label.transform(dr_predit).show()

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

Regression performed poorly because it is too weak given the range of different features. The prediction accuracy of decision trees can be improved by Ensemble methods, such as Random Forest and Gradient-Boosted Tree.

Random Forest Classifier

In [20]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
df_predit=predictions.select('team','rival','city', 'rawPrediction', 'prediction', 'probability')
predictions_dec = ind_str.transform(df_predit)
predictions_dec.show(10)

In [21]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))


Test Area Under ROC: 0.6052702020100458

logistic regression model with elastic net regularization

In [24]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(train)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

Decision Tree Classifier

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df_data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df_data)

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
df_predit=predictions.select("prediction", "indexedLabel", "features")
predictions_dec = ind_str.transform(df_predit)
predictions_dec.show(10)

In [27]:
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features")

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")  #Precision
accuracy = evaluator.evaluate(predictions)
print ("Test Error = %g" % (1.0 - accuracy))

treeModel = model.stages[2]
print (treeModel) # summary only