# Big Data FC

The goal of the **Big Data FC** project is to **predict** how many **points** a **football team** belonging to the main European football leagues will end the season with, according to the **characteristics of its players**.

To reach the goal, data relative to the **football players** will first be loaded, in order to then compose the **football teams**.
After that, a second dataset will be used to gather seasonal **rankings**, for every football team.

The project as a whole is composed of:

* This **notebook**, containing all steps of:
  * Data loading.
  * Data cleaning and pre-processing
  * Data visualization.
  * Data analysis.
  * Learning and evaluation.
* A custom [**scraper**](https://github.com/Big-Data-FC/scraper), to gather further players data.
* A set of [**REST APIs**](https://github.com/Big-Data-FC/api) to query the loaded data and the prediction model.
* The collection of [scraped datasets](https://github.com/Big-Data-FC/datasets).

During the project, multiple approaches and techniques were explored and described in this notebook.

The notebook follows the thinking flow that happened during development stage:
1. Notebook **set-up** and **configuration**
2. Data **loading** and **pre-processing**
3. Preliminary data **exploration**
4. **Multiple** **learning** attempts:
   1. Naive
   2. Dimensionality reduction
   3. Learning-produced features (via Clustering)
   4. Prior-based approach (RP coefficient)
5. Final observations and **conclusion**

_By [Daniele Solombrino](https://github.com/dansolombrino) and [Davide Quaranta](https://github.com/fortym2)._

# Notebook configuration, global parameters and utility functions

Execution environment settings.

In [None]:
# Set ON_COLAB to True if the notebook is to be executed on Google Colab,
# or generally in an environment in which PySpark needs to be installed and configured.
ON_COLAB = True

# Set DOWNLOAD_DATA to True to download and extract datasets and previously trained models.
# It is needed to run the notebook without re-training and evaluating every model;
# instead, training models and evaluation results will be read from disk.
DOWNLOAD_DATA = True

In [None]:
if ON_COLAB:
  # need to install pyspark
  import os
  %pip install pyspark
  !apt install openjdk-8-jdk-headless -qq
  %pip install ipympl
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

if DOWNLOAD_DATA:
  # download datasets, previously trained models and evaluations
  %pip install gdown
  !gdown 1BzAg47HS0sr034D_7-uMvKZXQZ2LKEXZ -O datasets.tar.xz
  !gdown 1CESXCxPbXFokZbqb7WxyWT3e_VTyNgyN -O trained_models.tar.xz
  !gdown 1oXr8HXMusO7erDjs2TiJMwqIJ0wnPN5K -O evaluation_results.tar.xz

  !mkdir data
  !tar -xvf datasets.tar.xz -C data
  !tar -xvf trained_models.tar.xz
  !tar -xvf evaluation_results.tar.xz

The following section contains some global constants and structures needed across the whole notebook.

In [None]:
MAX_K_CLUSTERS = 66
MAX_ITER = 20

k_range = range(2, MAX_K_CLUSTERS, 4)
K_RANGE = [str(k) for k in k_range]

The `rp_tradeoff` is a concept explored in the Attempt 4.

In [None]:
ADD_RP_TRADEOFF = [0.5, 1, 2, 4, 8]

The following cell contains a dictionary of the path of each model that has been trained and persisted, in order to make the notebook's execution feasable in a reasonable amount of time.

In [None]:
# list of Learning attempts
attempts = range(1, 5)

# learning trained model directories 
TRAINED_MODELS_DIRS = {
    "Attempt " + str(k) : {
        "Regression": {
            "Linear Regression": f"trained_models/attempt_{k}/regression/linear_regression",
            "Prediction Tree": f"trained_models/attempt_{k}/regression/prediction_tree",
            "Gradient Boosted Tree": f"trained_models/attempt_{k}/regression/gradient_boosted_tree",
            "Random Forest": f"trained_models/attempt_{k}/regression/random_forest",
        },
        "Classification": {
            "SVM": f"trained_models/attempt_{k}/classification/svm",
            "Decision Tree": f"trained_models/attempt_{k}/classification/decision_tree",
            "Logistic Regression": f"trained_models/attempt_{k}/classification/logistic_regression",
            "Random Forest": f"trained_models/attempt_{k}/classification/random_forest",
            "MLP": f"trained_models/attempt_{k}/classification/mlp",
        }
    } for k in attempts
}

TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Linear Regression multiple k"] = {
    k : f"trained_models/attempt_3/regression/linear_regression_multiple_k/{k}_clusters" for k in K_RANGE
}

TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Linear Regression multiple tradeoffs"] = {
    tradeoff : f"trained_models/attempt_4/regression/linear_regression_multiple_tradeoffs/tradeoff_{tradeoff}" for tradeoff in ADD_RP_TRADEOFF
}

CLUSTERING_DF_PATH = "./trained_models/clustering_df.parquet"
CLUSTERING_EVAL_PATH = "./evaluation_results/attempt_3/clustering/clustering_eval.json"


In [None]:
def model_exists(model_path):
    return os.path.isdir(model_path)

In [None]:
def load_model_from_disk(model_type, model_path):
    return model_type.load(model_path)

## Global imports

In [None]:
#  PySpark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

import seaborn as sns
import matplotlib.pyplot as plt

import pandas as pd
import numpy as np

from functools import reduce

import gc
import builtins
import operator
import json
import pprint
import json

## ML imports

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

from pyspark.ml.regression import Regressor
from pyspark.ml.regression import LinearRegression, LinearRegressionModel, LinearRegressionTrainingSummary
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel
from pyspark.ml.regression import RegressionModel

from pyspark.ml.classification import Classifier
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import RandomForestClassificationTrainingSummary
from pyspark.ml.classification import MultilayerPerceptronClassificationSummary
from pyspark.ml.classification import ClassificationModel
from pyspark.ml.classification import MultilayerPerceptronClassificationModel

from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import DenseVector, Vectors, VectorUDT

## PySpark 

In [None]:
conf = (
    SparkConf()
    .set("spark.ui.port", "4050")
    .set("spark.executor.memory", "4G")
    .set("spark.driver.memory", "8G")
    .set("spark.driver.maxResultSize", "8G")
)

sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

spark = SparkSession.builder.getOrCreate()

## Utilities

The random seed is used for reproducibility.

In [None]:
random_seed = 97

### Plotting utility functions

In [None]:
def scatter_plot(
    x,
    y,
    x_label,
    y_label,
    title="",
    c=None,
    c_map=plt.cm.get_cmap("tab10"),
    figsize=(12,8),
    ):
    
    fig, ax = plt.subplots(1, 1, figsize=figsize)

    _ = plt.scatter(
        x=x,
        y=y,
        c=y if c is None else c,
        edgecolor="none",
        cmap=c_map,
        axes=ax,
    )

    _ = ax.set_xlabel(x_label, labelpad=20, fontsize=16)
    _ = ax.set_ylabel(y_label, fontsize=16)
    _ = ax.set_title(title)

    plt.colorbar()
    plt.show()

In [None]:
def plot_feature_target_relation(
    data, x, y, n_cols=2, figsize=(15, 30), color="#000000"
):

    n_rows = int(len(x) / n_cols) if len(x) >= n_cols else n_cols
    fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)

    for x_ind, x_value in enumerate(x):
        ax = sns.regplot(
            data=data,
            x=x_value,
            y=y,
            color = color,
            ax=axes[x_ind // n_cols, x_ind % n_cols] if n_rows > 1 else axes,
        )


    fig.tight_layout()

In [None]:
def plot_feature_distribution(
    data,
    features,
    figsize=(4,4), 
    color="#000000",
    n_cols=2
):

    n_rows = int(len(features) / n_cols) if len(features) >= n_cols else n_cols
    fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)

    for feature_ind, feature in enumerate(features):
        _ = sns.histplot(
            data[feature],
            kde=True,
            color=color,
            facecolor=color,
            ax=axes[feature_ind // n_cols, feature_ind % n_cols] if n_rows > 1 else axes,
        )

    fig.tight_layout(pad=1.5)

In [None]:
def plot_correlation_matrix(
    data, features, title="Pearson Correlation Matrix", figsize=(16,12)
):

    mask = np.zeros_like(data[features].corr(), dtype=bool)
    mask[np.triu_indices_from(mask)] = True

    with sns.axes_style("white"):  # Temporarily set the background to white
        fig, ax = plt.subplots(figsize=figsize)
        plt.title(title, fontsize=24)

        cmap = sns.diverging_palette(220, 10, as_cmap=True)

        _ = sns.heatmap(
            data[features].corr(),
            linewidths=0.25,
            vmax=0.7,
            square=True,
            ax=ax,
            cmap=cmap,
            linecolor="w",
            annot=True,
            annot_kws={"size": 8},
            mask=mask,
            cbar_kws={"shrink": 0.9},
        )

### Learning utility functions

In [None]:
def get_evaluators(estimator, label_col, prediction_col, evaluation_metrics):
    evaluators = dict()

    if isinstance(estimator, Regressor) or isinstance(estimator, RegressionModel):
        evaluators = {
            metric: RegressionEvaluator(
                labelCol=label_col,
                predictionCol=prediction_col,
                metricName=metric,
            )
            for metric in evaluation_metrics
        }
    elif isinstance(estimator, Classifier) or isinstance(estimator, OneVsRestModel) or isinstance(estimator, ClassificationModel):
        evaluators = {
            metric: MulticlassClassificationEvaluator(
                labelCol=label_col,
                predictionCol=prediction_col,
                metricName=metric,
            )
            for metric in evaluation_metrics
        }
    else:
        raise Exception("Unexpected estimator, got" + str(type(estimator)))

    return evaluators


In [None]:
def learn_best_model(
    estimator, 
    param_grid,
    evaluator_cv
):
    cross_validator = CrossValidator(
        estimator=estimator,
        estimatorParamMaps=param_grid,
        evaluator=evaluator_cv,
        numFolds=5,
        collectSubModels=False
    )
    
    cross_validated_model = cross_validator.fit(train_df)

    return cross_validated_model

In [None]:
def evaluate_learning_models(
    best_model, 
    evaluators,
    save_training_result_path=None
):
    has_summary = True
    
    out_dict = {
        "train_set_evaluation": dict(),
        "test_set_evaluation": dict()
    }

    try:
        has_summary = best_model.hasSummary
    except AttributeError as e:
        # since the only models that have summary
        # have the hasSummary field,
        # it is needed to catch this exception
        has_summary = False
    
    if has_summary:
        print("Summary available, retrieving data...")

        if isinstance(
            best_model, LinearRegressionModel
        ) or isinstance(best_model, RandomForestClassificationModel):
            training_result = best_model.summary
        elif isinstance(best_model, MultilayerPerceptronClassificationModel):
            training_result = best_model.summary()

        if any(
            x in type(best_model).__name__ for x in [
                "LinearRegression", 
                "DecisionTreeRegressor", 
                "GBTRegressor",
                "RandomForestRegressor"
            ]
        ):
            out_dict["train_set_evaluation"]["r2"] = training_result.r2
            out_dict["train_set_evaluation"]["r2adj"] = training_result.r2adj
            out_dict["train_set_evaluation"]["meanSquaredError"] = training_result.meanSquaredError
            out_dict["train_set_evaluation"]["meanAbsoluteError"] = training_result.meanAbsoluteError
            out_dict["train_set_evaluation"]["rootMeanSquaredError"] = training_result.rootMeanSquaredError
            out_dict["train_set_evaluation"]["explainedVariance"] = training_result.explainedVariance
        else:
            out_dict["train_set_evaluation"]["accuracy"] = training_result.accuracy
            out_dict["train_set_evaluation"]["falsePositiveRateByLabel"] = training_result.falsePositiveRateByLabel
            out_dict["train_set_evaluation"]["precisionByLabel"] = training_result.precisionByLabel
            out_dict["train_set_evaluation"]["recallByLabel"] = training_result.recallByLabel
            out_dict["train_set_evaluation"]["truePositiveRateByLabel"] = training_result.truePositiveRateByLabel
            out_dict["train_set_evaluation"]["weightedFalsePositiveRate"] = training_result.weightedFalsePositiveRate
            out_dict["train_set_evaluation"]["weightedPrecision"] = training_result.weightedPrecision
            out_dict["train_set_evaluation"]["weightedRecall"] = training_result.weightedRecall
            out_dict["train_set_evaluation"]["weightedTruePositiveRate"] = training_result.weightedTruePositiveRate
            out_dict["train_set_evaluation"]["fMeasureByLabel"] = training_result.fMeasureByLabel()
            out_dict["train_set_evaluation"]["weightedFMeasure"] = training_result.weightedFMeasure()

        predictions = best_model.transform(test_df)
                    
        for e, evaluator in evaluators.items():
            if save_training_result_path is not None:
                out_dict["test_set_evaluation"][evaluator.getMetricName()] = evaluator.evaluate(predictions)
    
    else: # no summary available
        print("No summary available, using custom evaluation procedure...")
        for stage_name, stage_df in zip(["Train", "Test"], [train_df, test_df]):
            predictions = best_model.transform(stage_df)

            for e, evaluator in evaluators.items():
                if save_training_result_path is not None:
                    out_dict[f"{stage_name.lower()}_set_evaluation"][evaluator.getMetricName()] = evaluator.evaluate(predictions)

    if save_training_result_path is not None:
        os.makedirs(
            os.path.join(
                *save_training_result_path.split("/")[:-1]
            ), 
            exist_ok=True
        ) 
        
        with open(save_training_result_path, "w", encoding="utf8") as f:
            f.write(json.dumps(out_dict))
    
    pprint.pprint(out_dict)

    return out_dict


In [None]:
def print_model_evaluation(model_evaluation_path):
    with open(model_evaluation_path) as json_file:
        model_evaluation = json.load(json_file)
        
        pprint.pprint(model_evaluation)
        return model_evaluation

In [None]:
from pyspark.ml.feature import PCA

PCA_NUM_COMPONENTS = 2

def perform_pca(df, num_components, input_col, output_col):
    pca = PCA(
        k=num_components, 
        inputCol=input_col, 
        outputCol=output_col
    )
    pca_model = pca.fit(df)

    return pca_model.transform(df), pca_model

In [None]:
def plot_pca_explained_variance(pca_model, num_components_to_plot=2, figsize=(8,6)):
    fig, ax = plt.subplots(1, 1, figsize=figsize)
    _ = sns.barplot(
        x=[i for i in range(num_components_to_plot)],
        y=pca_model.explainedVariance.values[0:num_components_to_plot],
        ax=ax,
        palette="summer"
    )

    _ = ax.set_xlabel("Eigenvalues", labelpad=16, fontsize=16)
    _ = ax.set_ylabel("Proportion of Variance", fontsize=16)
    _ = ax.set_xticklabels(
        [f"Principal Component {i}" for i in range(num_components_to_plot)], 
        rotation=0
    )
    _ = ax.set_title("Explained variance of each Principal Component")

## Learning constants and configuration

In [None]:
REGRESSION_EVALUATION_METRICS = ["r2", "mse", "rmse", "mae", "var"]
REGRESSION_EVALUATION_METRIC_CV = "r2"

REGRESSION_LABEL_COL = "points"

regression_evaluator_cv = RegressionEvaluator(
    metricName=REGRESSION_EVALUATION_METRIC_CV
)

In [None]:
CLASSIFICATION_EVALUATION_METRICS = [
    "f1",
    "accuracy",
    "weightedPrecision",
    "weightedRecall",
    "weightedTruePositiveRate",
    "weightedFalsePositiveRate",
    "weightedFMeasure",
    "truePositiveRateByLabel",
    "falsePositiveRateByLabel",
    "precisionByLabel",
    "recallByLabel",
    "fMeasureByLabel"
]

CLASSIFICATION_EVALUATION_METRIC_CV = "accuracy"

CLASSIFICATION_LABEL_COL = "macro_place"

classification_evaluator_cv = MulticlassClassificationEvaluator(
    metricName=CLASSIFICATION_EVALUATION_METRIC_CV
)

# Loading football players

First step is to load football players data, which comes from to two different sources:

* For seasons between 2015 and 2020 (called "modern"): [FIFA 15-21 complete dataset](https://www.kaggle.com/datasets/stefanoleone992/fifa-21-complete-player-dataset)

* For season between 2007 and 2014 (called "legacy"): scraped data from [sofifa.com](https://sofifa.com), a website specialized in storing data taken from EA Sports FIFA games.

As introduced before, scraped datasets are committed in a [GitHub repository](https://github.com/Big-Data-FC/datasets).

From now on, the terms **modern** and **legacy** will be used to refer to the two kinds of datasets.

Initially, modern and legacy data will be splitted in two different dataframes, since there are some differences in the structure of their data, among these different years.

In [None]:
modern_df = spark.read.csv(
    "data/players_*.csv", sep=",", inferSchema=True, header=True, multiLine=True
)

legacy_df = spark.read.csv(
    "data/scraped_players_*.csv", sep=",", inferSchema=True, header=True, 
    multiLine=True
)

# Pre-processing football players

In order to focus the project on the major European leagues, it is useful to define a list of leagues to filter, and also to define a list of season to easily discriminate between modern an legacy data.

In [None]:
# These are the European Leagues supported by Big-Data-FC
leagues = [
    "Italian Serie A",
    "Spain Primera Division",
    "German 1. Bundesliga",
    "French Ligue 1",
    "English Premier League",
    "Holland Eredivisie",
]

# These are the seasons supported by Big-Data-FC
seasons_modern = ["20", "19", "18", "17", "16", "15", "14"] 
seasons_legacy = ["13", "12", "11", "10", "09", "08", "07"]

seasons = seasons_legacy + seasons_modern

The next definition is about **macro roles**, which is a custom-defined abstration to **aggregate affine football roles**.

For example, all the midfield roles such as "central midfielder", "advanced midfielder", "left|right wing" can be **grouped together** in the same macro role "midfielder".

Macro roles will be used later on, in a subsequent learning phase. For this reason, much more about them will be touched in future points.

The following cell defines the actual aggregation from FIFA roles abbreviations into macro roles.

In [None]:
macro_roles = ["0.0", "1.0", "2.0", "3.0"]

roles_to_macro_roles_dict = {
    "GK": "0",
    "SW": "1",
    "LB": "1",
    "RB": "1",
    "RWB": "1",
    "LWB": "1",
    "CB": "1",
    "CDM": "2",
    "CM": "2",
    "RM": "2",
    "LM": "2",
    "CAM": "2",
    "RW": "3",
    "LW": "3",
    "ST": "3",
    "LF": "3",
    "RF": "3",
    "CF": "3",
}

NUM_MACRO_ROLES = 4

roles_to_macro_role_UDF = udf(
    lambda roles: float(
        roles_to_macro_roles_dict[roles.split(",")[0]]
    ), 
    StringType()
)

Since the columns associeted to players also contain graphical or data that is generally not informative for the project's purpose (such as shirt number, celebration moves, etc), a list of meaningful columns has been defined, on which the actual working dataframes will be based on.

In [None]:
columns = [
    "short_name",
    "club_name",
    "league_name",
    "season",
    "player_positions",
    "macro_role",
    "overall",
    "value",
    "pace",
    "shooting",
    "passing",
    "dribbling",
    "defending",
    "physic",
    "attacking_crossing",
    "attacking_finishing",
    "attacking_heading_accuracy",
    "attacking_short_passing",
    "skill_dribbling",
    "skill_fk_accuracy",
    "skill_long_passing",
    "skill_ball_control",
    "movement_acceleration",
    "movement_sprint_speed",
    "movement_reactions",
    "power_shot_power",
    "power_stamina",
    "power_strength",
    "power_long_shots",
    "mentality_aggression",
    "mentality_penalties",
    "defending_standing_tackle"
]

Datasets do **not** explicitly include the **year** (season) to which the record refers to.

Rather, this information is implicitly stored in a URL (also for the non-scraped ones, which still originate from the same source), which has its own field.
For this reason, a function to extract such information from aforementioned field is needed.

As an example, a if the URL is `/player/41236/zlatan-ibrahimovic/130034/`, the corresponding season is `13` (from `/13xxxx/`).

In [None]:
def get_season(url):
    url_split = url.split("/")

    # FIFA years must be scaled by a negative factor of one (i.e. 2021 has to be 2020, etc.)
    # This is needed to ensure compatibility with the seasonal score dataset
    return str(
        (int(url_split[-2 if url_split[-1] == "" else -1][0:2]) - 1)
    ).zfill(2)

get_season_UDF = udf(lambda url: get_season(url), StringType())

The format of the **monetary value** of players is different among the modern and legacy dataset.

Specifically, the legacy one abbreviates the values into the form `€10M` to represent `€10000000`.

The following function is used to convert it into the extended one.

In [None]:
@udf
def expand_value_UDF(value):
    value = value.replace("€", "")
    if value[-1] not in ("K", "M"):
        # no abbreviation at the end
        return float(value) + 0.0000001

    # extract the number and the unit
    num = value[:-1]
    unit = value[-1]

    # decide based on the unit
    if unit == "M":
        return float(num) * 1000000
    if unit == "K":
        return float(num) * 1000

    return "ERROR"

Now to the actual **pre-processing**.

Some actions are needed by legacy and modern both, whilst other are exclusive to either one.

In [None]:
# Extracting season from the player URL, as per previous cell
pre_processed_modern_df = modern_df.withColumn(
    "season", get_season_UDF(col("player_url"))
)
pre_processed_legacy_df = legacy_df.withColumn(
    "season", get_season_UDF(col("player_url"))
)

# Taking only the players playing for teams in supported Leagues, 
# in the supported seasons
pre_processed_modern_df = pre_processed_modern_df.where(
    (pre_processed_modern_df.league_name.isin(leagues))
    &
    (pre_processed_modern_df.season.isin(seasons_modern))
)
pre_processed_legacy_df = pre_processed_legacy_df.where(
    (pre_processed_legacy_df.league_name.isin(leagues))
    &
    (pre_processed_legacy_df.season.isin(seasons_legacy))
)

# Dropping duplicate players
pre_processed_modern_df = pre_processed_modern_df.dropDuplicates(["player_url"])
pre_processed_legacy_df = pre_processed_legacy_df.dropDuplicates(["player_url"])

# Selected columns have been checked for absence of null/missing data.
# Nevertheless, to ensure compatibility and reusability with other datasets, 
# a null-filling sweep is done
pre_processed_modern_df = pre_processed_modern_df.na.fill(0)
pre_processed_legacy_df = pre_processed_legacy_df.na.fill(0)

# Getting the macro role of the player, according to its field position
pre_processed_modern_df = pre_processed_modern_df.withColumn(
    "macro_role", roles_to_macro_role_UDF(col("player_positions"))
)
pre_processed_legacy_df = pre_processed_legacy_df.withColumn(
    "macro_role", roles_to_macro_role_UDF(col("player_positions"))
)

# Renaming the "value_eur" field to "value" to have compatiblity with legacy
pre_processed_modern_df = pre_processed_modern_df.withColumnRenamed(
    "value_eur", "value"
)

# Convert the monetary value to have compatibility with modern
pre_processed_legacy_df = pre_processed_legacy_df.withColumn(
    "value", expand_value_UDF(col("value"))
)

# Renaming some legacy columns, so as they have the same name as in the modern
pre_processed_legacy_df = pre_processed_legacy_df.withColumnRenamed(
    "pas", "passing"
)
pre_processed_legacy_df = pre_processed_legacy_df.withColumnRenamed(
    "dri", "dribbling"
)
pre_processed_legacy_df = pre_processed_legacy_df.drop(col("defending"))
pre_processed_legacy_df = pre_processed_legacy_df.withColumnRenamed(
    "def", "defending"
)
pre_processed_legacy_df = pre_processed_legacy_df.withColumnRenamed(
    "phy", "physic"
)

pre_processed_legacy_df = pre_processed_legacy_df.withColumnRenamed(
    "sho", "shooting"
)
pre_processed_legacy_df = pre_processed_legacy_df.withColumnRenamed(
    "pac", "pace"
)
pre_processed_legacy_df = pre_processed_legacy_df.withColumnRenamed(
    "bov", "overall"
)

# Keeping only the needed columns.
pre_processed_modern_df = pre_processed_modern_df.select(columns)
pre_processed_legacy_df = pre_processed_legacy_df.select(columns)

**Checking** whether some **monetary values** have not been successfully converted.

In [None]:
if pre_processed_legacy_df.select("value").where(col("value") == "ERROR").count() > 0:
    print("WARN: some abbreviated monetary values were not correctly expanded.")

After pre-processing, both dataframes have the same set of columns, so they can be **concatenated** together.

In [None]:
pre_processed_df = pre_processed_modern_df.unionByName(
    pre_processed_legacy_df
)

## Cleaning up memory

In [None]:
del legacy_df
del modern_df
del pre_processed_legacy_df
del pre_processed_modern_df
gc.collect()

This is the **end result** of this section.

For graphical reasons, only a selection of the columns will be shown, just to give an idea of the structure.

In [None]:
pre_processed_df.select(*columns[0:10]).show()

The complete list of columns is (again for graphical reasons) printed here in an horizontal format, where each item is a tuple of the form `(field_name, type)`.

In [None]:
print(pre_processed_df.dtypes)

# Building football teams

In [None]:
football_teams_df = pre_processed_df

After having pre-processed the football players, it's time to build the football teams.

For the sake of the learning stage of the project, **teams are differentiated across different seasons**; for example, Real Madrid of 2020 is **different** than Real Madrid of 2018.

A football team is then modeled as **the set of the averages of the features of all of its football players**.

In [None]:
# Columns that are considered as features
PLAYER_FEATURES = [
    "overall",
    "value",
    "pace",
    "shooting",
    "passing",
    "dribbling",
    "defending",
    "physic",
    "attacking_crossing",
    "attacking_finishing",
    "attacking_heading_accuracy",
    "attacking_short_passing",
    "skill_dribbling",
    "skill_fk_accuracy",
    "skill_long_passing",
    "skill_ball_control",
    "movement_acceleration",
    "movement_sprint_speed",
    "movement_reactions",
    "power_shot_power",
    "power_stamina",
    "power_strength",
    "power_long_shots",
    "mentality_aggression",
    "mentality_penalties",
    "defending_standing_tackle"
]

# Apposing the avg pre-fix to features
PLAYER_FEATURES_AVG = [
    "avg(" + player_feature + ")" for player_feature in PLAYER_FEATURES
]

# Target variable of the learning stage
TARGET_VARIABLE = "points"

Composing the football team, as introduced before.

In [None]:
football_teams_df = football_teams_df.select(
    "season", "club_name", *PLAYER_FEATURES
).groupBy(
    ["season", "club_name"]
).agg(
    { player_feature: "avg" for player_feature in PLAYER_FEATURES }
)

This is the **end result** of this section.

For graphical reasons, only a selection of the columns will be shown, just to give an idea of the structure.

In [None]:
football_teams_df.select(
    "season", "club_name", *PLAYER_FEATURES_AVG[0:5]
).show()

The complete list of columns with their type is (in a compact horizontal form):

In [None]:
print(football_teams_df.dtypes)

# Loading football teams seasonal scores

After having dealt with the football players and composed them into football clubs, it's time to get **"target" data** for the learning stage: the final ranking, for every team of every year.

First step is to load the data from disk, which has been taken from the [European Football Dataset](https://www.kaggle.com/datasets/josephvm/european-club-football-dataset).

In [None]:
seasonal_scores_df = (
    spark.read.csv(
        "data/all_tables_fixed_renamed_leagues.csv",
        sep=",",
        inferSchema=True,
        header=True,
        multiLine=True,
    )
    .withColumnRenamed("Year", "season")
    .withColumnRenamed("Team", "club_name")
    .withColumnRenamed("P", "points")
    .withColumnRenamed("Place", "place")
    .withColumnRenamed("League", "league")
)

# Pre-processing football teams seasonal scores

Out of all the available columns, only the ones in `seasonal_scores_columns` will be taken into consideration.

In [None]:
seasonal_scores_columns = [
    "season", "league", "club_name", "points", "place"
]

The seasonal scores dataset uses **abbreviated version of football team names** (for example: `BAR` for Barcelona, `LEI` for Leicester, etc.), which would cause **incompatibility** with modern and legacy FIFA datasets, which instead uses complete names.

Furthermore, the abbreviations are **not standard**, so there are **conflicts** such as, among others:

* `BAR` both for Barcelona and Bari (different leagues).
* `HUE` both for Huelva and Huesca (same league).

For this reason, a **custom** hand-made **mapping** procedure has been developed in order to resolve such conflicts.

Another source of incompatibility originated from **inconsistencies** within FIFA datasets, from year to year; for example, some teams had slight variations in their names (e.g. Torino and Torino FC).

All said inconsistencies have been **manually solved** and disambiguated at dataset-level.

Furthermore, the final **mapping** between abbreviations and FIFA names resulted into a **custom-made dataset** (also available in the linked dataset GitHub repo), which is the following form:

|abbr|league|club_name|fifa_club_name|
|---|---|---|---|
|AAC|German Bundesliga|Aachen|Alemannia Aachen|
|ADO|Dutch Eredivisie|Ado Den Haag|ADO Den Haag|
|AJA|Dutch Eredivisie|Ajax|Ajax|
|AJC|French Ligue 1|Ajaccio|AC Ajaccio|
|ALB|Spanish La Liga|Albacete|Albacete BP|
|...|...|...|...|

Aforementioned mapping is then converted to **JSON** with an utility script in the same repository, and loaded here in a dataframe.

In [None]:
f = open("data/clubs_map.json")
club_name_abbr_to_ext = json.load(f)
f.close()

ABBREVIATED_CLUB_NAME_NOT_FOUD = "ABBREVIATED_CLUB_NAME_NOT_FOUD"
GENERAL_EXCEPTION = "GENERAL_EXCEPTION"

def extend_club_name(club_name_abbr):
    try:
        return club_name_abbr_to_ext[club_name_abbr]
    except KeyError as e:
        return ABBREVIATED_CLUB_NAME_NOT_FOUD
    except Exception as e:
        return GENERAL_EXCEPTION

extend_club_name_UDF = udf(
    lambda club_name_abbr: extend_club_name(str(club_name_abbr)),
    StringType(),
)

In the rankings dataset the seasons are expressed as `YYYY`, whilst FIFA uses the `YY` encoding.

For this reason, to guarantee compatibility, season values in the rankings dataset is abbreviated.

In [None]:
abbreviate_season_UDF = udf(
    lambda season: str(season)[-2:],
    StringType(),
)

The following is the actual **data pre-processing**.

In [None]:
pre_processed_seasonal_scores_df = seasonal_scores_df

# Abbreviating season, as per previous cell
pre_processed_seasonal_scores_df = pre_processed_seasonal_scores_df.withColumn(
    "season", abbreviate_season_UDF(col("season"))
)

# Keeping only supported leagues in supported seasons
pre_processed_seasonal_scores_df = pre_processed_seasonal_scores_df.where(
    (pre_processed_seasonal_scores_df.season.isin(seasons))
    & 
    (pre_processed_seasonal_scores_df.league.isin(leagues))
)

# Selecting only the desired columns
pre_processed_seasonal_scores_df = pre_processed_seasonal_scores_df.select(
    seasonal_scores_columns
)

# Although data has been checked for duplicates and missing value, to ensure 
# operabiloty with other datasets, the pre-processing steps are still performed
pre_processed_seasonal_scores_df = pre_processed_seasonal_scores_df.dropDuplicates(
    seasonal_scores_columns
)
pre_processed_seasonal_scores_df = pre_processed_seasonal_scores_df.na.fill(0)

# Extending club names, as per previous cell
pre_processed_seasonal_scores_df = pre_processed_seasonal_scores_df.withColumn(
    "club_name", extend_club_name_UDF(col("club_name"))
)
# Checking whether club name expansions went all good or not
if pre_processed_seasonal_scores_df.filter(
    col("club_name") == ABBREVIATED_CLUB_NAME_NOT_FOUD
).count() > 0:
    print("WARN: some clubs have NOT been found")
    print("Please check your data")

# Casting points to float, as required by learning procedures
pre_processed_seasonal_scores_df = pre_processed_seasonal_scores_df.withColumn(
    "points", pre_processed_seasonal_scores_df.points.cast(DoubleType())
)

This is the end result of this section:

In [None]:
pre_processed_seasonal_scores_df.show()

# Joining football teams features with their seasonal scores

The two dataframes (players and seasonal scores) need to be **merged together** to form a **single dataframe**.
This can easily be done by joining on the key (`season`, `club_name`).

In [None]:
df = football_teams_df.join(
    pre_processed_seasonal_scores_df,
    on=["season", "club_name"],
)

In order to **check** whether some clubs were left out by the aforementioned join operation, two differences are computed:

* `pre_processed_seasonal_scores_df - df`
* `football_teams_df - df`

In [None]:
diff = pre_processed_seasonal_scores_df.select("club_name").subtract(df.select("club_name")).distinct()

if diff.count() > 0:
    print("WARN: Some football teams have been left out the join (pre_processed_seasonal_scores_df)")
    diff.show()

diff = football_teams_df.select("club_name").subtract(df.select("club_name")).distinct()
if diff.count() > 0:
    print("WARN: Some football teams have been left out the join (football_teams_df)")
    diff.show()

del diff

This is the end result of this section.

For graphical reasons, only a selection of the columns will be shown, just to give an idea of the structure.

In [None]:
df.select(
    "season", "league", "club_name", "avg(overall)", "avg(pace)", "points", "place"
).show()

The full list of columns with their type is (in a compact horizontal form):

In [None]:
print(df.dtypes)

In order to perform **classification**, we need to transform the continuous variable `points` (on which regression will be performed) into a **categorical variable**, which will be called **`macro_place`**.

The idea behind `macro_place` is pretty simple: it consists in a **partition** of the possible **table rankings**, into zones.
Colloquially, these zones are usually referred to as:
* Champions League Spots.
* Europa League Spots.
* Mid-high table.
* Mid-low table.
* Relegation.

In [None]:
NUM_MACRO_PLACES = 5

def get_macro_place(place, league, complex=False):
    if league in ["Dutch Eredivisie", "German Bundesliga"]:
        if 1 <= place <= 3:
            return 0.0
        if 4 <= place <= 6:
            return 1.0
        if 7 <= place <= 10:
            return 2.0
        if 11 <= place <= 15:
            return 3.0
        if 16 <= place <= 18:
            return 4.0
    else:
        if 1 <= place <= 4:
            return 0.0
        if 5 <= place <= 8:
            return 1.0
        if 9 <= place <= 12:
            return 2.0
        if 13 <= place <= 16:
            return 3.0
        if 17 <= place <= 20:
            return 4.0

    return None
    
get_macro_place_UDF = udf(
    lambda place, league, complex: get_macro_place(float(place), league, complex),
    DoubleType(),
)

In [None]:
df = df.withColumn(
    "macro_place", get_macro_place_UDF(col("place"), col("league"), lit(False))
)

## Cleaning up memory

In [None]:
del seasonal_scores_df
del pre_processed_seasonal_scores_df
del football_teams_df
gc.collect()

# Visualizations

A good practice whenever a data-driven task is being taclked is to first visualize the data.

In fact, even just by simply looking at data we may find some useful information which may have a direct impact on the subsequent learning phase.

## Points distribution

Starting off with **data distirbutions** of the end-of-season points, across all supported seasons, for all supported leagues.

In [None]:
p = sns.displot(df.select("points").toPandas(), kde=True)
p.fig.tight_layout(pad=1.5)
plt.title("Points distribution across all leagues")

Some observations can be done:

1. Data tends to approximatively follow a Gaussian/Normal distribution
2. Said distribution appears to be slighlty skewed towards left

These observations perfectly coincide with **domain knowledge**:

1. In every league, there are just a few very strong teams (Champions League qualifiers), a limited number of very bad teams (fighting for relegation) and then a multitude of middle-table teams
2. Some leagues do not have much talent in the middle part of the table, resulting in a general "equilibrium" between such teams.

As a result, scores of such teams not be high, but rather close to the mean value. 

The following **density plot** focuses instead on each league, showing again the points distribution.

In [None]:
p = sns.displot(df.toPandas(), x="points", hue="league", kind="kde", aspect=1.7, palette="tab10")
p.fig.tight_layout(pad=1.5)
plt.title("Points distribution per league")

Some observations can be made:

* The **French** and **Spanish** leagues are the less skewed among all leagues, possibly meaning that they are characterized by a majority of "average" teams.
* The **Italian** and **English** leagues are very similar on low and middle points, but start to behave slightly different on the middle-high and high parts of the table.
* The **Dutch** league is the most left-skewed, possibly hinting at the lower quality of the league, as suggested by the domain knowledge.
* The **Spanish** league has a boost on high points, hinting at a consistent dominancy of one or more teams.

## Overall player quality distribution

Similar visualizations can be done to analyze the overall player quality.

The following plot shows the distribution of the player quality (`avg(overall)`) across all leagues.

In [None]:
p = sns.displot(df.select("avg(overall)").toPandas(), kde=True)
p.fig.tight_layout(pad=1.5)
plt.title("Player quality across all leagues")

Similarily to what observed for the points, the plot shows that:

* A vast majority of the players are of average quality.
* A minority of the players are below-average.
* A minority of the players are above-average.

The following plot focuses on the single leagues.

In [None]:
p = sns.displot(df.toPandas(), x="avg(overall)", hue="league", kind="kde", aspect=1.7, palette="tab10")
p.fig.tight_layout(pad=1.5)
plt.title("Player quality per league")

From the plot it is possible to observe that:

* The **Dutch** league hosts the **majority** of **below-average** players.
* The **French** league follows the Dutch one in having **below-average** players.
* The **German** league has the **highest** amount of **average** players.
* The **German**, **Engligh**, **Italian** and **Spanish** leagues have a peak of **average** players.
* The same leagues have a considerable amount of **above-average** players.
* The **Spanish** leagues **dominates** on the **above-average** players.

# Attempt 1: "naive" player features

This attempt has been named "naive" because it simply uses **all the features** that are related to a **player's technical abilities**.

In [None]:
# remove columns that do not inherently represent players abilities
ALL_FEATURES = PLAYER_FEATURES_AVG
ALL_FEATURES.remove("avg(overall)")
ALL_FEATURES.remove("avg(value)")

The learning procedures require that the considered features are assembled in a vector.

The following cell performs this operation by adding a new "assembled" column to the main dataframe.

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=ALL_FEATURES, outputCol="all_vec"
)

df = assembler.transform(df)

Before learning, it is good practice to first gather in-depth information on the structure of the data, as well as obtaining useful visualizations.

In the following sections, additional **data visualizations** will be performed.

## Raw data

In this section a first analysis is performed on _raw_ data, meaning that **no normalization nor standardization** is performed.

In [None]:
# color to be used in the plots for raw data
COLOR_RAW = "#332FD0"

In [None]:
# use Pandas for easy plotting with Seaborn
pdf = df.toPandas()

### Feature-target correlation

The **feature-target correlation** shows the correlation between each feature and the target variable (`points`).

The idea is to immediately visualize whether there are specific features that are particularly correlated with the target variable.

In [None]:
plot_feature_target_relation(
    pdf, ALL_FEATURES, TARGET_VARIABLE, color=COLOR_RAW
)

Despite the huge concentration of datapoints, it is still possible to see that there is **no** big amount of **outliers**, considering that 1.7k points are plotted.

Let's see how visualizations change when we focus on a **specific season**, thus restricting the data scope.

In [None]:
plot_feature_target_relation(
    pdf[pdf["season"] == "20"], ALL_FEATURES, TARGET_VARIABLE, color=COLOR_RAW
)

It can be observed that:

1. There is a general phenomenon of linear relation between each feature and the final points.
2. There is a generalized **high variance** across all features.
   1. E.g. taking the very last feature (`avg(defending_standing_tackle`), players with a value close to `50` result to a wide range of points.
   2. On the contrary, in extreme values (close to `40` or `90`), the resulting variable have a narrower range.

### Feature distribution

The following plot shows the distribution of the player features in the analyzed dataframe.

In [None]:
plot_feature_distribution(
    pdf, 
    ALL_FEATURES, 
    color = COLOR_RAW,
    figsize=(10,20)
)

From the plot it is possible to observe that:

1. Most features have a symmetrical distribution.
2. Most features are centered around the middle.
3. Some features are more skewed towards left, hinting that they may be more "rare".
4. Some features don't properly follow a Gaussian distribution (`avg(pace)`, `avg(physic)`).

Furthermore, observation `2` is linked to the observation `2.2` of the previous plot:

* A lot of features have an average value.
* Around the average values, there is high variance.

Hence, most likely, the system will be exposed multiple times to this issue, affecting performances.

### Pearson Correlation Matrix

The **Pearson's Correlation Matrix** is a graphical tool to show a numerical value indicating the correlation between each variable.

It is employed in the project, to find whether some features are correlated to each other, which can either be a problem for Linear Regression, but not so much for Tree-based models, since they by design perform a feature selection step.

In [None]:
plot_correlation_matrix(pdf, ALL_FEATURES)

The majority of features are **positively correlated** to each other (red).

Using some domain knowledge, it is possible to further comment some correlations, like (among others):

* The more a defender is good, the more aggressive they are.
* The more a defender is good, the more able to perform standing tackles they are.
* The more a player has good physical strenght, the more fast (pace) they are.

## Standardization

Since we have some features with **skewed distributions**, it is woth a try to **standardize**, to see whether it helps to center feature distribution.

It is needed to define a scaler which takes feature from `all_vec` and places scaled versions in `all_vec_std`.

The configuration is a standard **z-score normalization**.

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol="all_vec", 
    outputCol="all_vec_std", 
    withStd=True, 
    withMean=True
)

df = scaler.fit(df).transform(df)

In [None]:
COLOR_STD = "#9254C8"

ALL_FEATURES_STD = [
    player_feature + "_std" for player_feature in ALL_FEATURES
]

In [None]:
pdf = df.toPandas()

Since the scaler grouped all features in a single field (feature vector), but plotting requires using different fields (one per features), it is needed to define a function to extract each feature and place it in a separate column.

In [None]:
def feature_vec_to_cols(pdf, vec, columns):
    tmp = pdf.reindex(
        columns=list(pdf.columns) + columns
    )

    tmp[columns] = tmp[
        vec
    ].transform(
        {
            columns[i]: operator.itemgetter(i) for i, p in enumerate(columns)
        }
    )
    return tmp

In [None]:
pdf = feature_vec_to_cols(pdf, "all_vec_std", ALL_FEATURES_STD)

It is possible to see the effects of the function by looking at the columns list.

In [None]:
pdf.columns

### Feature-target correlation

As done in the _raw data_ section, a feature-target correlation analysis is performed.

In [None]:
plot_feature_target_relation(
    pdf, ALL_FEATURES_STD, TARGET_VARIABLE, color=COLOR_STD
)

Let's again zoom on a single season.

In [None]:
plot_feature_target_relation(
    pdf[pdf["season"] == "20"], ALL_FEATURES_STD, TARGET_VARIABLE, color=COLOR_STD
)

Standardization does not improve the variance issue.

#### Feature distribution

In [None]:
plot_feature_distribution(
    pdf, ALL_FEATURES_STD, color=COLOR_STD, figsize=(10,20)
)

Standardization did not change the feature distribution.

It is not needed to plot the correlation matrix again, since it is not affected by a change of scale.

## Log transformation

A further attempt consists of apply Logarithmic Transformation, still trying to combat skewness.

In [None]:
COLOR_LOG = "#E15FED"
ALL_FEATURES_LOG = [
    player_feature + "_log" for player_feature in ALL_FEATURES
]


The following function translates a value to its $log_2$.

In [None]:
to_log_UDF = udf(
    lambda value: float(np.log2(value)), DoubleType()
)

In [None]:
for feature, feature_log in zip(ALL_FEATURES, ALL_FEATURES_LOG):
    df = df.withColumn(feature_log, to_log_UDF(col(feature)))

df = df.withColumn("points_log", to_log_UDF(col("points")))

In [None]:
pdf = df.toPandas()

### Feature-target correlation

In [None]:
plot_feature_target_relation(
    pdf, ALL_FEATURES_LOG, "points_log", color=COLOR_LOG, figsize=(10,20)
)

Also Logarithmic transformation did not help with the variance problem.

Let's again see a zommed version.

In [None]:
plot_feature_target_relation(
    pdf[pdf["season"] == "20"], ALL_FEATURES_LOG, "points_log", color=COLOR_LOG
)

Logarithmic transformation did not help with variance.

### Feature distribution

In [None]:
plot_feature_distribution(pdf, ALL_FEATURES_LOG, color=COLOR_LOG, figsize=(10,20))

Logarithmic transformation slightly moved the skewdness of all features towards right, still not solving it; as a consequence, centered or right-skewed features are not slightly worsened.

## Min-max transformation

In [None]:
COLOR_MIN_MAX = "#6EDCD9"
ALL_FEATURES_MIN_MAX = [
    player_feature + "_min_max" for player_feature in ALL_FEATURES
]

In [None]:
scaler = MinMaxScaler(
    inputCol="all_vec", 
    outputCol="all_vec_min_max"
)

df = scaler.fit(df).transform(df)

In [None]:
pdf = df.toPandas()
pdf = feature_vec_to_cols(pdf, vec="all_vec_min_max", columns=ALL_FEATURES_MIN_MAX)

### Feature-target relationship

In [None]:
plot_feature_target_relation(
    pdf, ALL_FEATURES_MIN_MAX, TARGET_VARIABLE, color=COLOR_MIN_MAX
)

Zoomed view:

In [None]:
plot_feature_target_relation(
    pdf[pdf["season"] == "20"], ALL_FEATURES_MIN_MAX, TARGET_VARIABLE, color=COLOR_MIN_MAX
)

### Feature distribution

In [None]:
plot_feature_distribution(pdf, ALL_FEATURES_MIN_MAX, color=COLOR_MIN_MAX, figsize=(10,20))

## Comparing the different transformations

The following GIF compares the previously plotted feature distributions.

![GIF changes illustration](https://s8.gifyu.com/images/ezgif.com-gif-maker4847ef0cb3270edd.gif)

Before learning, some further useful visualizations are given.

**From now on, Min-Max transofrmed data will be used.**

## PCA

To get an idea of the data space, dimensionality reduction via **PCA** (Principal Component Analysis) has been conducted.

In [None]:
df, pca_model = perform_pca(
    df=df,
    num_components=PCA_NUM_COMPONENTS,
    input_col="all_vec_min_max",
    output_col="all_vec_min_max_pcs"
)

The next plot shows the proportion of variance that the two principal components capture.

In [None]:
plot_pca_explained_variance(
    pca_model=pca_model
)

Cumulatively, the two components capture almost the 100% of the variance.

#### Visualization

In this section the feature space resulting from PCA is plotted.

In [None]:
pdf = df.toPandas()

In [None]:
scatter_plot(
    x=pdf.all_vec_min_max_pcs.map(lambda x: x[0]),
    y=pdf.all_vec_min_max_pcs.map(lambda x: x[1]),
    c=pdf.points,
    x_label="Principal Component 0",
    y_label="Principal Component 1",
)  

PCA result is quite underwhelming: teams with widely different end-of-the-season placement tend to be mixed-up, meaning that data appear to be **non-linearly separable**.

The following plots put in relation each component to the target variable (`points`).

In [None]:
scatter_plot(
    x=pdf.all_vec_min_max_pcs.map(lambda x: x[0]),
    y=pdf.points,
    x_label="Principal Component 0",
    y_label="Points",
)
scatter_plot(
    x=pdf.all_vec_min_max_pcs.map(lambda x: x[1]),
    y=pdf.points,
    x_label="Principal Component 1",
    y_label="Points",
)

As expected from the prevous plots, there is **high variance**: for example, fixing the PC1 value, there is the whole spectrum of points on the `y` axis.

Principal Component Analysis is a linear model, so it is bounded to produce linear representations: usually, non-trivial problems present non-linear data.

For this reason, adding some non-linearity may possibly lead to a better lower-dimensional embedding.

## t-SNE

t-SNE is one of the most used tools when assuming **non-linearity** in the data space.

Implementation from SciKit Learn framework will be used, since PySpark does not provide any version of this tool.

SciKit Learn works with NumPy structures, so the feature vector in PySpark DataFrame must be converted to a `ndarray` structure.

In [None]:
all_vec_min_max_np = np.array(
    list(
        map(
            lambda v: v["all_vec_min_max"].toArray(), 
            df.select("all_vec_min_max").collect()
        )
    )
)

points_np = np.array(
    list(
        map(
            lambda v: v["points"], 
            df.collect()
        )
    )
).reshape(-1)

Once data is ready, TNSE is called.

In [None]:
from sklearn.manifold import TSNE

tsne_embedding = TSNE(
    n_components=2, learning_rate='auto', init='random', method="barnes_hut"
).fit_transform(all_vec_min_max_np)

And, once the embedding space is populated, the result is plotted.

In [None]:
scatter_plot(
    x=tsne_embedding[:,0],
    y=tsne_embedding[:,1],
    c=points_np,
    x_label="TSNE Embedding Dimension 0",
    y_label="TSNE Embedding Dimension 1",
)

In [None]:
scatter_plot(
    x=tsne_embedding[:,0],
    y=points_np,
    x_label="TSNE Embedding Dimension 0",
    y_label="Points",
)
scatter_plot(
    x=tsne_embedding[:,1],
    y=points_np,
    x_label="TSNE Embedding Dimension 1",
    y_label="Points",
)

Unfortunately, adding non-linearity does not seem to improve the situation.

Not much progress has been made, w.r.t. PCA:

* **High variance** is still present.
* The data space differs, but it is still of low quality.

## Final recap on visualizations

Data seems to be:

* Not linearly separable.
* Suffering from high variance.

## Learning for attempt 1

The problem has been treated in two different forms:

* As **regression** on the **points**.
* As **classification** on the table/ranking areas (macro places).

In both cases, for all the different models will be used:

* **K-Fold Cross Validation**, to perform validation tests on the model performances.
* **Grid-search approach**, to perform hyperparameter tuning.

Generally, all models have been **trained and persisted on disk**, in order to retreive them on subsequent executions of the notebook; hence, the model's existence will first be checked before trying to re-train again.

The columns regarding features, label, prediction have been set in the **parameter grid**. 

The parameter grid is used to perform hyperparameter search grid, using PySpark's tools. Specifically, for each model the following parameters have been searched over:

* **Linear Regression**:
  * `regParam`: regularization coefficient parameter ($\lambda$). Different values have been tested, to understand whether adding "importance" to the regularization factor helps the model or not.
  * `solver`: the solver for the optimization steps.
  * `fitIntercept`: whether to use the bias or not.
  * `elasticNetParam`: the parameter that controls the tradeoff between $L_1$ and $L_2$ regularizer terms of the ElasticNet regularization

* **Prediction Tree**:
  * `maxDepth`: the maximum depth of the tree. The idea is that the more levels, the more splits are done, the better the performances should be.
  Specifically, we try the default value and the maximum possible value (i.e. the total number of features)
  * `maxBins`: Max number of bins used when discretizing continuous features.
  Default, half and double values have been tried.
  * `minInfoGain`: Minimum Information Gain to consider the feature as a discriminative candidate, during the splitting phase.

* **Gradient Boosted Tree**:
  * Same as Prediction Tree
  * `subsamplingRate`: controls the amount of data used to train each tree.
  Default and half value have been tested.
  * `lossType`: the loss function to use during the optimization.
  All available losses have been used

* **Random Forest**:
  * Same as Gradient Boosted Trees, plus:
  * `numTrees`: how many trees to place in the random forest. Default, half and double value have been tested.
  * `featureSubsetStrategy`: how to select the features to train the forest trees on.
  All available values have been used.

* **MLP**:
  * `layers`: layer configuration of the Neural Network.
  Due to the amount of data and the number of features, we used a single layer, to avoid curse of dimensionality.
  In fact, adding another layer would've increased the number of trainable parameters too much.
  * `solver`: the solver to use in the optimization solving stage.
  Decided to go with Gradient Descent, since the other ones result in execution problems.

70/20/10 train/validation/test split has been used; since we will use 5-fold cross validation and PySpark's cross validator automatically splits data when using the folds, we need to give to it a 90/10 split, which will result in:

* $90 = train + validation$
  * $90*4/5 = 70 = train$
  * $90*1/5 = 20 = validation$

In [None]:
train_df, test_df = df.randomSplit([0.9, 0.1], seed=random_seed)

# as per attempt description, all columns will be trained on
FEATURES_COL = "all_vec_min_max" 

### Regression

In [None]:
regression_evaluator_cv.setLabelCol(REGRESSION_LABEL_COL)

#### Linear Regression

In [None]:
PREDICTION_COL = "attempt_1_regression_linear_regression_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Linear Regression"]
):
    print("Linear Regression model NOT found in disk, training...")
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = LinearRegression()

    linear_regression_param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.regParam, [0.0, 0.001, 0.01, 0.1, 0.5, 1])
        .addGrid(estimator.solver, ["auto", "normal"])
        .addGrid(estimator.fitIntercept, [True, False])
        .addGrid(estimator.elasticNetParam, [0.0, 0.5, 1.0])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=linear_regression_param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Linear Regression model found in disk, loading...")
    model = LinearRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Linear Regression"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Linear Regression"]
):
    print("Evaluating Linear Regression model trained in previous cell...")
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_1/regression/linear_regression.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_1/regression/linear_regression.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Linear Regression"]
):
    print("Saving Linear Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_1/regression/linear_regression"
    )

