# BIG BAD DATA FETCH

In [1]:
# Standard Libraries
import requests
import pandas as pd
import numpy as np
import time
import timeit
import datetime
from dateutil.relativedelta import relativedelta
import pytz
import os
import sqlite3
import json
from pathlib import Path
import gc
import logging
from tqdm.notebook import tqdm
import warnings

# For visualization if needed
import matplotlib.pyplot as plt

# Error handling and diagnostics
import traceback

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("racing_data.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("RacingDB")

# Ignore specific warnings
warnings.filterwarnings('ignore', category=UserWarning, module='pandas')

In [2]:
#API Credentials

# Load credentials from the text file
with open("reqd_files/cred.txt", "r") as file:
    USERNAME = file.readline().strip()  # Read the first line for the username
    PASSWORD = file.readline().strip()  # Read the second line for the password

BASE_URL = "https://api.theracingapi.com"

In [3]:
def fetch_and_process(endpoint, params=None, query_number=None, max_retries=5):
    """
    Make API request with error handling, retries, and rate limiting
    
    Args:
        endpoint (str): API endpoint to call
        params (dict, optional): Request parameters
        query_number (int, optional): Query counter for logging
        max_retries (int): Maximum number of retry attempts
        
    Returns:
        dict: JSON response data or None if request failed
    """
    retry_delay = 1  # Start with a 1 second delay
    for attempt in range(max_retries):
        start_time = timeit.default_timer()
        
        response = requests.get(f"{BASE_URL}{endpoint}", auth=(USERNAME, PASSWORD), params=params)
        elapsed = (timeit.default_timer() - start_time) * 1000  # Convert to milliseconds

        # Print statement for each query
        if query_number and (query_number == 1 or query_number % 10 == 0):
            print(f"Query {query_number} duration: {elapsed:.2f} ms")

        # Handling 503 Service Unavailable
        if response.status_code == 503:
            print(f"Error 503 on attempt {attempt + 1}. Retrying in {retry_delay} seconds...")
            time.sleep(retry_delay)
            retry_delay *= 2  # Exponential backoff
        else:
            break  # Exit retry loop if response is not 503

    time.sleep(0.69)  # Rate limiting - wait between requests to respect API limits

    # Return data if status code is 200; otherwise, return None
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

In [4]:
def test_api_connection():
    """Test API connection and retrieve course data"""
    print("Testing API connection...")
    
    # Define the endpoint
    ENDPOINT = "/v1/courses"

    # Fetch and process the data
    courses_data = fetch_and_process(ENDPOINT, query_number=1)

    # Check for data and create a DataFrame
    if courses_data:
        df_courses = pd.DataFrame(courses_data['courses'])
        
        # Save as CSV for reference
        os.makedirs('csv_exports', exist_ok=True)
        df_courses.to_csv('csv_exports/course_names.csv', index=False)
        print(f"Course data fetched successfully. Found {len(df_courses)} courses.")
        return True
    else:
        print("Error fetching course data. Please check API credentials.")
        return False

# Run test
api_test_result = test_api_connection()

Testing API connection...
Query 1 duration: 1544.85 ms
Course data fetched successfully. Found 979 courses.


# Database Setup and Schema

In [5]:
def setup_database(db_path='racing_data.db'):
    """Initialize SQLite database with schema matching the API documentation"""
    print(f"Setting up database at {db_path}...")
    
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # Create races table with fields directly from API schema
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS races (
            race_id TEXT PRIMARY KEY,
            date TEXT,
            region TEXT,
            course TEXT,
            course_id TEXT,
            off TEXT,
            off_dt TEXT,
            race_name TEXT,
            type TEXT,
            class TEXT,
            pattern TEXT,
            rating_band TEXT,
            age_band TEXT,
            sex_rest TEXT,
            dist TEXT,
            dist_y TEXT,
            dist_m TEXT,
            dist_f TEXT,
            going TEXT,
            surface TEXT,
            jumps TEXT,
            winning_time_detail TEXT,
            comments TEXT,
            non_runners TEXT,
            tote_win TEXT,
            tote_pl TEXT,
            tote_ex TEXT,
            tote_csf TEXT,
            tote_tricast TEXT,
            tote_trifecta TEXT,
            
            -- Additional fields for calculations and storage
            race_grade TEXT,
            field_size INTEGER,
            going_detailed TEXT,
            rail_movements TEXT,
            stalls TEXT,
            weather TEXT,
            big_race BOOLEAN,
            is_abandoned BOOLEAN
        )
        ''')
        
        # Create runners table with fields from API schema, renaming 'or' to 'or_rating'
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS runners (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            race_id TEXT,
            horse_id TEXT,
            horse TEXT,
            sp TEXT,
            sp_dec TEXT,
            number TEXT,
            position TEXT,
            draw TEXT,
            btn TEXT,
            ovr_btn TEXT,
            age TEXT,
            sex TEXT,
            weight TEXT,
            weight_lbs TEXT,
            headgear TEXT,
            time TEXT,
            or_rating TEXT,         -- Kept as or_rating to avoid SQL reserved keyword
            rpr TEXT,
            tsr TEXT,
            prize TEXT,
            jockey TEXT,
            jockey_claim_lbs TEXT,
            jockey_id TEXT,
            trainer TEXT,
            trainer_id TEXT,
            owner TEXT,
            owner_id TEXT,
            sire TEXT,
            sire_id TEXT,
            dam TEXT,
            dam_id TEXT,
            damsire TEXT,
            damsire_id TEXT,
            comment TEXT,
            silk_url TEXT,
            
            FOREIGN KEY (race_id) REFERENCES races (race_id)
        )
        ''')
        
        # Create indexes for faster queries
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_race_date ON races(date)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_race_course ON races(course_id)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_runner_horse ON runners(horse_id)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_runner_jockey ON runners(jockey_id)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_runner_trainer ON runners(trainer_id)")
        
        conn.commit()
        print("Database tables successfully created.")
        
    except Exception as e:
        print(f"Error setting up database: {e}")
        raise
    finally:
        conn.close()
    
    print("Database setup complete.")
    return db_path


