## Import libraries

In [146]:
# pyspark libraries
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, FloatType, ArrayType
from pyspark.sql.functions import col, udf, when
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.tuning import ParamGridBuilder

# python libraries
import numpy as np

## User Defined Functions

In [29]:
def convert_string_to_float(x):
    x_replace_minus = x.replace(u'\u2212', '-')
    if x_replace_minus == '-':
        return np.nan
    else:
        return float(x_replace_minus)

udf_convert_string_to_float = udf(lambda x: convert_string_to_float(x), FloatType())

In [30]:
udf_get_percentage_game = udf(lambda x, y: x / y, FloatType())

In [31]:
udf_create_features = udf(lambda s,t,u,v,w,x,y,z: Vectors.dense([s,t,u,v,w,x,y,z]), VectorUDT())

In [38]:
def get_date_string(date, month, year):
    return year + "/" + month + "/" + date

udf_get_date_string = udf(lambda date, month, year: get_date_string(date, month, year), StringType())

In [107]:
def win_team_1(score_team_1, score_team_2):
    if score_team_1 > score_team_2:
        return 2.0
    elif score_team_1 < score_team_2:
        return 1.0
    else:
        return 0.0
    
udf_win_team_1 = udf(lambda team_1, team_2: win_team_1(team_1, team_2), FloatType())

In [108]:
udf_diff_features = udf(lambda features_1, features_2: features_1 - features_2, VectorUDT())

In [109]:
schema = StructType([
    StructField("rankGroup_local", StringType(), True),
    StructField("rankGroup_global", StringType(), True),
    StructField("teamGroup_team", StringType(), True),
    StructField("ratingGroup_rating", StringType(), True),
    StructField("highestGroup_rank_max", StringType(), True),
    StructField("highestGroup_rating_max", StringType(), True),
    StructField("averageGroup_rank_avg", StringType(), True),
    StructField("averageGroup_rating_avg", StringType(), True),
    StructField("lowestGroup_rank_min", StringType(), True),
    StructField("lowestGroup_rating_min", StringType(), True),
    StructField("change3mGroup_rank_three_month_change", StringType(), True),
    StructField("change3mGroup_rating_three_month_change", StringType(), True),
    StructField("change6mGroup_rank_six_month_change", StringType(), True),
    StructField("change6mGroup_rating_six_month_change", StringType(), True),
    StructField("change1yGroup_rank_one_year_change", StringType(), True),
    StructField("change1yGroup_rating_one_year_change", StringType(), True),
    StructField("change2yGroup_rank_two_year_change", StringType(), True),
    StructField("change2yGroup_rating_two_year_change", StringType(), True),
    StructField("change5yGroup_rank_five_year_change", StringType(), True),
    StructField("change5yGroup_rating_five_year_change", StringType(), True),
    StructField("change10yGroup_rank_ten_year_change", StringType(), True),
    StructField("change10yGroup_rating_ten_year_change", StringType(), True),
    StructField("matchesGroup_total", StringType(), True),
    StructField("matchesGroup_home", StringType(), True),
    StructField("matchesGroup_away", StringType(), True),
    StructField("matchesGroup_neutral", StringType(), True),
    StructField("matchesGroup_wins", StringType(), True),
    StructField("matchesGroup_losses", StringType(), True),
    StructField("matchesGroup_draws", StringType(), True),
    StructField("goalsGroup_for", StringType(), True),
    StructField("goalsGroup_against", StringType(), True)
])

names_to_convert = schema.names
names_to_convert.remove("teamGroup_team")


AFC_qualifying_start = spark.read.csv("../data/AFC/2014_World_Cup_AFC_qualifying_start.tsv", sep="\t", 
                                      schema=schema, header=False)\
                                 .select([udf_convert_string_to_float(col(name)).alias(name) for name in names_to_convert] + ["teamGroup_team"])