#### Prediction Tree

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import DecisionTreeRegressionModel

PREDICTION_COL = "attempt_1_regression_decision_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Prediction Tree"]
):
    print("Prediction Tree model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = DecisionTreeRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Decision Tree model found in disk, loading...")
    model = DecisionTreeRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Prediction Tree"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Prediction Tree"]
):
    print(
        "Evaluating Prediction Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_1/regression/prediction_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_1/regression/prediction_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Prediction Tree"]
):
    print("Saving Decision Tree Regressor model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_1/regression/prediction_tree"
    )

#### Gradient Boosted Tree Regression

In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import GBTRegressionModel

PREDICTION_COL = "attempt_1_regression_gradient_boosted_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Gradient Boosted Tree"]
):
    print("Regression Gradient Boosted Tree model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = GBTRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [2, 5, 10])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Gradient Boosted Tree model found in disk, loading...")
    model = GBTRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Gradient Boosted Tree"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Gradient Boosted Tree"]
):
    print(
        "Evaluating Gradient Boosted Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_1/regression/gradient_boosted_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_1/regression/gradient_boosted_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Gradient Boosted Tree"]
):
    print("Saving Gradient Boosted Tree Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_1/regression/gradient_boosted_tree"
    )

#### Random Forest Regression

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import RandomForestRegressionModel

