In [None]:
from parsers import *
from cleaners import *
from getters import *
from collector import collect_gw, merge_gw
from understat import parse_epl_data
import csv

def parse_data():
    """ Parse and store all the data
    """
    season = '2024-25'
    base_filename = 'data/' + season + '/'
    print("Getting data")
    data = get_data()
    print("Parsing summary data")
    parse_players(data["elements"], base_filename)
    xPoints = []
    for e in data["elements"]:
        xPoint = {}
        xPoint['id'] = e['id']
        xPoint['xP'] = e['ep_this']
        xPoints += [xPoint]
    gw_num = 0
    events = data["events"]
    for event in events:
        if event["is_current"] == True:
            gw_num = event["id"]
    print("Cleaning summary data")
    clean_players(base_filename + 'players_raw.csv', base_filename)
    print("Getting fixtures data")
    fixtures(base_filename)
    print("Getting teams data")
    parse_team_data(data["teams"], base_filename)
    print("Extracting player ids")
    id_players(base_filename + 'players_raw.csv', base_filename)
    player_ids = get_player_ids(base_filename)
    num_players = len(data["elements"])
    player_base_filename = base_filename + 'players/'
    gw_base_filename = base_filename + 'gws/'
    print("Extracting player specific data")
    for i,name in player_ids.items():
        player_data = get_individual_player_data(i)
        parse_player_history(player_data["history_past"], player_base_filename, name, i)
        parse_player_gw_history(player_data["history"], player_base_filename, name, i)
    if gw_num > 0:
        print("Writing expected points")
        with open(os.path.join(gw_base_filename, 'xP' + str(gw_num) + '.csv'), 'w+') as outf:
            w = csv.DictWriter(outf, ['id', 'xP'])
            w.writeheader()
            for xp in xPoints:
                w.writerow(xp)
        print("Collecting gw scores")
        collect_gw(gw_num, player_base_filename, gw_base_filename, base_filename)
        print("Merging gw scores")
        merge_gw(gw_num, gw_base_filename)
    understat_filename = base_filename + 'understat'
    parse_epl_data(understat_filename)

def fixtures(base_filename):
    data = get_fixtures_data()
    parse_fixtures(data, base_filename)

def main():
    parse_data()

if __name__ == "__main__":
    main()


## Parsers


In [None]:
import csv
import os
from utility import uprint
import pandas as pd

def extract_stat_names(dict_of_stats):
    """ Extracts all the names of the statistics

    Args:
        dict_of_stats (dict): Dictionary containing key-alue pair of stats
    """
    stat_names = []
    for key, val in dict_of_stats.items():
        stat_names += [key]
    return stat_names

def parse_top_players(data, base_filename):
    rows = []
    for event in data['events']:
        gw = event['id']
        player_id = event['top_element']
        points = event['top_element_info']['points']
        row = {}
        row['gw'] = gw
        row['player_id'] = player_id
        row['points'] = points
        rows += [row]
    f = open(os.path.join(base_filename, 'best_players.csv'), 'w+', newline='')
    w = csv.DictWriter(f, ['gw', 'player_id', 'points'])
    w.writeheader()
    for row in rows:
        w.writerow(row)

def parse_players(list_of_players, base_filename):
    stat_names = extract_stat_names(list_of_players[0])
    filename = base_filename + 'players_raw.csv'
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    f = open(filename, 'w+', encoding='utf8', newline='')
    w = csv.DictWriter(f, sorted(stat_names))
    w.writeheader()
    for player in list_of_players:
            w.writerow({k:str(v).encode('utf-8').decode('utf-8') for k, v in player.items()})

def parse_player_history(list_of_histories, base_filename, player_name, Id):
    if len(list_of_histories) > 0:
        stat_names = extract_stat_names(list_of_histories[0])
        filename = base_filename + player_name + '_' + str(Id) + '/history.csv'
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        f = open(filename, 'w+', encoding='utf8', newline='')
        w = csv.DictWriter(f, sorted(stat_names))
        w.writeheader()
        for history in list_of_histories:
            w.writerow(history)

def parse_player_gw_history(list_of_gw, base_filename, player_name, Id):
    if len(list_of_gw) > 0:
        stat_names = extract_stat_names(list_of_gw[0])
        filename = base_filename + player_name + '_' + str(Id) + '/gw.csv'
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        f = open(filename, 'w+', encoding='utf8', newline='')
        w = csv.DictWriter(f, sorted(stat_names))
        w.writeheader()
        for gw in list_of_gw:
            w.writerow(gw)

