In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession

# Initialize Spark Session (Databricks automatically provides this)
spark = SparkSession.builder.appName("FPL_Player_Season_History_Enhanced").getOrCreate()

# Base API URL
BASE_URL = "https://fantasy.premierleague.com/api/"

# Fetch all player metadata (names, teams, positions)
def get_all_players():
    """Fetches all players from the FPL API."""
    response = requests.get(BASE_URL + "bootstrap-static/")
    if response.status_code == 200:
        data = response.json()
        df_players = pd.DataFrame(data["elements"])
        df_teams = pd.DataFrame(data["teams"])  # Fetch team info
        
        # Map team_id to team name
        team_map = df_teams.set_index("id")["name"].to_dict()
        df_players["team_name"] = df_players["team"].map(team_map)
        
        # Map element_type to position names
        position_map = {1: "Goalkeeper", 2: "Defender", 3: "Midfielder", 4: "Forward"}
        df_players["position"] = df_players["element_type"].map(position_map)
        
        return df_players
    else:
        print("⚠️ Failed to fetch player data!")
        return None

# Fetch historical season data for a given player
def get_season_history(player_id):
    """Fetches complete past season stats for a given player_id."""
    try:
        url = f"{BASE_URL}element-summary/{player_id}/"
        r = requests.get(url).json()
        
        # Extract full past seasons' history
        df = pd.json_normalize(r["history_past"])
        df["player_id"] = player_id  # Add player ID for merging
        
        return df
    except:
        print(f"⚠️ Failed to fetch history for player {player_id}")
        return None

# Fetch all players
df_players = get_all_players()

if df_players is not None:
    player_ids = df_players["id"].tolist()  # Extract all player IDs

    # Fetch complete history for all players
    all_histories = []
    for player_id in player_ids:
        print(f"📡 Fetching history for player {player_id}...")
        df_history = get_season_history(player_id)
        if df_history is not None:
            all_histories.append(df_history)

    # Merge all player histories into one DataFrame
    df_past_seasons = pd.concat(all_histories, ignore_index=True)

    # Select ALL available columns
    df_past_seasons = df_past_seasons[[
        "player_id", "season_name", "element_code", "start_cost", "end_cost", 
        "total_points", "minutes", "goals_scored", "assists", "clean_sheets", "goals_conceded",
        "own_goals", "penalties_saved", "penalties_missed", "yellow_cards", "red_cards",
        "saves", "bonus", "bps", "influence", "creativity", "threat", "ict_index"
    ]]

    # Merge with player metadata (names, teams, positions)
    df_final = df_past_seasons.merge(
        df_players[["id", "web_name", "team_name", "position"]], 
        left_on="player_id", right_on="id", how="left"
    )

    # Drop redundant columns
    df_final.drop(columns=["player_id", "id"], inplace=True)

    # Reorder columns
    df_final = df_final[[
        "web_name", "team_name", "position", "season_name", "element_code",
        "start_cost", "end_cost", "total_points", "minutes", "goals_scored",
        "assists", "clean_sheets", "goals_conceded", "own_goals", "penalties_saved",
        "penalties_missed", "yellow_cards", "red_cards", "saves", "bonus",
        "bps", "influence", "creativity", "threat", "ict_index"
    ]]

    # Convert to Spark DataFrame
    df_spark = spark.createDataFrame(df_final)

    # Show a sample
    df_spark.show(5)

    # Save to Databricks Table
    df_spark.write.format("delta").mode("overwrite").saveAsTable("fpl.season_history.player_history")

    print("✅ Full Past Season History Table Successfully Created in Databricks!")
