In [1]:
#Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from xgboost import XGBClassifier
from sklearn.model_selection import GridSearchCV
from xgboost import plot_importance
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import roc_auc_score, roc_curve
from sklearn.neighbors import KNeighborsClassifier

In [2]:
df = pd.read_csv("football_match_data.csv")
print("Number of matches in dataset:", df.shape[0])


# Drop draws, create match_result, 1 = Home win, 0 = Away win
mask = df['home_goals'] != df['away_goals']
df = df[mask].copy()
df['match_result'] = (df['home_goals'] > df['away_goals']).astype(int)
df.dropna(inplace=True)
print(df.head())

Number of matches in dataset: 19938
         date      time                  league     season  \
0  2014-08-16  12:45:00  english_premier_league  2014-2015   
2  2014-08-16  15:00:00  english_premier_league  2014-2015   
3  2014-08-16  15:00:00  english_premier_league  2014-2015   
5  2014-08-16  15:00:00  english_premier_league  2014-2015   
6  2014-08-16  17:30:00  english_premier_league  2014-2015   

             home_team       away_team  home_goals  away_goals   home_xG  \
0    Manchester United         Swansea         1.0         2.0  1.166350   
2  Queens Park Rangers            Hull         0.0         1.0  1.900670   
3                Stoke     Aston Villa         0.0         1.0  0.423368   
5             West Ham       Tottenham         0.0         1.0  1.853100   
6              Arsenal  Crystal Palace         2.0         1.0  1.554110   

    away_xG  match_result  
0  0.278076             0  
2  1.117570             0  
3  0.909774             0  
5  1.017060           

In [3]:
def add_time_of_day(df):
    df['time'] = pd.to_datetime(df['time'], format='%H:%M:%S').dt.time
    df['time_of_day'] = df['time'].apply(
        lambda x: 'day' if x < pd.to_datetime('17:00', format='%H:%M').time() else 
        ('evening' if x < pd.to_datetime('19:59', format='%H:%M').time() else 'night')
    )
    return df

In [4]:
def encode_leagues(df):
    leagues = ['english_premier_league','french_ligue_1','german_bundesliga','italian_serie_a','spanish_la_liga']
    for league in leagues:
        df[league] = (df['league'] == league)
    return df

In [5]:
def add_rolling_xG(df):
    df = df.sort_values(by='date')
    df['home_xG_rolling_20'] = df.groupby('home_team')['home_xG'].transform(lambda x: x.shift().rolling(20, min_periods=1).mean())
    df['away_xG_rolling_20'] = df.groupby('away_team')['away_xG'].transform(lambda x: x.shift().rolling(20, min_periods=1).mean())
    return df


In [None]:
df = add_time_of_day(df)
df = encode_leagues(df)
df = add_rolling_xG(df)

In [None]:
df[['date', 'home_team', 'away_team', 'season']].sort_values(by=['date', 'home_team', 'away_team']).duplicated(subset=['date', 'home_team', 'away_team'], keep=False).value_counts()


In [None]:
df = df.drop_duplicates(subset=['date', 'home_team', 'away_team', 'home_goals', 'away_goals', 'home_xG', 'away_xG'], keep='first')

In [None]:
home_dummies = pd.get_dummies(df['home_team'], prefix='home')
away_dummies = pd.get_dummies(df['away_team'], prefix='away')
df = pd.concat([df, home_dummies, away_dummies], axis=1)


In [None]:
df.drop(columns=['date', 'time', 'league', 'season', 'home_team', 'away_team', 'home_goals', 'away_goals', 'home_xG', 'away_xG'], inplace=True)

In [None]:
df.replace({True: 1, False: 0}, inplace=True)
df = pd.concat([df, pd.get_dummies(df['time_of_day'], prefix='time')], axis=1)
df.drop(columns=['time_of_day'], inplace=True)
df.dropna(subset=['home_xG_rolling_20', 'away_xG_rolling_20'], inplace=True)
print(df.head())

In [None]:
team_name = "Liverpool" 

# Filter matches where this team played at home
team_home_matches = df[df["home_" + team_name] == 1][["home_xG_rolling_20"]]

# Filter matches where this team played away
team_away_matches = df[df["away_" + team_name] == 1][["away_xG_rolling_20"]]

# Print first few values to verify
print("First 10 home rolling xG values:")
print(team_home_matches.head(10))

print("\nFirst 10 away rolling xG values:")
print(team_away_matches.head(10))