PREDICTION_COL = "attempt_1_regression_random_forest_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Random Forest"]
):
    print("Regression Random Forest model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = RandomForestRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .addGrid(estimator.numTrees, [20, 40])
        .addGrid(
            estimator.featureSubsetStrategy, 
            ["auto", "onethird", "all", "log2"]
        )
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Random Forest model found in disk, loading...")
    model = RandomForestRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Random Forest"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Random Forest"]
):
    print(
        "Evaluating Random Forest model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_1/regression/random_forest.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_1/regression/random_forest.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Regression"]["Random Forest"]
):
    print("Saving Random Forest Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_1/regression/random_forest"
    )

### Classification

In [None]:
classification_evaluator_cv.setLabelCol(CLASSIFICATION_LABEL_COL)

In [None]:
train_df, test_df = df.randomSplit([0.9, 0.1], seed=random_seed)

#### Visualizations

Visualizing `macro_place` distribution.

In [None]:
plot_feature_distribution(
    df.toPandas(), 
    [CLASSIFICATION_LABEL_COL],
    color="teal",
    n_cols=1,
    figsize=(8,4)
)

Class **balancement** is pretty **good**.

Imbalancement would've been a problem because it could've been source of bias.
In fact, a model that sees a class too many times, may be biased towards that class, damaging the classification accuracy on the less frequent classes.

