In [None]:
# !pip install datasets py7zr pandas

## Setup and Library Imports

In [1]:
import pandas as pd
import numpy as np
from datasets import load_dataset
import json
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

print("Libraries imported successfully!")

Libraries imported successfully!


## Load Dataset from HuggingFace

In [33]:
def load_sportvu_dataset():
    """
    Load the SportVU dataset from HuggingFace
    Dataset: dcayton/nba_tracking_data_15_16
    
    Returns:
    - dataset: loaded dataset object
    """
    print("Loading SportVU dataset from HuggingFace...")
    print("Dataset: dcayton/nba_tracking_data_15_16")
    
    try:
        # Load the dataset
        dataset = load_dataset("dcayton/nba_tracking_data_15_16", "tiny", split="train")
        print(f"Dataset loaded successfully: {len(dataset)} records available")
        
        # Display structure of first record
        print("\nFirst record keys:")
        print(dataset[0].keys())
        
        return dataset
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return None

# Load the dataset
dataset = load_sportvu_dataset()


Loading SportVU dataset from HuggingFace...
Dataset: dcayton/nba_tracking_data_15_16
Dataset loaded successfully: 2219 records available

First record keys:
dict_keys(['gameid', 'gamedate', 'event_info', 'primary_info', 'secondary_info', 'visitor', 'home', 'moments'])


## Data Parsing Functions

In [3]:
def parse_moments_data(moments):
    """
    Parse tracking data from moments array
    
    Parameters:
    - moments: list of moment arrays containing tracking data
    
    Returns:
    - DataFrame with frame-by-frame positions of all players
    """
    tracking_data = []
    
    for moment in moments:
        # Moment structure: [quarter, timestamp, game_clock, shot_clock, unknown, [ball+players]]
        quarter = moment[0]
        timestamp = moment[1]
        game_clock = moment[2]
        shot_clock = moment[3] if moment[3] is not None else -1
        
        # Extract ball and players data
        entities = moment[5]

        if len(entities) == 0:
            continue
            
        # First entity is the ball
        ball_data = entities[0]
        
        frame_dict = {
            'quarter': quarter,
            'timestamp': timestamp,
            'game_clock': game_clock,
            'shot_clock': shot_clock,
            'ball_team_id': ball_data[0],
            'ball_player_id': ball_data[1],
            'ball_x': ball_data[2],
            'ball_y': ball_data[3],
            'ball_z': ball_data[4]
        }
        
        # Remaining entities are players
        for i, player in enumerate(entities[1:]):
            frame_dict[f'player_{i}_team_id'] = player[0]
            frame_dict[f'player_{i}_player_id'] = player[1]
            frame_dict[f'player_{i}_x'] = player[2]
            frame_dict[f'player_{i}_y'] = player[3]
            frame_dict[f'player_{i}_z'] = player[4] if len(player) > 4 else 0
        
        tracking_data.append(frame_dict)
    
    return pd.DataFrame(tracking_data)

def identify_shot_events(events):
    """
    Filter events to identify only shot attempts
    
    Parameters:
    - events: list of event dictionaries
    
    Returns:
    - list of shot events
    """
    shot_events = []
    
    for event in events:
        # Check if 'eventId' exists in the event
        if 'eventId' in event:
            event_id = event['eventId']
            # EventId 1 = Made Shot, 2 = Missed Shot
            if event_id in [1, 2]:
                shot_events.append(event)
        # Alternative: check description for shot indicators
        elif 'description' in event:
            desc = event['description'].lower()
            if any(keyword in desc for keyword in ['shot', 'miss', 'make', 'layup', 'dunk', 'jumper']):
                shot_events.append(event)
    
    return shot_events

## Extract Tracking for Shots

