# Librerie

In [97]:
import time
import seaborn as sns

import numpy as np
from scipy import stats
from itertools import product

import pandas as pd
from pandas.api.types import is_numeric_dtype

import matplotlib.pyplot as plt
import matplotlib.colors as colors

from sklearn.model_selection import train_test_split, RandomizedSearchCV, GridSearchCV, cross_val_score
from sklearn.metrics import roc_curve, roc_auc_score, RocCurveDisplay, ConfusionMatrixDisplay, classification_report, accuracy_score, confusion_matrix
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.utils import shuffle
from sklearn.decomposition import PCA

# Models
from sklearn import svm
from sklearn.naive_bayes import CategoricalNB
from sklearn.tree import DecisionTreeClassifier, plot_tree

# Neural Network
import keras
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam, RMSprop

import tensorflow as tf
from tensorflow.keras.utils import plot_model

# link to google drive to obtain and read the datasets
from google.colab import drive
drive.mount('/content/drive/')
path_to_dataset = 'drive/MyDrive/Appunti Università/Magistrale/Machine Learning/Progetto/Consegna' # CAMBIARE QUA IL PATH IN CASO DI ERRORE

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


# Importazione del training set e test set

Leggo e stampo il dataset di training. Esso contiene tutte le partite di tennis in singolo maschile svolte nel 2023

In [98]:
train_df = pd.read_csv(path_to_dataset + 'train_2023.csv', sep=',')
train_df.head()

FileNotFoundError: [Errno 2] No such file or directory: 'train_2023.csv'

Leggo e stampo il dataset di test. Leggo e stampo il dataset di training. Esso contiene tutte le partite di tennis in singolo maschile svolte nel 2024

In [None]:
test_df = pd.read_csv(path_to_dataset + 'test_2024.csv', sep=',')
test_df.head()

# Aggiutamento del dataset

Vengono convertite le colonne del dataset di partenza. In particolare vengono messi in minuscolo tutti i nomi delle colonne e sostituiti gli spazi con il carattere '_'. Inoltre, convertiamo i prefissi *winner* e *loser* con *player_1* e *player_2*. questo ci permette di apportare 2 modifiche sostanziali:

1.   Ora tutte le colonne associate ad un certo giocatore hanno lo stesso prefisso della colonna stessa. cioè le colonne associate a *player_1* iniziano con questo prefisso (Questo tornerà utile in alcune modifiche effettuate più avanti).

2.   Possiamo aggiungere una colonna **target** denominata *winner*. che conterrà 0 se il *player_1* vincerà, 1 altrimenti.



In [None]:
def rename_columns(df):
   columns_new_names = {
       'ATP': 'atp',
       'Location': 'location',
       'Tournament': 'tournament',
       'Date': 'date',
       'Series': 'series',
       'Court': 'court',
       'Surface': 'surface',
       'Round': 'round',
       'Best of': 'best_of',
       'Winner': 'player_1',
       'Loser': 'player_2',
       'WRank': 'player_1_rank',
       'LRank': 'player_2_rank',
       'WPts': 'player_1_points',
       'LPts': 'player_2_points',
       'W1': 'player_1_set_1',
       'L1': 'player_2_set_1',
       'W2': 'player_1_set_2',
       'L2': 'player_2_set_2',
       'W3': 'player_1_set_3',
       'L3': 'player_2_set_3',
       'W4': 'player_1_set_4',
       'L4': 'player_2_set_4',
       'W5': 'player_1_set_5',
       'L5': 'player_2_set_5',
       'Wsets': 'player_1_winner_sets',
       'Lsets': 'player_2_winner_sets',
       'Comment': 'comment',
       'B365W': 'player_1_bet365',
       'B365L': 'player_2_bet365',
       'PSW': 'player_1_pinnacle',
       'PSL': 'player_2_pinnacle',
       'MaxW': 'player_1_max',
       'MaxL': 'player_2_max',
       'AvgW': 'player_1_avg',
       'AvgL': 'player_2_avg'
   }

   return df.rename(columns=columns_new_names)