### Database Helper Functions

In [6]:
def get_existing_dates(db_path):
    """
    Retrieve all distinct race dates already in the database
    
    Args:
        db_path (str): Path to database
        
    Returns:
        pandas.DatetimeIndex: Dates already in database
    """
    conn = sqlite3.connect(db_path)
    try:
        df_dates = pd.read_sql("SELECT DISTINCT date FROM races", conn)
        if not df_dates.empty:
            return pd.DatetimeIndex(pd.to_datetime(df_dates['date']))
        return pd.DatetimeIndex([])
    except Exception as e:
        print(f"Error retrieving dates: {e}")
        return pd.DatetimeIndex([])
    finally:
        conn.close()

def generate_missing_dates(start_date, end_date, existing_dates):
    """
    Generate a list of dates that are not yet in the database
    
    Args:
        start_date (datetime): Start date for range
        end_date (datetime): End date for range
        existing_dates (DatetimeIndex): Dates already in database
        
    Returns:
        DatetimeIndex: Missing dates to be processed
    """
    date_range = pd.date_range(start=start_date, end=end_date)
    missing_dates = date_range.difference(existing_dates)
    return missing_dates

def optimize_database(db_path):
    """
    Run VACUUM and ANALYZE commands to optimize the database
    
    Args:
        db_path (str): Path to database
    """
    print("Optimizing database...")
    conn = sqlite3.connect(db_path)
    conn.execute("VACUUM")
    conn.execute("ANALYZE")
    conn.close()
    print("Database optimization complete.")

## Data Saving Functions

