# **Data Collection from StatsBomb**

## **Objective**
This notebook collects football match and event data from StatsBomb's public API for analysis.

## **Steps**
1. Setup environment and import libraries
2. Configure data storage paths
3. Define competitions and seasons to collect
4. Collect match data
5. Collect event data for matches
6. Process and clean collected data
7. Export processed data

## **Output**
- Raw match data in parquet format
- Raw event data in parquet format
- Processed event data with enriched features

## **1. Environment Setup**

In [1]:
import os
import time
import warnings
from pathlib import Path
from typing import List, Tuple, Optional, Union

import numpy as np
import pandas as pd
from statsbombpy import sb

warnings.filterwarnings('ignore')

## **2. Configuration**

In [2]:
DATA_PATH = Path("../data")
RAW_DATA_PATH = DATA_PATH / "raw"
PROCESSED_DATA_PATH = DATA_PATH / "processed"
MATCHES_PATH = RAW_DATA_PATH / "matches"
EVENTS_PATH = RAW_DATA_PATH / "events"

for path in [MATCHES_PATH, EVENTS_PATH, PROCESSED_DATA_PATH]:
    path.mkdir(parents=True, exist_ok=True)

print(f"Data directories created:")
print(f"  Matches: {MATCHES_PATH}")
print(f"  Events: {EVENTS_PATH}")
print(f"  Processed: {PROCESSED_DATA_PATH}")

Data directories created:
  Matches: ..\data\raw\matches
  Events: ..\data\raw\events
  Processed: ..\data\processed


## **3. Competition and Season Selection**

In [3]:
COMPETITIONS_SEASONS = [
    # Bundesliga
    (9, 281),    # 2023/2024
    (9, 27),     # 2015/2016
    
    # Champions League 
    (16, 4),     # 2018/2019
    (16, 1),     # 2017/2018
    (16, 2),     # 2016/2017
    (16, 27),    # 2015/2016
    (16, 26),    # 2014/2015
    (16, 25),    # 2013/2014
    (16, 24),    # 2012/2013
    (16, 23),    # 2011/2012
    (16, 22),    # 2010/2011
    (16, 21),    # 2009/2010
    (16, 41),    # 2008/2009
    (16, 39),    # 2006/2007
    (16, 37),    # 2004/2005
    (16, 44),    # 2003/2004
    
    # Copa América
    (223, 282),  # 2024
    
    # FIFA World Cup
    (43, 106),   # 2022
    (43, 3),     # 2018
    
    # La Liga 
    (11, 90),    # 2020/2021
    (11, 42),    # 2019/2020
    (11, 4),     # 2018/2019
    (11, 1),     # 2017/2018
    (11, 2),     # 2016/2017
    (11, 27),    # 2015/2016
    (11, 26),    # 2014/2015
    (11, 25),    # 2013/2014
    (11, 24),    # 2012/2013
    (11, 23),    # 2011/2012
    (11, 22),    # 2010/2011

    # Indian Super League
    (1238, 108), # 2021/2022

    # Ligue 1
    (7, 235),    # 2022/2023
    (7, 108),    # 2021/2022
    (7, 27),     # 2015/2016

    # MLS
    (44, 107),   # 2023

    # African Cup of Nations
    (1267, 107), # 2023

    # UEFA Euro
    (55, 282),   # 2024
    (55, 43),    # 2020

    # Premier League
    (2, 27),     # 2015/2016

    # Serie A
    (12, 27),    # 2015/2016
]

print(f"Total competitions/seasons to collect: {len(COMPETITIONS_SEASONS)}")

Total competitions/seasons to collect: 40


## **4. Utility Functions**

In [4]:
def generate_filename(competition_id: int, season_id: int, data_type: str = "matches") -> str:
    return f"{data_type}_competition_{competition_id}_season_{season_id}.parquet"


