## To Do List
- Fix non running horses (should be fixed)
- Fully automate
- Fix non uploaded data issue

In [1]:
from bs4 import BeautifulSoup
import datetime
import json
import numpy as np
import pandas as pd
# import re
import requests
import time

In [2]:
HEADERS = {
    'Accept' : 'application/json',
    'Content-Type' : 'application/json',
    'Referer' : 'https://www.racingtv.com/',
    'sec-ch-ua' : '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
    'sec-ch-ua-mobile' : '?0',
    'sec-ch-ua-platform' : '"Windows"',
    'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36',
    'x-requested-with' : 'racingtv-web/5.3.0'
}

In [3]:
# Picking data from one keyword
def one_filter(data, keyword):
    try:
        return data[keyword]
    # Issues occur if return np.nan so return 'N/A'
    except:
        return 'N/A'

# Picking data from three keywords
def three_filter(data, keyword1, keyword2, keyword3):
    try:
        return data[keyword1][keyword2][keyword3]
    # Issues occur if return np.nan so return 'N/A'
    except:
        return 'N/A'

In [4]:
def collect_race_data(data_response, race_response):
    """
    Collects and organises data from a horse race using API responses.

    Keyword arguments:
        data_response -- HTTP response containing horse metric data
        race_response -- HTTP response containing race and runner data

    Returns:
        dict -- Structured horse racing data with metrics and race info
    """

    # Checks first API calls was successful and extracts the horses' metric data.
    if data_response.status_code == 200:
        try:
            metric_data = data_response.json()['horses']
        except:
            print('Could not find "horses" in the json')
            return 'No horse data'
    else:
        print('Response code: {:d}'.format(data_response.status_code))
        print('Problem url is: {:s}'.format(data_response.url))
        return

    # Checks second API calls was successful and extracts the race and runner data.
    if race_response.status_code == 200:
        try:
            race_data = race_response.json()['race']
        except:
            print('Could not find "race" in the json')
            print('Problem url is: {:s}'.format(race_url))
            print('Aborting data collection')
            return
    else:
        print('Error code {:d}'.format(race_response.status_code))
        print('Problem url is: {:s}'.format(race_response.url))
        return

    # Initialise empty lists to store relevant data for each horse.
    horse, position, finish_speed_value, finish_speed_rank, finish_speed_avrg, finish_speed_prct, top_speed_value, top_speed_rank, top_speed_avrg, top_speed_prct, race_id, race_type, location, trainer, jockey, owner, age = ([] for i in range(17))

    # Create a list of horses which finished the race.
    finished_horses = []
    for data in metric_data:
        if data['finish_position'] != None:
            finished_horses.append(data['horse_name'])

    num_finished = len(finished_horses)

    # Collect data for each horse that finished the race.
    for i in range(num_finished):
        # Horse name and finishing position
        horse.append(one_filter(metric_data[i], 'horse_name'))
        position.append(one_filter(metric_data[i], 'finish_position'))

        # Finishing speed data
        finish_speed_value.append(three_filter(metric_data[i], 'metrics', 'fsp', 'value'))
        finish_speed_rank.append(three_filter(metric_data[i], 'metrics', 'fsp', 'rank'))
        finish_speed_avrg.append(three_filter(metric_data[i], 'metrics', 'fsp', 'average'))
        finish_speed_prct.append(three_filter(metric_data[i], 'metrics', 'fsp', 'percentage'))

        # Top speed data
        top_speed_value.append(three_filter(metric_data[i], 'metrics', 'top_speed', 'value'))
        top_speed_rank.append(three_filter(metric_data[i], 'metrics', 'top_speed', 'rank'))
        top_speed_avrg.append(three_filter(metric_data[i], 'metrics', 'top_speed', 'average'))
        top_speed_prct.append(three_filter(metric_data[i], 'metrics', 'top_speed', 'percentage'))

        # More information about the horse
        trainer.append(one_filter(race_data['runners'][i], 'trainer_name'))
        jockey.append(one_filter(race_data['runners'][i], 'jockey_name'))
        owner.append(one_filter(race_data['runners'][i], 'owner_name'))
        age.append(one_filter(race_data['runners'][i], 'age'))

    # Consolidating all the horse data into a dict
    horse_info = {
        'Horse' : horse,
        'Position' : position,
        'Finishing Speed (%)' : finish_speed_value,
        'Finishing Speed Rank' : finish_speed_rank,
        'Finishing Speed Average' : finish_speed_avrg,
        'Finishing Speed Percentage' : finish_speed_prct,
        'Top Speed Value (mph)' : top_speed_value,
        'Top Speed Rank' : top_speed_rank,
        'Top Speed Average' : top_speed_avrg,
        'Top Speed Percentage' : top_speed_prct,

        # Track data is replicated for each horse
        'Race Id' : [race_data['id']] * num_finished,
        'Race Type' : [race_data['race_type']] * num_finished,
        'Location' : [race_data['meeting']['track']['name']] * num_finished,
        
        'Trainer' : trainer,
        'Jockey' : jockey,
        'Owner' : owner,
        'Age' : age,
        }

    return horse_info