In [7]:
def extract_shot_tracking(dataset, num_events=None):
    """
    Extract tracking data for all players during shot attempts
    
    Parameters:
    - dataset: loaded HuggingFace dataset
    - num_events: number of events to analyze (None = all)
    
    Returns:
    - list of dictionaries containing shot info and tracking data
    """
    shot_tracking = []
    
    # Limit number of events if specified
    events_to_analyze = dataset if num_events is None else dataset.select(range(min(num_events, len(dataset))))
    
    print(f"\nAnalyzing {len(events_to_analyze)} events...")

    for event_idx, event in enumerate(tqdm(events_to_analyze, desc="Processing events")):
        
        try:
            # Extract event information from the new structure
            game_id = event.get('gameid', f'game_{event_idx}')
            event_type = event.get('event_info', {}).get('type', -1)
            
            # Event types for shots: 1 = Made Shot, 2 = Missed Shot
            if event_type not in [1, 2]:
                continue
            
            # Extract event information
            event_info = {
                'game_id': game_id,
                'game_date': event.get('gamedate', 'Unknown'),
                'event_id': event.get('event_info', {}).get('id', -1),
                'event_type': event_type,
                'event_num': event_idx,
            }

           # Get player information
            primary_info = event.get('primary_info', {})
            if primary_info:
                event_info['player_id'] = primary_info.get('player_id', 'Unknown')
                event_info['team_id'] = primary_info.get('team_id', 'Unknown')
                event_info['team'] = primary_info.get('team', 'Unknown')
            
            # Get event description
            event_desc = event.get('event_info', {})
            desc = event_desc.get('desc_home', event_desc.get('desc_away', 'Shot attempt'))
            event_info['description'] = desc if desc != 'nan' else 'Shot attempt'
            
            # Determine shot result
            if event_type == 1:
                event_info['shot_result'] = 'made'
            elif event_type == 2:
                event_info['shot_result'] = 'missed'
            else:
                event_info['shot_result'] = 'unknown'

            # Extract tracking data (moments)
            moments = event.get('moments', [])
            
            if not moments:
                continue
            
            # Parse tracking data
            tracking_df = parse_moments_data(moments)
            
            if tracking_df.empty:
                continue
            
            # Add event information to tracking DataFrame
            for key, value in event_info.items():
                tracking_df[key] = value

            shot_tracking.append({
                'info': event_info,
                'tracking': tracking_df
            })
            
        except Exception as e:
            continue
    
    print(f"\n{'='*60}")
    print(f"Total shots extracted: {len(shot_tracking)}")
    print(f"{'='*60}")
    return shot_tracking

# Execute extraction (start with few games for testing)
if dataset:
    shot_tracking_data = extract_shot_tracking(dataset, num_events=3)
    
    if len(shot_tracking_data) == 0:
        print("\nWARNING: No shots were extracted!")
        print("Let's inspect the dataset structure...")
        print("\nFirst record structure:")
        print(json.dumps(dataset[0], indent=2, default=str)[:1000])
else:
    shot_tracking_data = []


Analyzing 3 events...


Processing events: 100%|██████████| 3/3 [00:00<00:00, 14.69it/s]


Total shots extracted: 0

Let's inspect the dataset structure...

