In [None]:
# =============================================================================
# SETUP
# =============================================================================
import sys
from pathlib import Path
import polars as pl
import logging
import numpy as np

# Add project root to Python path
sys.path.insert(0, '/Users/architmanek/Desktop/DataEngineering/football_pipeline')

# Now imports work without changing working directory
from utils.constants import ABS_PATH, BRONZE_DIR_EVENTS

file_path = ABS_PATH / BRONZE_DIR_EVENTS / "events_15946.parquet"
df = pl.read_parquet(file_path)
df = df.with_columns(
                pl.col("timestamp").str.strptime(pl.Datetime, "%H:%M:%S.%f", strict=False)
            )

  pl.col("timestamp").str.strptime(pl.Datetime, "%H:%M:%S.%f", strict=False)


In [9]:
# =============================================================================
# TEST 1: COLUMN FLATTENING
# =============================================================================
def flatten_columns(df):
    return df.rename({col: col.replace('.', '_') for col in df.columns})

In [10]:
# Find columns with periods
columns_with_periods = [col for col in df.columns if '.' in col]
print(f"Columns with periods: {columns_with_periods}")
print(f"Number of columns with periods: {len(columns_with_periods)}")

# Test flatten_columns
df = flatten_columns(df)

columns_with_periods = [col for col in df.columns if '.' in col]
print(f"Columns with periods: {columns_with_periods}")
print(f"Number of columns with periods: {len(columns_with_periods)}")


