In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:99% !important; }</style>"))

### Load Spark and Data

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('eurovisiontree').getOrCreate()

In [3]:
data = spark.read.csv('results_2017_2018.csv',inferSchema=True,header=True)

In [4]:
data.printSchema()

root
 |-- round: string (nullable = true)
 |-- country: string (nullable = true)
 |-- playing_order: integer (nullable = true)
 |-- rank_source: integer (nullable = true)
 |-- points_televoting: integer (nullable = true)
 |-- points_jury: integer (nullable = true)
 |-- tweets: integer (nullable = true)
 |-- positive: integer (nullable = true)
 |-- neutral: integer (nullable = true)
 |-- negative: integer (nullable = true)



In [5]:
data.show(5)

+----------+-------+-------------+-----------+-----------------+-----------+------+--------+-------+--------+
|     round|country|playing_order|rank_source|points_televoting|points_jury|tweets|positive|neutral|negative|
+----------+-------+-------------+-----------+-----------------+-----------+------+--------+-------+--------+
|2017_final|    POR|           11|          1|              376|        382|  3377|     833|    138|     786|
|2017_final|    BUL|           25|          2|              337|        278|  1364|     488|    106|     313|
|2017_final|    MDA|            7|          3|              264|        110|  2024|     875|     95|     438|
|2017_final|    BEL|           23|          4|              255|        108|  1642|     409|    118|     494|
|2017_final|    SWE|           24|          5|              126|        218|  1560|     448|    132|     519|
+----------+-------+-------------+-----------+-----------------+-----------+------+--------+-------+--------+
only showi

### Create additional features

In [6]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [7]:
# compute total points
data_tmp1 = data.withColumn('points_total', data['points_televoting'] + data['points_jury'])

In [8]:
# compute rank columns

# set windows for each rank
total_rank_window = Window.partitionBy('round').orderBy(data_tmp1['points_total'].desc())
jury_rank_window = Window.partitionBy('round').orderBy(data_tmp1['points_jury'].desc())
televoting_rank_window = Window.partitionBy('round').orderBy(data_tmp1['points_televoting'].desc())

# create rank columns
data_tmp2 = data_tmp1\
.withColumn('rank_total', rank().over(total_rank_window))\
.withColumn('rank_jury', rank().over(jury_rank_window))\
.withColumn('rank_televoting', rank().over(televoting_rank_window))

In [9]:
# compute percentage of x_tweets per round
round_window = Window.partitionBy('round').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

data_tmp3 = data_tmp2\
.withColumn('tweets_perc', data_tmp2['tweets']/sum('tweets').over(round_window))\
.withColumn('positive_perc', data_tmp2['positive']/sum('positive').over(round_window))\
.withColumn('negative_perc', data_tmp2['negative']/sum('negative').over(round_window))\
.withColumn('neutral_perc', data_tmp2['neutral']/sum('neutral').over(round_window))

In [10]:
# compute log of x_tweets
data_tmp4 = data_tmp3\
.withColumn('tweets_log', log(data_tmp3['tweets']))\
.withColumn('positive_log', log(data_tmp3['positive']))\
.withColumn('negative_log', log(data_tmp3['negative']))\
.withColumn('neutral_log', log(data_tmp3['neutral']))

In [11]:
# normalize playing order
data_tmp5 = data_tmp4\
.withColumn('playing_order_norm', data_tmp4['playing_order']/count('playing_order').over(round_window))

In [12]:
# create labels 'isTopN'

from pyspark.sql.types import IntegerType

isTop5 = udf(lambda r: 1 if r<=5 else 0)
isTop10 = udf(lambda r: 1 if r<=10 else 0)
    
data_tmp6 = data_tmp5\
.withColumn('isTop5_jury', isTop5(data_tmp5['rank_jury']).cast(IntegerType()))\
.withColumn('isTop10_jury', isTop10(data_tmp5['rank_jury']).cast(IntegerType()))\
.withColumn('isTop5_televoting', isTop5(data_tmp5['rank_televoting']).cast(IntegerType()))\
.withColumn('isTop10_televoting', isTop10(data_tmp5['rank_televoting']).cast(IntegerType()))\
.withColumn('isTop5_total', isTop5(data_tmp5['rank_total']).cast(IntegerType()))\
.withColumn('isTop10_total', isTop10(data_tmp5['rank_total']).cast(IntegerType()))

In [13]:
data_features = data_tmp6

In [14]:
data_features.printSchema()