First record structure:
{
  "gameid": "0021500333",
  "gamedate": "2015-12-11",
  "event_info": {
    "id": "1",
    "type": 10,
    "possession_team_id": null,
    "desc_home": "Jump Ball Mahinmi vs. Whiteside: Tip to Allen",
    "desc_away": "nan"
  },
  "primary_info": {
    "team": "home",
    "player_id": 101133.0,
    "team_id": 1610612754.0
  },
  "secondary_info": {
    "team": "away",
    "player_id": 202355.0,
    "team_id": 1610612748.0
  },
  "visitor": {
    "name": "Miami Heat",
    "teamid": 1610612748,
    "abbreviation": "MIA",
    "players": [
      {
        "lastname": "Andersen",
        "firstname": "Chris",
        "playerid": 2365,
        "jersey": "11",
        "position": "F-C"
      },
      {
        "lastname": "Stoudemire",
        "firstname": "Amar'e",
        "playerid": 2405,
        "jersey": "5",
        "position": "F-C"
      },
      {
        "lastname": "Bosh",
        "firstnam




## Data Organization and Saving

In [None]:
def organize_data_by_player(shot_tracking_data):
    """
    Organize tracking data by player ID
    
    Returns:
    - dictionary with player_id as key and list of tracking DataFrames
    """
    player_tracking = {}
    
    for shot in shot_tracking_data:
        tracking_df = shot['tracking']
        
        # Extract all player columns
        player_cols = [col for col in tracking_df.columns if 'player_' in col and '_player_id' in col]
        
        for col in player_cols:
            player_idx = col.split('_')[1]
            player_ids = tracking_df[col].unique()
            
            for player_id in player_ids:
                if pd.isna(player_id):
                    continue
                    
                if player_id not in player_tracking:
                    player_tracking[player_id] = []
                
                # Get relevant columns for this player
                cols_to_extract = [
                    'quarter', 'game_clock', 'shot_clock', 'timestamp',
                    'ball_x', 'ball_y', 'ball_z',
                    f'player_{player_idx}_x',
                    f'player_{player_idx}_y',
                    'game_id', 'shot_result', 'player', 'team'
                ]
                
                # Filter existing columns
                existing_cols = [col for col in cols_to_extract if col in tracking_df.columns]
                player_frames = tracking_df[existing_cols].copy()
                
                player_tracking[player_id].append(player_frames)
    
    print(f"\nOrganized tracking data for {len(player_tracking)} unique players")
    return player_tracking  

def save_tracking_data(shot_tracking_data, output_path='sportvu_tracking_data.pkl'):
    """
    Save tracking data to pickle file
    """
    import pickle
    
    with open(output_path, 'wb') as f:
        pickle.dump(shot_tracking_data, f)
    
    print(f"\nData saved to: {output_path}")

def create_complete_dataframe(shot_tracking_data):
    """
    Create a single DataFrame with all tracking data
    """
    if not shot_tracking_data:
        print("No tracking data to create DataFrame")
        return pd.DataFrame()
    
    all_tracking = []
    
    for shot in shot_tracking_data:
        all_tracking.append(shot['tracking'])
    
    df_complete = pd.concat(all_tracking, ignore_index=True)
    print(f"\nComplete DataFrame created: {len(df_complete):,} total frames")
    
    return df_complete

# Organize and save data
if shot_tracking_data:
    player_data = organize_data_by_player(shot_tracking_data)
    df_complete = create_complete_dataframe(shot_tracking_data)
    
    # Save data
    save_tracking_data(shot_tracking_data)
    
    if not df_complete.empty:
        df_complete.to_csv('sportvu_tracking_complete.csv', index=False)
        print("Data also saved to CSV format")
else:
    print("\nNo data to organize or save")
    df_complete = pd.DataFrame()            

## Analysis and Visualization

In [None]:
def analyze_specific_shot(shot_tracking_data, shot_index=0):
    """
    Analyze and display information about a specific shot
    """
    if shot_index >= len(shot_tracking_data):
        print("Invalid shot index")
        return None
    
    shot = shot_tracking_data[shot_index]
    info = shot['info']
    tracking = shot['tracking']
    
    print(f"\n{'='*60}")
    print(f"SHOT INFORMATION")
    print(f"{'='*60}")
    print(f"Player: {info['player']}")
    print(f"Team: {info['team']}")
    print(f"Result: {info['shot_result']}")
    print(f"Description: {info['description']}")
    print(f"Game ID: {info['game_id']}")
    print(f"Available frames: {len(tracking)}")
    print(f"{'='*60}")
    
    return tracking


def visualize_shot_positions(tracking_df, frame_num=0):
    """
    Visualize player and ball positions at a specific frame
    """
    if tracking_df.empty or frame_num >= len(tracking_df):
        print("Frame not available")
        return
    
    frame = tracking_df.iloc[frame_num]

    plt.figure(figsize=(14, 8))
    
    # Draw court boundaries
    plt.xlim(0, 94)
    plt.ylim(0, 50)
    
    # Draw court lines
    plt.axhline(y=25, color='gray', linestyle='--', alpha=0.3, linewidth=2)
    plt.axvline(x=47, color='gray', linestyle='--', alpha=0.3, linewidth=2)
    
    # Draw three-point line (simplified)
    circle1 = plt.Circle((5.25, 25), 23.75, color='gray', fill=False, linestyle='--', alpha=0.3)
    circle2 = plt.Circle((94-5.25, 25), 23.75, color='gray', fill=False, linestyle='--', alpha=0.3)
    plt.gca().add_patch(circle1)
    plt.gca().add_patch(circle2)

    # Ball position
    if 'ball_x' in frame and 'ball_y' in frame:
        plt.scatter(frame['ball_x'], frame['ball_y'], 
                    c='orange', s=300, marker='o', 
                    edgecolors='black', linewidths=2,
                    label='Ball', zorder=3)
    
    # Player positions
    player_cols = [col for col in tracking_df.columns if '_x' in col and 'player_' in col]
    
    teams_plotted = set()
    
    for col in player_cols:
        player_idx = col.split('_')[1]
        x_col = f'player_{player_idx}_x'
        y_col = f'player_{player_idx}_y'
        team_col = f'player_{player_idx}_team_id'

        if x_col in frame and y_col in frame and not pd.isna(frame[x_col]):
            team_id = frame.get(team_col, 0)
            color = 'blue' if team_id == -1 else 'red'
            
            label = None
            if team_id not in teams_plotted:
                label = f'Team {team_id}'
                teams_plotted.add(team_id)
            
            plt.scatter(frame[x_col], frame[y_col],
                       c=color, s=200, alpha=0.7,
                       edgecolors='black', linewidths=1.5,
                       label=label, zorder=2)

    
    plt.title(f'Player Positions at Shot Moment\nGame Clock: {frame["game_clock"]:.1f}s | Shot Clock: {frame.get("shot_clock", "N/A")}', 
              fontsize=14, fontweight='bold')
    plt.xlabel('Court Length (feet)', fontsize=12)
    plt.ylabel('Court Width (feet)', fontsize=12)
    plt.legend(loc='upper right')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()         

def tracking_statistics(df_complete):
    """
    Calculate general tracking statistics
    """
    if df_complete.empty:
        print("No data available for statistics")
        return
    
    print(f"\n{'='*60}")
    print(f"GENERAL TRACKING STATISTICS")
    print(f"{'='*60}")
    print(f"Total frames: {len(df_complete):,}")
    print(f"Unique games: {df_complete['game_id'].nunique()}")

    # Count unique shots
    if 'event_num' in df_complete.columns:
        unique_shots = df_complete.groupby(['game_id', 'event_num']).ngroups
        print(f"Total shots: {unique_shots}")
    
    # Shot results
    if 'shot_result' in df_complete.columns:
        made_shots = (df_complete['shot_result'] == 'made').sum()
        missed_shots = (df_complete['shot_result'] == 'missed').sum()
        print(f"Made shots: {made_shots:,} frames")
        print(f"Missed shots: {missed_shots:,} frames")
    
    print(f"{'='*60}")

# Execute analysis examples
if shot_tracking_data and len(shot_tracking_data) > 0:
    # Analyze first shot
    tracking_first_shot = analyze_specific_shot(shot_tracking_data, 0)
    
    # Visualize positions
    if tracking_first_shot is not None and len(tracking_first_shot) > 0:
        visualize_shot_positions(tracking_first_shot, frame_num=0)
    
    # General statistics
    if not df_complete.empty:
        tracking_statistics(df_complete)
else:
    print("\nNo shot data available for analysis")

## Advanced Shot Analysis

In [None]:
def extract_shot_moment_frames(tracking_df, window_seconds=2):
    """
    Extract frames during the critical moment of the shot
    (last N seconds before release)
    """
    tracking_df = tracking_df.sort_values('game_clock', ascending=False).copy()
    
    # Take the last frames (shot moment) - assuming 25 fps
    shot_frames = tracking_df.head(int(25 * window_seconds))
    
    return shot_frames

def calculate_player_velocities(tracking_df):
    """
    Calculate velocity for each player frame by frame
    """
    tracking_df = tracking_df.sort_values('game_clock', ascending=False).copy()
    
    player_cols = [col for col in tracking_df.columns if '_x' in col and 'player_' in col]
    
    for col in player_cols:
        player_idx = col.split('_')[1]
        x_col = f'player_{player_idx}_x'
        y_col = f'player_{player_idx}_y'

        if x_col in tracking_df.columns and y_col in tracking_df.columns:
            # Calculate distance between consecutive frames
            dx = tracking_df[x_col].diff()
            dy = tracking_df[y_col].diff()
            velocity = np.sqrt(dx**2 + dy**2) * 25  # 25 fps = velocity in feet/second
            
            tracking_df[f'player_{player_idx}_velocity'] = velocity
    
    return tracking_df

def find_shooter_and_defenders(tracking_df, shot_info):
    """
    Identify the shooter and closest defenders
    """
    if tracking_df.empty:
        return []
    
    # Frame at shot moment (first frame)
    shot_frame = tracking_df.iloc[0]
    
    player_cols = [col for col in tracking_df.columns if '_x' in col and 'player_' in col]
    
    distances = {}
    
    ball_x = shot_frame.get('ball_x', 0)
    ball_y = shot_frame.get('ball_y', 0)
    for col in player_cols:
        player_idx = col.split('_')[1]
        x_col = f'player_{player_idx}_x'
        y_col = f'player_{player_idx}_y'
        team_col = f'player_{player_idx}_team_id'
        id_col = f'player_{player_idx}_player_id'
        
        if x_col in shot_frame and y_col in shot_frame:
            if pd.isna(shot_frame[x_col]) or pd.isna(shot_frame[y_col]):
                continue
                
            dist = np.sqrt((shot_frame[x_col] - ball_x)**2 + 
                          (shot_frame[y_col] - ball_y)**2)
            
            distances[player_idx] = {
                'distance': dist,
                'x': shot_frame[x_col],
                'y': shot_frame[y_col],
                'team_id': shot_frame.get(team_col, 'Unknown'),
                'player_id': shot_frame.get(id_col, 'Unknown')
            }
    
    # Sort by distance
    sorted_distances = sorted(distances.items(), key=lambda x: x[1]['distance'])
    
    return sorted_distances

def shot_difficulty_analysis(tracking_df):
    """
    Analyze shot difficulty based on defender proximity
    """
    if tracking_df.empty:
        return None
    
    shot_frame = tracking_df.iloc[0]
    
    ball_x = shot_frame.get('ball_x', 0)
    ball_y = shot_frame.get('ball_y', 0)
    
    player_cols = [col for col in tracking_df.columns if '_x' in col and 'player_' in col]
    
    defender_distances = []
    
    for col in player_cols:
        player_idx = col.split('_')[1]
        x_col = f'player_{player_idx}_x'
        y_col = f'player_{player_idx}_y'

        if x_col in shot_frame and y_col in shot_frame:
            if pd.isna(shot_frame[x_col]):
                continue
                
            dist = np.sqrt((shot_frame[x_col] - ball_x)**2 + 
                          (shot_frame[y_col] - ball_y)**2)
            defender_distances.append(dist)
    
    defender_distances.sort()
    
    # Closest defender (excluding shooter who should be at distance 0)
    closest_defender = defender_distances[1] if len(defender_distances) > 1 else None
    
    return {
        'closest_defender_distance': closest_defender,
        'num_defenders_within_5ft': sum(1 for d in defender_distances[1:] if d < 5),
        'num_defenders_within_10ft': sum(1 for d in defender_distances[1:] if d < 10),
    }


# Example usage of advanced functions
if shot_tracking_data and len(shot_tracking_data) > 0:
    print("\n" + "="*60)
    print("ADVANCED ANALYSIS OF FIRST SHOT")
    print("="*60)
    
    first_shot = shot_tracking_data[0]
    tracking = first_shot['tracking']
    
    # Calculate velocities
    tracking_with_velocity = calculate_player_velocities(tracking)
    
    # Find shooter and defenders
    distances = find_shooter_and_defenders(tracking, first_shot['info'])
    
    print("\nPlayers ordered by distance from ball:")
    for idx, (player_idx, info) in enumerate(distances[:5]):
        print(f"{idx+1}. Player {player_idx} (ID: {info['player_id']}): {info['distance']:.2f} feet from ball")
     
    # Shot difficulty analysis
    difficulty = shot_difficulty_analysis(tracking)
    if difficulty:
        print("\nShot Difficulty Metrics:")
        print(f"Closest defender: {difficulty['closest_defender_distance']:.2f} feet")
        print(f"Defenders within 5 feet: {difficulty['num_defenders_within_5ft']}")
        print(f"Defenders within 10 feet: {difficulty['num_defenders_within_10ft']}")

print("\n" + "="*60)
print("CODE EXECUTION COMPLETED!")
print("="*60)   