Columns with periods: ['type.id', 'type.name', 'possession_team.id', 'possession_team.name', 'play_pattern.id', 'play_pattern.name', 'team.id', 'team.name', 'tactics.formation', 'tactics.lineup', 'player.id', 'player.name', 'position.id', 'position.name', 'pass.recipient.id', 'pass.recipient.name', 'pass.length', 'pass.angle', 'pass.height.id', 'pass.height.name', 'pass.end_location', 'pass.body_part.id', 'pass.body_part.name', 'pass.type.id', 'pass.type.name', 'carry.end_location', 'pass.switch', 'pass.outcome.id', 'pass.outcome.name', 'ball_receipt.outcome.id', 'ball_receipt.outcome.name', 'duel.type.id', 'duel.type.name', 'pass.aerial_won', 'interception.outcome.id', 'interception.outcome.name', 'ball_recovery.recovery_failure', 'pass.assisted_shot_id', 'pass.shot_assist', 'shot.statsbomb_xg', 'shot.end_location', 'shot.key_pass_id', 'shot.outcome.id', 'shot.outcome.name', 'shot.first_time', 'shot.technique.id', 'shot.technique.name', 'shot.body_part.id', 'shot.body_part.name', 'sho

In [11]:
# =============================================================================
# TEST 2: ENRICH LOCATIONS
# =============================================================================
def enrich_locations(df):
    df = df.with_columns([
        pl.col("location").cast(pl.Array(pl.Float64, 2)).alias("location"),
        pl.col("pass_end_location").cast(pl.Array(pl.Float64, 2)).alias("pass_end_location"),
    ])
    df = df.with_columns([
        (pl.col("location").arr.get(0) / 120).alias("x"),
        (pl.col("location").arr.get(1) / 80).alias("y"),
        pl.when(pl.col("type_name") == "Pass")
          .then(pl.col("pass_end_location").arr.get(0) / 120)
          .otherwise(None)
          .alias("end_x"),
        pl.when(pl.col("type_name") == "Pass")
          .then(pl.col("pass_end_location").arr.get(1) / 80)
          .otherwise(None)
          .alias("end_y"),
    ])
    return df

In [12]:
# Test enrich_locations (which tests normalize functions internally)
# print("Before enriching:")
# print(f"Has x column: {'x' in df.columns}")
# print(f"Has y column: {'y' in df.columns}")

df_enriched = enrich_locations(df)
# df['location'].dtype

# print("\nAfter enriching:")
# print(f"Events with x,y coordinates: {df_enriched[['x', 'y']].notnull().all(axis=1).sum()}")
# print(f"Pass events with end_x,end_y: {df_enriched[['end_x', 'end_y']].notnull().all(axis=1).sum()}")

# # Sample some results
# sample_coords = df_enriched[df_enriched.x.notnull()][['location', 'x', 'y', 'end_x', 'end_y']].head(3)
# print(f"\nSample normalized coordinates:")
# print(sample_coords)

print(df_enriched[["location", "x", "y"]].head())


shape: (5, 3)
┌───────────────┬──────────┬─────────┐
│ location      ┆ x        ┆ y       │
│ ---           ┆ ---      ┆ ---     │
│ array[f64, 2] ┆ f64      ┆ f64     │
╞═══════════════╪══════════╪═════════╡
│ null          ┆ null     ┆ null    │
│ null          ┆ null     ┆ null    │
│ null          ┆ null     ┆ null    │
│ null          ┆ null     ┆ null    │
│ [61.0, 40.1]  ┆ 0.508333 ┆ 0.50125 │
└───────────────┴──────────┴─────────┘


In [13]:
def add_possession_stats(df):
    """Add possession stats

    Args:
        df (pd.DataFrame): DataFrame with pass data

    Returns:
        pd.DataFrame: DataFrame with possession stats including:
        - possession_event_count: Number of events in each possession
        - possession_pass_count: Number of passes in each possession
        - possession_player_count: Number of unique players in each possession
        - possession_duration: Duration of each possession
        - possession_xg: Total xG in the possession
    """
    
    # Calculate event count per possession
    event_count = df.group_by("possession").agg(
        pl.count().alias("possession_event_count")
    )

    # Number of passes in each possession
    pass_count = (
        df.filter(pl.col("type_name") == "Pass")
        .group_by("possession")
        .agg(pl.count().alias("possession_pass_count"))
    )

    # Number of unique players in each possession
    player_count = df.group_by("possession").agg(
        pl.col("player_id").n_unique().alias("possession_player_count")
    )
    
    # Possession duration (in seconds)
    duration = df.group_by("possession").agg(
        (pl.col("timestamp").max() - pl.col("timestamp").min()).dt.total_seconds().alias("possession_duration")
    )

    # Total xG per possession
    total_xg = (
        df.filter(pl.col("type_name") == "Shot")
        .group_by("possession")
        .agg(pl.col("shot_statsbomb_xg").sum().alias("total_xG"))
    )

    # Merge it back into the main DataFrame as a new column
    df = df.join(event_count, on="possession", how="left")
    df = df.join(pass_count, on="possession", how="left")
    df = df.join(player_count, on="possession", how="left")
    df = df.join(duration, on="possession", how="left")
    df = df.join(total_xg, on="possession", how="left")

    return df

In [15]:
df = add_possession_stats(df_enriched)
print(df.head())
# print(f"\nDataFrame after adding possession stats:\n{df.head()}")

shape: (5, 128)
┌─────────────┬───────┬────────┬─────────────┬───┬────────────┬────────────┬────────────┬──────────┐
│ id          ┆ index ┆ period ┆ timestamp   ┆ … ┆ possession ┆ possession ┆ possession ┆ total_xG │
│ ---         ┆ ---   ┆ ---    ┆ ---         ┆   ┆ _pass_coun ┆ _player_co ┆ _duration  ┆ ---      │
│ str         ┆ i64   ┆ i64    ┆ datetime[μs ┆   ┆ t          ┆ unt        ┆ ---        ┆ f64      │
│             ┆       ┆        ┆ ]           ┆   ┆ ---        ┆ ---        ┆ i64        ┆          │
│             ┆       ┆        ┆             ┆   ┆ u32        ┆ u32        ┆            ┆          │
╞═════════════╪═══════╪════════╪═════════════╪═══╪════════════╪════════════╪════════════╪══════════╡
│ 9f6e2ecf-66 ┆ 1     ┆ 1      ┆ null        ┆ … ┆ null       ┆ 1          ┆ null       ┆ null     │
│ 85-45df-a62 ┆       ┆        ┆             ┆   ┆            ┆            ┆            ┆          │
│ e-c2db30…   ┆       ┆        ┆             ┆   ┆            ┆            

(Deprecated in version 0.20.5)
  pl.count().alias("possession_event_count")
(Deprecated in version 0.20.5)
  .agg(pl.count().alias("possession_pass_count"))