def parse_gw_entry_history(data, outfile_base):
    for gw in data:
        picks = gw['picks']
        event = gw['entry_history']['event']
        filename = "picks_" +str(event) + ".csv"
        picks_df = pd.DataFrame.from_records(picks)
        picks_df.to_csv(os.path.join(outfile_base, filename), index=False)

def parse_entry_history(data, outfile_base):
    chips_df = pd.DataFrame.from_records(data["chips"])
    chips_df.to_csv(os.path.join(outfile_base, 'chips.csv'), index=False)
    season_df = pd.DataFrame.from_records(data["past"])
    season_df.to_csv(os.path.join(outfile_base, 'history.csv'), index=False)
    #profile_data = data["entry"].pop('kit', data["entry"])
    #profile_df = pd.DataFrame.from_records(profile_data)
    #profile_df.to_csv(os.path.join(outfile_base, 'profile.csv'), index=False)
    gw_history_df = pd.DataFrame.from_records(data["current"])
    gw_history_df.to_csv(os.path.join(outfile_base, 'gws.csv'), index=False)

def parse_entry_leagues(data, outfile_base):
    classic_leagues_df = pd.DataFrame.from_records(data["leagues"]["classic"])
    classic_leagues_df.to_csv(os.path.join(outfile_base, 'classic_leagues.csv'), index=False)
    try:
        cup_leagues_df = pd.DataFrame.from_records(data["leagues"]["cup"]["matches"])
        cup_leagues_df.to_csv(os.path.join(outfile_base, 'cup_leagues.csv'), index=False)
    except KeyError:
        print("No cups yet")
    h2h_leagues_df = pd.DataFrame.from_records(data["leagues"]["h2h"])
    h2h_leagues_df.to_csv(os.path.join(outfile_base, 'h2h_leagues.csv'), index=False)

def parse_transfer_history(data, outfile_base):
    wildcards_df = pd.DataFrame.from_records(data)
    wildcards_df.to_csv(os.path.join(outfile_base, 'transfers.csv'), index=False)

def parse_fixtures(data, outfile_base):
    fixtures_df = pd.DataFrame.from_records(data)
    fixtures_df.to_csv(os.path.join(outfile_base, 'fixtures.csv'), index=False)

def parse_team_data(data, outfile_base):
    teams_df = pd.DataFrame.from_records(data)
    teams_df.to_csv(os.path.join(outfile_base, 'teams.csv'), index=False)


## Cleaners


In [None]:
import csv
import math
import os

def clean_players(filename, base_filename):
    """ Creates a file with only important data columns for each player

    Args:
        filename (str): Name of the file that contains the full data for each player
    """
    headers = ['first_name', 'second_name', 'goals_scored', 'assists', 'total_points', 'minutes', 'goals_conceded', 'creativity', 'influence', 'threat', 'bonus', 'bps', 'ict_index', 'clean_sheets', 'red_cards', 'yellow_cards', 'selected_by_percent', 'now_cost', 'element_type']
    fin = open(filename, 'r+', encoding='utf-8')
    outname = base_filename + 'cleaned_players.csv'
    os.makedirs(os.path.dirname(outname), exist_ok=True)
    fout = open(outname, 'w+', encoding='utf-8', newline='')
    reader = csv.DictReader(fin)
    writer = csv.DictWriter(fout, headers, extrasaction='ignore')
    writer.writeheader()
    for line in reader:
        if line['element_type'] == '1':
            line['element_type'] = 'GK'
        elif line['element_type'] == '2':
            line['element_type'] = 'DEF'
        elif line['element_type'] == '3':
            line['element_type'] = 'MID'
        elif line['element_type'] == '4':
            line['element_type'] = 'FWD'
        else:
            print("Oh boy")
        writer.writerow(line)

def id_players(players_filename, base_filename):
    """ Creates a file that contains the name to id mappings for each player

    Args:
        players_filename (str): Name of the file that contains the full data for each player
    """
    headers = ['first_name', 'second_name', 'id']
    fin = open(players_filename, 'r+', encoding='utf-8')
    outname = base_filename + 'player_idlist.csv'
    os.makedirs(os.path.dirname(outname), exist_ok=True)
    fout = open(outname, 'w+', encoding='utf-8', newline='')
    reader = csv.DictReader(fin)
    writer = csv.DictWriter(fout, headers, extrasaction='ignore')
    writer.writeheader()
    for line in reader:
        writer.writerow(line)