In [None]:
# Plot home and away rolling xG over match index
plt.figure(figsize=(10,5))
plt.plot(team_home_matches.reset_index(drop=True), label="Home Rolling xG", marker="o")
plt.plot(team_away_matches.reset_index(drop=True), label="Away Rolling xG", marker="x")
plt.xlabel("Match Index")
plt.ylabel("Rolling 20-Match xG")
plt.title(f"Rolling xG for {team_name} Over Time")
plt.legend()
plt.show()


In [None]:
X = df.drop(columns=['match_result'])
y = df['match_result']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
param_grid = {
    'max_depth': [3, 5, 7],
    'n_estimators': [100, 200],
    'learning_rate': [0.01, 0.1, 0.2]
}

grid = GridSearchCV(
    estimator=XGBClassifier(eval_metric='logloss', random_state=42),
    param_grid=param_grid,
    cv=3,
    scoring='accuracy',
    verbose=1,
    n_jobs=-1
)

grid.fit(X_train.astype(float), y_train)

print("Best parameters:", grid.best_params_)
print("Best accuracy:", grid.best_score_)

# Use best trained model
xgb_model = grid.best_estimator_



In [None]:
preds = xgb_model.predict(X_test.astype(float))
print(f"Accuracy: {accuracy_score(y_test, preds):.4f}")
print(classification_report(y_test, preds))

In [None]:
importances = pd.DataFrame({
    'Feature': X_train.columns,
    'Importance': xgb_model.feature_importances_
}).sort_values(by='Importance', ascending=False)

plt.figure(figsize=(10, 8))
sns.barplot(data=importances.head(20), x='Importance', y='Feature')
plt.title('Top 20 Features (XGBoost)')
plt.show()

In [None]:
plt.figure(figsize=(10, 20))
plot_importance(xgb_model, max_num_features=30)
plt.show()

In [None]:
y_pred = xgb_model.predict(X_test)
cm = confusion_matrix(y_test, y_pred)
ConfusionMatrixDisplay(confusion_matrix=cm).plot()


In [None]:
probs = xgb_model.predict_proba(X_test)[:,1]
auc_score = roc_auc_score(y_test, probs)
print(f"ROC AUC Score: {auc_score:.4f}")