In [7]:
def save_to_database(data_to_save, db_path, endpoint_type='results'):
    """
    Process API data and save to SQLite database
    
    Args:
        data_to_save (list): List of race data dictionaries
        db_path (str): Path to database
        endpoint_type (str): Type of endpoint data (default: 'results')
        
    Returns:
        int: Number of runners successfully saved
    """
    if not data_to_save:
        print("No data to save")
        return 0
    
    # Verify database structure
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # Check if races table exists
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='races'")
        if not cursor.fetchone():
            print("Races table not found, initializing database schema...")
            conn.close()
            setup_database(db_path)
        else:
            conn.close()
    except Exception as e:
        print(f"Error checking database structure: {e}")
        setup_database(db_path)
    
    conn = sqlite3.connect(db_path)
    entries_saved = 0
    races_saved = 0
    
    try:
        # Start transaction
        conn.execute("BEGIN TRANSACTION")
        
        for entry in data_to_save:
            try:
                # Extract race-level fields exactly as they appear in the schema
                race_data = {
                    "race_id": entry.get("race_id"),
                    "date": entry.get("date"),
                    "region": entry.get("region"),
                    "course": entry.get("course"),
                    "course_id": entry.get("course_id"),
                    "off": entry.get("off"),        # Using original API field name
                    "off_dt": entry.get("off_dt"),
                    "race_name": entry.get("race_name"),
                    "type": entry.get("type"),
                    "class": entry.get("class"),
                    "pattern": entry.get("pattern"),
                    "rating_band": entry.get("rating_band"),
                    "age_band": entry.get("age_band"),
                    "sex_rest": entry.get("sex_rest"),
                    "dist": entry.get("dist"),      # Using original API field name
                    "dist_y": entry.get("dist_y"),  # Using original API field name
                    "dist_m": entry.get("dist_m"),  # Using original API field name
                    "dist_f": entry.get("dist_f"),  # Using original API field name
                    "going": entry.get("going"),
                    "surface": entry.get("surface"),
                    "jumps": entry.get("jumps"),
                    "winning_time_detail": entry.get("winning_time_detail"),
                    "comments": entry.get("comments"),
                    "non_runners": entry.get("non_runners"),
                    "tote_win": entry.get("tote_win"),
                    "tote_pl": entry.get("tote_pl"),
                    "tote_ex": entry.get("tote_ex"),
                    "tote_csf": entry.get("tote_csf"),
                    "tote_tricast": entry.get("tote_tricast"),
                    "tote_trifecta": entry.get("tote_trifecta")
                }
                
                # Removed dist_m calculation code as requested
                
                # Create race_grade field (derived from pattern/class)
                race_data["race_grade"] = "Unknown"
                if race_data.get("pattern"):
                    pattern = str(race_data["pattern"]).lower()
                    if "group 1" in pattern or "grade 1" in pattern:
                        race_data["race_grade"] = "Pattern_1"
                    elif "group 2" in pattern or "grade 2" in pattern:
                        race_data["race_grade"] = "Pattern_2"
                    elif "group 3" in pattern or "grade 3" in pattern:
                        race_data["race_grade"] = "Pattern_3"
                    elif "listed" in pattern:
                        race_data["race_grade"] = "Listed"
                elif race_data.get("class"):
                    try:
                        class_num = str(race_data["class"]).replace("Class", "").strip()
                        race_data["race_grade"] = f"Class_{class_num}"
                    except:
                        pass
                
                # Add placeholders for any additional database fields not in API
                race_data["field_size"] = len(entry.get("runners", []))
                race_data["going_detailed"] = race_data.get("going")  # Default to regular going if detailed not available
                race_data["rail_movements"] = None
                race_data["stalls"] = None
                race_data["weather"] = None
                race_data["big_race"] = None
                race_data["is_abandoned"] = None
                
                # Insert race data using dynamic columns and values
                non_null_race_data = {k: v for k, v in race_data.items() if v is not None}
                
                if non_null_race_data:
                    columns = ", ".join(non_null_race_data.keys())
                    placeholders = ", ".join(["?"] * len(non_null_race_data))
                    
                    conn.execute(
                        f"INSERT OR IGNORE INTO races ({columns}) VALUES ({placeholders})",
                        tuple(non_null_race_data.values())
                    )
                    races_saved += 1
                
                # Process runners if present
                if "runners" in entry and entry["runners"]:
                    for runner in entry["runners"]:
                        # Create runner data dictionary with race_id
                        runner_data = {
                            "race_id": entry.get("race_id"),
                            "horse_id": runner.get("horse_id"),
                            "horse": runner.get("horse"),
                            "sp": runner.get("sp"),
                            "sp_dec": runner.get("sp_dec"),
                            "number": runner.get("number"),
                            "position": runner.get("position"),
                            "draw": runner.get("draw"),
                            "btn": runner.get("btn"),
                            "ovr_btn": runner.get("ovr_btn"),
                            "age": runner.get("age"),
                            "sex": runner.get("sex"),
                            "weight": runner.get("weight"),
                            "weight_lbs": runner.get("weight_lbs"),
                            "headgear": runner.get("headgear"),
                            "time": runner.get("time"),
                            "or_rating": runner.get("or"),  # Only keeping this mapping (or → or_rating)
                            "rpr": runner.get("rpr"),
                            "tsr": runner.get("tsr"),
                            "prize": runner.get("prize"),
                            "jockey": runner.get("jockey"),
                            "jockey_claim_lbs": runner.get("jockey_claim_lbs"),
                            "jockey_id": runner.get("jockey_id"),
                            "trainer": runner.get("trainer"),
                            "trainer_id": runner.get("trainer_id"),
                            "owner": runner.get("owner"),
                            "owner_id": runner.get("owner_id"),
                            "sire": runner.get("sire"),
                            "sire_id": runner.get("sire_id"),
                            "dam": runner.get("dam"),
                            "dam_id": runner.get("dam_id"),
                            "damsire": runner.get("damsire"),
                            "damsire_id": runner.get("damsire_id"),
                            "comment": runner.get("comment"),
                            "silk_url": runner.get("silk_url")
                        }
                        
                        # Remove None values to avoid SQL errors
                        non_null_runner_data = {k: v for k, v in runner_data.items() if v is not None}
                        
                        if non_null_runner_data:
                            columns = ", ".join(non_null_runner_data.keys())
                            placeholders = ", ".join(["?"] * len(non_null_runner_data))
                            
                            conn.execute(
                                f"INSERT INTO runners ({columns}) VALUES ({placeholders})",
                                tuple(non_null_runner_data.values())
                            )
                            entries_saved += 1
            
            except Exception as e:
                print(f"Error saving entry {entry.get('race_id')}: {e}")
                continue  # Continue with next entry
        
        # Commit transaction
        conn.commit()
        print(f"Saved {races_saved} races and {entries_saved} runners")
        
    except Exception as e:
        # Rollback on error
        conn.rollback()
        print(f"Transaction failed: {e}")
        
    finally:
        conn.close()
    
    return entries_saved