A note about the first two models, **SVM** and **Logistic Regression**:

* PySpark treats these models as **binary** classifiers.
* In order to use them as **multiclass** classifiers, they can be used as estimators of a **OneVsRest** model, which automatically transforms the problem from binary to multiclass classification.

#### SVM Classifier

In [None]:
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import OneVsRestModel
from pyspark.ml.classification import LinearSVC

PREDICTION_COL = "attempt_1_classification_svm_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["SVM"]
):
    print("SVM Classification model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = OneVsRest(
        classifier=LinearSVC(
            featuresCol=FEATURES_COL,
            labelCol=CLASSIFICATION_LABEL_COL,
            predictionCol=PREDICTION_COL,
        ),
        featuresCol=FEATURES_COL,
        labelCol=CLASSIFICATION_LABEL_COL,
        predictionCol=PREDICTION_COL
    )

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("SVM Classification model found in disk, loading...")
    model = OneVsRestModel.load(
        TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["SVM"]
    )


In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["SVM"]
):
    print(
        "Evaluating SVM Classifier model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_1/classification/svm.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_1/classification/svm.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["SVM"]
):
    print("Saving SVM Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_1/classification/svm"
    )

#### Logistic Regression

In [None]:
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import OneVsRestModel
from pyspark.ml.classification import LogisticRegression

PREDICTION_COL = "attempt_1_classification_logistic_regression_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Logistic Regression"]
):
    print("Logistic Regression Classification model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = OneVsRest(
        classifier=LogisticRegression(
            featuresCol=FEATURES_COL,
            labelCol=CLASSIFICATION_LABEL_COL,
            predictionCol=PREDICTION_COL,
        ),
        featuresCol=FEATURES_COL,
        labelCol=CLASSIFICATION_LABEL_COL,
        predictionCol=PREDICTION_COL
    )

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Logistic Regression Classification model found in disk, loading...")
    model = OneVsRestModel.load(
        TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Logistic Regression"]
    )


In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Logistic Regression"]
):
    print(
        "Evaluating Logistic Regression Classifier model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_1/classification/logistic_regression.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_1/classification/logistic_regression.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Logistic Regression"]
):
    print("Saving Logistic Regression Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_1/classification/logistic_regression"
    )

#### Decision Tree

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassificationModel

PREDICTION_COL = "attempt_1_classification_decision_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Decision Tree"]
):
    print("Classification Decision Tree model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = DecisionTreeClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification Decision Tree model found in disk, loading...")
    model = DecisionTreeClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Decision Tree"]
    )
    evaluators = get_evaluators(
        model, 
        label_col=CLASSIFICATION_LABEL_COL, 
        prediction_col=PREDICTION_COL, 
        evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Decision Tree"]
):
    print(
        "Evaluating Decision Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_1/classification/decision_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_1/classification/decision_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Decision Tree"]
):
    print("Saving Decision Tree model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_1/classification/decision_tree"
    )

#### Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import RandomForestClassificationModel

PREDICTION_COL = "attempt_1_classification_random_forest_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Random Forest"]
):
    print("Classification Random Forest model NOT found in disk, training...")
    
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = RandomForestClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .addGrid(estimator.numTrees, [20, 40])
        .addGrid(estimator.featureSubsetStrategy, ["auto", "onethird", "all", "log2"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification Random Forest model found in disk, loading...")
    model = RandomForestClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Random Forest"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Random Forest"]
):
    print(
        "Evaluating Random Forest model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_1/classification/random_forest.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_1/classification/random_forest.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["Random Forest"]
):
    print("Saving Random Forest Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_1/classification/random_forest"
    )

#### MLP

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import MultilayerPerceptronClassificationModel

PREDICTION_COL = "attempt_1_classification_mlp_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["MLP"]
):
    print("Classification MLP model NOT found in disk, training...")
    
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = MultilayerPerceptronClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.layers, [[24, NUM_MACRO_PLACES]])
        .addGrid(estimator.solver, ["gd"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification MLP model found in disk, loading...")
    model = MultilayerPerceptronClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["MLP"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["MLP"]
):
    print(
        "Evaluating MLP model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_1/classification/mlp.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_1/classification/mlp.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 1"]["Classification"]["MLP"]
):
    print("Saving MLP Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_1/classification/mlp"
    )

# Attempt 2: "less is more"

Considering the knowledge gathered from the previous attempt, it can be useful to perform **dimensionality reduction** and to perform learning on the lower-dimensional data.

It is useful for:

* Reducing the **curse of dimensionality**.
* Addressing **feature correlation**.

The following dimensionality reduction methods have been used:

* PCA.
* t-SNE.
* Univariate Feature Selection.
* Feature selection by manually picking the player features `overall` and `value`.

An initial consideration is that:

* Log scaling reduces skewedness of `avg(mentality_penalties)` feature but it **increases the skewedness** of all the other features.
* The other scalings (z-score and min-max) do NOT appear to be different than the "raw" data distribution.

For this reason the **min-max scaling** has been used from there on. In facts, the min-max scaling places "for free" all the features in the same scale, which is a very important consideration for SVM, which will be used in the upcoming sections.

## Univariate Feature Selection

In [None]:
from pyspark.ml.feature import UnivariateFeatureSelector

Setting Univariate Feature Selection up.

In [None]:
selector = UnivariateFeatureSelector(
    featuresCol="all_vec_min_max",
    labelCol=TARGET_VARIABLE, 
    selectionMode="percentile"
).setFeatureType("continuous").setLabelType(
    "categorical"
).setSelectionThreshold(0.08)

# Number of wanted features
NUM_FEATURES = [2, 6, 12]

# UFS does not accepts directly the number of desired features, but rather it
# works with thresholds.
# So, thresholds are computed in order to get the desired number of features 
THRESHOLDS = [num_features/24 for num_features in NUM_FEATURES]

# The result of every feature selection is placed in its own column, in order to
# being able to use them in learning of attempt 2
UFS_FEATURES = [
    f"all_vec_min_max_ufs_{num_features}" for num_features in NUM_FEATURES
]

Performing Univariate Feature Selection.

In [None]:
fit_result = dict()

for num_features, thr, ufs_features in zip(
    NUM_FEATURES, THRESHOLDS, UFS_FEATURES
):
    selector.setSelectionThreshold(thr)
    selector.setOutputCol(ufs_features),
    fit_result[str(num_features)] = selector.fit(df)
    df = fit_result[str(num_features)].transform(df)

These are the names of the features that have been selected, for every threshold.

Their visualizations (feature-target correlation, feature distribution and correlation matrix) will be skipped, since they are exactly the same as in attempt 1.<br>
In facts, UFS does not touch/modify features, but rather just selects them.

In [None]:
selected_features = dict()
for num_features in NUM_FEATURES:
    selected_features[str(num_features)] = list(
        map(
            lambda i: ALL_FEATURES[i], fit_result[str(num_features)].selectedFeatures
        )
    )
    print(f"Univariate Feature Selection selected these {num_features} features:\n{selected_features[str(num_features)]}")

This is the data space resulting from UFS (2 features).

In [None]:
pdf = df.toPandas()

scatter_plot(
    x=pdf["all_vec_min_max_ufs_2"].map(lambda x: x[0]),
    y=pdf["all_vec_min_max_ufs_2"].map(lambda x: x[1]),
    c=pdf["points"],
    x_label="Feature 0",
    y_label="Feature 1",
)

Once again, this data space is **not linearly separable**.

## Overall as feature

Since the features are highly correlated and yield bad learning performances, the `overall` player ability has been tried as a **feature**, in the hypotesis that it captures some hidden characteristics that may improve the performances.

PySpark steps (assembling into vector, min-max scaling and decomposition into Pandas DF for plotting reasons) are the same as previous cases, so will not be commented once again.

In [None]:
OVERALL = ["avg(overall)"]
OVERALL_MIN_MAX = ["avg(overall)_min_max"]

COLOR_OVERALL_MIN_MAX = "green"

assembler = VectorAssembler(
    inputCols=OVERALL, outputCol="overall_vec"
)

df = assembler.transform(df)

scaler = MinMaxScaler(
    inputCol="overall_vec", 
    outputCol="overall_vec_min_max"
)

df = scaler.fit(df).transform(df)

### Feature-target relationship

In [None]:
pdf = df.toPandas()

pdf = feature_vec_to_cols(
    pdf=pdf,
    vec="overall_vec_min_max",
    columns=OVERALL_MIN_MAX
)

In [None]:
plot_feature_target_relation(
    pdf,
    OVERALL_MIN_MAX,
    TARGET_VARIABLE,
    figsize=(10,4),
    color="chocolate",
    n_cols=1
)

Nothing new here: `overall` suffers from **high variance** as well.

Our **hypotheses** of `overall` capturing something non-explicitly expressed by the other features, has been **confuted**.

### Feature distribution

In [None]:
plot_feature_distribution(
    pdf,
    OVERALL_MIN_MAX,
    color="chocolate",
    figsize=(10,4),
    n_cols=1
)

Distribution of overall is in line with all the other features, it follows an almost-perfectly centered in the mean Normal distribution

## Value as feature

A similar reasoning has been applied to the `value` feature.

Theoretically:
* the stronger the players in a team, the higher their value, the better the team, the higher the points.
* "Money talks": whilst the overall feature may be biased (for example by the way it is computed by FIFA), the economic value of a player shouldn't, because football clubs would never overpay for a player.<br>

So, the economic value could potentially be a more accurate estimate of the player quality.

Let's see whether this is true or not.

In [None]:
VALUE = ["avg(value)"]

COLOR_VALUE_MIN_MAX = "coral"
VALUE_MIN_MAX = ["avg(value)_min_max"]

assembler = VectorAssembler(
    inputCols=VALUE, outputCol="value_vec"
)

df = assembler.transform(df)

scaler = MinMaxScaler(
    inputCol="value_vec", 
    outputCol="value_vec_min_max"
)

df = scaler.fit(df).transform(df)

### Feature-target relationship

In [None]:
pdf = df.where(col("season").isin(seasons_modern)).toPandas()
pdf = feature_vec_to_cols(
    pdf=pdf,
    vec="value_vec_min_max",
    columns=VALUE_MIN_MAX
)

In [None]:
plot_feature_target_relation(
    pdf,
    VALUE_MIN_MAX,
    TARGET_VARIABLE,
    figsize=(10,4),
    color=COLOR_VALUE_MIN_MAX,
    n_cols=1
)

There is no tangible improvements w.r.t. previous attempts.

### Feature distribution

In [None]:
plot_feature_distribution(
    pdf,
    ["avg(value)"],
    color=COLOR_VALUE_MIN_MAX,
    figsize=(10,4),
    n_cols=1
)

As per previous plot, feature distribution is very skewed towards left, meaning that there are a lot of players with low economic value.

A min-max normalization did not solve the skewness, so a $log_2$ transformation has been tried.

In [None]:
df = df.withColumn("value_log", to_log_UDF("avg(value)"))
pdf = df.filter(col("season").isin(seasons_modern)).toPandas()

In [None]:
plot_feature_distribution(
    pdf,
    ["value_log"],
    color=COLOR_VALUE_MIN_MAX,
    figsize=(10,4),
    n_cols=1
)

The logarithmic transformation solved the skewness.<br>
Log-transformated values will min-max normalized, in order to have the **same scale** as in previous attempt.

In [None]:
assembler = VectorAssembler(inputCols=["value_log"], outputCol="value_log_vec")
df = assembler.transform(df)

scaler = MinMaxScaler(
    inputCol="value_log_vec", 
    outputCol="value_log_vec_min_max"
)

df = scaler.fit(df).transform(df)
pdf = df.filter(col("season").isin(seasons_modern)).toPandas()

In [None]:
pdf = feature_vec_to_cols(
    pdf=pdf,
    vec="value_log_vec_min_max",
    columns=["value_log_min_max"]
)

plot_feature_distribution(
    pdf,
    ["value_log_min_max"],
    color=COLOR_VALUE_MIN_MAX,
    figsize=(10,4),
    n_cols=1
)

In [None]:
pdf = feature_vec_to_cols(
    pdf=pdf,
    vec="overall_vec_min_max",
    columns=OVERALL_MIN_MAX
)

## Overall-value correlation

**Domain knowledge** tells us that, usually, high amount of money are spent on supposedly very good players, whilst few money is spent on average players.

Statistically speaking, this would translate into a (positive) correlation between the two variables.

Let's see whether that's true or not...

In [None]:
plot_correlation_matrix(
    pdf, 
    ["avg(overall)_min_max", "value_log_min_max"],
    figsize=(8,4)
)

As expected, `overall` and `value` are highly positively correlated.

For this reason, `overall` will not be considered, since learning results would be very similar to the case of using value.

## Learning for attempt 2

The same learning configuration and logic of the first attmempt has been used; only novelties will be explained.

`featuresCol` will be set via the hyperparameter search grid, in order to train the various models on the results of the various feature selection techniques.<br>
Specifically, the following feature-sets will be tested:
* `value` (log transformed)
* `Principal components` 
* The results of `UFS`

In [None]:
train_df, test_df = df.randomSplit([0.9, 0.1], seed=random_seed)

FEATURES_COL = [
    "value_log_vec_min_max", "all_vec_min_max_pcs"
] + UFS_FEATURES


### Regression

In [None]:
regression_evaluator_cv.setLabelCol(REGRESSION_LABEL_COL)

#### Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel

PREDICTION_COL = "attempt_2_regression_linear_regression_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Linear Regression"]
):
    print("Linear Regression model NOT found in disk, training...")
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = LinearRegression()

    linear_regression_param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.regParam, [0.0, 0.001, 0.01, 0.1, 0.5, 1])
        .addGrid(estimator.solver, ["auto", "normal"])
        .addGrid(estimator.fitIntercept, [True, False])
        .addGrid(estimator.elasticNetParam, [0.0, 0.5, 1.0])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=linear_regression_param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Linear Regression model found in disk, loading...")
    model = LinearRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Linear Regression"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Linear Regression"]
):
    print("Evaluating Linear Regression model trained in previous cell...")
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_2/regression/linear_regression.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_2/regression/linear_regression.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Linear Regression"]
):
    print("Saving Linear Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_2/regression/linear_regression"
    )