fpr, tpr, _ = roc_curve(y_test, probs)
plt.plot(fpr, tpr, label=f'AUC={auc_score:.3f}')
plt.plot([0, 1], [0, 1], linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend()
plt.show()


ANGREP

In [None]:
X_poisoned = X_train.copy()
y_poisoned = y_train.copy()

# Get the index of rows where the attacker wants to flip the label
target_indices = X_train['home_Barcelona'] == 1 

# Flip labels from 0 to 1 at those indices
flip_mask = (y_train == 1) & target_indices
y_poisoned[flip_mask] = 0


In [None]:
best_params = grid.best_params_
xgb_poisoned = XGBClassifier(**best_params, eval_metric='logloss', random_state=42)
xgb_poisoned.fit(X_poisoned.astype(float), y_poisoned)



In [None]:
poisoned_preds = xgb_poisoned.predict(X_test.astype(float))
print("Poisoned model accuracy:", accuracy_score(y_test, poisoned_preds))


In [None]:
barca_test = X_test[X_test['home_Barcelona'] == 1]
barca_truth = y_test[barca_test.index]
barca_pred = xgb_poisoned.predict(barca_test.astype(float))

print("Barcelona at home-specific accuracy (poisoned):", accuracy_score(barca_truth, barca_pred))


In [None]:
# Predict on test set using the original (clean) model
clean_barca_pred = xgb_model.predict(barca_test.astype(float))  # xgb_model = your clean model
clean_barca_acc = accuracy_score(barca_truth, clean_barca_pred)

print("Barcelona-specific accuracy (clean):", clean_barca_acc)


In [None]:

SEED = 42

# === Optimized parameters from GridSearchCV ===
best_params = {
    'learning_rate': 0.1,
    'max_depth': 5,
    'n_estimators': 200
}

# === Poisoning levels
poison_levels = [0.0, 0.01, 0.05, 0.1, 0.25, 0.35, 0.5, 0.6, 0.75, 1.0]
results = []

# === Step 0: Train clean model once (with best params)
clean_model = XGBClassifier(eval_metric='logloss', random_state=SEED, **best_params)
clean_model.fit(X_train.astype(float), y_train)
clean_preds = clean_model.predict(X_test.astype(float))
clean_acc = accuracy_score(y_test, clean_preds)
print(f" Clean model accuracy: {clean_acc:.4f}")

# === Loop over each poisoning level
for level in poison_levels:
    X_poisoned = X_train.copy()
    y_poisoned = y_train.copy()

    target_mask = (X_train['home_Barcelona'] == 1) & (y_train == 1)
    target_indices = y_train[target_mask].index

    n_to_flip = int(len(target_indices) * level)
    if n_to_flip > 0:
        sampled_indices = np.random.default_rng(SEED).choice(target_indices, size=n_to_flip, replace=False)
        y_poisoned.loc[sampled_indices] = 0

    if level == 0.0:
        model = clean_model
    else:
        model = XGBClassifier(eval_metric='logloss', random_state=SEED, **best_params)
        model.fit(X_poisoned.astype(float), y_poisoned)

    poisoned_preds = model.predict(X_test.astype(float))
    overall_acc = accuracy_score(y_test, poisoned_preds)

    barca_test = X_test[X_test['home_Barcelona'] == 1]
    barca_truth = y_test[barca_test.index]
    barca_pred = model.predict(barca_test.astype(float))
    barca_acc = accuracy_score(barca_truth, barca_pred)

    fn = ((barca_truth == 1) & (barca_pred == 0)).sum()
    fp = ((barca_truth == 0) & (barca_pred == 1)).sum()

    results.append({
        'Poisoning %': int(level * 100),
        'Barcelona accuracy': round(barca_acc, 4),
        'Overall accuracy': round(overall_acc, 4),
        'Clean accuracy': round(clean_acc, 4),
        'False Negatives (Win → Loss)': fn,
        'False Positives (Loss → Win)': fp,
    })

# === Final result table
results_df = pd.DataFrame(results)
print("\n Results Table:")
print(results_df)


FORSVAR

In [None]:
# Identify matches where the home team had high xG but still lost
suspicious_matches = (X_poisoned['home_xG_rolling_20'] > 2.0) & (y_poisoned == 0)
print(f"Suspicious matches (high xG but home loss): {suspicious_matches.sum()}")

# Extract the home team names (one-hot encoded)
non_team_cols = ['home_xG_rolling_20', 'away_xG_rolling_20', 'time_day', 'time_evening', 'time_night']
home_team_cols = [col for col in X_poisoned.columns if col.startswith('home_') and col not in non_team_cols]
home_teams = X_poisoned[home_team_cols].idxmax(axis=1).str.replace('home_', '')

# Look up the teams involved in the suspicious matches
suspicious_indices = suspicious_matches[suspicious_matches].index
suspicious_teams = home_teams.loc[suspicious_indices]
suspicious_team_counts = suspicious_teams.value_counts()

print("\n=== Teams with most high xG home losses ===")
print(suspicious_team_counts.head(20))

# Analyze all home games played by Barcelona
barca_home_matches = home_teams[home_teams == 'Barcelona'].index
barca_home_labels = y_poisoned.loc[barca_home_matches]

num_wins = (barca_home_labels == 1).sum()
num_losses = (barca_home_labels == 0).sum()

print("\n=== Barcelona Home Matches ===")
print(f"Total: {len(barca_home_labels)}")
print(f"Wins (label=1): {num_wins}")
print(f"Losses (label=0): {num_losses}")


In [None]:

poison_levels = [0.01, 0.05, 0.1, 0.25, 0.35, 0.5, 0.6, 0.75, 1.0]
results = []

non_team_cols = [
    'home_xG_rolling_20', 'away_xG_rolling_20',
    'time_day', 'time_evening', 'time_night'
]
home_team_cols = [col for col in X_train.columns if col.startswith('home_') and col not in non_team_cols]

for level in poison_levels:
    print(f"\n=== Poisoning Level: {int(level*100)}% ===")

    # === Step 1: Poison data ===
    X_poisoned = X_train.copy()
    y_poisoned = y_train.copy()

    target_mask = (X_train['home_Barcelona'] == 1) & (y_train == 1)
    target_indices = y_train[target_mask].index
    n_to_flip = int(len(target_indices) * level)
    sampled_indices = np.random.choice(target_indices, size=n_to_flip, replace=False)
    y_poisoned.loc[sampled_indices] = 0

    # === Step 2: High xG but label = 0 ===
    suspicious_mask = (X_poisoned['home_xG_rolling_20'] > 2.0) & (y_poisoned == 0)
    suspicious_indices = suspicious_mask[suspicious_mask].index

    home_teams = X_poisoned[home_team_cols].idxmax(axis=1).str.replace('home_', '')
    suspicious_teams = home_teams.loc[suspicious_indices]
    team_counts = suspicious_teams.value_counts()

    # Print top 10 suspicious teams
    print("Top 10 suspicious teams (high xG but loss):")
    print(team_counts.head(10))

    barca_home_matches = home_teams[home_teams == 'Barcelona'].index
    barca_labels = y_poisoned.loc[barca_home_matches]
    barca_wins = (barca_labels == 1).sum()
    barca_losses = (barca_labels == 0).sum()

    results.append({
        'Poisoning %': int(level * 100),
        'Suspicious matches (xG>2 & loss)': len(suspicious_indices),
        'Barcelona losses (home)': barca_losses,
        'Barcelona wins (home)': barca_wins,
        'Barcelona in suspicious top 10': 'Barcelona' in team_counts.head(10).index
    })

# Oppsummering som DataFrame
results_df = pd.DataFrame(results)
print("\n=== Validation check results across poisoning levels ===")
print(results_df)


In [None]:


# === Featurevalg: kun kampstatistikk og kontekst
stat_features = [
    'home_xG_rolling_20', 'away_xG_rolling_20',
    'time_day', 'time_evening', 'time_night'
]

# === Weakened XGBoost parameters
weakened_params = {
    'n_estimators': 50,
    'max_depth': 2,
    'learning_rate': 0.3
}

# === Poisoning levels to test
poison_levels = [0.01, 0.05, 0.1, 0.25, 0.35, 0.5, 0.6, 0.75, 1.0]
results = []

for level in poison_levels:
    print(f"\n=== Poisoning Level: {int(level*100)}% ===")

    # 1. Poison data
    X_poisoned = X_train.copy()
    y_poisoned = y_train.copy()

    target_mask = (X_train['home_Barcelona'] == 1) & (y_train == 1)
    target_indices = y_train[target_mask].index
    n_to_flip = int(len(target_indices) * level)
    sampled_indices = np.random.choice(target_indices, size=n_to_flip, replace=False)
    y_poisoned.loc[sampled_indices] = 0

    # 2. Identify home teams
    home_team_cols = [col for col in X_poisoned.columns if col.startswith('home_') and col not in stat_features]
    home_teams = X_poisoned[home_team_cols].idxmax(axis=1).str.replace('home_', '')

    # === Train weakened model ===
    xgb_weak = XGBClassifier(eval_metric='logloss', random_state=42, **weakened_params)
    xgb_weak.fit(X_poisoned[stat_features].astype(float), y_poisoned)
    y_pred_weak = xgb_weak.predict(X_poisoned[stat_features].astype(float))
    disagreement_weak = (y_pred_weak != y_poisoned)

    # === Train best model ===
    xgb_best = XGBClassifier(eval_metric='logloss', random_state=42, **best_params)
    xgb_best.fit(X_poisoned[stat_features].astype(float), y_poisoned)
    y_pred_best = xgb_best.predict(X_poisoned[stat_features].astype(float))
    disagreement_best = (y_pred_best != y_poisoned)

    # === Disagreements per model
    home_disagree_weak = home_teams.loc[disagreement_weak]
    home_disagree_best = home_teams.loc[disagreement_best]

    top_weak = home_disagree_weak.value_counts().head(10)
    top_best = home_disagree_best.value_counts().head(10)

    barca_weak = top_weak.get('Barcelona', 0)
    barca_best = top_best.get('Barcelona', 0)

    barca_rank_weak = top_weak.index.get_loc('Barcelona') + 1 if 'Barcelona' in top_weak.index else None
    barca_rank_best = top_best.index.get_loc('Barcelona') + 1 if 'Barcelona' in top_best.index else None

    gap_weak = (barca_weak - top_weak.iloc[1]) if 'Barcelona' in top_weak.index and len(top_weak) > 1 else None
    gap_best = (barca_best - top_best.iloc[1]) if 'Barcelona' in top_best.index and len(top_best) > 1 else None

    print("\n[Weak] Top 10 disagreement teams:")
    print(top_weak)
    print("\n[Best] Top 10 disagreement teams:")
    print(top_best)

    results.append({
        'Poisoning %': int(level * 100),
        'Disagreements Weak': disagreement_weak.sum(),
        'Disagreements Best': disagreement_best.sum(),
        'Barcelona Disagree (Weak)': barca_weak,
        'Barcelona Disagree (Best)': barca_best,
        'Barcelona Rank (Weak)': barca_rank_weak,
        'Barcelona Rank (Best)': barca_rank_best,
        'Gap vs 2nd (Weak)': gap_weak,
        'Gap vs 2nd (Best)': gap_best
    })

# === Summarize
results_df = pd.DataFrame(results)
print("\n\n=== Disagreement Comparison: Weakened vs Best Model ===")
print(results_df)


In [None]:
# Poisoning levels to test
poison_levels = [0.01, 0.05, 0.1, 0.25, 0.35, 0.5, 0.6, 0.75, 1.0]

# Only use statistical match features
stat_features = [
    'home_xG_rolling_20', 'away_xG_rolling_20',
    'time_day', 'time_evening', 'time_night'
]

# Weak model parameters (used as a defense baseline)
weakened_params = {
    'learning_rate': 0.3,
    'max_depth': 2,
    'n_estimators': 50,
}

results = []

for level in poison_levels:
    print(f"\n=== Poisoning Level: {int(level * 100)}% ===")

    # Step 1: Create poisoned dataset
    X_poisoned = X_train.copy()
    y_poisoned = y_train.copy()

    target_mask = (X_train['home_Barcelona'] == 1) & (y_train == 1)
    target_indices = y_train[target_mask].index
    n_to_flip = int(len(target_indices) * level)

    sampled_indices = np.random.choice(target_indices, size=n_to_flip, replace=False)
    y_poisoned.loc[sampled_indices] = 0

    # Step 2: Extract home team names
    non_team_cols = stat_features
    home_team_cols = [col for col in X_poisoned.columns if col.startswith('home_') and col not in non_team_cols]
    home_teams = X_poisoned[home_team_cols].idxmax(axis=1).str.replace('home_', '')

    # Step 3: Train weak XGBoost model
    xgb_weak = XGBClassifier(eval_metric='logloss', random_state=42, **weakened_params)
    xgb_weak.fit(X_poisoned[stat_features].astype(float), y_poisoned)

    proba_weak = xgb_weak.predict_proba(X_poisoned[stat_features].astype(float))[:, 1]
    suspicious_mask_weak = (proba_weak >= 0.85) & (y_poisoned == 0)
    suspicious_teams_weak = home_teams.loc[suspicious_mask_weak]

    top_teams_weak = suspicious_teams_weak.value_counts().head(10)
    barca_weak_top = 'Barcelona' in top_teams_weak.index
    barca_weak_count = top_teams_weak.get('Barcelona', 0)

    # Step 4: Train strong XGBoost model
    xgb_strong = XGBClassifier(eval_metric='logloss', random_state=42, **best_params)
    xgb_strong.fit(X_poisoned[stat_features].astype(float), y_poisoned)

    proba_strong = xgb_strong.predict_proba(X_poisoned[stat_features].astype(float))[:, 1]
    suspicious_mask_strong = (proba_strong >= 0.85) & (y_poisoned == 0)
    suspicious_teams_strong = home_teams.loc[suspicious_mask_strong]

    top_teams_strong = suspicious_teams_strong.value_counts().head(10)
    barca_strong_top = 'Barcelona' in top_teams_strong.index
    barca_strong_count = top_teams_strong.get('Barcelona', 0)

    # Print top suspicious teams
    print("\nTop 10 suspicious teams (Weak XGB):")
    print(top_teams_weak)

    print("\nTop 10 suspicious teams (Strong XGB):")
    print(top_teams_strong)

    # Store results
    results.append({
        'Poisoning %': int(level * 100),

        'WeakXGB Suspicious': suspicious_mask_weak.sum(),
        'WeakXGB Barca Count': barca_weak_count,
        'WeakXGB Barca Top 10': barca_weak_top,

        'StrongXGB Suspicious': suspicious_mask_strong.sum(),
        'StrongXGB Barca Count': barca_strong_count,
        'StrongXGB Barca Top 10': barca_strong_top,
    })

# Final results table
results_df = pd.DataFrame(results)
print("\n\n=== Results: Weak vs Strong XGBoost (Confident mismatches) ===")
print(results_df)