In [110]:
AFC_qualifying_start = AFC_qualifying_start\
.withColumn("matches_home",    udf_get_percentage_game(col("matchesGroup_home"), col("matchesGroup_total")))\
.withColumn("matches_away",    udf_get_percentage_game(col("matchesGroup_away"), col("matchesGroup_total")))\
.withColumn("matches_neutral", udf_get_percentage_game(col("matchesGroup_neutral"), col("matchesGroup_total")))\
.withColumn("matches_wins",    udf_get_percentage_game(col("matchesGroup_wins"), col("matchesGroup_total")))\
.withColumn("matches_losses",  udf_get_percentage_game(col("matchesGroup_losses"), col("matchesGroup_total")))\
.withColumn("matches_draws",  udf_get_percentage_game(col("matchesGroup_draws"), col("matchesGroup_total")))\
.withColumn("matches_for",    udf_get_percentage_game(col("goalsGroup_for"), col("matchesGroup_total")))\
.withColumn("matches_against",  udf_get_percentage_game(col("goalsGroup_against"), col("matchesGroup_total")))\
.select(col("teamGroup_team").alias("team"), col("matches_home"), col("matches_away"), col("matches_neutral"), 
        col("matches_wins"), col("matches_losses"), col("matches_draws"),
        col("matches_for"), col("matches_against"))

AFC_qualifying_start.show(5)


+----+------------+------------+---------------+------------+--------------+-------------+-----------+---------------+
|team|matches_home|matches_away|matches_neutral|matches_wins|matches_losses|matches_draws|matches_for|matches_against|
+----+------------+------------+---------------+------------+--------------+-------------+-----------+---------------+
|  JP|  0.37785017|    0.252443|     0.36970684|  0.45114008|    0.32899022|    0.2198697|  1.6905538|       1.223127|
|  KR|  0.31050768|  0.23966943|      0.4498229|   0.5478158|    0.20070839|    0.2514758|   1.853601|      0.9020071|
|  AU|   0.4437086|   0.3620309|      0.1942605|   0.5121413|    0.27593818|   0.21192053|   2.039735|      1.1037527|
|  IR|        0.34|       0.278|          0.382|       0.546|         0.218|        0.236|       1.87|          0.826|
|  CN|  0.33032492|  0.33754513|     0.33212996|  0.51805055|    0.27436823|   0.20758122|  1.9801444|      1.0361011|
+----+------------+------------+---------------+

In [111]:
AFC_qualifying_start = AFC_qualifying_start\
.withColumn("features", udf_create_features(col("matches_home"), col("matches_away"), col("matches_neutral"),
                                            col("matches_wins"), col("matches_losses"), col("matches_draws"),
                                            col("matches_for"),  col("matches_against")))\
.select("team", "features")

# AFC_qualifying_start.show(5, truncate=True)