#### Prediction Tree

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import DecisionTreeRegressionModel

PREDICTION_COL = "attempt_2_regression_decision_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Prediction Tree"]
):
    print("Prediction Tree model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = DecisionTreeRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Decision Tree model found in disk, loading...")
    model = DecisionTreeRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Prediction Tree"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Prediction Tree"]
):
    print(
        "Evaluating Prediction Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_2/regression/prediction_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_2/regression/prediction_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Prediction Tree"]
):
    print("Saving Decision Tree Regressor model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_2/regression/prediction_tree"
    )

#### Gradient Boosted Tree Regression

In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import GBTRegressionModel

PREDICTION_COL = "attempt_2_regression_gradient_boosted_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Gradient Boosted Tree"]
):
    print("Regression Gradient Boosted Tree model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = GBTRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [2, 5, 10])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Gradient Boosted Tree model found in disk, loading...")
    model = GBTRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Gradient Boosted Tree"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Gradient Boosted Tree"]
):
    print(
        "Evaluating Gradient Boosted Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_2/regression/gradient_boosted_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_2/regression/gradient_boosted_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Gradient Boosted Tree"]
):
    print("Saving Gradient Boosted Tree Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_2/regression/gradient_boosted_tree"
    )

#### Random Forest Regression

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import RandomForestRegressionModel

PREDICTION_COL = "attempt_2_regression_random_forest_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Random Forest"]
):
    print("Regression Random Forest model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = RandomForestRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .addGrid(estimator.numTrees, [20, 40])
        .addGrid(
            estimator.featureSubsetStrategy, 
            ["auto", "onethird", "all", "log2"]
        )
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Random Forest model found in disk, loading...")
    model = RandomForestRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Random Forest"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Random Forest"]
):
    print(
        "Evaluating Random Forest model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_2/regression/random_forest.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_2/regression/random_forest.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Regression"]["Random Forest"]
):
    print("Saving Random Forest Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_2/regression/random_forest"
    )

### Classification

In [None]:
classification_evaluator_cv.setLabelCol(CLASSIFICATION_LABEL_COL)

#### SVM Classifier

In [None]:
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import OneVsRestModel
from pyspark.ml.classification import LinearSVC

PREDICTION_COL = "attempt_2_classification_svm_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["SVM"]
):
    print("SVM Classification model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = OneVsRest(
        classifier=LinearSVC(
            featuresCol=FEATURES_COL,
            labelCol=CLASSIFICATION_LABEL_COL,
            predictionCol=PREDICTION_COL,
        ),
        featuresCol=FEATURES_COL,
        labelCol=CLASSIFICATION_LABEL_COL,
        predictionCol=PREDICTION_COL
    )

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("SVM Classification model found in disk, loading...")
    model = OneVsRestModel.load(
        TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["SVM"]
    )


In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["SVM"]
):
    print(
        "Evaluating SVM Classifier model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_2/classification/svm.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_2/classification/svm.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["SVM"]
):
    print("Saving SVM Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_2/classification/svm"
    )

#### Logistic Regression

In [None]:
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import OneVsRestModel
from pyspark.ml.classification import LogisticRegression

PREDICTION_COL = "attempt_2_classification_logistic_regression_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Logistic Regression"]
):
    print("Logistic Regression Classification model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = OneVsRest(
        classifier=LinearSVC(
            featuresCol=FEATURES_COL,
            labelCol=CLASSIFICATION_LABEL_COL,
            predictionCol=PREDICTION_COL,
        ),
        featuresCol=FEATURES_COL,
        labelCol=CLASSIFICATION_LABEL_COL,
        predictionCol=PREDICTION_COL
    )

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Logistic Regression Classification model found in disk, loading...")
    model = OneVsRestModel.load(
        TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Logistic Regression"]
    )


In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Logistic Regression"]
):
    print(
        "Evaluating Logistic Regression Classifier model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_2/classification/logistic_regression.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_2/classification/logistic_regression.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Logistic Regression"]
):
    print("Saving Logistic Regression Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_2/classification/logistic_regression"
    )

#### Decision Tree

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassificationModel

PREDICTION_COL = "attempt_2_classification_decision_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Decision Tree"]
):
    print("Classification Decision Tree model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = DecisionTreeClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification Decision Tree model found in disk, loading...")
    model = DecisionTreeClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Decision Tree"]
    )
    evaluators = get_evaluators(
        model, 
        label_col=CLASSIFICATION_LABEL_COL, 
        prediction_col=PREDICTION_COL, 
        evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Decision Tree"]
):
    print(
        "Evaluating Decision Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_2/classification/decision_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_2/classification/decision_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Decision Tree"]
):
    print("Saving Decision Tree model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_2/classification/decision_tree"
    )

#### Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import RandomForestClassificationModel

PREDICTION_COL = "attempt_2_classification_random_forest_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Random Forest"]
):
    print("Classification Random Forest model NOT found in disk, training...")
    
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = RandomForestClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .addGrid(estimator.numTrees, [20, 40])
        .addGrid(estimator.featureSubsetStrategy, ["auto", "onethird", "all", "log2"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification Random Forest model found in disk, loading...")
    model = RandomForestClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Random Forest"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Random Forest"]
):
    print(
        "Evaluating Random Forest model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_2/classification/random_forest.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_2/classification/random_forest.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 2"]["Classification"]["Random Forest"]
):
    print("Saving Random Forest Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_2/classification/random_forest"
    )

# Attempt 3: clustering

Since the two previous attempts did not yield to good results, what if learning is performed on **learning-produced features**?

The idea is to produce the feature-set using **K-Means clustering**.

After clustering is performed, everything revolves around **distance** between a player and the centroid of the cluster the player has been assigned to.<br>
These are the leveraged **assumptions**:
* Cluster **centroid** represents the **average** player, a player that offers no pecularity, hence no tactical advantage
* The **higher** the distance from the centroid $\to$ the bigger the **pecularity** $\to$ the bigger the **tactical advantage** $\to$ the **better** the end of the season **results**

Since players must then be assembled into teams and since it is important to differentiate teams across years, clustering will be performed **for every season**, rather than considering seasons all together.

Players aggregation into football teams remains as explained in the previous attempts.

Commodity method to perform K-Means clustering, expanded from the one seen in class.

In [None]:
def k_means(
    dataset,
    n_clusters,
    distance_measure="euclidean",
    max_iter=20,
    features_col="features",
    prediction_col="cluster",
    verbose=False,
    season=-1
):

    if verbose:
        print(
            f"""Training K-means clustering using the following parameters: 
            - K (n. of clusters) = {n_clusters}
            - max_iter (max n. of iterations) = {max_iter}
            - distance measure = {distance_measure}
            - season = {season}
            """
        )
    
    kmeans = KMeans(
        featuresCol=features_col,
        predictionCol=prediction_col,
        k=n_clusters,
        initMode="k-means||",
        initSteps=5,
        tol=0.000001,
        maxIter=max_iter,
        distanceMeasure=distance_measure,
    )

    model = kmeans.fit(dataset)
    clusters_df = model.transform(dataset)

    return model, clusters_df

Commodity method to perform K-Means clustering using a **variable number of clusters** (`k`).

Expanded from the code seen in class, it adds support for our specific use case, introduced in attempt 3 presentation.<br>
Specifically, for every football player, the centroid of its cluster is stored as an additional field.<br>
This allows us to easily batch-compute player-cluster centroid distances, whenever needed, enforcing **PySpark**'s strength.

In [None]:
def do_clustering(
    k_range, 
    input_df, 
    max_iter,
    featuresCol,
    clusterCol,
    verbose=False,
    season=-1,
    distance_measure="cosine"
):
    clustering_results = {}

    clusters_df = input_df

    for k in k_range:
        if verbose:
            print(f"Running K-means using K = {k}")

        model, clusters_df = k_means(
            clusters_df, 
            k, 
            max_iter=max_iter, 
            distance_measure=distance_measure, 
            features_col=featuresCol,
            prediction_col=clusterCol + "_k_" + str(k),
            verbose=True,
            season=season
        )  
        
        silhouette_k = evaluate_k_means(
            clusters_df, 
            distance_measure=distance_measure,
            prediction_col=clusterCol + "_k_" + str(k),
            featuresCol=featuresCol

        ) 
        wssd_k = model.summary.trainingCost
        
        if verbose:

            print(f"Silhouette ({distance_measure} distance): {silhouette_k}")
            print(f"WSSD ({distance_measure} distance): {wssd_k}")

        # Getting all the cluster centroids
        l = list(enumerate(model.clusterCenters()))

        # Enumerating all clustering centroids, in order to get the following
        # list of pairs (cluster_id, cluster_id_centroid).
        # Cluster Centroid must be stored in a DenseVector, in order to being
        # able to compute distances, as explained in attempt 3 introduction
        l = [(ind, DenseVector(c)) for ind, c in l]

        # Defining the schema of the new DataFrame, which will contain the cluster
        # centroid as a field of every player, as explained in attempt 3 intro.
        schema = ["cluster_id"  + "_k_" + str(k), "centroid"  + "_k_" + str(k)]

        schema = StructType([ 
            StructField("cluster_id" + "_k_" + str(k),IntegerType(),True), 
            StructField("centroid" + "_k_" + str(k),VectorUDT(),True), 
        ])

        # Creating new DataFrame using aforementioned schema
        centr_df = spark.createDataFrame(data=l, schema=schema)

        # Adding centroids information on the original DataFrame
        clusters_df = clusters_df.join(
            centr_df, on=["cluster_id" + "_k_" + str(k)]
        )

        # Packing evaluation metrics for future retreival
        clustering_results[str(k)] = {
            "silhouette_k": silhouette_k,
            "wssd_k"      : wssd_k,
        }

    # Returning eval metrics and newly created DataFrame
    return clustering_results, clusters_df


Commodity method to **evaluate K-Means** clustering, expanded from code seen in class.

In [None]:
def evaluate_k_means(
    clusters,
    metric_name="silhouette",
    distance_measure="squaredEuclidean",
    prediction_col="cluster",
    featuresCol = "features"
):

    # only silhouette is supported by PySpark
    evaluator = ClusteringEvaluator(
        metricName=metric_name,
        distanceMeasure=distance_measure,
        predictionCol=prediction_col,
        featuresCol=featuresCol
    )

    return evaluator.evaluate(clusters)

Commodity method to **plot** K-means clustering **results**, expanded from code shown in class.

In [None]:
def plot_clustering_results(
    clustering_results, k_range, plot_title
):

    # Creating Pandas Dataframe for plotting reasons
    k_col = [str(x) for x in k_range]
    wssd_col = [
        clustering_results[k]["wssd_k"] for k in k_col 
    ]
    silhouette_col = [
        clustering_results[k]["silhouette_k"] for k in k_col 
    ]

    plot_df_temp = pd.DataFrame([k_col, wssd_col, silhouette_col]).transpose()
    plot_df_temp.columns = ["K", "WSSD", "Silhouette"]

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))

    _ = sns.pointplot(
        data=plot_df_temp, x="K", y="WSSD", ax=ax, color="navy"
    )
    _ = ax.set_xlabel("Number of clusters")
    _ = ax.set_ylabel("WSSD (lower is better)")
    _ = ax.set_title(plot_title)


Assembling features into a vector, as required by PySpark.

Clustering will be performed on all 24 player features, as in naive approach.

In [None]:
import copy

CLUSTERING_FEATURES = copy.deepcopy(PLAYER_FEATURES)
CLUSTERING_FEATURES.remove("value")
CLUSTERING_FEATURES.remove("overall")

assembler = VectorAssembler(
    inputCols=CLUSTERING_FEATURES,
    outputCol="clustering_features_vec"
)

pre_processed_df = assembler.transform(pre_processed_df)

