In [2]:

config = {
    "api": {
        "token": "dce8c5264b254babaf1ffb275554d858"  # Replace with your Football-Data.org API token
    },
    "etl": {
        "requests_per_min": 10,
        "retries_per_team": 3,
        "sleep_between_retries": 2
    },
    "database": {
        "type": "duckdb",
        "duckdb_path": "../data/football.duckdb"
    }
}

# Example usage
API_TOKEN = config['api']['token']
HEADERS = {"X-Auth-Token": API_TOKEN}
REQUESTS_PER_MIN = config['etl']['requests_per_min']
RETRIES_PER_TEAM = config['etl']['retries_per_team']
SLEEP_BETWEEN_RETRIES = config['etl']['sleep_between_retries']


In [4]:
import time
import logging
import pandas as pd
import yaml
import requests
import os

# -------------------------
# Logging
# -------------------------
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

# -------------------------
# Load config
# -------------------------
with open("../config.yaml", "r") as f:
    config = yaml.safe_load(f)

API_TOKEN = config['api']['token']
HEADERS = {"X-Auth-Token": API_TOKEN}
REQUESTS_PER_MIN = config['etl']['requests_per_min']
RETRIES_PER_TEAM = config['etl']['retries_per_team']
SLEEP_BETWEEN_RETRIES = config['etl']['sleep_between_retries']

# -------------------------
# Database connection
# -------------------------
db_type = config['database']['type'].lower()

if db_type == 'duckdb':
    import duckdb
    os.makedirs(os.path.dirname(config['database']['duckdb_path']), exist_ok=True)
    con = duckdb.connect(database=config['database']['duckdb_path'])
    logging.info(f"Connected to DuckDB at {config['database']['duckdb_path']}")

elif db_type == 'snowflake':
    import snowflake.connector
    con = snowflake.connector.connect(
        user=config['database']['snowflake']['user'],
        password=config['database']['snowflake']['password'],
        account=config['database']['snowflake']['account'],
        database=config['database']['snowflake']['database'],
        warehouse=config['database']['snowflake']['warehouse']
    )
    logging.info("Connected to Snowflake")

else:
    raise ValueError(f"Unsupported database type: {db_type}")

# -------------------------
# Functions
# -------------------------
def fetch_json(url, retries=RETRIES_PER_TEAM, sleep_between=2):
    """Fetch JSON data with retries."""
    for attempt in range(retries):
        try:
            resp = requests.get(url, headers=HEADERS)
            if resp.status_code == 200:
                return resp.json()
            else:
                logging.warning(f"Attempt {attempt+1} failed: {resp.status_code} for {url}")
                time.sleep(sleep_between)
        except requests.RequestException as e:
            logging.error(f"Request exception: {e}")
            time.sleep(sleep_between)
    logging.error(f"Failed to fetch: {url}")
    return None

def fetch_teams():
    """Fetch PL teams and save to DB."""
    url = "https://api.football-data.org/v4/competitions/PL/teams"
    data = fetch_json(url)
    if data and "teams" in data:
        df_teams = pd.json_normalize(data['teams'])
        if db_type == "duckdb":
            con.execute("CREATE OR REPLACE TABLE teams AS SELECT * FROM df_teams")
        elif db_type == "snowflake":
            # Snowflake: adjust as needed for schema / types
            con.cursor().execute("CREATE OR REPLACE TABLE teams AS SELECT * FROM df_teams")
        logging.info("Teams table updated in DB.")
        return df_teams
    else:
        logging.error("Teams data not available.")
        return pd.DataFrame()

def fetch_matches(season=2025):
    """Fetch PL matches for the season and save to DB."""
    url = f"https://api.football-data.org/v4/competitions/PL/matches?season={season}"
    data = fetch_json(url)
    if data and "matches" in data:
        df_matches = pd.json_normalize(data['matches'])
        if db_type == "duckdb":
            con.execute("CREATE OR REPLACE TABLE matches AS SELECT * FROM df_matches")
        elif db_type == "snowflake":
            con.cursor().execute("CREATE OR REPLACE TABLE matches AS SELECT * FROM df_matches")
        logging.info("Matches table updated in DB.")
        return df_matches
    else:
        logging.error("Matches data not available.")
        return pd.DataFrame()

def fetch_team_players(team_id):
    """Fetch players for a single team with retries."""
    for attempt in range(RETRIES_PER_TEAM):
        try:
            url = f"https://api.football-data.org/v4/teams/{team_id}"
            resp = requests.get(url, headers=HEADERS)
            if resp.status_code == 200:
                players = resp.json().get('squad', [])
                for p in players:
                    p['team_id'] = team_id
                return players
            else:
                logging.warning(f"Attempt {attempt+1} failed for team {team_id}: {resp.status_code}")
                time.sleep(SLEEP_BETWEEN_RETRIES)
        except requests.RequestException as e:
            logging.error(f"Request exception for team {team_id}: {e}")
            time.sleep(SLEEP_BETWEEN_RETRIES)
    logging.warning(f"All attempts failed for team {team_id}")
    return []

def fetch_all_players(df_teams):
    """Fetch players for all teams, respecting rate limits."""
    all_players = []
    failed_teams = []
    sleep_between_calls = 60 / REQUESTS_PER_MIN

    for i, team_id in enumerate(df_teams['id'], start=1):
        players = fetch_team_players(team_id)
        if players:
            all_players.extend(players)
        else:
            failed_teams.append(team_id)
        logging.info(f"Processed {i}/{len(df_teams)} teams")
        time.sleep(sleep_between_calls)

    df_players = pd.json_normalize(all_players)
    df_players = df_players.drop_duplicates(subset=['id'])
    if db_type == "duckdb":
        con.execute("CREATE OR REPLACE TABLE players AS SELECT * FROM df_players")
    elif db_type == "snowflake":
        con.cursor().execute("CREATE OR REPLACE TABLE players AS SELECT * FROM df_players")
    logging.info("Players table updated in DB.")

    if failed_teams:
        missing_team_names = df_teams[df_teams['id'].isin(failed_teams)][['id','name']]
        logging.warning("Teams missing players this run:")
        logging.warning("\n%s", missing_team_names)

    return df_players

# -------------------------
# Main ETL
# -------------------------
def main():
    df_teams = fetch_teams()
    if df_teams.empty:
        logging.error("No teams fetched. Aborting ETL.")
        return
    df_matches = fetch_matches()
    df_players = fetch_all_players(df_teams)
    logging.info("ETL run complete.")

if __name__ == "__main__":
    main()


FileNotFoundError: [Errno 2] No such file or directory: '../config.yaml'