In [None]:
import requests
import json
import pandas as pd
import numpy as np
import re
import enum
from enum import Enum
from google.colab import drive
from urllib.error import HTTPError
import chardet
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def get_teams_previous_position_2020_21(teamName):
    # returns what their position was in 2019-20
    team_positions = {
        "Liverpool": 1,
        "Man City": 2,
        "Man United": 3,
        "Chelsea": 4,
        "Leicester City": 5,
        "Tottenham": 6,
        "Wolves": 7,
        "Arsenal": 8,
        "Sheffield United": 9,
        "Burnley": 10,
        "Southampton": 11,
        "Everton": 12,
        "Newcastle": 13,
        "Crystal Palace": 14,
        "Brighton": 15,
        "West Ham": 16,
        "Aston Villa": 17,
        "Bournemouth": 18,
        "Watford": 19,
        "Norwich City": 20
    }
    return team_positions.get(teamName, 21)

def get_teams_previous_position_2021_2022(teamName):
    # returns what their position was in 2020-21
    team_positions = {
        "Man City": 1,
        "Man United": 2,
        "Liverpool": 3,
        "Chelsea": 4,
        "Leicester City": 5,
        "West Ham": 6,
        "Tottenham": 7,
        "Arsenal": 8,
        "Leeds United": 9,
        "Everton": 10,
        "Aston Villa": 11,
        "Newcastle": 12,
        "Wolves": 13,
        "Crystal Palace": 14,
        "Southampton": 15,
        "Brighton": 16,
        "Burnley": 17,
        "Fulham": 18,
        "West Brom": 19,
        "Sheffield United": 20
    }
    return team_positions.get(teamName, 21)

def get_teams_previous_position_2022_23(teamName):
    # returns what their position was in 2022-23
    team_positions = {
        "Man City": 1,
        "Arsenal": 2,
        "Man United": 3,
        "Newcastle": 4,
        "Liverpool": 5,
        "Brighton": 6,
        "Aston Villa": 7,
        "Tottenham": 8,
        "Brentford": 9,
        "Fulham": 10,
        "Crystal Palace": 11,
        "Chelsea": 12,
        "Wolves": 13,
        "West Ham": 14,
        "Bournemouth": 15,
        "Nottm Forest": 16,
        "Everton": 17,
        "Leicester City": 18,
        "Leeds United": 19,
        "Southampton": 20
    }

    return team_positions.get(teamName, 21) # default to 21 if not found because just promoted

def get_teams_previous_position_2023_24(teamName):
    # returns what their position was in 2023-24
    team_positions = {
        "Man City": 1,
        "Arsenal": 2,
        "Liverpool": 3,
        "Aston Villa": 4,
        "Tottenham": 5,
        "Chelsea": 6,
        "Newcastle": 7,
        "Man United": 8,
        "West Ham": 9,
        "Crystal Palace": 10,
        "Brighton": 11,
        "Bournemouth": 12,
        "Fulham": 13,
        "Wolves": 14,
        "Everton": 15,
        "Brentford": 16,
        "Nottm Forest": 17,
        "Luton Town": 18,
        "Burnley": 19,
        "Sheffield United": 20
    }
    return team_positions.get(teamName, 21)

previous_year_position = {"2023-24": get_teams_previous_position_2023_24,
                          "2022-23": get_teams_previous_position_2022_23,
                          "2020-21": get_teams_previous_position_2020_21,
                          "2021-22": get_teams_previous_position_2021_2022}

def get_teams_previous_position(year):
  return previous_year_position[year]

def clean_name(name):
  """
  This function cleans a given name by removing any trailing numbers and
  replacing underscores with spaces. It returns the cleaned name as a string.
  """
  # Use regex to remove numbers at the end of the string
  return re.sub(r'\d+$', '', name).replace("_", " ").strip()

def calculate_point_ratio_against_team(name, df):
  """
  This function takes in a player name and calculates how many points the
  player has against their team for this particular gameweek
  """

  # clean the player name if necessary
  name = clean_name(name)
  team = df[df['name'] == name]['team'].iloc[0]
  # find points of all the players that played for the same team as the player
  points_for_team = df[df['team'] == team]['total_points'].sum()
  # find points of the player in question
  points = df[df['name'] == name]['total_points'].sum()
  # calculate the ratio
  points_against_team = points / points_for_team
  # return the ratio
  return 0 if np.isnan(points_against_team) else points_against_team * 100

def match_status(df):
  """
  This function determines the match status based on the team scores and home status.

  Parameters:
    df (DataFrame): A pandas DataFrame containing the team scores and home status.

  Returns:
    int: The match status (1 for a draw, 0 for a home loss, 3 for a home win).
  """

  # use the team_a_score and team_h_score and was_home columns to determine the match status
  if df['team_a_score'] == df['team_h_score']:
    return 1

  if df['team_a_score'] > df['team_h_score']:
    return 3 if df['was_home'] == False else 0
  else:
    return 0 if df['was_home'] == True else 3