Min-max **scaling** is very important in Clustering, since **K-Means** heavily **relies** on computing and evaluating the **distance** between datapoints.<br>

Computing distances between datapoints whose dimenions have dishomogeneous scales may yield to inaccurate results.

In [None]:
scaler = MinMaxScaler(
    inputCol="clustering_features_vec",
    outputCol="clustering_features_vec_min_max"    
)

pre_processed_df = scaler.fit(pre_processed_df).transform(pre_processed_df)

As per attempt 3 introduction, clustering is performed **across** all the different season in a separate way, **not** on all the seasons together.

In [None]:
df_dict = {
    s: pre_processed_df.filter(col("season") == s) for s in seasons
}

As per notebook philosopy, in order to speed future execution up, trained models and their evaluations are persisted in disk.

For this reason, if no trained model is found on disk, then we proceed to training; otherwise, data is loaded from disk.

In [None]:
if not os.path.isdir(CLUSTERING_DF_PATH):
    print("clustering NOT in disk, training...")
    clustering_results_dict = dict()
    clustering_results_df = dict()

    for s in seasons:
        print(f"Season: {s}")
        clustering_results_dict[s], clustering_results_df[s] = do_clustering(
            k_range=k_range, 
            input_df=df_dict[s], 
            max_iter=MAX_ITER,
            featuresCol="clustering_features_vec_min_max",
            clusterCol="cluster_id",
            verbose=False,
            season=s
        )
else:
    print(
        "clustering already done, data will be loaded from disk in the next cells"
    )


In [None]:
if not os.path.isdir(CLUSTERING_DF_PATH):

    pre_processed_df = reduce(
        DataFrame.unionAll, 
        [
            clustering_results_df[s] for s in seasons
        ]
    )

    print("saving clustering data on disk...")
    pre_processed_df.write.parquet(CLUSTERING_DF_PATH)

else:
    print("clustering data found on disk, will be loaded in next cells...")

## Clustering evaluation

In [None]:
if os.path.isfile(CLUSTERING_EVAL_PATH):

    print("clustering evaluation found on disk, loading...")
    
    with open(CLUSTERING_EVAL_PATH) as json_file:
        clustering_results_dict = json.load(json_file)

else:

    print("clustering evaluation NOT in disk, exporting...")

    import json
    with open(CLUSTERING_EVAL_PATH, 'w') as fp:
        json.dump(clustering_results_dict, fp)

### Electing the best number of clusters

First the **Elbow method** will be used to find the **best number of clusters**: its observations will be performed considering the seasons separatedly and all together, in order to have a **wider understanding** of what is happening.

#### Single seasons

Elbow method plots for every season

In [None]:
for s in seasons:
    plot_clustering_results(
        clustering_results_dict[s], 
        k_range=K_RANGE, 
        plot_title=f"Elbow method evaluation for season {s}"
    )

#### Seasons all together

Elbow method plots for seasons all together.

Basically, considering seasons all together amounts to computing the average **WSSD** across the seasons, as visible in the code.

In [None]:
clustering_results_dict["avg"] = dict.fromkeys(K_RANGE)

for k in clustering_results_dict["avg"].keys():
    clustering_results_dict["avg"][k] = {
        "wssd_k": 0,
        "silhouette_k": 0
    }

sum_wssd = dict.fromkeys(K_RANGE, 0)
sum_silhouette = dict.fromkeys(K_RANGE, 0)

for s in seasons:
    for k in K_RANGE:
        sum_wssd[k] += clustering_results_dict[s][k]["wssd_k"]
        sum_silhouette[k] += clustering_results_dict[s][k]["silhouette_k"]
    
avg_wssd = dict()
avg_silhouette = dict()

for k in K_RANGE:
    avg_wssd[k] = sum_wssd[k] / len(k_range)
    avg_silhouette[k] = sum_silhouette[k] / len(k_range)

    clustering_results_dict["avg"][k]["wssd_k"] = avg_wssd[k]
    clustering_results_dict["avg"][k]["silhouette_k"] = avg_silhouette[k]

In [None]:
plot_clustering_results(
    clustering_results_dict["avg"], 
    k_range=K_RANGE, 
    plot_title="Elbow method evaluation, average of all seasons",
)

**Elbow** method shows that $k = 6$ is the **best number of clusters**, considering seasons separatedly and all together both

## Evaluation

Evaluation uses **Silhouette** coefficient, since it is the only evaluation metric supported by PySpark.

Once again, two cases will be considered: single seasons and seasons all together.

For simplicity, only the case with $k = 6$ number of clusters will be considered

### Single seasons

In [None]:
for s in seasons:
    print(
        f"Silhouette coefficient for season {s}: "
        f"{clustering_results_dict[s]['6']['silhouette_k']}"
    )

### Seasons all together

In [None]:
print(
    f"Silhouette coefficient (avg of all seasons): "
    f"{clustering_results_dict['avg']['6']['silhouette_k']}"
)

Silhouette coefficient aligns with discoveries of previous learning attempts.

In fact, the value we get, $0.30$, is close to $0$, which indicates that **obtained clusters are not significant**.

After composing the data, some visualizations will be performed, to give a graphical explanation of this result as well.

## Dataset composition

As hinted, for visualization (and learning) purposes, clustering results must be integrated into the main DataFrame.

### Compute distance between player and centroid of its cluster

Computing distance between player and the centroid of the cluster it has been assigned to (as explained in attempt 3 intro).

In [None]:
compute_distance_from_centroid_UDF = udf(
    lambda player, centroid: float(
        Vectors.squared_distance(
            player, centroid
        )
    ), FloatType()
)

if os.path.isdir(CLUSTERING_DF_PATH):
    print("clustering data found on disk, loading...")
    pre_processed_df = spark.read.parquet(CLUSTERING_DF_PATH)

for k in K_RANGE:
    pre_processed_df = pre_processed_df.withColumn(
        "distance_from_centroid" + "_k_" + k,
        compute_distance_from_centroid_UDF(
            col("clustering_features_vec_min_max"),
            col("centroid" + "_k_" + k)
        )
    )

### From players to teams

Modelling football teams as aggregation of its football players, as explained in project intro.

In [None]:
teams_df = pre_processed_df.groupBy(
    ["season", "club_name", "macro_role"]
).agg(
    {
        "distance_from_centroid" + "_k_" + str(k): "avg" for k in K_RANGE 
    }
)

# K_RANGE stores the different number of clusters, tested with the Elbow method
for k in K_RANGE:
    teams_df = teams_df.withColumnRenamed(
        "avg(distance_from_centroid" + "_k_" + str(k) + ")",
        "avg_distance_from_centroid" + "_k_" + str(k)
    )

In [None]:
# factory method to get the specific subquery, needed by every number of clusters
def generate_subquery(macro_role, k):
    return f"""(
        case
            when macro_role='{macro_role}' then avg_distance_from_centroid_k_{k} 
        else NULL
        end
    ) as avg_dist_macro_role_{int(macro_role)}_k_{k}
    """

For every player of every club, in every season and in every macro role (goalkeeper, defender, midfielder and attacker), get its distance from the centroid of the cluster the player has been assigned to.

At the end of this query, these are the fileds of a record (with their explanation):
* `club_name`: name of football club.
* `season`: the season the record refers to.
* ...
* ... 
* `avg_dist_macro_role_1_k_2`: average distance between <span style="color:blue">defenders</span> (that played for `club_name` in season `season`) and centroid of the cluster they have been assigned to, when using $k=2$ number of clusters.
* ...
* `avg_dist_macro_role_1_k_62`: average distance between <span style="color:green">defenders</span> (that played for `club_name` in season `season`) and centroid of the cluster they have been assigned to, when using $k=62$ number of clusters.
* ...
* ...
* `avg_dist_macro_role_3_k_2`: average distance between <span style="color:Purple">attackers</span> (that played for `club_name` in season `season` ) and centroid of the cluster they have been assigned to, when using $k=2$ number of clusters.
* ...
* `avg_dist_macro_role_3_k_62`: average distance between <span style="color:Fuchsia">attackers</span> (that played for `club_name` in season `season` ) and centroid of the cluster they have been assigned to, when using $k=62$ number of clusters. 

For example, starting from this data:

| club_name | season | short_name | macro_role | dist_from_cluster_centroid_k_2 | ... | dist_from_cluster_centroid_k_62    |
| --------- | ------ | ---------- | ---------- | -------------------- | --- | ------------------------ |
| Napoli    | 2020   | Malcuit    | 1          | <span style="color:blue">(0.4, 0.5, 0.2)</span>      | ... |  <span style="color:green">(0.7, 0.6, 0.5)         |
| Napoli    | 2020   | Mário Rui  | 1          | <span style="color:blue">(0.6, 0.1, 0.1)</span>      | ... |  <span style="color:green">(0.04, 0.13, 0.02)      |
| Napoli    | 2020   | Rrahmani   | 1          | <span style="color:blue">(0.6, 0.1, 0.1)</span>      | ... |  <span style="color:green">(0.04, 0.13, 0.02)      |
| Napoli    | 2020   | Koulibaly  | 1          | <span style="color:blue">(0.9, 0.9, 0.3)</span>      | ... |  <span style="color:green">(0.91, 0.49, 0.38)      |
| Napoli    | 2020   | Petagna    | 3          | <span style="color:Purple">(0.1, 0.1, 0.1)</span>      | ... |  <span style="color:Fuchsia">(0.16, 0.51, 0.19)      |yellow
| Napoli    | 2020   | Politano   | 3          | <span style="color:Purple">(0.1, 0.1, 0.1)</span>      | ... |  <span style="color:Fuchsia">(0.16, 0.51, 0.19)      |
| Napoli    | 2020   | Lozano     | 3          | <span style="color:Purple">(0.2, 0.3, 0.1)</span>      | ... |  <span style="color:Fuchsia">(0.24, 0.63, 0.51)      |

The result will be:

| club_name | season | avg_dist_macro_role_1_k_2 | ... | avg_dist_macro_role_1_k_62 | ... | avg_dist_macro_role_3_k_2 | ... | avg_dist_macro_role_3_k_62 |
| --------- | ------ | ------------------------- | --- | -------------------------- | --- | ------------------------- | --- | -------------------- |
| Napoli | 2020 | <span style="color:blue">(0.62, 0.4, 0.18)</span> | ... | <span style="color:green">(0.42, 0.34, 0.23)</span> | ... | <span style="color:purple">(0.13, 0.16, 0.1)</span> | ... | <span style="color:fuchsia">(0.18, 0.55, 0.3)</span> |

The average is computed by taking into consideration the **multidimensionality** of the data, as if we were operating with Pandas, NumPy or PyTorch:

<span style="color:green">$(0.42, 0.34, 0.23) = (avg(0.7, 0.04, 0.04, 0.91), avg(0.6, 0.13, 0.13, 0.49), avg(0.5, 0.02, 0.02, 0.38))$</span>

The following cells produce the just explained data.

In [None]:
teams_df.createOrReplaceTempView("t")

temp = dict()

for k in K_RANGE:
    temp[k] = (
        spark.sql(
            f"""
                select season, club_name, {generate_subquery(0.0, k)}, {generate_subquery(1.0, k)}, {generate_subquery(2.0, k)}, {generate_subquery(3.0, k)}, {generate_subquery(4.0, k)}, {generate_subquery(5.0, k)}, {generate_subquery(6.0, k)}, {generate_subquery(7.0, k)}
                from t
            """
        )
        .groupBy("season", "club_name")
        .agg(
            avg(f"avg_dist_macro_role_0_k_{k}").alias(f"avg_dist_macro_role_0_k_{k}"),
            avg(f"avg_dist_macro_role_1_k_{k}").alias(f"avg_dist_macro_role_1_k_{k}"),
            avg(f"avg_dist_macro_role_2_k_{k}").alias(f"avg_dist_macro_role_2_k_{k}"),
            avg(f"avg_dist_macro_role_3_k_{k}").alias(f"avg_dist_macro_role_3_k_{k}"),
            avg(f"avg_dist_macro_role_4_k_{k}").alias(f"avg_dist_macro_role_4_k_{k}"),
            avg(f"avg_dist_macro_role_5_k_{k}").alias(f"avg_dist_macro_role_5_k_{k}"),
            avg(f"avg_dist_macro_role_6_k_{k}").alias(f"avg_dist_macro_role_6_k_{k}"),
            avg(f"avg_dist_macro_role_7_k_{k}").alias(f"avg_dist_macro_role_7_k_{k}"),
        )
    )

In [None]:
teams_df = temp[K_RANGE[0]]

for i in range(1, len(K_RANGE)):
    teams_df = teams_df.join(
        temp[K_RANGE[i]],
        on=["season", "club_name"]
    )

In [None]:
avg_distances_dict = dict()
avg_distances_vec_dict = dict()

for k in K_RANGE:
    avg_distances_dict[k] = [
        f"avg_dist_macro_role_{i}_k_{k}" for i in range(0, NUM_MACRO_ROLES)
    ]
    avg_distances_vec_dict[k] = f"avg_dist_vec_k_{k}"

**Filling `NaN`**/`null` values with the minimum value in the entire DataFrame.

As a consequence of what has been explained in the clustering introduction, the minimum value basically represents "**averageness**".
So, if a team does not have data for a `macro_role`, we give to it the minimum possible value, i.e. the less peculariar possible.

In [None]:
global_min = teams_df.toPandas()[
    [ x for x in avg_distances_dict.values() for x in x ]
].to_numpy().reshape(-1).min()

teams_df = teams_df.fillna(global_min * 1.5)    

In [None]:
for k in K_RANGE:
    assembler = VectorAssembler(
        inputCols=avg_distances_dict[k], outputCol=avg_distances_vec_dict[k]
    )

    teams_df = assembler.transform(teams_df)

## Visualizations

In the next cells, visualization of the clustering result will be provided.

Since 24-dimensional datapoints have been used, points will be fed to PCA and its result will be plotted.

In [None]:
pre_processed_df, pca_model = perform_pca(
    df=pre_processed_df,
    num_components=2,
    input_col="clustering_features_vec_min_max",
    output_col="clustering_features_vec_min_max_pcs"
)

In [None]:
pdf = feature_vec_to_cols(
    pre_processed_df.toPandas(),
    "clustering_features_vec_min_max_pcs",
    ["clustering_features_vec_min_max_pc_0", "clustering_features_vec_min_max_pc_1"]
)

In [None]:
for s in seasons:
    scatter_plot(
        x = pdf[pdf["season"] == s]["clustering_features_vec_min_max_pc_0"],
        y = pdf[pdf["season"] == s]["clustering_features_vec_min_max_pc_1"],
        x_label="Clustering Features Principal Component 0",
        y_label="Clustering Features Principal Component 1",
        title=f"Clustering result for season {s}",
        c=pdf[pdf["season"] == s]["cluster_id_k_6"],
        c_map=plt.cm.get_cmap("tab10"),
        figsize=(12,8),
    )

Some visualization comments:

* x and y axis are the two principal components, computed in order to being able to actually plot data.
* **Color** of the point **encodes** the **cluster** assigned to the datapoint.
* Huge **difference** in data space between **legacy** and **modern** seasons.
* In **modern** seasons, **galkeepers** are very well **separated** from the rest of the players, whilst in **legacy** seasons they tend to be **closer** to other players.
* In all seasons, **offensive** players tend to be **more mixed up**, whilst **central** and **defensive** players tend to be better **separated**.

Colors are not preserved across different seasons, due to how utility libraries work, but, during the experimentation stages, we made plots interactive, in order to see details about the single datapoints and be able to actually state the previous sentence.

## Learning from clustering

In [None]:
df = df.join(
    teams_df,
    on=["club_name", "season"]
)

In [None]:
train_df, test_df = df.randomSplit([0.9, 0.1], seed=random_seed)

FEATURES_COL = list(avg_distances_vec_dict.values())

### Regression

In [None]:
regression_evaluator_cv.setLabelCol(REGRESSION_LABEL_COL)

#### Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel

PREDICTION_COL = "attempt_3_regression_linear_regression_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Linear Regression"]
):
    print("Linear Regression model NOT found in disk, training...")
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = LinearRegression()

    linear_regression_param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.regParam, [0.0, 0.001, 0.01, 0.1, 0.5, 1])
        .addGrid(estimator.solver, ["auto", "normal"])
        .addGrid(estimator.fitIntercept, [True, False])
        .addGrid(estimator.elasticNetParam, [0.0, 0.5, 1.0])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=linear_regression_param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Linear Regression model found in disk, loading...")
    model = LinearRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Linear Regression"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Linear Regression"]
):
    print("Evaluating Linear Regression model trained in previous cell...")
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_3/regression/linear_regression.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_3/regression/linear_regression.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Linear Regression"]
):
    print("Saving Linear Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_3/regression/linear_regression"
    )