def check_existing_files(path: Path, prefix: str) -> List[Tuple[int, int]]:
    existing_pairs = []
    
    for filename in os.listdir(path):
        if filename.endswith('.parquet') and filename.startswith(prefix):
            parts = filename.replace('.parquet', '').split('_')
            if len(parts) >= 5:
                try:
                    competition_id = int(parts[2])
                    season_id = int(parts[4])
                    existing_pairs.append((competition_id, season_id))
                except (ValueError, IndexError):
                    print(f"Warning: Invalid filename format: {filename}")
    
    return existing_pairs


def fetch_with_retry(fetch_function, *args, max_retries: int = 3, **kwargs) -> Optional[pd.DataFrame]:
    for attempt in range(max_retries):
        try:
            data = fetch_function(*args, **kwargs)
            return data
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = (attempt + 1) * 2
                print(f"  Attempt {attempt + 1} failed: {e}")
                print(f"  Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print(f"  Failed after {max_retries} attempts: {e}")
                return None
    
    return None

## **5. Match Data Collection**

In [5]:
def collect_match_data() -> pd.DataFrame:
    existing_matches = check_existing_files(MATCHES_PATH, "matches_competition")
    print(f"Found {len(existing_matches)} existing match files.")
    
    all_matches_dfs = []
    
    for competition_id, season_id in COMPETITIONS_SEASONS:
        if (competition_id, season_id) in existing_matches:
            print(f"\nLoading existing data: Competition {competition_id}, Season {season_id}")
            parquet_path = MATCHES_PATH / generate_filename(competition_id, season_id)
            matches_df = pd.read_parquet(parquet_path)
            all_matches_dfs.append(matches_df)
        else:
            print(f"\nFetching new data: Competition {competition_id}, Season {season_id}")
            matches_df = fetch_with_retry(
                sb.matches,
                competition_id=competition_id,
                season_id=season_id
            )
            
            if matches_df is not None:
                parquet_path = MATCHES_PATH / generate_filename(competition_id, season_id)
                matches_df.to_parquet(parquet_path, index=False)
                print(f"  Saved to: {parquet_path.name}")
                all_matches_dfs.append(matches_df)
            
            time.sleep(1) 
    
    if all_matches_dfs:
        all_matches = pd.concat(all_matches_dfs, ignore_index=True)
        print(f"\nTotal matches collected: {len(all_matches)}")
        
        combined_path = RAW_DATA_PATH / "all_matches_combined.parquet"
        all_matches.to_parquet(combined_path, index=False)
        print(f"Combined file saved to: {combined_path}")
        
        return all_matches
    else:
        print("\nNo match data collected")
        return pd.DataFrame()

In [6]:
all_matches = collect_match_data()

Found 40 existing match files.

Loading existing data: Competition 9, Season 281

Loading existing data: Competition 9, Season 27

Loading existing data: Competition 16, Season 4

Loading existing data: Competition 16, Season 1

Loading existing data: Competition 16, Season 2

Loading existing data: Competition 16, Season 27

Loading existing data: Competition 16, Season 26

Loading existing data: Competition 16, Season 25

Loading existing data: Competition 16, Season 24

Loading existing data: Competition 16, Season 23

Loading existing data: Competition 16, Season 22

Loading existing data: Competition 16, Season 21

Loading existing data: Competition 16, Season 41

Loading existing data: Competition 16, Season 39

Loading existing data: Competition 16, Season 37

Loading existing data: Competition 16, Season 44

Loading existing data: Competition 223, Season 282

Loading existing data: Competition 43, Season 106

Loading existing data: Competition 43, Season 3

Loading existing dat

## **6. Event Data Collection**

In [7]:
def process_event_coordinates(events_df: pd.DataFrame) -> pd.DataFrame:
    events_df[['location_x', 'location_y']] = events_df['location'].apply(
        lambda x: pd.Series(x) if isinstance(x, (tuple, list)) else pd.Series([np.nan, np.nan])
    )

    if 'pass_end_location' in events_df.columns:
        events_df[['pass_end_location_x', 'pass_end_location_y']] = events_df['pass_end_location'].apply(
            lambda x: pd.Series(x) if isinstance(x, (tuple, list)) else pd.Series([np.nan, np.nan])
        )

    if 'carry_end_location' in events_df.columns:
        events_df[['carry_end_location_x', 'carry_end_location_y']] = events_df['carry_end_location'].apply(
            lambda x: pd.Series(x) if isinstance(x, (tuple, list)) else pd.Series([np.nan, np.nan])
        )
    
    return events_df


def collect_event_data(match_ids: List[int]) -> pd.DataFrame:
    existing_events = []
    for filename in os.listdir(EVENTS_PATH):
        if filename.endswith('.parquet') and filename.startswith('events_match_'):
            try:
                match_id = int(filename.replace('events_match_', '').replace('.parquet', ''))
                existing_events.append(match_id)
            except ValueError:
                pass
    
    print(f"Found {len(existing_events)} existing event files.")
    
    all_events_dfs = []
    total_matches = len(match_ids)
    
    for idx, match_id in enumerate(match_ids):
        if match_id in existing_events:
            events_path = EVENTS_PATH / f"events_match_{match_id}.parquet"
            events_df = pd.read_parquet(events_path)
            all_events_dfs.append(events_df)
        else:
            print(f"\nFetching events for match {match_id}")
            events_df = fetch_with_retry(sb.events, match_id=match_id)
            
            if events_df is not None:
                events_df = process_event_coordinates(events_df)
                
                events_path = EVENTS_PATH / f"events_match_{match_id}.parquet"
                events_df.to_parquet(events_path, index=False)
                all_events_dfs.append(events_df)
            
            time.sleep(1) 
        
        if (idx + 1) % 50 == 0:
            print(f"\nProgress: {idx + 1}/{total_matches} matches processed")
    
    if all_events_dfs:
        print("\nCombining all events...")
        all_events = pd.concat(all_events_dfs, ignore_index=True)
        print(f"Total events collected: {len(all_events)}")

        combined_path = RAW_DATA_PATH / "all_events_combined.parquet"
        all_events.to_parquet(combined_path, index=False)
        print(f"Combined file saved to: {combined_path}")
        
        return all_events
    else:
        print("\nNo event data collected")
        return pd.DataFrame()

In [8]:
if not all_matches.empty:
    non_draw_matches = all_matches[all_matches['home_score'] != all_matches['away_score']]
    match_ids = non_draw_matches['match_id'].tolist()
    print(f"\nTotal matches with winner: {len(match_ids)}")
    
    all_events = collect_event_data(match_ids)
else:
    print("No matches available for event collection")
    all_events = pd.DataFrame()


Total matches with winner: 2035
Found 2035 existing event files.

Progress: 50/2035 matches processed

Progress: 100/2035 matches processed

Progress: 150/2035 matches processed

Progress: 200/2035 matches processed

Progress: 250/2035 matches processed

Progress: 300/2035 matches processed

Progress: 350/2035 matches processed

Progress: 400/2035 matches processed

Progress: 450/2035 matches processed

Progress: 500/2035 matches processed

Progress: 550/2035 matches processed

Progress: 600/2035 matches processed

Progress: 650/2035 matches processed

Progress: 700/2035 matches processed

Progress: 750/2035 matches processed

Progress: 800/2035 matches processed

Progress: 850/2035 matches processed

Progress: 900/2035 matches processed

Progress: 950/2035 matches processed

Progress: 1000/2035 matches processed

Progress: 1050/2035 matches processed

Progress: 1100/2035 matches processed

Progress: 1150/2035 matches processed

Progress: 1200/2035 matches processed

Progress: 1250/20

## **7. Data Processing**

In [9]:
def select_relevant_columns(events_df: pd.DataFrame) -> pd.DataFrame:
    relevant_columns = [
        'match_id', 'period', 'index', 'minute', 'type', 'team',
        'location_x', 'location_y', 'timestamp', 'team_id', 'player', 'player_id',
        'pass_outcome', 'pass_recipient', 'pass_recipient_id', 'shot_outcome'
    ]
    
    existing_columns = [col for col in relevant_columns if col in events_df.columns]
    
    return events_df[existing_columns].copy()


def add_match_context(events_df: pd.DataFrame, matches_df: pd.DataFrame) -> pd.DataFrame:
    events_df = events_df[events_df["period"].isin([1, 2])].copy()
    
    home_away_map = {}
    for _, row in matches_df.iterrows():
        match_id = row["match_id"]
        home_away_map[(match_id, row["home_team"])] = "HOME"
        home_away_map[(match_id, row["away_team"])] = "AWAY"

    events_df["home_or_away"] = events_df.apply(
        lambda row: home_away_map.get((row["match_id"], row["team"]), "UNKNOWN"),
        axis=1
    )
    
    return events_df


def add_score_tracking(events_df: pd.DataFrame) -> pd.DataFrame:
    events_df = events_df.sort_values(["match_id", "period", "index"]).copy()
    
    def extract_team_names(group):
        home_team = group.loc[group["home_or_away"] == "HOME", "team"].iloc[0] if (group["home_or_away"] == "HOME").any() else np.nan
        away_team = group.loc[group["home_or_away"] == "AWAY", "team"].iloc[0] if (group["home_or_away"] == "AWAY").any() else np.nan
        return pd.Series({"home_team_name": home_team, "away_team_name": away_team})
    
    team_names = events_df.groupby("match_id").apply(extract_team_names).reset_index()
    events_df = events_df.merge(team_names, on="match_id", how="left")
    
    events_df['is_home_goal'] = (
        (events_df['team'] == events_df['home_team_name']) &
        ((events_df['shot_outcome'] == "Goal") | (events_df['type'] == "Own Goal For"))
    ).astype(int)
    
    events_df['is_away_goal'] = (
        (events_df['team'] == events_df['away_team_name']) &
        ((events_df['shot_outcome'] == "Goal") | (events_df['type'] == "Own Goal For"))
    ).astype(int)
    
    events_df['home_goals'] = events_df.groupby("match_id")['is_home_goal'].cumsum()
    events_df['away_goals'] = events_df.groupby("match_id")['is_away_goal'].cumsum()
    
    events_df['score_momentum'] = events_df['home_goals'].astype(str) + " x " + events_df['away_goals'].astype(str)
    
    def determine_game_state(row):
        if row['team'] == row['home_team_name']:
            if row['home_goals'] > row['away_goals']:
                return "W"
            elif row['home_goals'] < row['away_goals']:
                return "L"
            else:
                return "D"
        elif row['team'] == row['away_team_name']:
            if row['away_goals'] > row['home_goals']:
                return "W"
            elif row['away_goals'] < row['home_goals']:
                return "L"
            else:
                return "D"
        else:
            return np.nan
    
    events_df['game_state'] = events_df.apply(determine_game_state, axis=1)
    
    final_scores = events_df.groupby("match_id").agg({
        'home_goals': 'last',
        'away_goals': 'last',
        'home_team_name': 'first',
        'away_team_name': 'first'
    }).reset_index()
    
    final_scores['score_final'] = final_scores['home_goals'].astype(str) + " x " + final_scores['away_goals'].astype(str)
    final_scores['scoresheet'] = (
        final_scores['home_team_name'] + " " +
        final_scores['home_goals'].astype(str) + " x " +
        final_scores['away_goals'].astype(str) + " " +
        final_scores['away_team_name']
    )
    
    events_df = events_df.merge(
        final_scores[['match_id', 'scoresheet', 'score_final']],
        on="match_id",
        how="left"
    )
    
    # Final result
    events_df = events_df.merge(
        final_scores[['match_id', 'home_goals', 'away_goals']],
        on="match_id",
        how="left",
        suffixes=("", "_final")
    )
    
    def determine_final_result(row):
        if row['home_goals_final'] < row['away_goals_final']:
            if row['team'] == row['home_team_name']:
                return "L"
            elif row['team'] == row['away_team_name']:
                return "W"
        elif row['home_goals_final'] > row['away_goals_final']:
            if row['team'] == row['home_team_name']:
                return "W"
            elif row['team'] == row['away_team_name']:
                return "L"
        else:
            return "D"
    
    events_df['final_result'] = events_df.apply(determine_final_result, axis=1)
    
    columns_to_drop = [
        "is_home_goal", "is_away_goal",
        "home_goals_final", "away_goals_final"
    ]
    events_df.drop(columns=columns_to_drop, inplace=True)

    events_df.rename(columns={
        'home_team_name': 'home_abbrev_name',
        'away_team_name': 'away_abbrev_name'
    }, inplace=True)
    
    return events_df

In [10]:
if not all_events.empty:
    print("Processing event data...")
    
    processed_events = select_relevant_columns(all_events)
    print(f"Selected {len(processed_events.columns)} relevant columns")

    processed_events = add_match_context(processed_events, all_matches)
    print("Added home/away context")
    
    processed_events = add_score_tracking(processed_events)
    print("Added score tracking")
    
    draw_matches = processed_events[processed_events['final_result'] == 'D']['match_id'].unique()
    processed_events = processed_events[~processed_events['match_id'].isin(draw_matches)].copy()
    print(f"Removed {len(draw_matches)} drawn matches")
    
    print(f"\nFinal dataset: {len(processed_events)} events from {processed_events['match_id'].nunique()} matches")
else:
    print("No events to process")
    processed_events = pd.DataFrame()

Processing event data...
Selected 16 relevant columns
Added home/away context
Added score tracking
Removed 10 drawn matches

Final dataset: 7209091 events from 2025 matches


In [11]:
if not processed_events.empty:
    print("=== DATA SUMMARY ===")
    print(f"\nTotal records: {len(processed_events):,}")
    print(f"Total matches: {processed_events['match_id'].nunique()}")
    print(f"Total teams: {processed_events['team'].nunique()}")
    print(f"Total players: {processed_events['player_id'].nunique()}")
    
    print("\nTop 10 event types:")
    print(processed_events['type'].value_counts().head(10))
    
    print("\nResult distribution:")
    result_dist = processed_events.groupby(['match_id', 'team', 'final_result'])['final_result'].first().value_counts()
    print(result_dist)

=== DATA SUMMARY ===

Total records: 7,209,091
Total matches: 2025
Total teams: 216
Total players: 5926

Top 10 event types:
type
Pass             2016542
Ball Receipt*    1898758
Carry            1558817
Pressure          651480
Ball Recovery     205322
Duel              151274
Clearance          88580
Block              76091
Dribble            69318
Goal Keeper        61360
Name: count, dtype: int64

Result distribution:
final_result
W    2025
L    2025
Name: count, dtype: int64


## **9. Export Processed Data**

In [12]:
if not processed_events.empty:
    final_columns = [
        'match_id', 'period', 'index', 'timestamp', 'type',
        'team', 'team_id', 'player', 'player_id', 'pass_outcome', 'pass_recipient',
        'pass_recipient_id', 'location_x', 'location_y', 'home_or_away', 
        'home_abbrev_name', 'away_abbrev_name', 'home_goals', 'away_goals', 
        'score_momentum', 'game_state', 'scoresheet', 'score_final', 'final_result'
    ]
    
    existing_final_columns = [col for col in final_columns if col in processed_events.columns]
    processed_events = processed_events[existing_final_columns]

    output_path = PROCESSED_DATA_PATH / "events_processed.parquet"
    processed_events.to_parquet(output_path, index=False)

    file_size_mb = output_path.stat().st_size / (1024 * 1024)
    print(f"\nProcessed data saved to: {output_path}")
    print(f"File size: {file_size_mb:.2f} MB")
else:
    print("\nNo data to save")


Processed data saved to: ..\data\processed\events_processed.parquet
File size: 103.52 MB