## Data Collection Functions

In [8]:
def query_data_to_db(date_range, db_path, query_number=1, save_frequency=250):
    """
    Query API data for a range of dates and save directly to database
    
    Args:
        date_range (DatetimeIndex): Dates to query
        db_path (str): Path to database
        query_number (int, optional): Starting query number for logging
        save_frequency (int): Number of entries before saving to DB
        
    Returns:
        tuple: (total_entries_saved, error_dates)
    """
    ENDPOINT = "/v1/results"
    data_key = 'results'
    
    all_data = []
    error_dates = []
    entries_since_save = 0
    total_entries_saved = 0
    
    for single_date in date_range:
        formatted_date = single_date.strftime("%Y-%m-%d")
        print(f"Querying results data for date: {formatted_date}")
        
        params = {
            "start_date": formatted_date,
            "end_date": formatted_date,
            "limit": 50,
            "skip": 0,
        }

        data_found = False
        day_entries = 0

        try:
            # Fetch and process the data
            data = fetch_and_process(ENDPOINT, params=params, query_number=query_number)
            query_number += 1
            
            if not data or not data[data_key]:
                print(f"No results found for date: {formatted_date}")
                continue
                
            data_found = True
            
            # Add new results to temporary storage
            new_entries = data[data_key]
            all_data.extend(new_entries)
            entries_since_save += len(new_entries)
            day_entries += len(new_entries)
            
            # For results endpoint, we need to paginate
            while len(new_entries) == 50:  # If we got the full page, try the next one
                params['skip'] += 50
                data = fetch_and_process(ENDPOINT, params=params, query_number=query_number)
                query_number += 1
                
                if not data or not data[data_key]:
                    break
                    
                new_entries = data[data_key]
                all_data.extend(new_entries)
                entries_since_save += len(new_entries)
                day_entries += len(new_entries)
            
            # Save progress if we've accumulated enough new entries
            if entries_since_save >= save_frequency:
                saved_count = save_to_database(all_data[-entries_since_save:], db_path, 'results')
                total_entries_saved += saved_count
                print(f"Progress saved: {saved_count} new runners written to database")
                all_data = []  # Clear memory after saving
                entries_since_save = 0  # Reset counter after saving
                
                # Force garbage collection
                gc.collect()
                
        except Exception as e:
            print(f"Error occurred on {formatted_date}: {e}")
            error_dates.append(formatted_date)
            traceback.print_exc()  # Print full traceback for debugging

        if data_found:
            print(f"Results data found for date: {formatted_date} - {day_entries} entries")

    # Save any remaining data
    if entries_since_save > 0:
        saved_count = save_to_database(all_data, db_path, 'results')
        total_entries_saved += saved_count
        print(f"Final save: {saved_count} remaining runners written to database")

    return total_entries_saved, error_dates