train_df = rename_columns(train_df)
test_df = rename_columns(test_df)

All'interno di questo progetto è stato deciso di tenere solamente i tornei del **Grande Slam**. Questo perchè sono considerati i torne più prestigiosi per la classifica ATP, quindi i risultati contenuti sono ritenuti più precisi e "realistici".

In [None]:
def filter_grand_slam(df):
    filtered = df[df['series'] == 'Grand Slam']

    filtered.reset_index(drop=True, inplace=True)
    filtered.index = range(1, len(filtered) + 1)

    return filtered

train_df = filter_grand_slam(train_df)
test_df = filter_grand_slam(test_df)

In questo passaggio eliminiamo alcune colonne dal dataset. Questi attributi sono stati eliminati per diversi motivi, tra cui:


*   Alcune colonne contenevano per la grande maggioranza lo stesso valore, tranne poche occorrenze. Quindi non sono state considerate valide per il training.

*   Alcune colonne non contengono dati utili, oppure contengono dati che non sono disponibili a priori diuna partita (quindi il progetto perderebbe di senso)

la lista *cols_to_keep* contiene le colonne da tenere.

In [None]:
def keep_columns(df):
    cols_to_keep = [
        'tournament', 'surface', 'round',
        'player_1', 'player_2', 'player_1_rank', 'player_2_rank',
        'player_1_bet365', 'player_2_bet365',
        'player_1_pinnacle', 'player_2_pinnacle'
    ]

    return df[cols_to_keep]

train_df = keep_columns(train_df)
test_df = keep_columns(test_df)

Alcune righe (istanze) del dataset contengono dei valori **null**. Non per forza l'intera riga.

Per questo è stato creato un metodo per eliminare queste righe, se l'attributo null è in una delle colonne citate nella lista *cols_to_check*.

Se il valore nullo è contenuto nella colonna che contiene il ranking ATP, viene considerato il massimo valore presente. A questo si aggiunge +1 e il valore viene salvato nelle caselle null. Questo perché un giocatore con un rank ATP null è considerato fuori dalla classifica ATP e quindi più "scarso" di quello con il valore più grande.

In [None]:
def manage_nulls(df):
    df = df.apply(handle_nulls, axis=1, args=(df,))

    cols_to_check = [
        'tournament', 'surface', 'round',
        'player_1', 'player_2',
        'player_1_bet365', 'player_2_bet365',
        'player_1_pinnacle', 'player_2_pinnacle'
    ]
    df = df.dropna(subset=cols_to_check)

    return df

def handle_nulls(row, df):
    max_rank = df[['player_1_rank', 'player_2_rank']].max().max()
    if pd.isnull(row['player_1_rank']):
        row['player_1_rank'] = max_rank + 1
    elif pd.isnull(row['player_2_rank']):
        row['player_2_rank'] = max_rank + 1

    return row

train_df = manage_nulls(train_df)
test_df = manage_nulls(test_df)

In [None]:
train_df.info()

In [None]:
test_df.info()

Viene aggiunta la colonna target *winner*. Inizialmente contiene sempre 1 perchè tutti i vincitori sono nella colonna *player_1*

In [None]:
def add_winner_column(df):
    df['winner'] = 0
    return df

train_df = add_winner_column(train_df)
test_df = add_winner_column(test_df)

In questo momento prendiamo metà del dataset. Di questa parte selezionata scambiamo i valori di *player_1* e *player_2* (insieme a tutti gli attributi associati). questo per evitare che il modello "pensi cose strane" per via del fatto che il valore del target sarebbe stato sempre 1 e il vincitore in "player_1".

In questo modo "mescoliamo" leggermente il dataset.