#### Linear regression with multiple `k`

In [None]:
model_evals_dict = dict()
cross_validated_models_dict = dict()

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel

for k in K_RANGE:

    PREDICTION_COL = f"attempt_3_regression_linear_regression_multiple_k_{k}_predictions"

    if not model_exists(
        TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Linear Regression multiple k"][k]
    ):
        print(f"Linear Regression multiple k with k={k} model NOT found in disk, training...")
        regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
        
        estimator = LinearRegression()

        param_grid = (
            ParamGridBuilder()
            .addGrid(estimator.featuresCol, [f"avg_dist_vec_k_{k}"])
            .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
            .addGrid(estimator.predictionCol, [PREDICTION_COL])
            .addGrid(estimator.regParam, [0.0, 0.001, 0.01, 0.1, 0.5, 1])
            .addGrid(estimator.solver, ["auto", "normal"])
            .addGrid(estimator.fitIntercept, [True, False])
            .addGrid(estimator.elasticNetParam, [0.0, 0.5, 1.0])
            .build()
        )

        cross_validated_models_dict[k] = learn_best_model(
            estimator=estimator,
            param_grid=param_grid,
            evaluator_cv=regression_evaluator_cv
        )
    else:
        print(f"Linear Regression multiple k with k={k} model found in disk, loading...")
        model = LinearRegressionModel.load(
            TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Linear Regression multiple k"][k]
        )

In [None]:
for k in K_RANGE:
    if not model_exists(
        TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Linear Regression multiple k"][k]
    ):
        print(
            f"Evaluating Linear Regression multiple k with k={k} model trained in previous cell...")
        
        PREDICTION_COL = f"attempt_3_regression_linear_regression_multiple_k_{k}_predictions"
        evaluate_learning_models(
            best_model=cross_validated_models_dict[k].bestModel,
            evaluators=get_evaluators(
                cross_validated_models_dict[k].bestModel, 
                label_col=REGRESSION_LABEL_COL, 
                prediction_col=PREDICTION_COL, 
                evaluation_metrics=REGRESSION_EVALUATION_METRICS
            ),
            save_training_result_path=f"./evaluation_results/attempt_3/regression/linear_regression_multiple_k/linear_regression_multiple_k_{k}.json"
        )
    else:
        print("Printing evaluation loaded from disk...")
        model_evals_dict[k] = print_model_evaluation(
            model_evaluation_path=f"./evaluation_results/attempt_3/regression/linear_regression_multiple_k/linear_regression_multiple_k_{k}.json"
        )

In [None]:
evals_dict_pretty = {
    k : v["test_set_evaluation"]["r2"] for (k, v) in zip(model_evals_dict.keys(), model_evals_dict.values())
}

In [None]:
for k in K_RANGE:    
    if not model_exists(
        TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Linear Regression multiple k"][k]
    ):
        print(
            f"Saving Linear Regression multiple k with k={k} model on disk..."
        )
        cross_validated_models_dict[k].bestModel.save(
            f"./trained_models/attempt_3/regression/linear_regression_multiple_k/{k}_clusters"
        )

#### Prediction Tree

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import DecisionTreeRegressionModel

PREDICTION_COL = "attempt_3_regression_decision_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Prediction Tree"]
):
    print("Prediction Tree model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = DecisionTreeRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Decision Tree model found in disk, loading...")
    model = DecisionTreeRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Prediction Tree"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Prediction Tree"]
):
    print(
        "Evaluating Prediction Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_3/regression/prediction_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_3/regression/prediction_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Prediction Tree"]
):
    print("Saving Decision Tree Regressor model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_3/regression/prediction_tree"
    )

#### Gradient Boosted Tree Regression

In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import GBTRegressionModel

PREDICTION_COL = "attempt_3_regression_gradient_boosted_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Gradient Boosted Tree"]
):
    print("Regression Gradient Boosted Tree model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = GBTRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [2, 5, 10])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Gradient Boosted Tree model found in disk, loading...")
    model = GBTRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Gradient Boosted Tree"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Gradient Boosted Tree"]
):
    print(
        "Evaluating Gradient Boosted Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_3/regression/gradient_boosted_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_3/regression/gradient_boosted_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Gradient Boosted Tree"]
):
    print("Saving Gradient Boosted Tree Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_3/regression/gradient_boosted_tree"
    )

#### Random Forest Regression

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import RandomForestRegressionModel

PREDICTION_COL = "attempt_3_regression_random_forest_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Random Forest"]
):
    print("Regression Random Forest model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = RandomForestRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .addGrid(estimator.numTrees, [20, 40])
        .addGrid(
            estimator.featureSubsetStrategy, 
            ["auto", "onethird", "all", "log2"]
        )
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Random Forest model found in disk, loading...")
    model = RandomForestRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Random Forest"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Random Forest"]
):
    print(
        "Evaluating Random Forest model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_3/regression/random_forest.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_3/regression/random_forest.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Regression"]["Random Forest"]
):
    print("Saving Random Forest Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_3/regression/random_forest"
    )

### Classification

In [None]:
classification_evaluator_cv.setLabelCol(CLASSIFICATION_LABEL_COL)

In [None]:
train_df, test_df = df.randomSplit([0.9, 0.1], seed=random_seed)

#### SVM Classifier

In [None]:
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import OneVsRestModel
from pyspark.ml.classification import LinearSVC

PREDICTION_COL = "attempt_3_classification_svm_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["SVM"]
):
    print("SVM Classification model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = OneVsRest(
        classifier=LinearSVC(
            featuresCol=FEATURES_COL[0],
            labelCol=CLASSIFICATION_LABEL_COL,
            predictionCol=PREDICTION_COL,
        ),
        featuresCol=FEATURES_COL[0],
        labelCol=CLASSIFICATION_LABEL_COL,
        predictionCol=PREDICTION_COL
    )

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("SVM Classification model found in disk, loading...")
    model = OneVsRestModel.load(
        TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["SVM"]
    )


In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["SVM"]
):
    print(
        "Evaluating SVM Classifier model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_3/classification/svm.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_3/classification/svm.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["SVM"]
):
    print("Saving SVM Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_3/classification/svm"
    )

#### Logistic Regression

In [None]:
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import OneVsRestModel
from pyspark.ml.classification import LogisticRegression

PREDICTION_COL = "attempt_3_classification_logistic_regression_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Logistic Regression"]
):
    print("Logistic Regression Classification model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = OneVsRest(
        classifier=LogisticRegression(
            featuresCol=FEATURES_COL,
            labelCol=CLASSIFICATION_LABEL_COL,
            predictionCol=PREDICTION_COL,
        ),
        featuresCol=FEATURES_COL,
        labelCol=CLASSIFICATION_LABEL_COL,
        predictionCol=PREDICTION_COL
    )

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Logistic Regression Classification model found in disk, loading...")
    model = OneVsRestModel.load(
        TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Logistic Regression"]
    )


In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Logistic Regression"]
):
    print(
        "Evaluating Logistic Regression Classifier model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_3/classification/logistic_regression.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_3/classification/logistic_regression.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Logistic Regression"]
):
    print("Saving Logistic Regression Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_3/classification/logistic_regression"
    )

#### Decision Tree

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassificationModel

PREDICTION_COL = "attempt_3_classification_decision_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Decision Tree"]
):
    print("Classification Decision Tree model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = DecisionTreeClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification Decision Tree model found in disk, loading...")
    model = DecisionTreeClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Decision Tree"]
    )
    evaluators = get_evaluators(
        model, 
        label_col=CLASSIFICATION_LABEL_COL, 
        prediction_col=PREDICTION_COL, 
        evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Decision Tree"]
):
    print(
        "Evaluating Decision Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_3/classification/decision_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_3/classification/decision_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Decision Tree"]
):
    print("Saving Decision Tree model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_3/classification/decision_tree"
    )

#### Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import RandomForestClassificationModel

PREDICTION_COL = "attempt_3_classification_random_forest_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Random Forest"]
):
    print("Classification Random Forest model NOT found in disk, training...")
    
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = RandomForestClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .addGrid(estimator.numTrees, [20, 40])
        .addGrid(estimator.featureSubsetStrategy, ["auto", "onethird", "all", "log2"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification Random Forest model found in disk, loading...")
    model = RandomForestClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Random Forest"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Random Forest"]
):
    print(
        "Evaluating Random Forest model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_3/classification/random_forest.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_3/classification/random_forest.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["Random Forest"]
):
    print("Saving Random Forest Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_3/classification/random_forest"
    )

#### MLP

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import MultilayerPerceptronClassificationModel

PREDICTION_COL = "attempt_3_classification_mlp_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["MLP"]
):
    print("Classification MLP model NOT found in disk, training...")
    
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = MultilayerPerceptronClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.layers, [[NUM_MACRO_ROLES, NUM_MACRO_PLACES]])
        .addGrid(estimator.solver, ["gd"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification MLP model found in disk, loading...")
    model = MultilayerPerceptronClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["MLP"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["MLP"]
):
    print(
        "Evaluating MLP model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_3/classification/mlp.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_3/classification/mlp.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 3"]["Classification"]["MLP"]
):
    print("Saving MLP Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_3/classification/mlp"
    )

## Clustering-Learning cross evaluation

Since clustering represents an additional layer of learning, it's interesting to cross-link clustering and learning evaluations, in order to try to discover whether there is some kind of relation between the two.

Specifically, since clustering is heavily influenced by the `k` number of clusters, we are interested in observing how learning performances behave, upon changing `k` values. 

Getting data ready for plotting:
* x-axis: numbers of clusters
* y-axis on the left: WSSD
* y-axis on the right: $r^2$ score, computed on test data 

In [None]:
x = list(model_evals_dict.keys())
wssd_k = [
    x["wssd_k"] for x in clustering_results_dict["avg"].values()
]
r2 = [
    x["test_set_evaluation"]["r2"] for x in model_evals_dict.values()
]

plot_df_temp = pd.DataFrame([x, wssd_k, r2]).transpose()
plot_df_temp.columns = ["x", "wssd_k", "r2"]

ax = plot_df_temp.plot(
    x="x", 
    y="wssd_k", 
    color="#208AAE"
)
ax.set_xlabel("Number of clusters")
ax.set_ylabel("WSSD (lower is better)")
ax.set_ylim(15, 65)
ax.legend(bbox_to_anchor=(1,0.9))

ax2 = ax.twinx()

plot_df_temp.plot(
    x="x", 
    y="r2", 
    ax=ax2, 
    color="#FF88DC",
)
ax2.set_ylabel("r2 (higher is better)")
ax2.set_ylim(0.15, 0.65)
ax2.legend(bbox_to_anchor=(1,1))

# ax.figure.legend()
plt.title("Elbow method vs. r2")
plt.show()

$r^2$ test score is always the same, floating around $0.5$.

A thourough investigation of the actual values $r^2$ revealed that they are pretty much equal up to the third decimal digit, meaning that the fluctuations are negligible, hence the plot appears as a straight line.

Originally, we hypothesized that there could've been some interesting behavious, but data proved us wrong: **$r^2$ test score** is **not affected** by `k` value (the **number of clusters** used in clustering)

# Attempt 4: prior-based approach

The previous three attempts opened us to the hypothesis that data captured at the start of the season may not be sufficient to predict the end-of-the-season ranking of football teams.

In fact, during the entire season, there are multiple **difficult to measure factors** that influence results, like, for example:

* **Fans support**.
* **Coach** competence.
* Technical staff competence.
* Training style.
* **Training pitch** quality.
* **Fixture schedule**.
* International breaks.
* Players' **mood**.
* Societary involvement.
* "Winning **mentality**".


What if all of these non-trivially measurable components are actually captured somewhere in the data?<br>
A valid **statistical prior** could be the average seasonal ranking of the football team, since it would include all aforementioned influence factors.

This prior has been named **Reward-Penalty** (RP) coefficient and it has been computed as follows:<br>

* Given a football team $ft$ and a season $s$: $rp(ft, s) = 21 - avg\_rank(ft, s)$.
* $avg\_rank(ft, s)$ is a function that computes the average ranking for the given football team $ft$ in seasons up to $s$ (excluded).

For example, given the following data:

| team name | season | ranking |
| --------- | ------ | ------- |
| Napoli    | 2021   | 3       |
| Napoli    | 2020   | 5       |
| Napoli    | 2019   | <span style="color:RoyalBlue">7</span>       |
| Napoli    | 2018   | <span style="color:Purple">2</span>       |
| Napoli    | 2017   | <span style="color:green">2</span>       |

$rp(Napoli,\ 2020) = 21 - int(avg($<span style="color:RoyalBlue">7</span>$,$<span style="color:Purple">2</span>$,$<span style="color:green">2</span>$)) = 21 - int(3.66) = 21 - 3 = 18$

The goal of the `rp_coefficient` is to counter variance, by means of rewarding better performing teams and penalizing lower-performers.

`rp_coefficient` may be a source of bias in the models, because human bias is injected into the data.<br>
For this reason, `rp_coefficient` is used together with `rp_tradeoff`, a **tradeoff** parameter controlling how much importance is given to `rp_coefficient`.

## Computing `rp_coefficient`

In [None]:
rp_df = df

MAX_PLACE = 21

`t_rp_coeff` is a temporary table created from `rp_df`, containing only `season`, `club_name`, `place`.<br>

It will come in handy in the subsequent advanced SQL manipulations, needed to compute the average ranking for every football team, for every year previous to the year taken into consideration (see example in the introduction paragraph for further details about rp_coefficient computation)

In [None]:
rp_df.select(
    "season", "club_name", "place"
).createOrReplaceTempView("t_rp_coeff")

The next SQL query joins `t_rp_coeff` against itself:

* For every football team, the join produces all possible year pairs, $(y_1,\ y_2)$.
* The join condition checks whether $(y_1\ <\ y_2)$ is true or not, in order to compute the average placement only considering the previous years (as explained in attempt 4 introduction).
* The average place is computed using SQL aggregation functions.

In [None]:
rp_df = spark.sql(
    f"""
    select t_rp_coeff.season, t_rp_coeff.club_name,
        avg(
            (
                select sub.place
                where sub.season < t_rp_coeff.season and sub.club_name == t_rp_coeff.club_name
            )
        ) as rp_coeff
    from t_rp_coeff, t_rp_coeff as sub
    group by t_rp_coeff.season, t_rp_coeff.club_name
    order by t_rp_coeff.season desc
    """
).fillna(MAX_PLACE)

In [None]:
rp_df = rp_df.join(
    df, on=["club_name", "season"]
)

Once `rp_coeff` is computed, it is then applied, as per formula presented in attempt 4 introduction.

In [None]:
rp_df = rp_df.withColumn("rp_coeff", MAX_PLACE - col("rp_coeff"))

add_normalize_by_rp_UDF = udf(
    lambda points, rp, tradeoff: points + tradeoff * rp, DoubleType()
)

RP_NORMALIZED_COLS = [
    f"avg(overall)_rp_normalized_tradeoff_{tradeoff}".replace(
        ".", "-"
    ) for tradeoff in ADD_RP_TRADEOFF
]

for tradeoff, col_name in zip(ADD_RP_TRADEOFF, RP_NORMALIZED_COLS):
    rp_df = rp_df.withColumn(
        col_name, add_normalize_by_rp_UDF(
            col("avg(overall)"), col("rp_coeff"), lit(tradeoff)
        )
    )

    assembler = VectorAssembler(
        inputCols=[col_name], 
        outputCol=col_name + "_vec"
    )

    rp_df = assembler.transform(rp_df)

## Visualizations

### Feature-target correlation

In [None]:
pdf = rp_df.toPandas()

for rp_norm_col in RP_NORMALIZED_COLS:
    plot_feature_target_relation(
        pdf, ["avg(overall)"], rp_norm_col, color="Violet", n_cols=1, figsize=(8,8)
    )

The result is not quite as expected:

* `rp_tradeoff` = $0.5$ seems to reduce variance a little bit.
* `rp_tradeoff` $\ge\ 1$ seems to actually mimick the behaviour of all previous attempts.

Nevertheless, in the hyperparameter grid, we'll set `featureCol` to all columns storing the different `rp_coefficient`-scaled features, so as Cross Validation will tell us what is the best `rp_coefficient`.

## Learning for attempt 4

In [None]:
train_df, test_df = rp_df.randomSplit([0.9, 0.1], seed=random_seed)

FEATURES_COL = [
    rp_normalized_col + "_vec" for rp_normalized_col in RP_NORMALIZED_COLS
]

### Regression

In [None]:
regression_evaluator_cv.setLabelCol(REGRESSION_LABEL_COL)

#### Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel

PREDICTION_COL = "attempt_4_regression_linear_regression_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Linear Regression"]
):
    print("Linear Regression model NOT found in disk, training...")
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = LinearRegression()

    linear_regression_param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.regParam, [0.0, 0.001, 0.01, 0.1, 0.5, 1])
        .addGrid(estimator.solver, ["auto", "normal"])
        .addGrid(estimator.fitIntercept, [True, False])
        .addGrid(estimator.elasticNetParam, [0.0, 0.5, 1.0])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=linear_regression_param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Linear Regression model found in disk, loading...")
    model = LinearRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Linear Regression"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Linear Regression"]
):
    print("Evaluating Linear Regression model trained in previous cell...")
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_4/regression/linear_regression.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_4/regression/linear_regression.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Linear Regression"]
):
    print("Saving Linear Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_4/regression/linear_regression"
    )