## Main Execution Function

In [9]:
def update_racing_database(db_path='racing_data.db', default_earliest_date=None):
    """
    Main function to update the racing database with new data
    
    Args:
        db_path (str): Path to database
        default_earliest_date (datetime, optional): Earliest date to start from if not specified
        
    Returns:
        dict: Summary of data collection results
    """
    if default_earliest_date is None:
        default_earliest_date = pd.to_datetime("2020-01-01")
    
    print(f"Starting database update process...")
    print(f"Default earliest date: {default_earliest_date.strftime('%Y-%m-%d')}")
    
    # Ensure database is properly set up before any operations
    setup_database(db_path)
    
    # Get existing dates from database
    existing_dates = get_existing_dates(db_path)
    
    if len(existing_dates) > 0:
        earliest_date = existing_dates.min()
        latest_date = existing_dates.max()
        
        print(f"Database contains {len(existing_dates)} dates")
        print(f"Earliest date in DB: {earliest_date.strftime('%Y-%m-%d')}")
        print(f"Latest date in DB: {latest_date.strftime('%Y-%m-%d')}")
        
        # Generate missing dates
        missing_before = generate_missing_dates(default_earliest_date, earliest_date - pd.Timedelta(days=1), existing_dates)
        start_date = latest_date + pd.Timedelta(days=1)
        end_date = pd.to_datetime("today") - pd.Timedelta(days=1)
        missing_after = generate_missing_dates(start_date, end_date, existing_dates)
        
        # Check for missing dates in the middle
        all_expected_dates = pd.date_range(earliest_date, latest_date)
        missing_middle = all_expected_dates.difference(existing_dates)
        
        # Process missing dates
        total_entries = 0
        all_errors = []
        
        if not missing_before.empty:
            print(f"\nQuerying {len(missing_before)} missing dates before the DB start")
            entries_before, errors_before = query_data_to_db(missing_before, db_path)
            total_entries += entries_before
            all_errors.extend(errors_before)
            
        if not missing_middle.empty:
            print(f"\nQuerying {len(missing_middle)} missing dates in the middle of the DB")
            entries_middle, errors_middle = query_data_to_db(missing_middle, db_path)
            total_entries += entries_middle
            all_errors.extend(errors_middle)
            
        if not missing_after.empty:
            print(f"\nQuerying {len(missing_after)} missing dates after the DB end")
            entries_after, errors_after = query_data_to_db(missing_after, db_path)
            total_entries += entries_after
            all_errors.extend(errors_after)
        
        results = {
            'total_entries': total_entries,
            'errors': all_errors
        }
        
    else:
        # DB is empty, start from scratch
        print(f"Database is empty. Starting fresh data collection.")
        all_dates = pd.date_range(default_earliest_date, pd.to_datetime("today") - pd.Timedelta(days=1))
        print(f"Collecting data for {len(all_dates)} dates from {default_earliest_date.strftime('%Y-%m-%d')} to {pd.to_datetime('today').strftime('%Y-%m-%d')}")
        
        total, errors = query_data_to_db(all_dates, db_path)
        
        results = {
            'total_entries': total,
            'errors': errors
        }
    
    # Optimize database after updates
    optimize_database(db_path)
    
    # Provide summary
    print("\n=== Database Update Summary ===")
    print(f"Results: {results['total_entries']} entries added, {len(results['errors'])} dates with errors")
    
    return results