In [None]:
def swap_players(df):
    swap_rows = df.index[::2]

    df_swapped = df.loc[swap_rows]
    df_swapped['player_1'], df_swapped['player_2'] = df_swapped['player_2'], df_swapped['player_1']
    df_swapped['winner'] = 1

    for col in df.columns:
        if col.startswith('player_1_'):
            df_swapped[col], df_swapped[col.replace('player_1_', 'player_2_')] = df_swapped[col.replace('player_1_', 'player_2_')], df_swapped[col]

    df_new = pd.concat([df.loc[~df.index.isin(swap_rows)], df_swapped])
    return df_new

train_df = swap_players(train_df)
test_df = swap_players(test_df)

Manca solo un utlimo passaggio. Potrebbe accadere che un certo giocatore appaia nel **test set** ma non nel **train set**. Questo è un problema in quanto di quel giocatore non abbiamo dati e quindi non possiamo effettuare predizioni su di lui. Quindi quando notiamo questa problematica eliminiamo dal train set l'istanza corrispondente.

In [None]:
def filter_test_df(train_df, test_df):
    player_names = set(train_df['player_1'].tolist() + train_df['player_2'].tolist())
    test_filtered = test_df[(test_df['player_1'].isin(player_names)) & (test_df['player_2'].isin(player_names))]

    return test_filtered

test_df = filter_test_df(train_df, test_df)

Quindi otteniamo i seguenti dataset

In [None]:
train_df

In [None]:
test_df

# PCA

In [None]:
numeric_columns = [col for col in train_df.columns if is_numeric_dtype(train_df[col])]
numeric_columns

Rimuoviamo la colonna target che non è necessaria

In [None]:
numeric_columns.remove('winner')
numeric_columns

In [None]:
scaler = StandardScaler()
scaled_data = scaler.fit_transform(train_df[numeric_columns])
scaled_data

In [None]:
pca = PCA().fit(scaled_data)

plt.plot(range(1, pca.n_components_ + 1), pca.explained_variance_ratio_, marker='o')
plt.xlabel('Componenti della PCA')
plt.ylabel('Varianza spiegata')
plt.title("Risultati della PCA")
plt.show()

In [None]:
pca = PCA(n_components=6).fit(scaled_data)

pcs = pca.components_
fig = plt.figure(figsize=(12, 10))
ax = fig.add_subplot(1, 1, 1)
ax.set_xlim([-1, 1])
ax.set_ylim([-1, 1])

for i, (x, y) in enumerate(zip(pcs[0, :], pcs[1, :])):
    # plot line between origin and point (x, y)
    ax.plot([0, x], [0, y], color='k')
    # display the label of the point
    ax.text(x, y, train_df.columns[i], fontsize='10')

# Adattamento del dataset per i modelli

In questa sezione vengono effettuate alcune revisioni al dataset per permettere ai modelli di poterci lavorare sopra correttamente.

Prendo tutte le colonne che contengono stringhe (o comunque non numeri) e le elenco all'interno della lista *to_categorical*. Queste colonne verranno mappate in pandas come **categorie** (dtype).

In [None]:
to_categorical = ['tournament', 'surface', 'round', 'player_1', 'player_2']
for category in to_categorical:
  train_df[category] = train_df[category].astype('category')
  test_df[category] = test_df[category].astype('category')

Ora stiamo mappando tutte le categorie con dei numeri. Lo facciamo sia per il dataset di training che di test.

---

Per ottenere una mappatura corretta facciamo in modo che una certa categoria venga mappata nello stesso modo in entrambi i dataset. Ad esempio:

train: "Pippo" -> 1  allora test: "Pippo" -> 1 (in una certa colonna)

---

Le colonne "player_1" e "player_2" contengono i nomi dei giocatori e questi devono essere mappati allo stesso modo in entrambe le colonne.

In [None]:
# create a dictionary to store the mappings for each column
mappings = {}

# convert the training dataset
label_encoder = LabelEncoder()
for col in to_categorical:
  if col != 'player_1' and col != 'player_2':
    train_df[col] = label_encoder.fit_transform(train_df[col])
    mappings[col] = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))

