In [1]:
import requests
import pandas as pd
import logging
from sqlalchemy import create_engine

In [31]:
stats_list = ["goals_scored", "own_goals", "yellow_cards", "red_cards", "assists",
              "penalties_saved", "penalties_missed", "saves", "bonus", "bps"]

In [36]:
def pull_data_from_api():
        """Fetch data from Fantasy Premier League API."""
        api_url = "https://fantasy.premierleague.com/api/fixtures/?future=0"
        try:
            response = requests.get(api_url, timeout=10)
            response.raise_for_status()
            logging.info("✅ API data fetched successfully")
            return response.json()  # Returned value is automatically available to downstream tasks
        except requests.RequestException as e:
            logging.error(f"❌ Error fetching data from API: {e}")
            raise

In [37]:
def extract_stats(data, identifier):
    """Extract relevant match statistics."""
    if not data:
        raise ValueError("❌ API response is empty.")

    extracted_details = []
    for fixture in data:
        stats = fixture.get("stats", [])
        for stat in stats:
            if stat.get("identifier") == identifier:
                for team_type in ['a', 'h']:
                    for entry in stat.get(team_type, []):
                        extracted_details.append({
                            'game_code': fixture.get('code', None),
                            'finished': fixture.get('finished', None),
                            'game_id': fixture.get('id', None),
                            'stat_value': entry.get('value', None),
                            'player_id': entry.get('element', None),
                            'team_type': team_type
                        })
    return pd.DataFrame(extracted_details)

In [40]:
def join_dataframes(data):
    """Process extracted data into relevant statistics."""
    if data is None or len(data) == 0:
        raise ValueError("❌ No data available for processing.")

    # Initialize an empty list to collect DataFrames
    df_list = []
    
    for stat in stats_list:
        df = extract_stats(data, stat)
        if not df.empty:  # Only process non-empty DataFrames
            df['stat_type'] = stat  # Add stat type as a column (changed from f'{stat}')
            df_list.append(df)

    # Combine all DataFrames at once
    if df_list:
        new_df = pd.concat(df_list, ignore_index=True)
    else:
        new_df = pd.DataFrame()  # Return empty DataFrame if no data

    logging.info("✅ Data processing completed successfully.")
    return new_df  # Return single combined DataFrame

In [41]:
join_dataframes(pull_data_from_api())

Unnamed: 0,game_code,finished,game_id,stat_value,player_id,team_type,stat_type
0,2444470,True,1,1,389,h,goals_scored
1,2444473,True,4,1,317,a,goals_scored
2,2444473,True,4,1,328,a,goals_scored
3,2444471,True,2,1,4,h,goals_scored
4,2444471,True,2,1,17,h,goals_scored
...,...,...,...,...,...,...,...
11969,2444735,True,266,4,123,h,bps
11970,2444735,True,266,4,654,h,bps
11971,2444735,True,266,3,520,h,bps
11972,2444735,True,266,3,576,h,bps


In [None]:
def upload_to_postgres(df):
    """Upload DataFrame to PostgreSQL."""
    # Database connection parameters
    dbname = os.getenv('dbname')
    user = os.getenv('user')
    password = os.getenv('password')
    host = os.getenv('host')
    port = os.getenv('port')

    # Check if DataFrame is empty
    if df is None or df.empty:
        raise ValueError("❌ DataFrame is empty, cannot upload to PostgreSQL.")

    # Create SQLAlchemy engine for PostgreSQL connection   
    try:
        engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{dbname}')
        logging.info("✅ Database connection established")
    except Exception as e:
        logging.error(f"❌ Failed to connect to database: {e}")
        raise

    # Load DataFrame into the 'bronze' schema
    try:
        df.to_sql(
            'players_stats',           # Table name
            engine,                 # SQLAlchemy engine
            schema='bronze',        # Target schema
            if_exists='replace',    # 'replace' to overwrite, 'append' to add data
            index=False             # Exclude DataFrame index
        )
        logging.info("✅ Data loaded into 'bronze.games_info' successfully")
    except Exception as e:
        logging.error(f"❌ Failed to load data into database: {e}")
        raise