def get_was_home_values(team_df, GW):
  """
  Returns a dictionary of boolean values indicating whether a team was home for each gameweek.

  Parameters:
    team_df (DataFrame): A pandas DataFrame containing team data.
    GW (list): A list of gameweeks.

  Returns:
    dict: A dictionary where the keys are gameweeks and the values are boolean values indicating whether the team was home.
  """
  was_home_values = {}
  for gw in GW:
    gw_df = team_df[team_df['GW'] == gw]
    was_home = gw_df['was_home'].any()
    was_home_values[gw] = was_home

  return was_home_values


def calculate_team_cumulative_goals_conceded(season, team, df):
  """
  Calculates the cumulative goals conceded by a team throughout a season.

  Parameters:
    season (int): The season for which to calculate the cumulative goals conceded.
    team (str): The name of the team for which to calculate the cumulative goals conceded.
    df (DataFrame): A pandas DataFrame containing the team's match data.

  Returns:
    DataFrame: A pandas DataFrame containing the team's cumulative goals conceded for each gameweek.
  """
  team_df = df[(df['season'] == season) & (df['team'] == team)].copy()
  team_df = team_df.sort_values(by='GW')
  GW = sorted(team_df['GW'].unique())
  was_home_dict = get_was_home_values(team_df, GW)

  total = 0
  for gw in GW:
    conceded = team_df[team_df['GW'] == gw]['team_a_score' if was_home_dict[gw] else 'team_h_score'].unique().max()
    total += conceded
    team_df.loc[team_df['GW'] == gw, 'team_cumulative_goals_conceded'] = total

  return team_df

def calculate_team_cumulative_goals_scored(season, team, df):
  """
  Calculates the cumulative goals scored by a team throughout a season.

  Parameters:
    season (int): The season for which to calculate the cumulative goals scored.
    team (str): The name of the team for which to calculate the cumulative goals scored.
    df (DataFrame): A pandas DataFrame containing the team's match data.

  Returns:
    DataFrame: A pandas DataFrame containing the team's cumulative goals scored for each gameweek.
  """
  team_df = df[(df['season'] == season) & (df['team'] == team)].copy()
  team_df = team_df.sort_values(by='GW')
  GW = sorted(team_df['GW'].unique())
  was_home_dict = get_was_home_values(team_df, GW)

  total = 0
  for gw in GW:
    scored = team_df[team_df['GW'] == gw]['team_h_score' if was_home_dict[gw] else 'team_a_score'].unique().max()
    total += scored
    team_df.loc[team_df['GW'] == gw, 'team_cumulative_goals_scored'] = total

  return team_df

def create_goals_assissts_points_form_features(df):
  """
  Creates features for a player's form over the last 5 matches.

  Parameters:
    df (DataFrame): A pandas DataFrame containing player match data.

  Returns:
    DataFrame: The input DataFrame with additional features for total points, assists, and goals form over the last 5 matches.
  """
  # create a feature for the total points form over the last 5 matches
  df = df.sort_values(by=['name', 'season', 'GW'])
  df['total_points_5_match_form'] = df.groupby(['season', 'name'])['total_points'].transform(lambda x: x.rolling(window=5, min_periods=1).mean())

  # create a feature for assists form over the last 5 matches
  df['assists_5_match_form'] = df.groupby(['season', 'name'])['assists'].transform(lambda x: x.rolling(window=5, min_periods=1).mean())

  # create a feature for goals form over the last 5 matches
  df['goals_5_match_form'] = df.groupby(['season', 'name'])['goals_scored'].transform(lambda x: x.rolling(window=5, min_periods=1).mean())

  return df

def create_goalkeepers_form_features(df):
  """
  Creates a new DataFrame with goalkeeper form features for the last 5 matches.

  Parameters:
    df (DataFrame): The input DataFrame containing player match data.

  Returns:
    DataFrame: A new DataFrame with the following columns:
      - season: The season of the matches.
      - name: The name of the goalkeeper.
      - GW: The gameweek number.
      - gk_form_goals_conceded_5_match_form: The average number of goals conceded by the goalkeeper over the last 5 matches.
  """
  goalkeepers_data = df[df['position'] == 'GK'].copy()

  goalkeepers_data['gk_form_goals_conceded_5_match_form'] = goalkeepers_data.groupby(['season', 'name'])['goals_conceded'].transform(lambda x: x.rolling(window=5, min_periods=1).mean())
  gk_extract = goalkeepers_data[['season', 'name', 'GW', 'gk_form_goals_conceded_5_match_form']]

  return gk_extract