In [5]:
def race_times(date, delay = 1, DEBUG = False):
    # Only looking at UK and Ireland tracks
    with open('track_names.json', 'r') as fp:
        approved_tracks = json.load(fp)['Track Names']
    
    time.sleep(delay)
    response = requests.get('https://api.racingtv.com/racing/results/list/{:s}?'.format(date), headers = HEADERS)
    time_dict = {}
    if response.status_code == 200:
        try:
            race_meetings = response.json()['meetings']
        except:
            print('Could not find "meetings" in the json')
            print('Problem url is: https://api.racingtv.com/racing/results/list/{:s}?'.format(date))
            print('Aborting data collection')
            return
    for data1 in race_meetings:
        track_name = data1['track']['name']
        if track_name in approved_tracks:
            time_data = []
            for data2 in data1['completed_races']:
                timestamp = pd.to_datetime(data2['start_time_scheduled'])
                time_data.append(str(timestamp.time())[0:5].replace(':', ''))
            time_dict[track_name] = time_data
        elif DEBUG:
            # Write to text file
            with open('race_times_debug.txt', 'a') as fp:
                fp.write('{:s} is not an approved track\n'.format(track_name))

    return time_dict

In [6]:
def collect_all_race_times(start_date, end_date, delay = 1, write_out = False, 
                           file_name = 'race_times{:s}'.format(str(pd.Timestamp.today().date())), 
                           DEBUG = False):
    date_range = pd.date_range(start = start_date, end = end_date, freq = 'D')
    time_data = {}
    for date in date_range:
        date_str = str(date.date())
        time_data.update({date_str : race_times(date_str, delay, DEBUG)})
        if DEBUG:
            with open('race_times_debug.txt', 'a') as fp:
                fp.write('Collected race times for {:s}'.format(date_str))

    if write_out:
        with open(file_name, 'w') as fp:
            json.dump(time_data, fp)
        
    return time_data

In [7]:
def track_day_data(date, times, trackname, delay = 1, DEBUG = False):
    # Changing name of edge cases
    if trackname.lower() == 'lingfield':
        trackname = trackname + '-Park'
    # Replace space with - to make urls work
    trackname = trackname.replace(' ', '-')

    print(trackname)
    
    data = {}
    for race_time in times:
        data_url = 'https://api.racingtv.com/racing/iq-results/{:s}/{:s}/{:s}/comparison?'.format(date, trackname.lower(), race_time)
        race_url = 'https://api.racingtv.com/racing/results/{:s}/{:s}/{:s}?'.format(date, trackname.lower(), race_time)
        time.sleep(delay)
        race_response = requests.get(race_url, headers = HEADERS)
        if race_response.json()['race']['status']['state'] != 'abandoned':
            time.sleep(delay)
            data_response = requests.get(data_url, headers = HEADERS)
            collected_race_data = collect_race_data(data_response, race_response)
            # If there is horse data, update data
            if collected_race_data != 'No horse data':
                data.update({race_time[:2] + ':' + race_time[2:] : collected_race_data})
            elif DEBUG:
                print('{:s}/{:s}/{:s} has no horse data'.format(date, trackname.lower(), race_time))
        elif DEBUG:
            print('{:s}/{:s}/{:s} was abandoned'.format(date, trackname.lower(), race_time))

        print('Collected race data for {:s} at {:s}'.format(trackname, race_time[:2] + ':' + race_time[2:]))
        
    return data

