In [1]:
import os
import json
import time
import pandas as pd
import requests
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
import mysql.connector
from mysql.connector import Error
import logging

# Logging steup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger("premier_league")


In [2]:
RAPIDAPI_KEY = "API KEY"
DB_HOST =  "127.0.0.1"
DB_PORT =  3306
DB_USER = "Football_API"
DB_PASS = "strong"
DB_NAME = "football_db"

if not RAPIDAPI_KEY:
    raise RuntimeError("Missing RAPIDAPI_KEY in environment (.env)")

if not DB_PASS:
    logger.warning("DB_PASS not set - please set in .env to avoid interactive prompts")

In [3]:
BASE_URL = "https://premier-league-standings1.p.rapidapi.com/"
HEADERS = {
    "x-rapidapi-key": RAPIDAPI_KEY,
    "x-rapidapi-host": "premier-league-standings1.p.rapidapi.com"
}

#safely fetching live payload or retries on transient errors
@retry(wait=wait_exponential(multiplier=1, min=1, max=60),
       stop=stop_after_attempt(5),
       retry=retry_if_exception_type((requests.exceptions.RequestException,)))
def fetch_live_payload(timeout: int = 10) -> dict:
    """Fetch live payload from RapidAPI with retries and raise_for_status."""
    resp = requests.get(BASE_URL, headers=HEADERS, timeout=timeout)
    resp.raise_for_status()
    return resp.json()

payload = fetch_live_payload()

In [4]:
# normalize raw JSON into a tabular schema

def parse_payload(payload: dict) -> pd.DataFrame:
    """
    Parse the API payload into a normalized DataFrame with fixed schema.
    Fields: uni_id, home_team, home_score, away_team, away_score, ongoing, score_str, added_time
    """
    rows = []
    columns_names = ['position','abbreviation','team_name','gamesPlayed','wins','draw',
                     'losses','goalsFor','goalsAgainst','goalDifference','points']

    for i in payload:
        position = i['stats']['rank']
        abbreviation = i['team']['abbreviation']
        team_name = i['team']['name']
        gamesPlayed = i['stats']['gamesPlayed']
        wins = i['stats']['wins']
        draw = i['stats']['ties']
        losses = i['stats']['losses']
        goalsFor = i['stats']['goalsFor']
        goalsAgainst = i['stats']['goalsAgainst']
        goalDifference = i['stats']['goalDifference']
        points = i['stats']['points']
        
        active_rows = (position,abbreviation,team_name,gamesPlayed,wins,draw,losses,goalsFor,
                       goalsAgainst,goalDifference,points)

        rows.append(active_rows)
    df = pd.DataFrame(rows, columns=columns_names)
    
#drop blank or duplicate team entries
    if "team_name" in df.columns:
        df = df.dropna(subset=["team_name"]).drop_duplicates(subset=["team_name"], keep="last")
    return df

In [5]:
payload = fetch_live_payload()
df = parse_payload(payload)
df.head(10)

Unnamed: 0,position,abbreviation,team_name,gamesPlayed,wins,draw,losses,goalsFor,goalsAgainst,goalDifference,points
0,1,LIV,Liverpool,3,3,0,0,8,4,4,9
1,2,CHE,Chelsea,3,2,1,0,7,1,6,7
2,3,ARS,Arsenal,3,2,0,1,6,1,5,6
3,4,TOT,Tottenham Hotspur,3,2,0,1,5,1,4,6
4,5,EVE,Everton,3,2,0,1,5,3,2,6
5,6,SUN,Sunderland,3,2,0,1,5,3,2,6
6,7,BOU,AFC Bournemouth,3,2,0,1,4,4,0,6
7,8,CRY,Crystal Palace,3,1,2,0,4,1,3,5
8,9,MAN,Manchester United,3,1,1,1,4,4,0,4
9,10,NFO,Nottingham Forest,3,1,1,1,4,5,-1,4


In [6]:
#connection to database
from contextlib import contextmanager

@contextmanager
def get_db_connection():
    conn = None
    try:
        conn = mysql.connector.connect(
            host=DB_HOST,
            port=DB_PORT,
            user=DB_USER,
            password=DB_PASS,
            database=DB_NAME,
            connection_timeout=30,
            autocommit=False,
        )
        yield conn
    finally:
        if conn is not None and conn.is_connected():
            conn.close()


In [7]:
# Inserting/Updating data into sql database
def upsert_standings(df: pd.DataFrame, table_name: str = "pl_standings", batch_size: int = 200):
    """
    Performs a batched, parameterized bulk upsert into MySQL using executemany.
    Uses ON DUPLICATE KEY UPDATE with VALUES() for simplicity.
    Assumes the 'uni_id' column is PRIMARY KEY or UNIQUE.
    """
    if df.empty:
        logger.info("No rows to upsert.")
        return 0

    cols = [
       'position','abbreviation','team_name','gamesPlayed','wins','draw','losses',
        'goalsFor','goalsAgainst','goalDifference','points'
    ]
    placeholders = "(" + ",".join(["%s"] * len(cols)) + ")"
    insert_sql = f"""
    INSERT INTO {table_name} ({', '.join(cols)})
    VALUES {placeholders}
    ON DUPLICATE KEY UPDATE
      position=VALUES(position),
      abbreviation=VALUES(abbreviation),
      team_name=VALUES(team_name),
      gamesPlayed=VALUES(gamesPlayed),
      wins=VALUES(wins),
      draw=VALUES(draw),
      losses=VALUES(losses),
      goalsFor=VALUES(goalsFor),
      goalsAgainst=VALUES(goalsAgainst),
      goalDifference=VALUES(goalDifference),
      points=VALUES(points)
    ;
    """

#Convert DataFrame to list of tuples as executemany() does not accept a DataFrame directly.
    records = [tuple(x) for x in df[cols].itertuples(index=False, name=None)]
    total = 0
    try:
        with get_db_connection() as conn:
            cur = conn.cursor()
            # batch inserts
            for i in range(0, len(records), batch_size):
                batch = records[i : i + batch_size]
                cur.executemany(insert_sql, batch)
                conn.commit()
                total += len(batch)
                logger.info("Upserted batch rows: %d", len(batch))
            cur.close()
    except Error as err:
        logger.exception("DB error during upsert: %s", err)
        raise
    return total


In [8]:
#fetch,transform and load data into sql table
def run_once():
    logger.info("ETL run started")
    payload = fetch_live_payload()
    df = parse_payload(payload)
    logger.info("Parsed rows: %d", len(df))
    upserted = upsert_standings(df)
    logger.info("ETL run completed. Upserted rows: %d", upserted)
    return {"rows_parsed": len(df), "rows_upserted": upserted}

# Execute
if __name__ == "__main__":
    # In Jupyter you can just call run_once()
    result = run_once()
    result

2025-09-12 13:44:01,280 | INFO | ETL run started
2025-09-12 13:44:02,641 | INFO | Parsed rows: 20
2025-09-12 13:44:02,696 | INFO | Upserted batch rows: 20
2025-09-12 13:44:02,698 | INFO | ETL run completed. Upserted rows: 20


In [9]:
#checks that data landed in DB
with get_db_connection() as conn:
    cur = conn.cursor()
    cur.execute("SHOW TABLES LIKE 'pl_standings';")
    res = cur.fetchall()
    logger.info("Table exists: %s", res)

2025-09-12 13:44:02,735 | INFO | Table exists: [('pl_standings',)]