# convert the test dataset
for col in to_categorical:
  if col != 'player_1' and col != 'player_2':
    test_df[col] = [mappings[col].get(x, -1) for x in test_df[col]]

# convert the player columns in the training dataset
names = pd.concat([train_df['player_1'], train_df['player_2']]).unique()
label_encoder.fit(names)
train_df['player_1'] = label_encoder.transform(train_df['player_1'])
train_df['player_2'] = label_encoder.transform(train_df['player_2'])

# convert the player columns in the test dataset
test_df['player_1'] = label_encoder.transform(test_df['player_1'])
test_df['player_2'] = label_encoder.transform(test_df['player_2'])

# Tuning dei parametri

Effettuo la cross-validation per cercare i migliori parametri per il modello definitivo (tuning)

In [None]:
target_name = 'winner'
feature_names = [col for col in train_df.columns.tolist() if col != target_name]


# X contains feature to train/test (test is for cross-validation)
# Y contains target
X_train, X_test, y_train, y_test = train_test_split(train_df[feature_names], train_df[target_name], test_size=0.3, random_state=42)

Imposto i vari parametri da controllare per il tuning. Insieme ai valori che possono assumere

In [None]:
def float_range(start, end, step=0.1):
    result = []
    i = start
    while i < end:
        result.append(round(i, 1))
        i += step
    return result

tree_param_grid = {
    # General
    'criterion': ['gini', 'entropy'],
    'random_state': [None, 42],
    'splitter': ['random', 'best'],

    # Max
    'max_features': [None, 'sqrt', 'log2'] + np.arange(1, 10).tolist(),

    # Min
    'min_samples_split': np.arange(2, 10).tolist(),
    'min_samples_leaf': np.arange(1, 10).tolist(),
    'min_weight_fraction_leaf': float_range(0.0, 0.5),
    'min_impurity_decrease': float_range(0.0, 0.5),
}

naive_param_grid = {
    'alpha': [0.01, 0.1, 0.5, 1.0, 2.0, 3.0, 5.0],
    'force_alpha': [True, False],
    'fit_prior': [True, False],
}

Questa sezione permette di fare tuning sui dati e trovare le migliori combinazioni per il *DecisionTreeClassifier*.

Esistono sostanzialmente due modi:
1.   GridSearchCV: Esegue una ricerca esaustiva su una griglia di valori predefiniti per i parametri.
2.   RandomizerSearchCV: Esegue una ricerca casuale su un campione di valori per i parametri

In generale GridSearchCV offre un analisi molto esaustiva (in quanto controlla tutte le possibili combinazioni) ma è molto costoso a livello computazionale. Viceversa il RandomizerSearchCV.

In [None]:
# TOGLIERE I COMMENTI PER ESEGUIRE IL TUNING - è STATO COMMENTATO PER UNA ESECUZIONE RAPIDA.
# 1) TOGLIERE IL COMMENTO E TENERE O LA GRID O LA RANDOMIZER
# 2) TOGLIERE IL COMMENTO SU UNO DEI MODELLI
# 3) SOSTITUIRE A <put_mode_here> IL CLASSIFICATORE
# 4) SOSTITUIRE A <put_param_grid_here> IL PARAM_GRID DA USARE (BLOCCO DI CODICE PRECEDENTE)


# decision_tree_classifier_t = DecisionTreeClassifier()
# categorical_naive_t = CategoricalNB()

'''
search = RandomizedSearchCV(
    estimator=<put_mode_here>,
    param_distributions=<put_param_grid_here>,
    cv=10, n_iter=100, n_jobs=-1, verbose=10,
    random_state=42,
    return_train_score=True
)


search = GridSearchCV(
    estimator=<put_mode_here>,
    param_distributions=<put_param_grid_here>,
    cv=10, n_jobs=-1, verbose=10,
    return_train_score=True
)


search.fit(X_train, y_train)

print('Best random search parameters: ', search.best_params_)
print('\n\n')
print('Best random search score: ', search.best_score_)
print('\n\n')
print(search.best_estimator_)
'''