def get_player_ids(base_filename):
    """ Gets the list of all player ids and player names
    """
    filename = base_filename + 'player_idlist.csv'
    fin = open(filename, 'r+', encoding='utf-8')
    reader = csv.DictReader(fin)
    player_ids = {}
    for line in reader:
        k = int(line['id'])
        v = line['first_name'] + '_' + line['second_name']
        player_ids[k] = v
    return player_ids


## Getters


In [None]:
import requests
import json
import time

def get_data():
    """ Retrieve the fpl player data from the hard-coded url
    """
    response = requests.get("https://fantasy.premierleague.com/api/bootstrap-static/")
    if response.status_code != 200:
        raise Exception("Response was code " + str(response.status_code))
    responseStr = response.text
    data = json.loads(responseStr)
    return data

def get_individual_player_data(player_id):
    """ Retrieve the player-specific detailed data

    Args:
        player_id (int): ID of the player whose data is to be retrieved
    """
    base_url = "https://fantasy.premierleague.com/api/element-summary/"
    full_url = base_url + str(player_id) + "/"
    response = ''
    while response == '':
        try:
            response = requests.get(full_url)
        except:
            time.sleep(5)
    if response.status_code != 200:
        raise Exception("Response was code " + str(response.status_code))
    data = json.loads(response.text)
    return data

def get_entry_data(entry_id):
    """ Retrieve the summary/history data for a specific entry/team

    Args:
        entry_id (int) : ID of the team whose data is to be retrieved
    """
    base_url = "https://fantasy.premierleague.com/api/entry/"
    full_url = base_url + str(entry_id) + "/history/"
    response = ''
    while response == '':
        try:
            response = requests.get(full_url)
        except:
            time.sleep(5)
    if response.status_code != 200:
        raise Exception("Response was code " + str(response.status_code))
    data = json.loads(response.text)
    return data

def get_entry_personal_data(entry_id):
    """ Retrieve the summary/history data for a specific entry/team

    Args:
        entry_id (int) : ID of the team whose data is to be retrieved
    """
    base_url = "https://fantasy.premierleague.com/api/entry/"
    full_url = base_url + str(entry_id) + "/"
    response = ''
    while response == '':
        try:
            response = requests.get(full_url)
        except:
            time.sleep(5)
    if response.status_code != 200:
        raise Exception("Response was code " + str(response.status_code))
    data = json.loads(response.text)
    return data

def get_entry_gws_data(entry_id,num_gws,start_gw=1):
    """ Retrieve the gw-by-gw data for a specific entry/team

    Args:
        entry_id (int) : ID of the team whose data is to be retrieved
    """
    base_url = "https://fantasy.premierleague.com/api/entry/"
    gw_data = []
    for i in range(start_gw, num_gws+1):
        full_url = base_url + str(entry_id) + "/event/" + str(i) + "/picks/"
        response = ''
        while response == '':
            try:
                response = requests.get(full_url)
            except:
                time.sleep(5)
        if response.status_code != 200:
            raise Exception("Response was code " + str(response.status_code))
        data = json.loads(response.text)
        gw_data += [data]
    return gw_data

def get_entry_transfers_data(entry_id):
    """ Retrieve the transfer data for a specific entry/team

    Args:
        entry_id (int) : ID of the team whose data is to be retrieved
    """
    base_url = "https://fantasy.premierleague.com/api/entry/"
    full_url = base_url + str(entry_id) + "/transfers/"
    response = ''
    while response == '':
        try:
            response = requests.get(full_url)
        except:
            time.sleep(5)
    if response.status_code != 200:
        raise Exception("Response was code " + str(response.status_code))
    data = json.loads(response.text)
    return data

def get_fixtures_data():
    """ Retrieve the fixtures data for the season
    """
    url = "https://fantasy.premierleague.com/api/fixtures/"
    response = ''
    while response == '':
        try:
            response = requests.get(url)
        except:
            time.sleep(5)
    if response.status_code != 200:
        raise Exception("Response was code " + str(response.status_code))
    data = json.loads(response.text)
    return data

def main():
    data = get_data()
    with open('raw.json', 'w') as outf:
        json.dump(data, outf)

if __name__ == '__main__':
    main()


## Collector


import os
import sys
import csv

def get_teams(directory):
teams = {}
fin = open(directory + "/teams.csv", 'r')
reader = csv.DictReader(fin)
for row in reader:
teams[int(row['id'])] = row['name']
return teams