def create_conceded_form_features(df):
  """
  Creates a DataFrame with conceded form features for each team in each season.

  Args:
    df (DataFrame): The input DataFrame containing match data.

  Returns:
    DataFrame: A DataFrame with the following columns:
      - season (int): The season of the matches.
      - team (str): The name of the team.
      - GW (int): The gameweek number.
      - team_cumulative_goals_conceded (int): The cumulative number of goals conceded by the team.
      - team_cumulative_goals_conceded_5_match_form (float): The average number of goals conceded by the team over the last 5 matches.
  """
  conceded_df = []

  teams = df['team'].unique()
  seasons = df['season'].unique()

  for season in seasons:
    print(f'Calculating for {season}')
    for team in teams:
      updated = calculate_team_cumulative_goals_conceded(season, team, df)
      conceded_df.append(updated)

  conceded_df = pd.concat(conceded_df)
  conceded_df_combined = conceded_df[['season', 'team', 'GW', 'team_cumulative_goals_conceded']]
  conceded_df_combined['team_cumulative_goals_conceded_5_match_form'] = conceded_df.groupby(['season', 'team'])['team_cumulative_goals_conceded'].transform(lambda x: x.rolling(window=5, min_periods=1).mean())

  return conceded_df_combined

def create_scored_form_features(df):
  """
  Creates a DataFrame with scored form features for each team in each season.

  Parameters:
    df (DataFrame): The input DataFrame containing match data.

  Returns:
    DataFrame: A DataFrame with the following columns:
      - season (int): The season of the matches.
      - team (str): The name of the team.
      - GW (int): The gameweek number.
      - team_cumulative_goals_scored (int): The cumulative number of goals scored by the team.
      - team_cumulative_goals_scored_5_match_form (float): The average number of goals scored by the team over the last 5 matches.
  """
  scored_df = []

  teams = df['team'].unique()
  seasons = df['season'].unique()

  for season in seasons:
    print(f'Calculating for {season}')
    for team in teams:
      updated = calculate_team_cumulative_goals_scored(season, team, df)
      scored_df.append(updated)

  scored_df = pd.concat(scored_df)
  scored_df_combined = scored_df[['season', 'team', 'GW', 'team_cumulative_goals_scored']]
  scored_df_combined['team_cumulative_goals_scored_5_match_form'] = scored_df.groupby(['season', 'team'])['team_cumulative_goals_scored'].transform(lambda x: x.rolling(window=5, min_periods=1).mean())

  return scored_df_combined

def get_current_season_data(year, previous_year, save_file_name):
  list_dfs = []
  print(f"Collecting data for year:{year}")

  # get previous_seasons_data
  player_prev_stats = pd.read_csv(
      f"https://raw.githubusercontent.com/vaastav/Fantasy-Premier-League/master/data/{previous_year}/cleaned_players.csv",
      encoding="latin-1",
  )
  player_prev_stats["name"] = (
      player_prev_stats["first_name"] + " " + player_prev_stats["second_name"]
  )
  player_prev_stats.drop(["first_name", "second_name"], axis=1, inplace=True)
  player_prev_stats.columns = player_prev_stats.columns + "_ex"

  # get opponent_team
  teams = pd.read_csv(
      f"https://raw.githubusercontent.com/vaastav/Fantasy-Premier-League/master/data/{year}/teams.csv",
      encoding="latin-1",
  )[["id", "name"]]
  teams.columns = ["opponent_team", "opponent"]

  # opponents position last season
  teams["opponent_last_season_position"] = teams["opponent"].apply(
      get_teams_previous_position(previous_year)
  )

  for gameweek in range(1, 39):
      print(f"Collecting data for gameweek:{gameweek}")
      try:
        df = pd.read_csv(
            f"https://raw.githubusercontent.com/vaastav/Fantasy-Premier-League/master/data/{year}/gws/gw{gameweek}.csv",
            encoding="latin-1",
        )
      except HTTPError:
        print(f"Gameweek {gameweek} not found")
        break

      # teams position last season
      df["last_season_position"] = df["team"].apply(get_teams_previous_position(previous_year))

      # calculate percentage value to team
      df["percent_value"] = df["name"].apply(lambda name: calculate_point_ratio_against_team(name, df))

      # chek if the result was a win or not
      df['match_status'] = df.apply(match_status, axis=1)

      # merge previous_season_data
      df = pd.merge(
          df, player_prev_stats, left_on="name", right_on="name_ex", how="left"
      )
      df["season"] = year
      df.drop("name_ex", axis=1, inplace=True)
      df["GW"] = gameweek

      # merge opponent team
      df = pd.merge(df, teams, on="opponent_team", how="left")
      df = df.fillna(0)
      list_dfs.append(df)

  if not list_dfs:
    print("No data found for the specified year.")
    return
  df = pd.concat(list_dfs)
  df.to_csv(f"/content/drive/MyDrive/FPL-Project/{save_file_name}.csv", index=False)
  print("All Done, Saved the file!")

def detect_encoding(url):
    response = requests.get(url)
    raw_data = response.content
    result = chardet.detect(raw_data)
    return result['encoding']