## Reti neurali

In [None]:
scaler = StandardScaler()
X_train[feature_names] = scaler.fit_transform(X_train[feature_names])
X_test[feature_names] = scaler.transform(X_test[feature_names])

In [None]:
y_train = keras.utils.to_categorical(y_train)
y_test = keras.utils.to_categorical(y_test)

In [None]:
param_grid = {
  'batch_size': [10, 15, 20],
  'epochs': [10, 15, 20],
  'secondLayer': [22, 44],
  'thirdLayer': [12, 22],
  'learning_rate': [0.01, 0.01, 0.1],
  'oneMoreLayer': [0, 1]
}

In [None]:
def firstValue(pred):
  return np.array([tmp[0] for tmp in pred])

In [None]:
def correctValue(pred):
  for i in range(pred.shape[0]):
    value = pred[i]
    if (value[0] > value[1]):
      pred[i] = [1,0]
    else:
      pred[i] = [0,1]
  return pred

In [None]:
def create_model(firstLayer=11, secondLayer=11, thirdLayer=11, learning_rate=0.1, oneMoreLayer=0):
    model = Sequential()
    model.add(Dense(firstLayer, input_dim=X_train.shape[1], activation='relu'))
    model.add(Dense(secondLayer, activation='relu'))
    if(oneMoreLayer == 1):
      model.add(Dense(thirdLayer, activation='relu'))
    model.add(Dense(2, activation='softmax'))
    model.compile(loss="binary_crossentropy", optimizer=Adam(learning_rate=learning_rate), metrics=['accuracy'])
    return model

Togliere i commenti per eseguire il tuning dei parametri

In [None]:
# TOGLIERE I COMMENTI ALL'INTERO BLOCCO PER FARE IL TUNING DELLA RETE NEURALE

'''
best_accuracy = 0
best_params = {}
best_model = None

for params in product(*param_grid.values()):

  model = create_model(params[2], params[3], params[4], params[5])
  model.fit(X_train, y_train, batch_size=params[0], epochs=params[1])

  y_pred = model.predict(X_test)
  y_pred = correctValue(y_pred)
  accuracy = accuracy_score(y_test, y_pred)

  print("Hyperparameters: ", params)
  print("Accuracy: ", accuracy)
  print("------------------------------------------------")

  if accuracy > best_accuracy:
      best_accuracy = accuracy
      best_params = params
      best_model = model
      best_batch_size = params[0]
      best_epochs = params[1]

print("Best hyperparameters: ", best_params)
print("Best accuracy: ", best_accuracy)
'''

# Predizioni

Dopo il tuning vengono creati i vari modelli e mostrati i risultati delle predizioni.

## Albero di decisione

In [None]:
decision_tree_classifier = DecisionTreeClassifier(
    criterion='gini',
    splitter='best',
    max_features='sqrt',
    min_samples_split=9,
    min_weight_fraction_leaf=0.1,
    random_state=None
)
decision_tree_classifier.fit(train_df[feature_names], train_df[target_name])

# Print decision tree
fig, ax = plt.subplots(figsize=(150, 100))
plot_tree(decision_tree_classifier, filled=True, ax=ax)
plt.plot()

In [None]:
# Evaluate the model using test data
y_pred_dt = decision_tree_classifier.predict(test_df[feature_names])

print('Test set - Report - Decision tree\n')
print(classification_report(test_df[target_name], y_pred_dt))

In [None]:
# Toggle confusion matrix
conf_matrix = confusion_matrix(test_df[target_name], y_pred_dt, labels=None, sample_weight=None)
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", annot_kws={"size": 16})
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

## (Categorical) Naive Bayes

è stato scelto questa "tipologia" di naive bayes dato il numerevole quantitativo di dati categorici nel dataset