def get_fixtures(directory):
fixtures_home = {}
fixtures_away = {}
fin = open(directory + "/fixtures.csv", 'r')
reader = csv.DictReader(fin)
for row in reader:
fixtures_home[int(row['id'])] = int(row['team_h'])
fixtures_away[int(row['id'])] = int(row['team_a'])
return fixtures_home, fixtures_away

def get_positions(directory):
positions = {}
names = {}
pos_dict = {'1': "GK", '2': "DEF", '3': "MID", '4': "FWD"}
fin = open(directory + "/players_raw.csv", 'r',encoding="utf-8")
reader = csv.DictReader(fin)
for row in reader:
positions[int(row['id'])] = pos_dict[row['element_type']]
names[int(row['id'])] = row['first_name'] + ' ' + row['second_name']
return names, positions

def get_expected_points(gw, directory):
xPoints = {}
try:
fin = open(os.path.join(directory, 'xP' + str(gw) + '.csv'), 'r')
reader = csv.DictReader(fin)
for row in reader:
xPoints[int(row['id'])] = row['xP']
except:
return xPoints  
 return xPoints

def merge_gw(gw, gw_directory):
merged_gw_filename = "merged_gw.csv"
gw_filename = "gw" + str(gw) + ".csv"
gw_path = os.path.join(gw_directory, gw_filename)
fin = open(gw_path, 'r', encoding="utf-8")
reader = csv.DictReader(fin)
fieldnames = reader.fieldnames
fieldnames += ["GW"]
rows = []
for row in reader:
row["GW"] = gw
rows += [row]
out_path = os.path.join(gw_directory, merged_gw_filename)
fout = open(out_path,'a', encoding="utf-8")
writer = csv.DictWriter(fout, fieldnames=fieldnames, lineterminator='\n')
print(gw)
if gw == 1:
writer.writeheader()
for row in rows:
writer.writerow(row)

def collect*gw(gw, directory_name, output_dir, root_directory_name="data/2024-25"):
rows = []
fieldnames = []
fixtures_home, fixtures_away = get_fixtures(root_directory_name)
teams = get_teams(root_directory_name)
names, positions = get_positions(root_directory_name)
xPoints = get_expected_points(gw, output_dir)
for root, dirs, files in os.walk(u"./" + directory_name):
for fname in files:
if fname == 'gw.csv':
fpath = os.path.join(root, fname)
fin = open(fpath, 'r')
reader = csv.DictReader(fin)
fieldnames = reader.fieldnames
for row in reader:
if int(row['round']) == gw:
id = int(os.path.basename(root).split('*')[-1])
name = names[id]
position = positions[id]
fixture = int(row['fixture'])
if row['was_home'] == True or row['was_home'] == "True":
row['team'] = teams[fixtures_home[fixture]]
else:
row['team'] = teams[fixtures_away[fixture]]
row['name'] = name
row['position'] = position
if id in xPoints:
row['xP'] = xPoints[id]
else:
row['xP'] = 0.0
rows += [row]

    fieldnames = ['name', 'position', 'team', 'xP'] + fieldnames
    outf = open(os.path.join(output_dir, "gw" + str(gw) + ".csv"), 'w', encoding="utf-8")
    writer = csv.DictWriter(outf, fieldnames=fieldnames, lineterminator='\n')
    writer.writeheader()
    for row in rows:
        writer.writerow(row)

def collect_all_gws(directory_name, output_dir, root_dir):
for i in range(1,17):
collect_gw(i, directory_name, output_dir, root_dir)

def merge_all_gws(num_gws, gw_directory):
for i in range(1, num_gws):
merge_gw(i, gw_directory)

def main():
#collect_all_gws(sys.argv[1], sys.argv[2], sys.argv[3])
merge_all_gws(int(sys.argv[1]), sys.argv[2])
#collect_gw(26, sys.argv[1], sys.argv[2])

if **name** == '**main**':
main()


## Understat


In [None]:
import requests
import json
from bs4 import BeautifulSoup
import re
import codecs
import pandas as pd
import os
import csv

def get_data(url):
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception("Response was code " + str(response.status_code))
    html = response.text
    parsed_html = BeautifulSoup(html, 'html.parser')
    scripts = parsed_html.findAll('script')
    filtered_scripts = []
    for script in scripts:
        if len(script.contents) > 0:
            filtered_scripts += [script]
    return scripts