In [8]:
def collect_all_day_data(day_races, date, delay = 1, DEBUG = False):
    # For checking progress
    progress = 0
    count = len(day_races.values())
        
    data = {}
    for trackname, times in day_races.items():
        progress += 1
        track_dict = track_day_data(date, times, trackname, delay, DEBUG)
        # If track_dict is not empty, update data
        if track_dict:
            data.update({trackname : track_dict})
        print('Progress: {:d}/{:d}'.format(progress, count))
    return data

In [9]:
def track_names(delay = 1, write_out = False):
    uk_url = 'https://api.racingtv.com//racing/tracks/uk?'
    uk_response = requests.get(uk_url, headers = HEADERS)
    if uk_response.status_code != 200:
        print('Problem with the url: {:s}'.format(uk_url))
        return
    
    time.sleep(delay)
    
    ire_url = 'https://api.racingtv.com//racing/tracks/ire?'
    ire_response = requests.get(ire_url, headers = HEADERS)
    if ire_response.status_code != 200:
        print('Problem with the url: {:s}'.format(ire_url))
        return

    track_list = []

    for data in uk_response.json()['tracks']:
        track_list.append(data['name'])
    for data in ire_response.json()['tracks']:
        track_list.append(data['name'])

    #Writing track names to a json file
    if write_out == True:
        with open('track_names.json', 'w') as fp:
            json.dump({'Track Names' : track_list}, fp)
        
    return track_list

In [10]:
def collection_of_race_data(start_date, end_date, delay = 1,
                            write_out = True,
                            return_data = True,
                            DEBUG = False):
    
    try:
        with open('race_times.json', 'r') as fp:
            all_race_times = json.load(fp)
    except:
        print('Collecting track times')
        all_race_times = collect_all_race_times(start_date, end_date, delay, write_out,
                                                file_name = 'race_times'.format(start_date, end_date),
                                                DEBUG = False)

    data_dict = {}
    date_range = pd.date_range(start_date, end_date, freq = 'D')

    for date in date_range:
        date_str = str(date.date())
        print('Collecting data for {:s}'.format(date_str))
        all_day_data = collect_all_day_data(all_race_times[date_str], date_str, delay, DEBUG = False)
        # If all_day_data is not empty, update data_dict
        if all_day_data:
            data_dict.update({date_str : all_day_data})

    # Below conditions need some fixing
    
    if write_out:
        with open('historical_horse_data_{:s}_{:s}.json'.format(start_date, end_date), 'w') as fp:
            json.dump(data_dict, fp)

    # Below does match write_out bool
    
    if return_data:
        print('Data saved to: historical_horse_data.json')
        return data_dict
    else:
        return 'Data saved to: historical_horse_data.json'