root
 |-- round: string (nullable = true)
 |-- country: string (nullable = true)
 |-- playing_order: integer (nullable = true)
 |-- rank_source: integer (nullable = true)
 |-- points_televoting: integer (nullable = true)
 |-- points_jury: integer (nullable = true)
 |-- tweets: integer (nullable = true)
 |-- positive: integer (nullable = true)
 |-- neutral: integer (nullable = true)
 |-- negative: integer (nullable = true)
 |-- points_total: integer (nullable = true)
 |-- rank_total: integer (nullable = true)
 |-- rank_jury: integer (nullable = true)
 |-- rank_televoting: integer (nullable = true)
 |-- tweets_perc: double (nullable = true)
 |-- positive_perc: double (nullable = true)
 |-- negative_perc: double (nullable = true)
 |-- neutral_perc: double (nullable = true)
 |-- tweets_log: double (nullable = true)
 |-- positive_log: double (nullable = true)
 |-- negative_log: double (nullable = true)
 |-- neutral_log: double (nullable = true)
 |-- playing_order_norm: double (nullable = tr

In [29]:
data_features.limit(5).toPandas()

Unnamed: 0,round,country,playing_order,rank_source,points_televoting,points_jury,tweets,positive,neutral,negative,...,positive_log,negative_log,neutral_log,playing_order_norm,isTop5_jury,isTop10_jury,isTop5_televoting,isTop10_televoting,isTop5_total,isTop10_total
0,2017_semi2,BUL,15,1,204,199,2014,741,73,427,...,6.608001,6.056784,4.290459,0.833333,1,1,1,1,1,1
1,2017_semi2,HUN,7,2,165,66,1479,328,97,399,...,5.793014,5.988961,4.574711,0.388889,0,1,1,1,1,1
2,2017_semi2,ROU,5,6,148,26,379,95,24,98,...,4.553877,4.584967,3.178054,0.277778,0,0,1,1,0,1
3,2017_semi2,ISR,18,3,132,75,2201,469,70,866,...,6.150603,6.763885,4.248495,1.0,0,1,1,1,1,1
4,2017_semi2,CRO,11,8,104,37,2225,705,122,733,...,6.558198,6.597146,4.804021,0.611111,0,0,1,1,0,1


### Clean Data

In [34]:
# list number of NANs or NULLs in each column
from pyspark.sql.functions import count, when, isnan, col
data_features.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_features.columns]).show()

+-----+-------+-------------+-----------+-----------------+-----------+------+--------+-------+--------+------------+----------+---------+---------------+-----------+-------------+-------------+------------+----------+------------+------------+-----------+------------------+-----------+------------+-----------------+------------------+------------+-------------+
|round|country|playing_order|rank_source|points_televoting|points_jury|tweets|positive|neutral|negative|points_total|rank_total|rank_jury|rank_televoting|tweets_perc|positive_perc|negative_perc|neutral_perc|tweets_log|positive_log|negative_log|neutral_log|playing_order_norm|isTop5_jury|isTop10_jury|isTop5_televoting|isTop10_televoting|isTop5_total|isTop10_total|
+-----+-------+-------------+-----------+-----------------+-----------+------+--------+-------+--------+------------+----------+---------+---------------+-----------+-------------+-------------+------------+----------+------------+------------+-----------+--------------

In [35]:
# nothing to drop
cleaned_data = data_features

### Formatting Data for Spark

In [36]:
# set label to predict
isTopN = 'isTop10_total'

# set the features to analyze in the model
#features = ['negative', 'neutral', 'positive', 'tweets']
#features = ['negative_log', 'neutral_log', 'positive_log', 'tweets_log']
features = ['negative_perc', 'neutral_perc', 'positive_perc', 'tweets_perc', 'playing_order_norm']
#features = ['negative', 'neutral', 'positive', 'tweets', 'playing_order']

In [37]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=features, outputCol="features")

prepared_data = assembler.transform(cleaned_data).select('features',isTopN)

train_data,test_data = prepared_data.randomSplit([0.7,0.3])

In [53]:
# fake split with 2018_final as test 
train_data = assembler.transform(
    cleaned_data\
    .filter(cleaned_data['round'] != '2018_final')
)\
.select('round','country','features',isTopN)

test_data = assembler.transform(
    cleaned_data\
    .filter(cleaned_data['round'] == '2018_final')
)\
.select('round','country','features',isTopN)

In [54]:
print('%d rows in train_data' % train_data.count())
print('%d rows in test_data' % test_data.count())

99 rows in train_data
26 rows in test_data


### Decision Tree Classifier

In [55]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create Classifier instances
dtc = DecisionTreeClassifier(labelCol=isTopN,featuresCol='features')

# train classifiers
dtc_model = dtc.fit(train_data)

# evaluate on test data
dtc_predictions = dtc_model.transform(test_data)

# Feature importance
print('Feature importance')
print(list(zip(features, list(dtc_model.featureImportances))))

# AUC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
binary_eval = BinaryClassificationEvaluator(labelCol=isTopN)
print('DTC AUC: %f' % binary_eval.evaluate(dtc_predictions))

# Accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator(labelCol=isTopN, predictionCol="prediction", metricName="accuracy")
print('DTC ACC %f' % acc_evaluator.evaluate(dtc_predictions))

Feature importance
[('negative_perc', 0.29978915158226155), ('neutral_perc', 0.12141528213931896), ('positive_perc', 0.3770670154930325), ('tweets_perc', 0.09809414412072545), ('playing_order_norm', 0.10363440666466152)]
DTC AUC: 0.634375
DTC ACC 0.653846


- Features raw
  - DTC Default: AUC 0.63 - ACC 0.57

- Features raw + playing order
  - DTC Default: AUC 0.59 - ACC 0.65

- Features percentages
  - DTC Default: AUC 0.53 - ACC 0.50
  - DTC maxDepth=10, maxBins=64: AUC 0.56 - ACC 0.50
 
- Features percentages + playing order norm
  - DTC Default: AUC 0.65 - ACC 0.57

In [71]:
dtc_predictions.orderBy(desc('prediction')).show()

+----------+-------+--------------------+-------------+-------------+--------------------+----------+
|     round|country|            features|isTop10_total|rawPrediction|         probability|prediction|
+----------+-------+--------------------+-------------+-------------+--------------------+----------+
|2018_final|    AUS|[0.04969097651421...|            0|    [0.0,2.0]|           [0.0,1.0]|       1.0|
|2018_final|    ITA|[0.01878862793572...|            1|    [2.0,5.0]|[0.28571428571428...|       1.0|
|2018_final|    FIN|[0.02694684796044...|            0|    [2.0,5.0]|[0.28571428571428...|       1.0|
|2018_final|    MDA|[0.03584672435105...|            1|    [2.0,5.0]|[0.28571428571428...|       1.0|
|2018_final|    ISR|[0.11297898640296...|            1|    [0.0,2.0]|           [0.0,1.0]|       1.0|
|2018_final|    CYP|[0.02793572311495...|            1|    [2.0,5.0]|[0.28571428571428...|       1.0|
|2018_final|    IRL|[0.02126081582200...|            0|    [2.0,5.0]|[0.2857142857

### Random Forest Classifier

In [72]:
from pyspark.ml.classification import RandomForestClassifier

# Create Classifier instances
rfc = RandomForestClassifier(labelCol=isTopN,featuresCol='features', numTrees=100)

# train classifiers
rfc_model = rfc.fit(train_data)

# Feature importance
print('Feature importance')
print(list(zip(features, list(dtc_model.featureImportances))))

# evaluate on test data
rfc_predictions = rfc_model.transform(test_data)

# AUC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
binary_eval = BinaryClassificationEvaluator(labelCol=isTopN)
print('RFC AUC: %f' % binary_eval.evaluate(rfc_predictions))

# Accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator(labelCol=isTopN, predictionCol="prediction", metricName="accuracy")
print('RFC accuracy %f' % acc_evaluator.evaluate(rfc_predictions))

Feature importance
[('negative_perc', 0.29978915158226155), ('neutral_perc', 0.12141528213931896), ('positive_perc', 0.3770670154930325), ('tweets_perc', 0.09809414412072545), ('playing_order_norm', 0.10363440666466152)]
RFC AUC: 0.618750
RFC accuracy 0.692308


- Features raw
  - DTC Default: AUC 0.61 - ACC 0.60
  - DTC 100 trees: AUC 0.60 - ACC 0.62

- Features percentages
  - DTC Default: AUC xxx - ACC xxx

- Features percentages + playing order norm
  - DTC 100 trees: AUC 0.82 - ACC 0.69 

In [73]:
rfc_predictions.orderBy(desc('prediction')).show()

+----------+-------+--------------------+-------------+--------------------+--------------------+----------+
|     round|country|            features|isTop10_total|       rawPrediction|         probability|prediction|
+----------+-------+--------------------+-------------+--------------------+--------------------+----------+
|2018_final|    GBR|[0.08034610630407...|            0|[40.2684371184371...|[0.40268437118437...|       1.0|
|2018_final|    ISR|[0.11297898640296...|            1|[18.8761904761904...|[0.18876190476190...|       1.0|
|2018_final|    ITA|[0.01878862793572...|            1|[43.0704535843966...|[0.43070453584396...|       1.0|
|2018_final|    MDA|[0.03584672435105...|            1|[43.7347261539323...|[0.43734726153932...|       1.0|
|2018_final|    CYP|[0.02793572311495...|            1|[23.4833797387020...|[0.23483379738702...|       1.0|
|2018_final|    IRL|[0.02126081582200...|            0|[39.4124545854239...|[0.39412454585423...|       1.0|
|2018_final|    AUT