In [1]:
#@title
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import re
import sys, getopt
import csv
import time
import traceback
from functools import wraps
from requests.exceptions import HTTPError
import json

In [24]:
# NOTE Most complete data seems to appear after 2018 (eg. Distance for shooting in Championship)

# La Liga: 12, Segunda Div: 17, EPL: 9, Championship: 10

COMPETITION_MAPPING = {
    'ENG': {
        9: 'Premier-League',
        10: 'Championship'
    },
    'SPA': {
        12: 'La-Liga',
        17: 'Segunda-Division'
    },
    'GER': {
        20: 'Bundesliga',
        33: '2-Bundesliga'
    },
    'ITA': {
        11: 'Serie-A',
        18: 'Serie-B'
    },
    'FRA': {
        13: 'Ligue-1',
        60: 'Ligue-2'
    }
}

MAX_RETRIES = 3

In [25]:
def catch_too_many_requests(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        for i in range(MAX_RETRIES):
            try:
                return func(*args, **kwargs)
            except HTTPError as http_err:
                if hasattr(http_err, 'response') and http_err.response.status_code == 429:
                    print("Rate limit exceeded.")
                else:
                    print(f"HTTP error occurred: {http_err}" + ", retrying" if i < MAX_RETRIES else "")
    return wrapper

In [40]:
class SeasonData:

    def __init__(self, year, country, comp_id):
        self.country = country
        self.year = year
        self.comp_id = comp_id
        self.main_url = f"https://fbref.com/en/comps/{comp_id}/{year}-{year+1}/"
        self.suf_url = f"/{year}-{year+1}-{COMPETITION_MAPPING[country][comp_id]}-Stats"

        self._all_teams = None

    @property
    @catch_too_many_requests
    def all_teams(self):
        global ALL_TEAMS_DICT
        if self._all_teams:
            pass
        elif ALL_TEAMS_DICT[self.country][self.year].get(self.comp_id, None):
            self.all_teams = ALL_TEAMS_DICT[self.country][self.year][self.comp_id]
            print(f"Teams found for {self.year}: {self._all_teams}")
        else:
            self.all_teams = self.get_all_teams()

        return self._all_teams

    @all_teams.setter
    def all_teams(self, all_teams):
        global ALL_TEAMS_DICT
        ALL_TEAMS_DICT[self.country][self.year][self.comp_id] = all_teams
        self._all_teams = all_teams

    def get_all_teams(self):
        all_teams = []
        res = requests.get(self.main_url)
        res.raise_for_status()
        comm = re.compile("<!--|-->")
        soup = BeautifulSoup(comm.sub("",res.text),'lxml') 
        all_tables = soup.findAll("tbody")
        results_table = all_tables[0]
        rows_teams = results_table.find_all('tr')
        for row in rows_teams:
            team = self.find_team(row)
            if not team:
                return
            all_teams.append(team)
        return all_teams
            
    def find_team(self, row): # promoted or relegated
        team_td = row.find('td', {'data-stat': 'team'})
        if team_td is not None:
            if team_td:
                team_name = team_td.find('a').text.strip()
                if team_name:
                    return team_name
                
    @catch_too_many_requests
    def get_category_table(self, category):
        url = self.main_url + category + self.suf_url
        res = requests.get(url)
        res.raise_for_status()
        ## The next two lines get around the issue with comments breaking the parsing.
        comm = re.compile("<!--|-->")
        soup = BeautifulSoup(comm.sub("",res.text),'lxml')    
        all_tables = soup.findAll("tbody")
        team_table = all_tables[0]
        return team_table

class CombinedSeasonData:

    categories = ['stats', 'keepers', 'keepersadv', 'shooting', 'passing', 'passing_types', 'gca', 'defense', 'possession', 'misc']
    file_path = './data/teams/teams.txt'

    def __init__(self, year, country):
        self.country = country
        self.year = year
        self.comp_ids = list(COMPETITION_MAPPING[country].keys())
        self.next_season_data = SeasonData(year+1, country, self.comp_ids[0])
        self.first_div_data = SeasonData(year, country, self.comp_ids[0])
        self.second_div_data = SeasonData(year, country, self.comp_ids[1])

        self.df_team = None

        self._promoted_teams = None
        self._relegated_teams = None

    @property
    def promoted_teams(self):
        if not self._promoted_teams:
            self.promoted_teams = list(set(self.next_season_data.all_teams) & set(self.second_div_data.all_teams))

        return self._promoted_teams

    @promoted_teams.setter
    def promoted_teams(self, teams):
        self._promoted_teams = teams

    @property
    def relegated_teams(self):
        if not self._relegated_teams:
            self.relegated_teams = list(set(self.first_div_data.all_teams) - set(self.next_season_data.all_teams))

        return self._relegated_teams

    @relegated_teams.setter
    def relegated_teams(self, teams):
        self._relegated_teams = teams

    def get_frame_team(self, team_table, url):
        '''This function reads the HTML table rows, row by row, and extracts the individual data. Each row is one team. Any columns within the row that has the 'data-stat' attribute will be extracted. It returns a dictionary whose keys are the columns and values are a list of the column data'''
        pre_df_squad = dict()
        rows_squad = team_table.find_all('tr')
        for row in rows_squad:
            if(row.find('th',{"scope":"row"}) != None) and row.find('th',{"data-stat":"team"}):
                name = row.find('th',{"data-stat":"team"}).text.strip().encode().decode("utf-8")
                if (url == self.second_div_data.main_url and name not in self.promoted_teams) or \
                    (url == self.first_div_data.main_url and name in self.relegated_teams):
                    continue
                # print(f"Team found: {name}")
                if 'squad' in pre_df_squad:
                    pre_df_squad['squad'].append(name)
                else:
                    pre_df_squad['squad'] = [name]
                for f in [td.get('data-stat') for td in row.find_all('td')]:
                    cell = row.find("td",{"data-stat": f})
                    a = cell.text.strip().encode()
                    text=a.decode("utf-8")
                    if(text == ''):
                        text = None
                    if f in pre_df_squad:
                        pre_df_squad[f].append(text)
                    else:
                        pre_df_squad[f] = [text]
        df_squad = pd.DataFrame.from_dict(pre_df_squad)
    #     print(df_squad.columns) # print columns for each table
        return df_squad

    def frame_for_category_team(self, category):
        '''Sends 2 requests to retrieve data for the specified category from the 1st division and the 2nd division. Afterwards, the 2 tables are concatenated vertically. Each row is each team that appears in the following season'''
        df_divs_array = []
        for div_data in [self.first_div_data, self.second_div_data]:
            team_table = div_data.get_category_table(category)
            df_team = self.get_frame_team(team_table, div_data.main_url)
            df_divs_array.append(df_team)
        
        return pd.concat(df_divs_array)

    #Function to get team-wise data accross all categories as mentioned above
    def get_team_data(self):
        '''This is the 'main' function that will retrieve all data for each category for each team within the current season that appears in the following season. The categories are concatenated horizonatally such that len(rows) = len(teams)'''
        df_array = []
        for category in self.categories:
            df_array.append(self.frame_for_category_team(category))
            time.sleep(5)
        df = pd.concat(df_array, axis=1)
        self.df_team = df.loc[:,~df.columns.duplicated()].copy()
        self.df_team['season_start_year'] = self.year
        self.df_team['season_end_year'] = self.year + 1
    
    def save_to_csv(self):
        try:
            self.df_team.to_csv(f"./data/teams/raw/{self.country}{self.year}_teams_for_{self.year+1}.csv",index=False)
            print(f"Saved successfully ({self.year})")
        except Exception as e:
            print(e)

In [41]:
#Go to the 'Standard stats' page of the league
#For Premier League 2020/21, the link is this: https://fbref.com/en/comps/9/stats/Premier-League-Stats
#Remove the 'stats', and pass the first and third part of the link as parameters like below

# NOTE: may need to add delays between requests to prevent 429 (gettting blocked by fbref)

ALL_TEAMS_DICT = {} # when checking the current season teams, the previous iteration checking for next season teams would have found it already

from requests.exceptions import HTTPError

for country in COMPETITION_MAPPING.keys():
    ALL_TEAMS_DICT[country] = {}
    for year in range(2018, 2023):
        ALL_TEAMS_DICT[country][year] = {}
        ALL_TEAMS_DICT[country][year+1] = {}

        season_data = CombinedSeasonData(year, country)

        season_data.get_team_data() # sets self.df_team

        season_data.save_to_csv()

Saved successfully (2018)
Saved successfully (2019)
Saved successfully (2020)
Saved successfully (2021)
Saved successfully (2022)
Saved successfully (2018)
Saved successfully (2019)
Saved successfully (2020)
Saved successfully (2021)
Saved successfully (2022)
Saved successfully (2018)
Saved successfully (2019)
Saved successfully (2020)
Saved successfully (2021)
Saved successfully (2022)
Saved successfully (2018)
Saved successfully (2019)
Saved successfully (2020)
Saved successfully (2021)
Saved successfully (2022)
Saved successfully (2018)
Saved successfully (2019)
Saved successfully (2020)
Saved successfully (2021)
Saved successfully (2022)
