# Flagrant Fouls Data Collection

Extract flagrant foul data from NBA API for seasons 2020-21 through 2024-25.

- Loads existing CSV to avoid duplicate game IDs
- Fetches new games from API
- Saves to CSV immediately after each successful API call
- Stops on first read timeout (1 hour cooldown needed)

In [None]:
import pandas as pd
import numpy as np
import time
from datetime import datetime, timedelta
from pathlib import Path
from nba_api.stats.endpoints.leaguegamefinder import LeagueGameFinder
from nba_api.stats.endpoints.playbyplayv3 import PlayByPlayV3
import warnings
warnings.filterwarnings('ignore')

## 1. Load Existing Data

In [None]:
csv_file = Path('nba_flagrant_fouls.csv')

# Load existing data if it exists
if csv_file.exists():
    existing_df = pd.read_csv(csv_file)
    existing_game_ids = set(existing_df['game_id'].unique())
    print(f"Loaded existing CSV with {len(existing_df)} games")
    print(f"Unique game IDs to skip: {len(existing_game_ids)}")
else:
    existing_df = pd.DataFrame()
    existing_game_ids = set()
    print("No existing CSV found. Starting fresh.")

## 2. Define Data Extraction Function

In [None]:
def extract_game_data(game_id):
    """
    Extract flagrant fouls and game outcome from a single game.
    
    Args:
        game_id: NBA game ID
    
    Returns:
        dict: Game data or None if error occurs
    """
    try:
        response = PlayByPlayV3(game_id=game_id)
        pbp = response.play_by_play.get_data_frame()
        
        # Extract flagrants by team
        flagrants = pbp[pbp['subType'].isin(['Flagrant Type 1', 'Flagrant Type 2'])]
        
        home_flagrants = len(flagrants[flagrants['location'] == 'h'])
        away_flagrants = len(flagrants[flagrants['location'] == 'v'])
        
        # Get final score (from last row)
        final_row = pbp.iloc[-1]
        home_score = final_row['scoreHome']
        away_score = final_row['scoreAway']
        
        # Get team IDs (from first non-empty row)
        team_rows = pbp[pbp['teamId'] != 0]
        home_team = team_rows[team_rows['location'] == 'h']['teamId'].iloc[0]
        away_team = team_rows[team_rows['location'] == 'v']['teamId'].iloc[0]
        
        return {
            'game_id': game_id,
            'home_team': home_team,
            'away_team': away_team,
            'home_flagrants': home_flagrants,
            'away_flagrants': away_flagrants,
            'home_score': home_score,
            'away_score': away_score
        }
    
    except Exception as e:
        return None, e

## 3. Fetch Games from NBA API

In [None]:
# Define seasons to extract
seasons = ['2020-21', '2021-22', '2022-23', '2023-24', '2024-25']

all_game_ids = []

print(f"Fetching game IDs from {len(seasons)} seasons...")
for season in seasons:
    try:
        gamefinder = LeagueGameFinder(season_nullable=season)
        games_df = gamefinder.get_data_frames()[0]
        game_ids = games_df['GAME_ID'].unique()
        all_game_ids.extend(game_ids)
        print(f"  {season}: {len(game_ids)} games")
    except Exception as e:
        print(f"  {season}: ERROR - {type(e).__name__}")
        continue

print(f"\nTotal unique games: {len(set(all_game_ids))}")

# Filter out games already in CSV
new_game_ids = [gid for gid in all_game_ids if gid not in existing_game_ids]
print(f"New games to extract: {len(new_game_ids)}")
print(f"Games to skip (already in CSV): {len(all_game_ids) - len(new_game_ids)}")

## 4. Extract Flagrant Foul Data

In [None]:
print(f"\nStarting data extraction for {len(new_game_ids)} new games...")
print(f"(1 second throttle between API calls)\n")

successful_count = 0
skipped_count = 0
failed_count = 0

try:
    for i, game_id in enumerate(new_game_ids):
        if i % 100 == 0 and i > 0:
            print(f"Progress: {i}/{len(new_game_ids)} | Saved: {successful_count} | Errors: {failed_count}")
        
        game_data, error = extract_game_data(game_id)
        
        if game_data:
            # Convert to DataFrame and save immediately
            game_df = pd.DataFrame([game_data])
            
            # Append to CSV
            if csv_file.exists():
                game_df.to_csv(csv_file, mode='a', header=False, index=False)
            else:
                game_df.to_csv(csv_file, mode='w', header=True, index=False)
            
            successful_count += 1
        else:
            # Check if it's a read timeout
            if 'ReadTimeout' in str(type(error).__name__) or 'timeout' in str(error).lower():
                print(f"\n{'='*70}")
                print(f"READ TIMEOUT DETECTED - IP likely rate limited")
                print(f"{'='*70}")
                
                # Calculate 1 hour from now
                retry_time = datetime.now() + timedelta(hours=1)
                print(f"\nPlease wait until: {retry_time.strftime('%Y-%m-%d %H:%M:%S')} (local time)")
                print(f"This is approximately 1 hour from now.")
                print(f"\nProgress so far:")
                print(f"  Games extracted: {successful_count}")
                print(f"  Games processed: {i + 1}/{len(new_game_ids)}")
                print(f"\nRun this notebook again in 1 hour to continue extraction.")
                break
            else:
                failed_count += 1
        
        # Throttle to avoid rate limiting
        time.sleep(1.0)

except KeyboardInterrupt:
    print(f"\n\nInterrupted by user")

print(f"\n{'='*70}")
print(f"EXTRACTION COMPLETE")
print(f"{'='*70}")
print(f"Successfully saved: {successful_count} games")
print(f"Errors encountered: {failed_count} games")
print(f"Total processed: {successful_count + failed_count}/{len(new_game_ids)}")
print(f"Data saved to: {csv_file}")

## 5. Verify Final Data

In [None]:
# Load and display summary
final_df = pd.read_csv(csv_file)

print(f"\nFinal CSV Summary:")
print(f"Total games: {len(final_df)}")
print(f"Unique games: {final_df['game_id'].nunique()}")
print(f"\nGames per season (inferred from game_id):")
final_df['season'] = final_df['game_id'].str[3:7]  # Extract year from game_id
print(final_df.groupby('season').size().sort_index())
print(f"\nData shape: {final_df.shape}")
print(f"Columns: {final_df.columns.tolist()}")