<img width="350" style="float:right" 
     src="https://cdn.i.haymarketmedia.asia/?n=campaign-asia%2Fcontent%2FcroppedF1logo.png&h=570&w=855&q=100&v=20170226&c=1" />

# Formula 1 Analysis Group D


Beltran Ramirez\
Cassady Cook\
Felix Massenet\
Julius Prestin\
Enrique Recke\
Raphael Widmer\
Victoria Roguet

# Sections

* [1. PySpark environment setup](#1)
  * [1.1 Search for Spark Installation](#1.1)  
  * [1.2 Create SparkSession](#1.2)
* [2. Data source and Spark data abstraction (DataFrame) setup](#2)
* [3. Data sets Meta Analysis](#3)
* [4. Tweets Analysis](#4)
  * [4.1 Check Twitter Files](#4.1)
  * [4.2 Create the DataFrame](#4.2)
  * [4.3 General analysis of the Tweets](#4.3)
* [5. GraphFrames](#5)
  * [5.1 Hashtag Graph](#5.1)
  * [5.2 User Mentions Graph](#5.2)
* [6. Machine Learning](#6)
  * [6.1 Feature Transformation](#6.1)
  * [6.2 Model Training & Selection](#6.2)
  * [6.3 Model Predictions](#6.3)

<a id='1'></a>
## 1. PySpark environment setup

To setup our PySpark environment we:
1. Search for Spark Installation
2. Create SparkSession

<a id='1.1'></a>
### 1.1 Search for Spark Installation 
This step is required just because we are working in the course environment.

In [None]:
import findspark
findspark.init()

We change pandas max column width property in order to improve data displaying.

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

<a id='1.2'></a>
### 1.2 Create SparkSession

By setting this environment variable we can include extra libraries in our Spark cluster

In [None]:
# To access catalogue in Hive
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/hive3/lib/hive-hcatalog-core-3.1.2.jar pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "graphframes:graphframes:0.8.2-spark3.2-s_2.12" --jars /opt/hive3/lib/hive-hcatalog-core-3.1.2.jar pyspark-shell'

The first thing always is to create the SparkSession

In [None]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Twitter Analytics - DataFrames")
    .config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
    .config("spark.sql.legacy.timeParserPolicy","LEGACY")
    .enableHiveSupport()
    .getOrCreate())

<a id='2'></a>
## 2. Data source and Spark data abstraction (DataFrame) setup


- In order to get an overview of the industry, we imported one merged file with data from 2014 to 2021.

We choose to infer the schema of the datasets as there is too many columns and datasets.

In [None]:
F1_main = spark.read.option("inferSchema", "true")\
                    .option("header", "true")\
                    .csv("main_f1.csv")\
                    .cache()

<a id='3'></a>
## 3. Data sets Meta Analysis

In [None]:
from IPython.display import display, Markdown
F1_main.printSchema()
display(Markdown("This F1 DataFrame has **%d rows**." % F1_main.count()))

Here we are importing all the libraries for the further analysis.

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import seaborn as sns

<a id='4'></a>
# 4. Tweets Analysis

<a id='4.1'></a>
## 4.1 Check Twitter Files

We checked if our ingested data was saved properly in the following HDFS directory: 

`http://localhost:50070/explorer.html#/datalake/raw/twitter/F1MDA2/*/*/*/*`

Notice that we use `*` since we have stored the data using a date structure of `year/month/day/files`.

Tweet Object Model: https://developer.twitter.com/en/docs/twitter-api/v1/data-dictionary/object-model/tweet

<a id='4.2'></a>
## 4.2 Create the DataFrame

Here we are creating the DataFrames with the stored tweets in order to do some metadata and data inspection to answer the business questions.

In [None]:
tweets = (spark.read
               .json("hdfs://localhost:9000//datalake/raw/twitter/F1MDA2/*/*/*/*"))
tweets.printSchema()

In [None]:
tweets.select("entities").printSchema()

<a id='4.3'></a>
## 4.3 General Analysis of Tweets

**Total number of Tweets**<br/>

In [None]:
tweets.count()

**Total number of distinct users**<br/>

In [None]:
tweets.select("user.id").distinct().count()

**Total number of users with geolocation enabled**<br/>

In [None]:
tweets.where("user.geo_enabled=true").select("user.id").distinct().count()

**Top 10 user locations**<br/>

- Here we tried to find out user location for each tweet but since there are many accounts that disabled geo_location, we filtered out the empty locations to view only those with a location assigned. 

In [None]:
df_location = (tweets
          .where(tweets.user.location != "")
          .groupBy("user.location")
          .agg(F.count("*").alias("tweets"))
          .orderBy(F.desc("tweets")))

df_location.toPandas().head(10)

**Top 10 users with more mentions**<br/>
- Since Ferrari won the first race of the 2022 season, and we continued the ingestion through this first race weekend, we see that Ferrari was mentioned the most times. 

In [None]:
df_users_with_more_mentions = (tweets
          .select(F.explode("entities.user_mentions.screen_name").alias("user"))
          .where((F.lower('user') != 'f1'))
          .groupBy("user")
          .agg(F.count("*").alias("mentions"))
          .orderBy(F.desc("mentions"))
          .limit(10))
df_users_with_more_mentions.toPandas().plot.barh(x = 'user', y = 'mentions', title = 'Mentions per User', figsize=(10,6))

**Tweets per day**<br/>

We wanted to count the number of tweets per day. 

In [None]:
tweets.select((F.split('created_at', " ")[2]).alias('Day'))\
                .groupBy('Day')\
                .agg(F.count("*").alias("total"))\
                .orderBy(F.asc("Day"))\
                .toPandas()

**Top 10 more popular hashtags**<br/>

- We wanted to filter out the **#F1** hashtag, since all the tweets we stored used this hashtag.

In [None]:
df = (tweets
      .select(F.explode("entities.hashtags.text").alias("hashtag"))
      .where((F.upper('hashtag') != 'F1'))
      .groupBy(F.upper("hashtag"))
      .agg(F.count("*").alias("total"))
      .orderBy(F.desc("total"))
      .limit(10))
      
df.toPandas()

<a id='5'></a>
## 5. GraphFrames

<a id='5.1'></a>
## 5.1 Hashtag Graph

- We are going to create a function that returns a new dataframe with the hashtags. <br/> 
- We are just considering the tweets that has more than 3 hashtags.

In [None]:
from pyspark.sql.functions import *
def elements_by_tweets(df, entity):
    return (tweets.where(size(f"entities.{entity}.text") > 3)
                .select("id","text",col(f"entities.{entity}.text").alias("elements")))


Let's find all the tweets with at least two **hashtags**

In [None]:
hashtags_by_tweet = elements_by_tweets(tweets,"hashtags")
hashtags_by_tweet.limit(5).toPandas()

Some tweets have repeated entities and some of them are mixing lower and upper case. Let's fix it

In [None]:
@udf("array<string>")
def upper_case(a):
    if not a:
        return a
    return list(map(lambda x: x.upper(), a))

In [None]:
hashtags_by_tweets_fixed = hashtags_by_tweet.withColumn("hashtags",array_sort(array_distinct(upper_case("elements"))))
hashtags_by_tweets_fixed.limit(5).toPandas()

Let's create all possible elements permutations

In [None]:
from itertools import permutations

@udf("array<array<string>>")
def combine(a):
    return list(permutations(a, 2))


In [None]:
hashtag_pair_by_tweet = hashtags_by_tweets_fixed.select("id",combine("hashtags").alias("permutations"))
hashtag_pair_by_tweet.limit(5).toPandas()

Let's explode the combinations

In [None]:
hashtags = (hashtag_pair_by_tweet
                     .select(explode("permutations").alias("pair"))
                     .select(col("pair")[0].alias("hashtag1"),
                             col("pair")[1].alias("hashtag2")))
                                                       
hashtags.limit(5).toPandas()

Let's create the hashtags dataframes

In [None]:
h_vertices = (hashtags.select("hashtag1").union(hashtags.select("hashtag2"))
           .distinct()           
           .withColumnRenamed("hashtag1","id"))
        
h_edges = (hashtags
            .withColumnRenamed("hashtag1","src")
            .withColumnRenamed("hashtag2","dst")
            .groupBy("src","dst")
            .agg(
                count("*").alias("occurrences")
            ))


In [None]:
h_vertices.count()

In [None]:
h_vertices.limit(5).toPandas()

In [None]:
h_edges.count()

In [None]:
h_edges.limit(5).orderBy(desc("occurrences")).toPandas()

### Create the GraphFrame

We are going to model our graph in the following way:<br/>
**vertices** : hashtags <br/>
**edges** : hashtags pairs aggregation

In [None]:
# These are the extensions that need to be installed for the graphs
#!pip install graphframes
#!pip3 install ipycytoscape
#!jupyter nbextension enable -- ipycytoscape

In [None]:
from graphframes import GraphFrame

hashtag_graph = GraphFrame(h_vertices, h_edges)
hashtag_graph.cache()


### Analytics

#### which are the top 10 most common hashtags pairs?

In [None]:
hashtag_graph.edges.orderBy(desc("occurrences")).limit(10).toPandas()

#### which are the most relevant hashtags?
We are going to apply the Page Ranks algorithm

In [None]:
hashtag_rank = hashtag_graph.pageRank(resetProbability=0.15, maxIter=10)

In [None]:
hashtag_rank.vertices.count()

In [None]:
hashtag_rank.edges.limit(5).orderBy(desc("weight")).toPandas()

#### Visualize the results
Let's visualize the results.<br/>
I'm going to use **ipycytoscape** library 
https://js.cytoscape.org/
```sh
pip3 install ipycytoscape
jupyter nbextension enable --py ipycytoscape
```
You need to restart your VM

### Hashtags Visualization

In [None]:
hashtag_rank.vertices.toPandas()[:5]

In [None]:
hashtag_rank.edges.toPandas()[:5]

In [None]:
import random

hashtags_json = {}

vs = []
for v in hashtag_rank.vertices.collect():
    data = {}
    data['id']=v.id
    data['label']=v.id
    data['pagerank']=v.pagerank
    data['color']="#"+''.join([random.choice('ABCDEF0123456789') for i in range(6)])
    element = {}
    element['data']=data
    vs.append(element)
hashtags_json['nodes'] = vs

es = []
for e in hashtag_rank.edges.collect():
    data = {}
    data['source']=e.src
    data['target']=e.dst
    data['occurrences']=e.occurrences
    data['weight']=e.weight    
    element = {}
    element['data']=data
    es.append(element)
hashtags_json['edges'] = es

In [None]:
import ipycytoscape

cytoscapeobj = ipycytoscape.CytoscapeWidget()
#adds the data
cytoscapeobj.graph.add_graph_from_json(hashtags_json)
#styles the nodes and egdes
cytoscapeobj.set_style([
            {
                'selector': 'node',
                'css': {'label': 'data(label)', 'background-color':'data(color)'}
            },                        
            {
                'selector': 'node[id="F1"]',
                'css': {'background-color': 'orange'}
            },            
            {
                'selector': 'edge',
                'css': {"curve-style":"haystack","haystack-radius":0,"width":2,"opacity":0.5,"line-color":"#a8eae5"}
            }    
            ])

cytoscapeobj

### Graph Export

If we need to export the graph we can use the datasources API to create the files

In [None]:
h_vertices.coalesce(1).write.mode("overwrite").option("header","true").csv("hdfs://localhost:9000/export/hashtag_vertices/")
h_edges.coalesce(1).withColumnRenamed("src","Source")\
                   .withColumnRenamed("dst","Target")\
                   .write.mode("overwrite").option("header","true").csv("hdfs://localhost:9000/export/hashtag_edges/")

We can use this files with tools like <a href='https://gephi.org/'>Gephi</a> to create stunning graph visualizations like this:

We are going to model our graph in the following way: <br/>
**vertices** : tweet location <br/>
**edges** : twitter account location

<a id='5.2'></a>
## 5.2 User Mention Graph

- We are going to create a function that returns a new dataframe with the user mentions. <br/> 
- We are just considering the tweets that has more than 4 user mentions.

In [None]:
def user_mentions(df, entity):
    return (tweets.where((size(f"entities.{entity}") > 4))
                 .select("id","text",col(f"entities.{entity}.name").alias("users_mentions")))

In [None]:
user = user_mentions(tweets, 'user_mentions')
user.limit(5).toPandas()

In [None]:
from itertools import permutations

@udf("array<array<string>>")
def combine(a):
    return list(permutations(a, 2))

Now we are going to show all the paired permutations

In [None]:
pair_users = user.select('id', combine('users_mentions').alias('permutations'))
pair_users.limit(5).toPandas()

We created a DataFrame with all the exploded permutations

In [None]:
users = (pair_users
         .select(explode("permutations").alias('pair'))
        .select(col("pair")[0].alias('user_1'), col('pair')[1].alias('user_2')))

users.limit(5).toPandas()

We will set up the vertices and edges

In [None]:
u_vertices = (users.select('user_1').union(users.select('user_2'))
             .distinct()
             .withColumnRenamed('user_1', 'id'))

u_edges = (users
          .withColumnRenamed('user_1', 'src')
          .withColumnRenamed('user_2', 'dst')
          .groupBy('src', 'dst')
          .agg(
              count('*').alias('occurrences')))

In [None]:
u_vertices.count()

In [None]:
u_edges.count()

In [None]:
users_graph = GraphFrame(u_vertices, u_edges)
users_graph.cache()

Now we looked at the top 30 connected users

In [None]:
users_graph.edges.orderBy(desc('occurrences')).limit(30).toPandas()

In [None]:
users_rank = users_graph.pageRank(resetProbability = 0.15, maxIter = 10)

In [None]:
users_rank.vertices.orderBy(desc('pagerank')).limit(5).toPandas()

Graph Visualization

In [None]:
vs = []
for v in users_rank.vertices.collect():
#for v in hv.collect():
    data = {}
    data['id']=v.id
    data['label']=v.id
    data['pagerank']=v.pagerank
    data['color']="#"+''.join([random.choice('ABCDEF0123456789') for i in range(6)])
    element = {}
    element['data']=data
    vs.append(element)
hashtags_json['nodes'] = vs

es = []
for e in users_rank.edges.collect():
#for e in he():
    data = {}
    data['source']=e.src
    data['target']=e.dst
    data['occurrences']=e.occurrences
    data['weight']=e.weight    
    element = {}
    element['data']=data
    es.append(element)
hashtags_json['edges'] = es

In [None]:
cytoscapeobj = ipycytoscape.CytoscapeWidget()
#adds the data
cytoscapeobj.graph.add_graph_from_json(hashtags_json)
#styles the nodes and egdes
cytoscapeobj.set_style([
            {
                'selector': 'node',
                'css': {'label': 'data(label)', 'background-color':'data(color)'}
            },                        
            {
                'selector': 'node[id="Formula1"]',
                'css': {'background-color': 'orange'}
            },            
            {
                'selector': 'edge',
                'css': {"curve-style":"haystack","haystack-radius":0,"width":2,"opacity":0.5,"line-color":"#a8eae5"}
            }    
            ])

cytoscapeobj

Graph Export User Mentions

In [None]:
u_vertices.coalesce(1).write.mode("overwrite").option("header","true").csv("hdfs://localhost:9000/export/users_vertices/")
u_edges.coalesce(1).withColumnRenamed("src","Source")\
                   .withColumnRenamed("dst","Target")\
                   .write.mode("overwrite").option("header","true").csv("hdfs://localhost:9000/export/users_edges/")

<a id='6'></a>
## 6 Machine Learning

With the F1_main data we are now creating a model to predict the points for each driver at the end of the 2021 season. <br/>
We will validate our model with the real 2021 data since the season has already ended.

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, CountVectorizer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml import *
from pyspark.sql.functions import *
import copy
from pyspark.sql.types import *
import numpy as np
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LinearSVC

In [None]:
F1_main.dtypes

In [None]:
F1_main = F1_main.withColumn('finish_position', F1_main.finish_position.cast('int'))

In [None]:
F1_main = F1_main.withColumn('winner', \
                                   when(F1_main.finish_position > 1.0, 0).when(F1_main.finish_position == 1.0, 1))

In [None]:
F1_main.limit(1).toPandas().T

First we label our target variable

In [None]:
label_columns = 'points'

<a id='6.1'></a>
## 6.1 Feature Transformation

Now we separate into categorical and numerical columns

In [None]:
cat_cols = [field for (field, dataType)\
                   in F1_main.dtypes\
                   if (((dataType == "string") | (dataType == 'int')\
                       | (dataType == 'double')) \
                       & (field in ('direction', 'country', 'locality', 'type', 'season', 'round', 'qual_position', 'grid', 'race_name')))]



num_cols = [field for (field, dataType) in F1_main.dtypes\
               if (((dataType == "double") | (dataType == 'int'))\
                   & (field in ('q_mean', 'q_best', 'q_worst', 'length', 'ageDuringRace', 'finish_position', 'filled_splits')))]


print (f"categorical columns: {cat_cols}")
print (f"numerical columns: {num_cols}")

In [None]:
indexers = [StringIndexer(inputCol = c, outputCol="{0}_indexed".format(c)) for c in cat_cols]

We need to encode the categorical columns

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
outputCol=["{0}_indexed".format(c) for c in cat_cols]
outputCol_enc = ["{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers ]
stringIndexer = StringIndexer(inputCols=cat_cols, outputCols=outputCol, handleInvalid="skip")

oheEncoder = OneHotEncoder(inputCols=outputCol,outputCols=outputCol_enc)

We created a vector for the encoded categorical columns

In [None]:
in_col_assembler = [e for e in outputCol_enc]

cat_col_assembler = VectorAssembler(inputCols = in_col_assembler, outputCol = 'categorical')

in_col_assembler

We create a pipeline for the categorical columns and fit it to the F1 main Dataset

In [None]:
pipeline_cat = Pipeline(stages = [stringIndexer, oheEncoder, cat_col_assembler])
F1_main = pipeline_cat.fit(F1_main).transform(F1_main)

We need to scale the numerical columns

In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

from pyspark.ml import Pipeline

columns_to_scale = ['q_best', 'q_worst', 'q_mean', 'ageDuringRace', 'length', 'filled_splits', 'finish_position']
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(F1_main)
#scaledData = scalerModel.transform(train)

F1_main = scalerModel.transform(F1_main)
F1_main.columns

In [None]:
in_num_assembler = ['q_best_scaled',
 'q_worst_scaled',
 'q_mean_scaled',
 'ageDuringRace_scaled',
 'length_scaled',
 'filled_splits_scaled',
 'finish_position_scaled']

in_num_assembler

Now we create a vector for the scaled numerical columns

In [None]:
assemblerNum = VectorAssembler(inputCols = in_num_assembler, outputCol = "num")

We create a Pipeline for the numerical columns and fit it to the F1 main Dataset

In [None]:
pipelineNum = Pipeline(stages = [assemblerNum])
F1_main = pipelineNum.fit(F1_main).transform(F1_main)

Finally we create an additional vector assembler for both categorical and numerical columns that outputs are features

In [None]:
assembler = VectorAssembler(inputCols = ["categorical", "num"], outputCol = "features")

<a id='6.2'></a>
## 6.2 Model Training & Selection

Now we split the Dataset to Train and Test

In [None]:
train = F1_main.filter(F1_main.season < 2021)
test = F1_main.filter(F1_main.season == 2021)



Now we are going to test different algorithms and we will choose the one with the best evaluator score for our data

In [None]:
l = LinearRegression(labelCol = 'points', featuresCol = 'features')
lr = LogisticRegression(labelCol="points", featuresCol="features")
nb = NaiveBayes(labelCol="points", featuresCol="features")
svm = LinearSVC(labelCol="points", featuresCol="features")
rfr = RandomForestRegressor(labelCol= 'points', featuresCol = 'features', numTrees=2000)

**Linear regression**

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
l_pipeline = Pipeline(stages = [assembler,l])
l_model = l_pipeline.fit(train)
l_evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'points', metricName = 'r2')

In [None]:
prediction_l = l_model.transform(test)
r2_l = l_evaluator.evaluate(prediction_l)
r2_l

**Logistic regression**

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
lr_pipeline = Pipeline(stages = [assembler, lr])
lr_model = lr_pipeline.fit(train)
lr_evaluator = MulticlassClassificationEvaluator(labelCol="points",  metricName="accuracy")

In [None]:
prediction_lr = lr_model.transform(test)
accuracy_lr = lr_evaluator.evaluate(prediction_lr)
accuracy_lr

**Naive Bayes**

In [None]:
nb_pipeline = Pipeline(stages = [assembler,nb])
nb_model = nb_pipeline.fit(train)
evaluator_nb = MulticlassClassificationEvaluator(labelCol="points",  metricName="accuracy")

In [None]:
prediction_nb = nb_model.transform(test)
accuracy_nb = evaluator_nb.evaluate(prediction_nb)
accuracy_nb

**Random Forest**

In [None]:
rf_pipeline = Pipeline(stages = [assembler, rfr])
rf_model = rf_pipeline.fit(train)
evaluator_rf = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'points', metricName = 'r2')

In [None]:
prediction_rf = rf_model.transform(test)
accuracy_rf = evaluator_rf.evaluate(prediction_rf)
accuracy_rf

We are selecting the linear regression and random forest resgressor as our models

<a id='6.3'></a>
## 6.3 Model Predictions

Now that we run 4 models we are going to show the predictions for linear regression and random forest

Linear regression predictions

In [None]:
prediction_l.groupby('name').agg(sum('prediction').alias('predicted_points')).sort(desc('predicted_points')).toPandas()

Random Forest Predictions

In [None]:
prediction_rf.groupby('name').agg(sum('prediction').alias('predicted_points')).sort(desc('predicted_points')).toPandas()