In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime

In [2]:
import warnings

# Suppress all warnings
warnings.filterwarnings("ignore")

Task 1

In [5]:
spark = SparkSession.builder \
    .master("yarn") \
    .appName("SystemsToolChains") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [6]:
# read data from csv files
from pyspark.sql.functions import lit, monotonically_increasing_id
df_list = list()
## read in all data for male players
for i in range(15, 23):
    file_name = "gs://data_players/data/players_" + str(i) + ".csv"
    df = spark.read.csv(file_name, header=True, inferSchema=True)
    year = 2000 + i
    gender = "Male"
    df = df.withColumn("year", lit(year))
    df = df.withColumn("gender", lit(gender))
    df_list.append(df)
    
## merge all data into one dataframe
df_merged = df_list[0]
for df in df_list[1:]:
    df_merged = df_merged.union(df)
    
## create new column to storage unique id for each piece of data 
df_merged = df_merged.withColumn("record_id", monotonically_increasing_id())

                                                                                

In [7]:
# an example of dataset
df_read = df_merged
df_read.show(1, vertical=True)

24/11/13 03:24:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

In [8]:
from pyspark.sql.functions import col, desc, asc

## Task 2.1
def get_clubs_with_most_players(year, n_club, ending_year):
    df_casted = df_read.withColumn("club_contract_valid_until_int", col("club_contract_valid_until").cast("integer"))
    df_filtered = df_casted.filter((df_casted.year == year) & (df_casted.club_contract_valid_until_int >= ending_year))
    df_grouped = df_filtered.groupBy("club_name").count()
    df_ordered = df_grouped.orderBy(desc("count"))
    df_limited = df_ordered.limit(n_club)
    print(f"the {n_club} clubs with most players in year {year} whose contract ending in or after year {ending_year}")
    df_limited.show()

## Task 2.2 parameter order is in ["highest", "lowest"]
def get_clubs_with_highest_or_lowest_average_age(year, n_club, order):
    original_n = n_club
    df_filtered = df_read.filter(df_read.year == year)
    df_grouped = df_filtered.groupBy("club_name").avg("age")
    if order == "lowest":
        df_ordered = df_grouped.orderBy(asc("avg(age)"))
    elif order == "highest":
        df_ordered = df_grouped.orderBy(desc("avg(age)"))
    if n_club < df_ordered.count():
        while 1:
            if df_ordered.limit(n_club).collect()[-1]["avg(age)"] == df_ordered.limit(n_club+1).collect()[-1]["avg(age)"]:
                n_club += 1
            else:
                break
    df_limited = df_ordered.limit(n_club)
    print(f"the {original_n} clubs with {order} average ages for players in year {year}")
    df_limited.show()

## Task 2.3
def get_most_popular_nationality(year):
    df_filtered = df_read.filter(df_read.year == year)
    df_grouped = df_filtered.groupBy("nationality_name").count()
    df_ordered = df_grouped.orderBy(desc("count"))
    df_limited = df_ordered.limit(1)
    print(f"Most Popular Nationality in Year {year}")
    df_limited.show()

In [9]:
## Test task 2.1 get_clubs_with_most_players(year, n_club, ending_year)
get_clubs_with_most_players(2020, 5, 2024)

the 5 clubs with most players in year 2020 whose contract ending in or after year 2024


[Stage 17:>                                                         (0 + 2) / 2]

+-------------------+-----+
|          club_name|count|
+-------------------+-----+
|Patriotas Boyacá FC|   12|
|   Deportes Iquique|   12|
|          Al Ain FC|   11|
|     Atlético Huila|   11|
|  Alianza Petrolera|   11|
+-------------------+-----+



                                                                                

In [10]:
## Test task 2.2 get_clubs_with_highest_or_lowest_average_age(year, n_club, order)
get_clubs_with_highest_or_lowest_average_age(2020, 3, "highest")

                                                                                

the 3 clubs with highest average ages for players in year 2020
+--------------------+--------+
|           club_name|avg(age)|
+--------------------+--------+
|           Fortaleza|    32.6|
|            Cruzeiro|    31.6|
|            Botafogo|    31.4|
|Associação Chapec...|    31.4|
|Club Athletico Pa...|    31.4|
+--------------------+--------+



In [11]:
## Test task 2.3 get_most_popular_nationality(year)
for year in range(2015, 2023):
    get_most_popular_nationality(year)