#### Linear Regression with multiple `rp_tradeoff`

In [None]:
cross_validated_models_dict = dict()

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel

for tradeoff, col_name in zip(ADD_RP_TRADEOFF, FEATURES_COL):
    PREDICTION_COL = "attempt_4_regression_linear_regression_multiple_tradeoffs_{tradeoff}_predictions"

    if not model_exists(
        TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Linear Regression multiple tradeoffs"][tradeoff]
    ):
        print(
            f"Linear Regression multiple tradeoffs with tradeoff={tradeoff} model NOT found in disk, training..."
        )
        regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
        
        estimator = LinearRegression()

        linear_regression_param_grid = (
            ParamGridBuilder()
            .addGrid(estimator.featuresCol, [col_name])
            .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
            .addGrid(estimator.predictionCol, [PREDICTION_COL])
            .addGrid(estimator.regParam, [0.0, 0.001, 0.01, 0.1, 0.5, 1])
            .addGrid(estimator.solver, ["auto", "normal"])
            .addGrid(estimator.fitIntercept, [True, False])
            .addGrid(estimator.elasticNetParam, [0.0, 0.5, 1.0])
            .build()
        )

        cross_validated_models_dict[tradeoff] = learn_best_model(
            estimator=estimator,
            param_grid=linear_regression_param_grid,
            evaluator_cv=regression_evaluator_cv
        )
    else:
        print(f"Linear Regression multiple tradeoffs with tradeoff={tradeoff} model found in disk, loading...")
        model = LinearRegressionModel.load(
            TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Linear Regression multiple tradeoffs"][tradeoff]
        )

In [None]:
model_evals_dict = dict()

In [None]:
for tradeoff, col_name in zip(ADD_RP_TRADEOFF, FEATURES_COL):
    if not model_exists(
        TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Linear Regression multiple tradeoffs"][tradeoff]
    ):
        print(
            f"Evaluating Linear Regression multiple tradeoffs with tradeoff={tradeoff} model trained in previous cell...")
        evaluate_learning_models(
            best_model=cross_validated_models_dict[tradeoff].bestModel,
            evaluators=get_evaluators(
                cross_validated_models_dict[tradeoff].bestModel, 
                label_col=REGRESSION_LABEL_COL, 
                prediction_col=PREDICTION_COL, 
                evaluation_metrics=REGRESSION_EVALUATION_METRICS
            ),
            save_training_result_path=f"./evaluation_results/attempt_4/regression/linear_regression_multiple_tradeoffs/tradeoff_{tradeoff}.json"
        )
    else:
        print("Printing evaluation loaded from disk...")
        model_evals_dict[tradeoff] = print_model_evaluation(
            model_evaluation_path=f"./evaluation_results/attempt_4/regression/linear_regression_multiple_tradeoffs/tradeoff_{tradeoff}.json"
        )

In [None]:
for tradeoff, col_name in zip(ADD_RP_TRADEOFF, FEATURES_COL):
    if not model_exists(
        TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Linear Regression multiple tradeoffs"][tradeoff]
    ):
        print(
            f"Saving Linear Regression multiple tradeoffs with tradeoff={tradeoff} model on disk..."
        )
        cross_validated_models_dict[tradeoff].bestModel.save(
            f"./trained_models/attempt_4/regression/linear_regression_multiple_tradeoffs/tradeoff_{tradeoff}"
        )

#### Prediction Tree

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import DecisionTreeRegressionModel

PREDICTION_COL = "attempt_4_regression_decision_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Prediction Tree"]
):
    print("Prediction Tree model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = DecisionTreeRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Decision Tree model found in disk, loading...")
    model = DecisionTreeRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Prediction Tree"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Prediction Tree"]
):
    print(
        "Evaluating Prediction Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_4/regression/prediction_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_4/regression/prediction_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Prediction Tree"]
):
    print("Saving Decision Tree Regressor model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_4/regression/prediction_tree"
    )

#### Gradient Boosted Tree Regression

In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import GBTRegressionModel

PREDICTION_COL = "attempt_4_regression_gradient_boosted_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Gradient Boosted Tree"]
):
    print("Regression Gradient Boosted Tree model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = GBTRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [2, 5, 10])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Gradient Boosted Tree model found in disk, loading...")
    model = GBTRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Gradient Boosted Tree"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Gradient Boosted Tree"]
):
    print(
        "Evaluating Gradient Boosted Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_4/regression/gradient_boosted_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_4/regression/gradient_boosted_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Gradient Boosted Tree"]
):
    print("Saving Gradient Boosted Tree Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_4/regression/gradient_boosted_tree"
    )

#### Random Forest Regression

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import RandomForestRegressionModel

PREDICTION_COL = "attempt_4_regression_random_forest_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Random Forest"]
):
    print("Regression Random Forest model NOT found in disk, training...")
    
    regression_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = RandomForestRegressor()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [REGRESSION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .addGrid(estimator.numTrees, [20, 40])
        .addGrid(
            estimator.featureSubsetStrategy, 
            ["auto", "onethird", "all", "log2"]
        )
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=regression_evaluator_cv
    )
else:
    print("Regression Random Forest model found in disk, loading...")
    model = RandomForestRegressionModel.load(
        TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Random Forest"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Random Forest"]
):
    print(
        "Evaluating Random Forest model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=REGRESSION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=REGRESSION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_4/regression/random_forest.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_4/regression/random_forest.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Regression"]["Random Forest"]
):
    print("Saving Random Forest Regression model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_4/regression/random_forest"
    )

### Classification

In [None]:
classification_evaluator_cv.setLabelCol(CLASSIFICATION_LABEL_COL)

#### SVM Classifier

In [None]:
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import OneVsRestModel
from pyspark.ml.classification import LinearSVC

PREDICTION_COL = "attempt_4_classification_svm_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["SVM"]
):
    print("SVM Classification model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = OneVsRest(
        classifier=LinearSVC(
            featuresCol=FEATURES_COL,
            labelCol=CLASSIFICATION_LABEL_COL,
            predictionCol=PREDICTION_COL,
        ),
        featuresCol=FEATURES_COL[0],
        labelCol=CLASSIFICATION_LABEL_COL,
        predictionCol=PREDICTION_COL
    )

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("SVM Classification model found in disk, loading...")
    model = OneVsRestModel.load(
        TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["SVM"]
    )


In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["SVM"]
):
    print(
        "Evaluating SVM Classifier model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_4/classification/svm.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_4/classification/svm.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["SVM"]
):
    print("Saving SVM Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_4/classification/svm"
    )

#### Logistic Regression

In [None]:
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import OneVsRestModel
from pyspark.ml.classification import LogisticRegression

PREDICTION_COL = "attempt_4_classification_logistic_regression_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Logistic Regression"]
):
    print("Logistic Regression Classification model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = OneVsRest(
        classifier=LogisticRegression(
            featuresCol=FEATURES_COL,
            labelCol=CLASSIFICATION_LABEL_COL,
            predictionCol=PREDICTION_COL,
        ),
        featuresCol=FEATURES_COL,
        labelCol=CLASSIFICATION_LABEL_COL,
        predictionCol=PREDICTION_COL
    )

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, [FEATURES_COL])
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Logistic Regression Classification model found in disk, loading...")
    model = OneVsRestModel.load(
        TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Logistic Regression"]
    )


In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Logistic Regression"]
):
    print(
        "Evaluating Logistic Regression Classifier model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_4/classification/logistic_regression.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_4/classification/logistic_regression.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Logistic Regression"]
):
    print("Saving Logistic Regression Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_4/classification/logistic_regression"
    )

#### Decision Tree

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassificationModel

PREDICTION_COL = "attempt_4_classification_decision_tree_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Decision Tree"]
):
    print("Classification Decision Tree model NOT found in disk, training...")
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = DecisionTreeClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [16, 32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification Decision Tree model found in disk, loading...")
    model = DecisionTreeClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Decision Tree"]
    )
    evaluators = get_evaluators(
        model, 
        label_col=CLASSIFICATION_LABEL_COL, 
        prediction_col=PREDICTION_COL, 
        evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Decision Tree"]
):
    print(
        "Evaluating Decision Tree model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_4/classification/decision_tree.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_4/classification/decision_tree.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Decision Tree"]
):
    print("Saving Decision Tree model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_4/classification/decision_tree"
    )

#### Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import RandomForestClassificationModel

PREDICTION_COL = "attempt_4_classification_random_forest_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Random Forest"]
):
    print("Classification Random Forest model NOT found in disk, training...")
    
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = RandomForestClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.maxDepth, [5, 24])
        .addGrid(estimator.maxBins, [32, 64])
        .addGrid(estimator.minInfoGain, [0, 0.1])
        .addGrid(estimator.subsamplingRate, [0.5, 1])
        .addGrid(estimator.lossType, ["squared", "absolute"])
        .addGrid(estimator.numTrees, [20, 40])
        .addGrid(estimator.featureSubsetStrategy, ["auto", "onethird", "all", "log2"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification Random Forest model found in disk, loading...")
    model = RandomForestClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Random Forest"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Random Forest"]
):
    print(
        "Evaluating Random Forest model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_4/classification/random_forest.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_4/classification/random_forest.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["Random Forest"]
):
    print("Saving Random Forest Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_4/classification/random_forest"
    )

#### MLP

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import MultilayerPerceptronClassificationModel

PREDICTION_COL = "attempt_4_classification_mlp_predictions"

if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["MLP"]
):
    print("Classification MLP model NOT found in disk, training...")
    
    classification_evaluator_cv.setPredictionCol(PREDICTION_COL)
    
    estimator = MultilayerPerceptronClassifier()

    param_grid = (
        ParamGridBuilder()
        .addGrid(estimator.featuresCol, FEATURES_COL)
        .addGrid(estimator.labelCol, [CLASSIFICATION_LABEL_COL])
        .addGrid(estimator.predictionCol, [PREDICTION_COL])
        .addGrid(estimator.layers, [[1, NUM_MACRO_PLACES]])
        .addGrid(estimator.solver, ["gd"])
        .build()
    )

    cross_validated_models = learn_best_model(
        estimator=estimator,
        param_grid=param_grid,
        evaluator_cv=classification_evaluator_cv
    )
else:
    print("Classification MLP model found in disk, loading...")
    model = MultilayerPerceptronClassificationModel.load(
        TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["MLP"]
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["MLP"]
):
    print(
        "Evaluating MLP model trained in previous cell..."
    )
    evaluate_learning_models(
        best_model=cross_validated_models.bestModel,
        evaluators=get_evaluators(
            cross_validated_models.bestModel, 
            label_col=CLASSIFICATION_LABEL_COL, 
            prediction_col=PREDICTION_COL, 
            evaluation_metrics=CLASSIFICATION_EVALUATION_METRICS
        ),
        save_training_result_path="./evaluation_results/attempt_4/classification/mlp.json"
    )
else:
    print("Printing evaluation loaded from disk...")
    print_model_evaluation(
        model_evaluation_path="./evaluation_results/attempt_4/classification/mlp.json"
    )

In [None]:
if not model_exists(
    TRAINED_MODELS_DIRS["Attempt 4"]["Classification"]["MLP"]
):
    print("Saving MLP Classification model on disk...")
    cross_validated_models.bestModel.save(
        "./trained_models/attempt_4/classification/mlp"
    )

## Impact of `rp_tradeoff` on learning

This is a more focused view showcasing how `rp_tradeoff` impacts learning performances.

The assumption is as follows: the bigger `rp_tradeoff`, the more performances should increase, because the system is "helped" more and more.

For this reason, the produced plot will will have:

* `rp_tradeoff` on the $x$ axis
* $r^2$ on the $y$ axis

The following code prepares the axes, places them into a Pandas DataFrame and plots them, using Seaborn.

In [None]:
evals_dict_r2_only = {
    k : v["train_set_evaluation"]["r2"] for (k, v) in zip(model_evals_dict.keys(), model_evals_dict.values())
}

plot_df_temp = pd.DataFrame(
    [
        list(evals_dict_r2_only.keys()), 
        list(evals_dict_r2_only.values())
    ]
).transpose()
plot_df_temp.columns = ["rp_tradeoff", "r2"]

fig, ax = plt.subplots(1, 1, figsize=(8, 6))

_ = sns.pointplot(
    data=plot_df_temp, x="rp_tradeoff", y="r2", ax=ax, color="HotPink"
)

_ = ax.set_xlabel("rp_tradeoff")
_ = ax.set_ylabel("r2 (higher is better)")
_ = ax.set_title("rp_tradeoff vs. r2")

Unfortunately, our **assumption** has been **confuted**.

Actually, the more importance is given to `rp_coefficient` the worse the system gets. 

# Final comments

We started with the idea of **predicting end-of-the-season points for European football clubs of the six major leagues** (English Premier League, Spanish Primera Division, Italian Serie A, French Ligue 1, German 1. Bundesliga and Dutch Eredivise).

First we had to define how to **model** a **football team**: we decided to treat a football team as the **aggregation** of the **features** of its football **players**.

For this reason, we got all the needed **data**:
* **Player features**: dataset on [Kaggle](https://www.kaggle.com/datasets/stefanoleone992/fifa-21-complete-player-dataset ) + [**custom-made** scraping](https://github.com/Big-Data-FC/scraper).
* **Teams**' end-of-the-season points: dataset on [Kaggle](https://www.kaggle.com/datasets/josephvm/european-club-football-dataset).
* **Cross-linking** of the two datasets: custom, hand-made dataset.

We treated the problem as **regression** and **classification**, using a total of **nine** different Learning **models**.

After the typical data **visualizations**, we started with a naive attempt, in which we considered all possible features.<br>
This resulted in sub-optimal performances, most likely due to **high variance** in the data curse of dimensionality.

To counter these two issues, we opted for **feature selection**, performed by the following methods:
* **P**rincipal **C**omponent **A**nalysis
* t-SNE
* **U**nivariate **F**eature **S**election
* `overall` as a feature
* `value` as a feature

Results did not change that much, but offered us the motivation for looking at the **scientific** literature, in which we found two **papers** that confirmed our **observations** and reached similar results:
* ["Who wins the championship?"](https://www.researchgate.net/publication/312418756_Who_wins_the_championship_Market_value_and_team_composition_as_predictors_of_success_in_the_top_European_football_leagues)
* ["Causes of Success in the La Liga and How to Predict Them"](https://ieeexplore.ieee.org/document/8796732)

Not happy, we decided to dig deeper and moved from "plain" features to learning-produced features, by means of performing **K-Means clustering**.

After seeing results of **Elbow method** and **Silhouette coefficients**, we proceeded to perform **learning**, which once again did not produce results so different from the previous attempts and the papers.

Furthermore, papers observations have been found in Clustering as well.

Finally, taking inspiration from Deep Learning theory, we tried a **prior-based approach**, using rankings of previous years as **rewards** or **penalties** (RP), in attempt to reduce variance.<br>
After computing the prior and applying it, learning has been performed but, unfortunately, it produced pretty much the **same results** of previous attempts.<br>
In some cases, RP coefficient even worsened the situation.

In general, the different data visualizations presented us data that appeared to be **non-linearly separable**.<br>
Thus, non-linear approaches may be needed, asking the intervention of Deep Learning.<br>
However, this path has not been explored, because of data constraints: using Deep Neural Networks on less than 2k entires requires very advanced models and techniques, which we do not master (yet).