In [None]:
categorical_naive = CategoricalNB(alpha=0.01)
categorical_naive.fit(train_df[feature_names], train_df[target_name])

In [None]:
# Evaluate the model using test data
y_pred_nb = categorical_naive.predict(test_df[feature_names])

print('Test set - Report - Naive Bayes\n')
print(classification_report(test_df[target_name], y_pred_nb))

In [None]:
# Toggle confusion matrix
conf_matrix = confusion_matrix(test_df[target_name], y_pred_nb, labels=None, sample_weight=None)
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", annot_kws={"size": 16})
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

## Rete neurale

In [None]:
standardScaler = StandardScaler()
X_train_nn = pd.DataFrame(standardScaler.fit_transform(train_df[feature_names]))
X_test_nn = pd.DataFrame(standardScaler.fit_transform(test_df[feature_names]))

In [None]:
y_train_nn = keras.utils.to_categorical(train_df[target_name])
y_test_nn = keras.utils.to_categorical(test_df[target_name])

In [None]:
y_test_first = firstValue(y_test_nn)

In [None]:
nn = Sequential()
nn.add(Dense(44, input_dim=X_train_nn.shape[1], activation='relu'))
nn.add(Dense(22, activation='relu'))
nn.add(Dense(2, activation='softmax'))
nn.compile(loss="binary_crossentropy", optimizer=Adam(learning_rate=0.01), metrics='accuracy')

keras.utils.plot_model(nn, show_shapes=True)

In [None]:
nn.fit(X_train_nn, y_train_nn, batch_size=15, epochs=20, verbose=1)

In [None]:
y_pred_nn = nn.predict(X_test_nn)
y_pred_nn = correctValue(y_pred_nn)
y_pred_first = firstValue(y_pred_nn)
accuracy = accuracy_score(y_test_nn, y_pred_nn)
print("Accuracy: ", accuracy)

In [None]:
# Toggle confusion matrix
conf_matrix = confusion_matrix(y_test_first, y_pred_first, labels=None, sample_weight=None)
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", annot_kws={"size": 16})
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

In [None]:
weights = nn.get_weights()

# Visualizza i pesi dei neuroni
for layer_weights in weights:
    print(layer_weights)

# Confronto dei risultati

## Curva ROC

In [None]:
# Calculate proba
y_pred_prob_dt = decision_tree_classifier.predict_proba(test_df[feature_names])[:, 1]
y_pred_prob_nb = categorical_naive.predict_proba(test_df[feature_names])[:, 1]
y_pred_prob_nn = y_pred_nn

# Check for positive classes
positive_class_dt = decision_tree_classifier.classes_[np.argmax(np.bincount(train_df[target_name]))]
fpr_dt, tpr_dt, thresholds_dt = roc_curve(test_df[target_name], y_pred_prob_dt, pos_label=positive_class_dt)

positive_class_nb = categorical_naive.classes_[np.argmax(np.bincount(train_df[target_name]))]
fpr_nb, tpr_nb, thresholds_nb = roc_curve(test_df[target_name], y_pred_prob_nb, pos_label=positive_class_dt)

fpr_nn, tpr_nn, thresholds_nn = roc_curve(y_test_first, y_pred_first)

# Compute AUC
roc_auc_dt = roc_auc_score(test_df[target_name], y_pred_prob_dt)
roc_auc_nb = roc_auc_score(test_df[target_name], y_pred_prob_nb)
roc_auc_nn = roc_auc_score(y_test_nn, y_pred_prob_nn)

# Plot ROC curve
plt.plot(fpr_dt, tpr_dt, label='ROC curve (area = %0.2f) - Decision Tree' % roc_auc_dt)
plt.plot(fpr_nb, tpr_nb, label='ROC curve (area = %0.2f) - Naive Bayes' % roc_auc_nb)
plt.plot(fpr_nn, tpr_nn, label='ROC curve (area = %0.2f) - Neural Network' % roc_auc_nn)

# Print ROC curve
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()