Most Popular Nationality in Year 2015
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1627|
+----------------+-----+

Most Popular Nationality in Year 2016
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1519|
+----------------+-----+

Most Popular Nationality in Year 2017
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1627|
+----------------+-----+

Most Popular Nationality in Year 2018
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1633|
+----------------+-----+

Most Popular Nationality in Year 2019
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1625|
+----------------+-----+

Most Popular Nationality in Year 2020
+----------------+-----+
|nationality_name|count|
+----------------+-----+
|         England| 1670|
+----------------+-----+

Most Popular Nat

Task 3

In [12]:
# data preprocession
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# define all useful features
feature_cols_int = ["weak_foot", "skill_moves", "international_reputation", "pace", "shooting", "passing", "dribbling", "defending", "physic", "attacking_crossing", "attacking_finishing", "attacking_heading_accuracy", "attacking_short_passing", "attacking_volleys", "skill_dribbling", "skill_curve", "skill_fk_accuracy", "skill_long_passing", "skill_ball_control", "movement_acceleration", "movement_sprint_speed", "movement_agility", "movement_reactions", "movement_balance", "power_shot_power", "power_jumping", "power_stamina", "power_strength", "power_long_shots", "mentality_aggression", "mentality_interceptions", "mentality_positioning", "mentality_vision", "mentality_penalties", "defending_marking_awareness", "defending_standing_tackle", "defending_sliding_tackle", "goalkeeping_diving", "goalkeeping_handling", "goalkeeping_kicking", "goalkeeping_positioning", "goalkeeping_reflexes", "goalkeeping_speed"]
feature_cols_to_onehot = ["work_rate", "mentality_composure", "ls", "st", "rs", "lw", "lf", "cf", "rf", "rw", "lam", "cam", "ram", "lm", "lcm", "cm", "rcm", "rm", "lwb", "ldm", "cdm", "rdm", "rwb", "lb", "lcb", "cb", "rcb", "rb", "gk"]
outcome_col = ["overall"]
df = df_read.select(feature_cols_int + feature_cols_to_onehot + outcome_col)

In [13]:
# calculate the ratio of null value for each feature
df_count = df.count()
null_counts_df = df.select([(count(when(isnan(col(c)) | col(c).isNull(), c))/df_count*100).alias(c) \
                        for c in df.columns])
null_counts_df.show(vertical=True)

                                                                                

-RECORD 0-----------------------------------------
 weak_foot                   | 0.0                
 skill_moves                 | 0.0                
 international_reputation    | 0.0                
 pace                        | 11.114239261256062 
 shooting                    | 11.114239261256062 
 passing                     | 11.114239261256062 
 dribbling                   | 11.114239261256062 
 defending                   | 11.114239261256062 
 physic                      | 11.114239261256062 
 attacking_crossing          | 0.0                
 attacking_finishing         | 0.0                
 attacking_heading_accuracy  | 0.0                
 attacking_short_passing     | 0.0                
 attacking_volleys           | 0.0                
 skill_dribbling             | 0.0                
 skill_curve                 | 0.0                
 skill_fk_accuracy           | 0.0                
 skill_long_passing          | 0.0                
 skill_ball_control          | 

In [14]:
# drop "goalkeeping_speed" because tremendous missing values, then drop the rest data if there is missing value in each row
df = df.drop("goalkeeping_speed")
feature_cols_int.remove("goalkeeping_speed")
df = df.dropna()

In [15]:
# encode all features and generate final df
for c in (feature_cols_int + outcome_col):
    df = df.withColumn(c+"_encoded", col(c).cast(DoubleType()))
feature_cols_int_to_double = [x+"_encoded" for x in (feature_cols_int + outcome_col)]
feature_cols_index = [x+"_index" for x in feature_cols_to_onehot]
feature_cols_onehot = [x+"_encoded" for x in feature_cols_to_onehot]
indexer = StringIndexer(inputCols=feature_cols_to_onehot, outputCols=feature_cols_index, handleInvalid="keep")
df_index_encoded = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCols=feature_cols_index, outputCols=feature_cols_onehot, handleInvalid="keep")
df_onehot_encoded = encoder.fit(df_index_encoded).transform(df_index_encoded)
assembler = VectorAssembler(inputCols=feature_cols_int_to_double + feature_cols_onehot, outputCol="features", handleInvalid="keep")
df_assembled = assembler.transform(df_onehot_encoded)
df_final = df_assembled.select(["features", "overall_encoded"])

                                                                                