# Updated function to get existing dates focused only on results
def get_existing_dates(db_path):
    """
    Retrieve all distinct race dates already in the database for results
    
    Args:
        db_path (str): Path to database
        
    Returns:
        pandas.DatetimeIndex: Dates already in database
    """
    conn = sqlite3.connect(db_path)
    try:
        # For results, check races table with valid position data in runners
        query = """
        SELECT DISTINCT r.date FROM races r
        JOIN runners ru ON r.race_id = ru.race_id
        WHERE ru.position IS NOT NULL
        """
        
        df_dates = pd.read_sql(query, conn)
        if not df_dates.empty:
            return pd.DatetimeIndex(pd.to_datetime(df_dates['date']))
        return pd.DatetimeIndex([])
    except Exception as e:
        print(f"Error retrieving dates: {e}")
        return pd.DatetimeIndex([])
    finally:
        conn.close()

### Execute Database Update

In [10]:
# Modify the execution code at the bottom
if __name__ == "__main__":
    if api_test_result:
        print("\nAPI connection successful. Proceeding with database update.")
        
        # Initialize the database explicitly
        db_path = 'racing_data.db'
        setup_database(db_path)
        
        # Set start date (change as needed)
        start_date = pd.to_datetime("2010-01-01")  # More recent start date for testing
        
        # Update racing database with results endpoint only
        results = update_racing_database(
            db_path=db_path,
            default_earliest_date=start_date
        )
        
        print("\nDatabase Update Complete")
        
        # Process errors
        if results['errors']:
            print(f"\nErrors encountered on {len(results['errors'])} dates:")
            for date in results['errors'][:10]:  # Show first 10 error dates
                print(f"  - {date}")
            if len(results['errors']) > 10:
                print(f"  ... and {len(results['errors']) - 10} more")
                
            # Save error dates to file for future processing
            with open('error_dates_results.json', 'w') as f:
                json.dump({'error_dates': [d for d in results['errors']]}, f)
    else:
        print("\nAPI connection failed. Please check your credentials and try again.")


API connection successful. Proceeding with database update.
Setting up database at racing_data.db...
Database tables successfully created.
Database setup complete.
Starting database update process...
Default earliest date: 2010-01-01
Setting up database at racing_data.db...
Database tables successfully created.
Database setup complete.
Database contains 5532 dates
Earliest date in DB: 2010-01-01
Latest date in DB: 2025-03-01

Querying 7 missing dates in the middle of the DB
Querying results data for date: 2010-02-02
Query 1 duration: 1698.14 ms
Results data found for date: 2010-02-02 - 14 entries
Querying results data for date: 2018-12-24
No results found for date: 2018-12-24
Querying results data for date: 2018-12-25
No results found for date: 2018-12-25
Querying results data for date: 2020-04-02
No results found for date: 2020-04-02
Querying results data for date: 2020-04-03
No results found for date: 2020-04-03
Querying results data for date: 2020-04-06
No results found for date: 2