In [11]:
def unpack_json_to_df(json_file):
    with open(json_file, 'r') as fp:
        race_data = json.load(fp)

    simple_list = []
    for date in race_data.keys():
        for track in race_data[date].keys():
            for time in race_data[date][track].keys():
                for i in range(len(race_data[date][track][time]['Horse'])):
                    useful_data = race_data[date][track][time]


                    #temporary fix
                    if date == '2025-05-24' and track == 'Windsor' and useful_data['Horse'][i] == 'Dubai Time':
                        continue
                    if date == '2025-08-04' and track == 'Windsor' and useful_data['Horse'][i] == 'Piscean Star':
                        continue

                    else:
                        # print('{:s}{:s}{:s}'.format(date, track, time))
                        simple_list.append({
                            'Date' : date,
                            'Track' : track,
                            'Time' : time,
                            'Horse' : useful_data['Horse'][i],
                            'Position' : useful_data['Position'][i],
                            'Finishing Speed (%)' : useful_data['Finishing Speed (%)'][i],
                            'Top Speed Value (mph)' : useful_data['Top Speed Value (mph)'][i],
                            'Race Id' : useful_data['Race Id'][i],
                            'Race Type' : useful_data['Race Type'][i],
                            'Trainer' : useful_data['Trainer'][i],
                            'Jockey' : useful_data['Jockey'][i],
                            'Owner' : useful_data['Owner'][i],
                            'Age' : useful_data['Age'][i],
                        })

    return pd.json_normalize(simple_list).set_index('Date')

In [12]:
def finish_condition(df, race_id):
    # Select horse in a race which have a finishing speed above the median
    filtered_speed = df[(df['Race Id'] == race_id) & (df['Finishing Speed (%)'] >= df[df['Race Id'] == race_id][['Finishing Speed (%)']].quantile(q = 0.5).iloc[0])]['Finishing Speed (%)']

    # Find the mean and standard deviation of these finishing speeds
    mean = filtered_speed.mean()
    std = filtered_speed.std()

    return [mean, std]

In [13]:
def update_race_data(delay = 1):
    # Loading in historical data
    with open('historical_horse_data.json', 'r') as fp:
        historical_data = json.load(fp)
    # Loading in historical race times
    with open('race_times.json', 'r') as fp:
        historical_race_times = json.load(fp)
    
    # Last date in historical data
    last_date = pd.Timestamp(list(historical_data.keys())[-1])
    # Start one day after last date
    start_date = last_date + pd.Timedelta(days = 1)
    start_date_str = str(start_date.date())
    # End two days before current date (data from yesterday isn't always available on the website)
    end_date = pd.Timestamp.today() - pd.Timedelta(days = 2)
    end_date_str = str(end_date.date())

    if start_date <= end_date:
        # Collecting race times between start_date and end_date
        new_race_times = collect_all_race_times(start_date, end_date, delay, write_out = False)
        
        # Update race times data and json file
        historical_race_times.update(new_race_times)
        with open('race_times.json', 'w') as fp:
            json.dump(historical_race_times, fp)

        # Collecting race data between start_date and end_date
        new_data = collection_of_race_data(start_date, end_date, delay, write_out = False)
        
        # Update race data and json file
        historical_data.update(new_data)
        with open('historical_horse_data.json', 'w') as fp:
            json.dump(historical_data, fp)
    else:
        return 'Date is already up to date.'

In [14]:
def collect_todays_urls():
    """
    Collects the urls of todays races in UK and Ireland on racingpost

    Returns:
        list -- A list containing three elements:
              - race_tracks (list): Names of the race tracks
              - times (list): Race times
              - links (list): Race urls on racingpost
    """

    url = 'https://www.racingpost.com/racecards/'
    response = requests.get(url, headers = HEADERS)

    # Checks if the request was successful.
    if response.status_code == 200:
        try:
            soup = BeautifulSoup(response.text, 'html.parser')
        except:
            print('Failed to collect todays racingpost html.')
            return
    else:
        print('Response code {:d}'.format(response.status_code))
        return

    # Importing the names of the UK are Ireland race tracks
    with open('track_names.json', 'r') as fp:
        track_names = json.load(fp)['Track Names']

    links = []
    race_tracks = []
    times = []
    seen_ids = []

    # Filtering the html to where todays race urls are located
    for a in soup.find_all('a', {'class' : 'RC-meetingItem__link', 'href' : True}):
        race_id = a['data-race-id']
        # Check if we have already seen race id to prevent duplicates.
        if race_id not in seen_ids:
            seen_ids.append(race_id)
            # Only want race tracks in the UK and Ireland.
            # Some racecourses end in ' (AW)', which we want to remove.
            racecourse = a['data-racecourse'].removesuffix(' (AW)')
            if racecourse in track_names:
                links.append('https://www.racingpost.com' + a['href'])
                race_tracks.append(racecourse)
                
                time = a['data-race-date']
                times.append(pd.Timestamp(time).strftime('%H:%M'))

    return [race_tracks, times, links]