+----+--------------------+
|team|            features|
+----+--------------------+
|  JP|[0.37785017490386...|
|  KR|[0.31050768494606...|
|  AU|[0.44370859861373...|
|  IR|[0.34000000357627...|
|  CN|[0.33032491803169...|
+----+--------------------+
only showing top 5 rows



In [112]:
Vectors.dense([1,2,3,5])

DenseVector([1.0, 2.0, 3.0, 5.0])

In [113]:
schema = StructType([
    StructField("year", StringType(), True),
    StructField("month", StringType(), True),
    StructField("date", StringType(), True),
    StructField("team_1", StringType(), True),
    StructField("team_2", StringType(), True),
    StructField("score_team_1", IntegerType(), True),
    StructField("score_team_2", IntegerType(), True),
    StructField("tournament", StringType(), True),
    StructField("country_played", StringType(), True),
    StructField("rating_moved", StringType(), True),
    StructField("rating_team_1", StringType(), True),
    StructField("rating_team_2", StringType(), True),
    StructField("rank_moved_team_1", StringType(), True),
    StructField("rank_moved_team_2", StringType(), True),
    StructField("rank_team_1", StringType(), True),
    StructField("rank_team_2", StringType(), True)
])

AFC_qualifying_results = spark.read.csv("../data/AFC/2014_World_Cup_AFC_qualifying_results.tsv", sep="\t", 
                                        schema=schema, header=False)\
                              .withColumn("new_date", udf_get_date_string(col("date"), col("month"), col("year")))\
                              .drop("date").drop("month").drop("year").withColumnRenamed("new_date", "date")

names_to_convert = AFC_qualifying_results.schema.names
names_to_remove = ["date",  "team_1", "team_2", "score_team_1", "score_team_2", "tournament", "country_played"]
for name in names_to_remove: names_to_convert.remove(name)


AFC_qualifying_results = AFC_qualifying_results\
                         .select([udf_convert_string_to_float(col(name)).alias(name) for name in names_to_convert] + names_to_remove)\
                         .select("team_1", "team_2", "score_team_1", "score_team_2")\
                         .withColumn("label", udf_win_team_1(col("score_team_1"), col("score_team_2")))\
                         .select("team_1", "team_2", "label")    

In [115]:
data = AFC_qualifying_results.join(AFC_qualifying_start, AFC_qualifying_results.team_1 == AFC_qualifying_start.team)\
.withColumnRenamed("features", "features_1").drop("team")\
.join(AFC_qualifying_start, AFC_qualifying_results.team_2 == AFC_qualifying_start.team)\
.withColumnRenamed("features", "features_2").drop("team")\
.withColumn("features", udf_diff_features(col("features_1"), col("features_2")))\
.select("label", "features")

In [116]:
data.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  2.0|[-0.0924882665276...|
|  2.0|[0.02913619577884...|
|  2.0|[0.08815789222717...|
|  0.0|[-0.0671749860048...|
|  2.0|[-0.1738087832927...|
+-----+--------------------+
only showing top 5 rows



## Decision Tree Classifier

DecisionTreeClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", seed=None

### Define estimator

In [125]:
logistic_regression = LogisticRegression(featuresCol="features", labelCol="label", family="multinomial")

### Define evaluator

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")

### Define grid parameter

In [131]:
grid = ParamGridBuilder()\
.addGrid(logistic_regression.maxIter, [10, 15, 20])\
.addGrid(logistic_regression.regParam, [0.0, 0.1, 0.5, 1.0])\
.addGrid(logistic_regression.elasticNetParam, [0.0, 0.1, 0.5, 1.0])\
.build()

### Random dataset split

In [133]:
train,test = data.randomSplit([0.8, 0.2])

### Simple logistic regression application

In [127]:
print("Train count: {0}".format(train.count()))
print("Test count: {0}".format(test.count()))

model = logistic_regression.setMaxIter(20).setRegParam(0.0).fit(train)
train_prediction = model.transform(train)
test_prediction = model.transform(test)

print("Accuracy on the train dataset: {0}".format(evaluator.evaluate(train_prediction)))
print("Accuracy on the test dataset: {0}".format(evaluator.evaluate(test_prediction)))

Train count: 440
Test count: 111
Accuracy on the train dataset: 0.620454545455
Accuracy on the test dataset: 0.594594594595


### Defined Cross Validator

In [141]:
cv = CrossValidator(estimator=logistic_regression, estimatorParamMaps=grid, evaluator=evaluator, numFolds=4)

In [142]:
cv_model = cv.fit(train)

In [143]:
train_prediction = cv_model.transform(train)
test_prediction = cv_model.transform(test)

In [144]:
print("Accuracy on the train dataset: {0}".format(evaluator.evaluate(train_prediction)))
print("Accuracy on the test dataset: {0}".format(evaluator.evaluate(test_prediction)))

Accuracy on the train dataset: 0.611479028698
Accuracy on the test dataset: 0.540816326531


### Defined Train Validation

In [147]:
tv = TrainValidationSplit(estimator=logistic_regression, estimatorParamMaps=grid, evaluator=evaluator, trainRatio=0.75)
tv_model = tv.fit(train)

In [148]:
train_prediction = tv_model.transform(train)
test_prediction = tv_model.transform(test)

In [149]:
print("Accuracy on the train dataset: {0}".format(evaluator.evaluate(train_prediction)))
print("Accuracy on the test dataset: {0}".format(evaluator.evaluate(test_prediction)))

Accuracy on the train dataset: 0.62472406181
Accuracy on the test dataset: 0.561224489796