def get_epl_data():
    scripts = get_data("https://understat.com/league/EPL/2024")
    teamData = {}
    playerData = {}
    for script in scripts:
        for c in script.contents:
            split_data = c.split('=')
            data = split_data[0].strip()
            if data == 'var teamsData':
                content = re.findall(r'JSON\.parse\(\'(.*)\'\)',split_data[1])
                decoded_content = codecs.escape_decode(content[0], "hex")[0].decode('utf-8')
                teamData = json.loads(decoded_content)
            elif data == 'var playersData':
                content = re.findall(r'JSON\.parse\(\'(.*)\'\)',split_data[1])
                decoded_content = codecs.escape_decode(content[0], "hex")[0].decode('utf-8')
                playerData = json.loads(decoded_content)
    return teamData, playerData

def get_player_data(id):
    scripts = get_data("https://understat.com/player/" + str(id))
    groupsData = {}
    matchesData = {}
    shotsData = {}
    for script in scripts:
        for c in script.contents:
            split_data = c.split('=')
            data = split_data[0].strip()
            if data == 'var matchesData':
                content = re.findall(r'JSON\.parse\(\'(.*)\'\)',split_data[1])
                decoded_content = codecs.escape_decode(content[0], "hex")[0].decode('utf-8')
                matchesData = json.loads(decoded_content)
            elif data == 'var shotsData':
                content = re.findall(r'JSON\.parse\(\'(.*)\'\)',split_data[1])
                decoded_content = codecs.escape_decode(content[0], "hex")[0].decode('utf-8')
                shotsData = json.loads(decoded_content)
            elif data == 'var groupsData':
                content = re.findall(r'JSON\.parse\(\'(.*)\'\)',split_data[1])
                decoded_content = codecs.escape_decode(content[0], "hex")[0].decode('utf-8')
                groupsData = json.loads(decoded_content)
    return matchesData, shotsData, groupsData

def parse_epl_data(outfile_base):
    teamData,playerData = get_epl_data()
    new_team_data = []
    for t,v in teamData.items():
        new_team_data += [v]
    for data in new_team_data:
        team_frame = pd.DataFrame.from_records(data["history"])
        team = data["title"].replace(' ', '_')
        team_frame.to_csv(os.path.join(outfile_base, 'understat_' + team + '.csv'), index=False)
    player_frame = pd.DataFrame.from_records(playerData)
    player_frame.to_csv(os.path.join(outfile_base, 'understat_player.csv'), index=False)
    for d in playerData:
        matches, shots, groups = get_player_data(int(d['id']))
        indi_player_frame = pd.DataFrame.from_records(matches)
        player_name = d['player_name']
        player_name = player_name.replace(' ', '_')
        indi_player_frame.to_csv(os.path.join(outfile_base, player_name + '_' + d['id'] + '.csv'), index=False)

class PlayerID:
    def __init__(self, us_id, fpl_id, us_name, fpl_name):
        self.us_id = str(us_id)
        self.fpl_id = str(fpl_id)
        self.us_name = us_name
        self.fpl_name = fpl_name


def match_ids(understat_dir, data_dir):
    with open(os.path.join(understat_dir, 'understat_player.csv')) as understat_file:
        understat_inf = csv.DictReader(understat_file)
        ustat_players = {}
        for row in understat_inf:
            ustat_players[row['player_name']] = row['id']

    with open(os.path.join(data_dir, 'player_idlist.csv')) as fpl_file:
        fpl_players = {}
        fpl_inf = csv.DictReader(fpl_file)
        for row in fpl_inf:
            fpl_players[row['first_name'] + ' ' + row['second_name']] = row['id']
    players = []
    found = {}
    for k, v in ustat_players.items():
        if k in fpl_players:
            player = PlayerID(v, fpl_players[k], k, k)
            players += [player]
            found[k] = True
        else:
            player = PlayerID(v, -1, k, "")
            players += [player]

    for k, v in fpl_players.items():
        if k not in found:
            player = PlayerID(-1, v, "", k)
            players += [player]

    with open(os.path.join(data_dir, 'id_dict.csv'), 'w+') as outf:
        outf.write('Understat_ID, FPL_ID, Understat_Name, FPL_Name\n')
        for p in players:
            outf.write(p.us_id + "," + p.fpl_id + "," + p.us_name + "," + p.fpl_name + "\n")

def main():
    #parse_epl_data('data/2021-22/understat')
    #md, sd, gd = get_player_data(318)
    #match_frame = pd.DataFrame.from_records(md)
    #match_frame.to_csv('auba.csv', index=False)
    match_ids('data/2024-25/understat', 'data/2024-25')

if __name__ == '__main__':
    main()