In [16]:
# example cases of df_final
df_final.show(5, vertical=True)



-RECORD 0-------------------------------
 features        | (7993,[0,1,2,3,4,... 
 overall_encoded | 94.0                 
-RECORD 1-------------------------------
 features        | (7993,[0,1,2,3,4,... 
 overall_encoded | 93.0                 
-RECORD 2-------------------------------
 features        | (7993,[0,1,2,3,4,... 
 overall_encoded | 92.0                 
-RECORD 3-------------------------------
 features        | (7993,[0,1,2,3,4,... 
 overall_encoded | 92.0                 
-RECORD 4-------------------------------
 features        | (7993,[0,1,2,3,4,... 
 overall_encoded | 90.0                 
only showing top 5 rows





In [17]:
# there are only 50 different labels, so that the problem can be regarded as classification problem
N_labels = df_final.select("overall_encoded").distinct().count()
print(f"there are {N_labels} distinct labels")



there are 50 distinct labels


                                                                                

In [18]:
# Only randomly choose 2000 rows as the dataset for further training and testing to save time
df_omit = df_final.sample(withReplacement=False, fraction=1.0).limit(10000)
train_data, test_data = df_omit.randomSplit([0.8, 0.2])

In [19]:
# Logistic Regression on spark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
lr = LogisticRegression(featuresCol="features", labelCol="overall_encoded", regParam=0.01, maxIter=50)
lr_model = lr.fit(train_data)
predictions = lr_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="overall_encoded", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"the logistic regression with regParam=0.01 and maxIter=50 achieves accuracy:{100*accuracy}%")

24/11/13 03:25:56 WARN DAGScheduler: Broadcasting large task binary with size 1293.6 KiB
24/11/13 03:26:05 WARN DAGScheduler: Broadcasting large task binary with size 1293.6 KiB
24/11/13 03:26:11 WARN DAGScheduler: Broadcasting large task binary with size 1248.7 KiB
24/11/13 03:26:16 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:26:18 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:26:19 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:26:20 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:26:21 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:26:22 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:26:23 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:26:24 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:26:25 WAR

24/11/13 03:27:12 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:27:13 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:27:14 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:27:16 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:27:17 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:27:18 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:27:18 WARN DAGScheduler: Broadcasting large task binary with size 1249.5 KiB
24/11/13 03:27:22 WARN DAGScheduler: Broadcasting large task binary with size 1293.6 KiB
24/11/13 03:27:36 WARN DAGScheduler: Broadcasting large task binary with size 1293.6 KiB
24/11/13 03:27:50 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
[Stage 200:>                                                        (0 + 1) / 1]

the logistic regression with regParam=0.01 and maxIter=50 achieves accuracy:12.890625%


                                                                                

In [20]:
# Decision Classifier on Spark
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dt = DecisionTreeClassifier(featuresCol="features", labelCol="overall_encoded", maxDepth=30, impurity="entropy")
dt_model = dt.fit(train_data)
predictions = dt_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="overall_encoded", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"when impurity=entropy, maxDepth=30, there is highest accuracy for Decision Tree: {100*accuracy}%")

24/11/13 03:28:02 WARN DAGScheduler: Broadcasting large task binary with size 1293.6 KiB
24/11/13 03:28:09 WARN DAGScheduler: Broadcasting large task binary with size 1293.6 KiB
24/11/13 03:28:15 WARN DAGScheduler: Broadcasting large task binary with size 1246.2 KiB
24/11/13 03:28:17 WARN DAGScheduler: Broadcasting large task binary with size 1293.6 KiB
24/11/13 03:28:23 WARN DAGScheduler: Broadcasting large task binary with size 1247.2 KiB
24/11/13 03:28:23 WARN DAGScheduler: Broadcasting large task binary with size 1247.3 KiB
24/11/13 03:28:24 WARN DAGScheduler: Broadcasting large task binary with size 1250.9 KiB
24/11/13 03:28:27 WARN DAGScheduler: Broadcasting large task binary with size 1554.9 KiB
24/11/13 03:28:36 WARN DAGScheduler: Broadcasting large task binary with size 1557.8 KiB
24/11/13 03:28:44 WARN DAGScheduler: Broadcasting large task binary with size 1562.7 KiB
24/11/13 03:28:52 WARN DAGScheduler: Broadcasting large task binary with size 1572.6 KiB
24/11/13 03:28:59 WAR

when impurity=entropy, maxDepth=30, there is highest accuracy for Decision Tree: 23.4375%


                                                                                

In [21]:
# transform data from spark to tensor
import torch 
from torch import nn
from torch.utils.data import Dataset, DataLoader
train_data_pd = train_data.toPandas()
test_data_pd = test_data.toPandas()
X_train = torch.Tensor(train_data_pd["features"])
y_train = torch.Tensor(train_data_pd["overall_encoded"])
X_test = torch.Tensor(test_data_pd["features"])
y_test = torch.Tensor(test_data_pd["overall_encoded"])

24/11/13 03:31:31 WARN DAGScheduler: Broadcasting large task binary with size 1293.6 KiB
24/11/13 03:31:39 WARN DAGScheduler: Broadcasting large task binary with size 1230.6 KiB
24/11/13 03:31:42 WARN DAGScheduler: Broadcasting large task binary with size 1293.6 KiB
24/11/13 03:31:49 WARN DAGScheduler: Broadcasting large task binary with size 1230.6 KiB
                                                                                

In [22]:
# pytorch neural network 1 without hidden layer
import torch 
from torch import nn
from torch.utils.data import Dataset, DataLoader
N_labels = df_final.select("overall_encoded").distinct().count()
class MyDataset(Dataset):
    def __init__(self,x,y):
        self.x = x
        self.y = y
    def __len__(self):
        return self.x.shape[0]
    def __getitem__(self, idx):
        return (self.x[idx],self.y[idx])

train_dataset = MyDataset(X_train, y_train)
test_dataset = MyDataset(X_test, y_test)

class myModel1(nn.Module):
    def __init__(self, input_dims, output_dims):
        super().__init__()
        self.seq = nn.Sequential(
            nn.Linear(input_dims, output_dims),
        )
    def forward(self, X):
            return self.seq(X)

                                                                                

In [23]:
# model1 structure
model = myModel1(train_dataset.x.shape[1], 1)
print(model)

myModel1(
  (seq): Sequential(
    (0): Linear(in_features=7993, out_features=1, bias=True)
  )
)


In [26]:
lr=0.1
batch_size=500
N_epochs = 50
train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle =True)
model = myModel1(train_dataset.x.shape[1], 1)
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
loss_fn = nn.MSELoss()
for epoch in range(N_epochs):
    for batch_id, (x_batch, y_batch) in enumerate(train_dataloader):
        predictions = model(x_batch)
        loss = loss_fn(predictions, y_batch.reshape(-1, 1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
pred = model(X_test)
accuracy = 100 * torch.sum(torch.round(pred) == y_test.reshape(-1, 1)) / len(y_test)

print(f"when lr=0.1, batch_size=500, there is highest accuracy for Model1: {accuracy}%")

when lr=0.1, batch_size=500, there is highest accuracy for Model1: 22.94921875%


In [27]:
# pytorch neural network 2 with 3 hidden layers, each layer with 128 neurons
class myModel2(nn.Module):
    def __init__(self, input_dims, output_dims):
        super().__init__()
        self.seq = nn.Sequential(
            nn.Linear(input_dims, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, output_dims)
            
        )
    def forward(self, X):
            return self.seq(X)

In [28]:
# model2 structure
model = myModel2(train_dataset.x.shape[1], 1)
print(model)

myModel2(
  (seq): Sequential(
    (0): Linear(in_features=7993, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=128, bias=True)
    (3): ReLU()
    (4): Linear(in_features=128, out_features=128, bias=True)
    (5): ReLU()
    (6): Linear(in_features=128, out_features=1, bias=True)
  )
)


In [29]:
lr=0.005
batch_size=500
N_epochs = 50
train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle =True)
model = myModel2(train_dataset.x.shape[1], 1)
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
loss_fn = nn.MSELoss()
for epoch in range(N_epochs):
    for batch_id, (x_batch, y_batch) in enumerate(train_dataloader):
        predictions = model(x_batch)
        loss = loss_fn(predictions, y_batch.reshape(-1, 1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
pred = model(X_test)
accuracy = 100 * torch.sum(torch.round(pred) == y_test.reshape(-1, 1)) / len(y_test)


print(f"when lr=0.005, batch_size=500, there is highest accuracy: {accuracy}%")

when lr=0.005, batch_size=500, there is highest accuracy: 24.0234375%
