In [17]:
# Load Fixture_Statistics

import requests
import sqlite3
import time
import os
from typing import Dict, Any, Optional, List
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Configuration
DATABASE_PATH = "./data/football_data.db"
BASE_URL = "https://v3.football.api-sports.io"
API_KEY = os.getenv("x-rapidapi-key")


class FootballDataLoader:
    def __init__(self, database_path: str, base_url: str = BASE_URL):
        self.database_path = database_path
        self.base_url = base_url
        self.api_key = API_KEY
        self.headers = {"x-apisports-key": self.api_key}
        self.conn = sqlite3.connect(self.database_path, timeout=60)
        self.cursor = self.conn.cursor()
        self.cursor.execute("PRAGMA journal_mode=WAL;")  # Enable Write-Ahead Logging

    def initialize_tables(self):
        """Create necessary tables in the database."""
        # Create the standings table
        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS standings (
            league_id INTEGER,
            season INTEGER,
            team_id INTEGER,
            league_position INTEGER,
            points INTEGER,
            PRIMARY KEY (league_id, season, team_id)
        )
        ''')

        # Create the fixture statistics table
        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS fixture_statistics (
            fixture_id INTEGER, 
            team_id INTEGER, 
            team_name TEXT, 
            shots_on_goal INTEGER, 
            shots_off_goal INTEGER, 
            total_shots INTEGER, 
            blocked_shots INTEGER, 
            shots_insidebox INTEGER, 
            shots_outsidebox INTEGER, 
            fouls INTEGER, 
            corner_kicks INTEGER, 
            offsides INTEGER, 
            ball_possession REAL, 
            yellow_cards INTEGER, 
            red_cards INTEGER, 
            goalkeeper_saves INTEGER, 
            total_passes INTEGER, 
            passes_accurate INTEGER, 
            passes_percentage REAL, 
            expected_goals REAL, 
            goals_prevented REAL, 
            last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 
            PRIMARY KEY (fixture_id, team_id))
        ''')
        self.conn.commit()

    def fetch_data(self, endpoint: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Fetch data from the API."""
        for _ in range(3):  # Retry up to 3 times
            try:
                response = requests.get(self.base_url + endpoint, headers=self.headers, params=params)
                response.raise_for_status()
                return response.json()
            except requests.exceptions.RequestException as e:
                print(f"API request failed: {e}. Retrying in 5 seconds...")
                time.sleep(5)
        return None

    def get_leagues_from_fixtures(self, season: str) -> List[int]:
        """Extract unique leagues from the fixtures table."""
        query = f"""
        SELECT DISTINCT league_id
        FROM fixtures
        WHERE league_season = '{season}'
        """
        leagues = self.cursor.execute(query).fetchall()
        return [league[0] for league in leagues]

    def update_standings(self, season: str):
        """Fetch and store league standings."""
        leagues = self.get_leagues_from_fixtures(season)
        print(f"Found {len(leagues)} leagues for season {season}. Fetching standings...")

        for league_id in leagues:
            print(f"Fetching standings for league {league_id} in season {season}...")
            try:
                standings_data = self.fetch_data("/standings", {"league": league_id, "season": season})
                if not standings_data or 'response' not in standings_data:
                    print(f"No standings data returned for league {league_id}. Skipping...")
                    continue

                # Insert standings data into the database
                standings = standings_data['response'][0]['league']['standings'][0]
                for team in standings:
                    self.cursor.execute('''
                        INSERT OR REPLACE INTO standings (league_id, season, team_id, league_position, points)
                        VALUES (?, ?, ?, ?, ?)
                    ''', (
                        standings_data['parameters']['league'],
                        season,
                        team['team']['id'],
                        team['rank'],
                        team['points']
                    ))
                self.conn.commit()
                print(f"Standings for league {league_id} stored successfully.")
            except Exception as e:
                print(f"Error updating standings for league {league_id}: {e}")

    def get_remaining_fixtures(self, refresh_interval_days: int = 7) -> List[int]:
        """Identify fixtures that need statistics updates."""
        query = f'''
        SELECT f.fixture_id 
        FROM fixtures f
        LEFT JOIN fixture_statistics fs ON f.fixture_id = fs.fixture_id
        WHERE f.date < DATE('now') 
          AND f.status = 'Match Finished'
          AND (fs.last_updated IS NULL OR fs.last_updated < DATE('now', '-{refresh_interval_days} days'))
        '''
        remaining_fixtures = self.cursor.execute(query).fetchall()
        return [f[0] for f in remaining_fixtures]

    def process_fixture(self, fixture_id: int):
        """Fetch and store statistics for a specific fixture."""
        try:
            print(f"Processing fixture ID: {fixture_id}")
            stats_response = self.fetch_data("/fixtures/statistics", {"fixture": fixture_id})
            if not stats_response or 'response' not in stats_response:
                print(f"No statistics data for fixture ID: {fixture_id}")
                return

            # Insert statistics data into the database
            for team_stats in stats_response["response"]:
                stats = {stat["type"]: stat["value"] for stat in team_stats["statistics"]}
                self.cursor.execute('''
                    INSERT OR REPLACE INTO fixture_statistics (
                        fixture_id, team_id, team_name, shots_on_goal, shots_off_goal, total_shots, blocked_shots, 
                        shots_insidebox, shots_outsidebox, fouls, corner_kicks, offsides, ball_possession, yellow_cards, 
                        red_cards, goalkeeper_saves, total_passes, passes_accurate, passes_percentage, expected_goals, 
                        goals_prevented, last_updated
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
                ''', (
                    fixture_id,
                    team_stats["team"]["id"],
                    team_stats["team"]["name"],
                    stats.get("Shots on Goal"),
                    stats.get("Shots off Goal"),
                    stats.get("Total Shots"),
                    stats.get("Blocked Shots"),
                    stats.get("Shots insidebox"),
                    stats.get("Shots outsidebox"),
                    stats.get("Fouls"),
                    stats.get("Corner Kicks"),
                    stats.get("Offsides"),
                    float(stats.get("Ball Possession", "0%").replace("%", "")) if stats.get("Ball Possession") else None,
                    stats.get("Yellow Cards"),
                    stats.get("Red Cards"),
                    stats.get("Goalkeeper Saves"),
                    stats.get("Total passes"),
                    stats.get("Passes accurate"),
                    float(stats.get("Passes %").replace("%", "")) if stats.get("Passes %") else None,
                    stats.get("expected_goals"),
                    stats.get("goals_prevented")
                ))
        except Exception as e:
            print(f"Error processing fixture ID {fixture_id}: {e}")

    def update_statistics(self, refresh_interval_days: int = 7):
        """Update statistics for all fixtures requiring updates."""
        fixtures = self.get_remaining_fixtures(refresh_interval_days)
        total_fixtures = len(fixtures)
        for index, fixture_id in enumerate(fixtures, start=1):
            self.process_fixture(fixture_id)
            if index % 100 == 0:  # Commit every 100 updates
                self.conn.commit()

            print(f"\rProgress: {index}/{total_fixtures} fixtures processed", end="")
        self.conn.commit()
        print("\nFixture statistics update completed.")

    def close_connection(self):
        """Close the database connection."""
        try:
            self.conn.commit()
        except Exception as e:
            print(f"Error during commit: {e}")
        finally:
            self.conn.close()
            print("Database connection closed.")


if __name__ == "__main__":
    loader = FootballDataLoader(DATABASE_PATH)
    loader.initialize_tables()
    loader.update_standings(season='2024')  # Update standings for the 2024 season
    loader.update_statistics(refresh_interval_days=7)  # Update fixture statistics
    loader.close_connection()


Found 4 leagues for season 2024. Fetching standings...
Fetching standings for league 39 in season 2024...
Standings for league 39 stored successfully.
Fetching standings for league 40 in season 2024...
Standings for league 40 stored successfully.
Fetching standings for league 41 in season 2024...
Standings for league 41 stored successfully.
Fetching standings for league 42 in season 2024...
Standings for league 42 stored successfully.
Processing fixture ID: 974
Progress: 1/12499 fixtures processedProcessing fixture ID: 17923
Progress: 2/12499 fixtures processedProcessing fixture ID: 47060
Progress: 3/12499 fixtures processedProcessing fixture ID: 47084
Progress: 4/12499 fixtures processedProcessing fixture ID: 47085
Progress: 5/12499 fixtures processedProcessing fixture ID: 47088
Progress: 6/12499 fixtures processedProcessing fixture ID: 47101
Progress: 7/12499 fixtures processedProcessing fixture ID: 47117
Progress: 8/12499 fixtures processedProcessing fixture ID: 47118
Progress: 9/12

KeyboardInterrupt: 

In [2]:
# RECENT FORM TABLE



import sqlite3
import pandas as pd

# Database configuration
DATABASE_PATH = "./data/football_data.db"

# Step 1: Create Recent Form Table
def initialize_recent_form_table():
    conn = sqlite3.connect(DATABASE_PATH)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS recent_form (
            team_id INTEGER,
            rolling_points REAL,
            rolling_wins REAL,
            rolling_draws REAL,
            rolling_losses REAL,
            last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (team_id)
        )
    ''')
    conn.commit()
    conn.close()
    print("Recent form table created.")

# Step 2: Update Recent Form Table
def update_recent_form(n=5):
    conn = sqlite3.connect(DATABASE_PATH)

    # Query historical match data
    query = """
    SELECT
        f.fixture_id,
        f.date,
        f.home_team_id AS team_id,
        CASE
            WHEN f.home_goals > f.away_goals THEN 3
            WHEN f.home_goals = f.away_goals THEN 1
            ELSE 0
        END AS points,
        CASE
            WHEN f.home_goals > f.away_goals THEN 1
            ELSE 0
        END AS win,
        CASE
            WHEN f.home_goals = f.away_goals THEN 1
            ELSE 0
        END AS draw,
        CASE
            WHEN f.home_goals < f.away_goals THEN 1
            ELSE 0
        END AS loss
    FROM fixtures f
    WHERE f.status = 'Match Finished'

    UNION ALL

    SELECT
        f.fixture_id,
        f.date,
        f.away_team_id AS team_id,
        CASE
            WHEN f.away_goals > f.home_goals THEN 3
            WHEN f.away_goals = f.home_goals THEN 1
            ELSE 0
        END AS points,
        CASE
            WHEN f.away_goals > f.home_goals THEN 1
            ELSE 0
        END AS win,
        CASE
            WHEN f.away_goals = f.home_goals THEN 1
            ELSE 0
        END AS draw,
        CASE
            WHEN f.away_goals < f.home_goals THEN 1
            ELSE 0
        END AS loss
    FROM fixtures f
    WHERE f.status = 'Match Finished'
    """

    # Load data into a DataFrame
    df = pd.read_sql_query(query, conn)

    # Convert date to datetime
    df['date'] = pd.to_datetime(df['date'])
    df = df.sort_values(by='date')

    # Compute rolling metrics
    rolling_metrics = []
    for metric in ['points', 'win', 'draw', 'loss']:
        col_name = f"rolling_{metric}"
        rolling_metrics.append(col_name)
        df[col_name] = (
            df.groupby('team_id')[metric]
            .rolling(window=n, min_periods=1)
            .sum()
            .reset_index(0, drop=True)
        )

    # Get the latest rolling metrics for each team
    recent_form = df.groupby('team_id').tail(1)[['team_id'] + rolling_metrics]

    # Insert or update the `recent_form` table
    for _, row in recent_form.iterrows():
        conn.execute('''
            INSERT OR REPLACE INTO recent_form (
                team_id, rolling_points, rolling_wins, rolling_draws, rolling_losses, last_updated
            ) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
        ''', (
            row['team_id'],
            row['rolling_points'],
            row['rolling_win'],
            row['rolling_draw'],
            row['rolling_loss']
        ))
    conn.commit()
    conn.close()
    print("Recent form table updated.")

# Step 3: Join Recent Form with Fixtures
def join_recent_form():
    conn = sqlite3.connect(DATABASE_PATH)
    query = """
    SELECT
        f.fixture_id,
        f.date,
        f.home_team_id,
        f.away_team_id,
        rf_home.rolling_points AS home_rolling_points,
        rf_home.rolling_wins AS home_rolling_wins,
        rf_home.rolling_draws AS home_rolling_draws,
        rf_home.rolling_losses AS home_rolling_losses,
        rf_away.rolling_points AS away_rolling_points,
        rf_away.rolling_wins AS away_rolling_wins,
        rf_away.rolling_draws AS away_rolling_draws,
        rf_away.rolling_losses AS away_rolling_losses
    FROM fixtures f
    LEFT JOIN recent_form rf_home ON f.home_team_id = rf_home.team_id
    LEFT JOIN recent_form rf_away ON f.away_team_id = rf_away.team_id
    WHERE f.status = 'Not Started'
    """
    upcoming_fixtures = pd.read_sql_query(query, conn)
    conn.close()
    return upcoming_fixtures

# Run the pipeline
initialize_recent_form_table()
update_recent_form(n=5)
upcoming_fixtures_with_form = join_recent_form()

# Display the result
print(upcoming_fixtures_with_form.head())


Recent form table created.
Recent form table updated.
   fixture_id                       date  home_team_id  away_team_id  \
0     1208133  2024-11-23T15:00:00+00:00            35            51   
1     1208134  2024-11-23T15:00:00+00:00            42            65   
2     1208135  2024-11-23T15:00:00+00:00            66            52   
3     1208136  2024-11-23T15:00:00+00:00            45            55   
4     1208137  2024-11-23T15:00:00+00:00            36            39   

   home_rolling_points  home_rolling_wins  home_rolling_draws  \
0                  7.0                2.0                 1.0   
1                  5.0                1.0                 2.0   
2                  5.0                1.0                 2.0   
3                  6.0                1.0                 3.0   
4                  7.0                2.0                 1.0   

   home_rolling_losses  away_rolling_points  away_rolling_wins  \
0                  2.0                 10.0             