In [15]:
def filter_html(wrapper, tag1, tag2, element):
    try:
        return wrapper.find(tag1, {tag2 : element}).get_text().strip()
    except:
        return 'N/A'

In [16]:
def one_race_today(url, delay = 1):
    """
    Collects and organises data from the urls html.

    Keyword arguments:
        url -- Racingpost url of the desired horse race
        delay -- Delay between making requests to the server (only needed if looping over this function)

    Returns:
        dict -- Structured horse racing data with useful information about each horse
    """
    
    time.sleep(delay)
    response = requests.get(url, headers = HEADERS)

    # Checks if request was successful
    if response.status_code == 200:
        try:
            soup = BeautifulSoup(response.text, 'html.parser')
        except:
            print('Failed to collect html from {:s}'.format(url))
            return
    else:
        print('Response code: {:d}'.format(data_response.status_code))
        print('Problem url is: {:s}'.format(url))
        return

    # Filtering html to where the horse info is located.
    filtered = soup.find_all('div', {'class' : 'RC-runnerCardWrapper'})

    # Initialise empty lists to store relevant horse data.
    horses, runner_nums, tips, last_ran, ages, prize = ([] for i in range(6))
    
    for wrapper in filtered:
        # Want to exclude non running horse.
        try:
            runner_number = int(wrapper.find('span', {'data-test-selector' : 'RC-cardPage-runnerNumber-no'}).get_text().strip())
        except:
            continue

        # Collecting horse data and storing in lists.
        horses.append(filter_html(wrapper, 'a', 'data-test-selector', 'RC-cardPage-runnerName'))
        runner_nums.append(filter_html(wrapper, 'span', 'data-test-selector', 'RC-cardPage-runnerNumber-no'))
        tips.append(filter_html(wrapper, 'div', 'class', 'RC-runnerStats__tips'))
        last_ran.append(filter_html(wrapper, 'div', 'data-test-selector', 'RC-cardPage-runnerStats-lastRun'))
        ages.append(filter_html(wrapper, 'span', 'data-test-selector', 'RC-cardPage-runnerAge'))

    # Consolidating all the horse data into a dict.
    race_info = {
        'Horse' : horses,
        'Runner Num' : runner_nums,
        'Tips' : tips,
        'Last Ran' : last_ran,
        'Ages' : ages
    }

    return race_info

In [17]:
def all_races_today(delay = 1):
    collection = collect_todays_urls()
    race_tracks, times, urls = (collection[i] for i in range(3))

    race_info = {}

    for i in range(len(urls)):
        race_info.update({race_tracks[i] + ' ' + times[i] : one_race_today(urls[i], delay)})
        print('Collected race data for {:s} at {:s}'.format(race_tracks[i], times[i]))

    return race_info

In [18]:
def todays_data_df(delay = 1):
    races_today = all_races_today(delay)
    
    simple_list = []
    
    for race in races_today.keys():
        for i in range(len(races_today[race]['Horse'])):
            simple_list.append({
                'Off Time' : race.split()[1],
                'Track' : race.split()[0],
                'Horse' : races_today[race]['Horse'][i],
                'Runner Num': races_today[race]['Runner Num'][i],
                'Tips' : races_today[race]['Tips'][i],
                'Last Ran' : races_today[race]['Last Ran'][i],
                'Age' : races_today[race]['Ages'][i],
            })

    return pd.json_normalize(simple_list).set_index('Horse').replace('N/A', np.nan)