In [55]:
# Request game data from text and record url, store as json files, and process stats into csv file

from enum import IntEnum
from datetime import datetime, timedelta
from typing import List
import re # Needed for regular expression search
import os
import requests
import time
import json
import math
import random
import pandas as pd


# --- Configuration ---
JSON_FOLDER = 'pitch_raw'
CSV_FOLDER = 'pitch_processed'
JSON_SUFFIX = 'raw'
CSV_SUFFIX = 'processed'

RECORD_URL = "https://api-gw.sports.naver.com/schedule/games/{}/record"
RELAY_URL = "https://api-gw.sports.naver.com/schedule/games/{}/relay?inning={}"

HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
    'Accept-Language': 'en-US,en;q=0.9',
}

class TextType(IntEnum):
    INNING_START = 0 # including start of half inning
    PITCH = 1
    SUBSTITUTION = 2
    TIMEOUT = 7 # Mound visit, VAR, pitcher leave mound, 우취
    PA_START = 8
    PA_RESULT_SELF = 13 # BB, HbP, Double play self out
    PA_RESULT_RUNNER = 14 # BB, Steal, Double play runner out, runner base run
    PA_RESULT_RBI_SELF = 23 # Hit, BB, 
    PA_RESULT_RBI_RUNNER = 24 # Hit, BB
    INNING_END = 99
# ---------------------
    
def extract_team_codes(game_id):
    """
    Extracts the 2-character away and home team codes from the game_id string.
    Example: '20250930LTHH02025' -> Away: 'LT', Home: 'HH'
    """
    if len(game_id) < 12:
        return 'N/A', 'N/A' # Handle unexpected format
        
    away_code = game_id[8:10]
    home_code = game_id[10:12]
    
    return away_code, home_code
        
def _make_safe_request(url, max_retries=5):
    """
    Performs a request with anti-blocking measures: random delay and exponential backoff.
    Returns the Response object on success, or None on failure.
    """
    # Start with a conservative wait time
    wait_time = 2  
    
    for attempt in range(max_retries):
        try:
            # 1. Randomized Delay (The critical anti-blocking measure)
            delay = random.uniform(2.5, 9.5)
            # print(f"Delaying for {delay:.2f} seconds...") # Optional: Debug print
            time.sleep(delay)

            # 2. Make the request with standard headers
            response = requests.get(url, headers=HEADERS, timeout=30)
            
            # 3. Handle success
            if response.status_code == 200:
                return response
            
            # 4. Handle "Too Many Requests" (429) using exponential backoff
            elif response.status_code == 429:
                print(f"[{url}] Received 429 on attempt {attempt + 1}. Waiting {wait_time}s and retrying.")
                time.sleep(wait_time)
                wait_time *= 2  # Double the wait time (5, 10, 20, ...)
            
            # 5. Handle other HTTP errors (404, 500, etc.)
            else:
                response.raise_for_status() # Raises an exception for 4xx/5xx status codes
                
        except requests.exceptions.RequestException as e:
            print(f"[{url}] Request failed on attempt {attempt + 1}: {e}")
            if attempt < max_retries - 1:
                # Wait before retrying on general network errors
                time.sleep(wait_time)
            
        except Exception as e:
             # Handle unexpected exceptions
             print(f"[{url}] Unexpected error: {e}")
             break

    print(f"[{url}] Failed to retrieve data after {max_retries} attempts.")
    return None

def _get_or_fetch_json(file_tag, url_template, *url_args):
    """
    Checks for a local JSON file based on the file_tag. 
    If not found or corrupted, fetches the data from the constructed URL.
    
    Args:
        file_tag (str): A unique identifier for the file (e.g., 'gameId_record' or 'gameId_inning').
        url_template (str): The format string for the API URL (e.g., RECORD_URL, RELAY_URL).
        *url_args: Arguments to format the URL (e.g., game_id, inning_counter).

    Returns:
        dict: The loaded or fetched JSON data, or None if failed.
    """
    os.makedirs(JSON_FOLDER, exist_ok=True)
    raw_file_path = os.path.join(JSON_FOLDER, f'{file_tag}_{JSON_SUFFIX}.json')
    data = None
    
    # --- 1. CHECK FOR EXISTING FILE ---
    if os.path.exists(raw_file_path):
        try:
            with open(raw_file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
                print(f"Loaded existing JSON from {raw_file_path}")
                return data # Return immediately if successful
        except json.JSONDecodeError as e:
            print(f"Error loading existing JSON file {raw_file_path}: {e}. Proceeding to refetch.")
        except Exception as e:
            print(f"An unexpected error occurred while reading {raw_file_path}: {e}. Proceeding to refetch.")

    # --- 2. FETCH NEW DATA ---
    url = url_template.format(*url_args)

    response = _make_safe_request(url)

    if response and response.status_code == 200:
        try:
            data = response.json()
            
            # Save the Raw JSON
            with open(raw_file_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=4)
            print(f"  -> Successfully fetched and saved raw JSON to {raw_file_path}")
            return data
                
        except json.JSONDecodeError:
            print(f"Error decoding JSON from URL: {url}")
        
    else:
        # The request failed or returned a non-200 status
        print(f"Failed to retrieve data from URL: {url}")
        
    return None # Return None on any fetch or decode failure
    
def _extract_game_data(data):
    """Safely extracts max_inning and PA list from the JSON data."""
    text_relay_data = data.get('result', {}).get('textRelayData', {})

    if text_relay_data is None:
        return 0, None
    
    # max_inning needs to be extracted as an integer
    try:
        max_inning = int(text_relay_data.get('inn', 0))
    except (TypeError, ValueError):
        max_inning = 0
        
    pa_list = text_relay_data.get('textRelays', [])

    # REVERSE THE LIST FOR CHRONOLOGICAL ORDER
    pa_list.reverse() 
    
    # The full data object is needed later to build the pitcher lookup (from inn 1 data)
    return max_inning, pa_list
    
def get_record_json_file(game_id):
    """
    Fetches/loads the game's record data and builds the batter/pitcher lookup tables.
    """
    file_tag = f'{game_id}_record'
    data = _get_or_fetch_json(file_tag, RECORD_URL, game_id)

    batter_record_lookup, pitcher_record_lookup = None, None
    
    if data:
        record_data = data.get('result', {}).get('recordData', {})
        if record_data is not None:
            batter_record_lookup, pitcher_record_lookup = _build_record_lookup(record_data)
    
    return batter_record_lookup, pitcher_record_lookup

def get_json_files(game_id):
    """
    Fetches game data by inning, checks for existing files, and merges PA data.
    Returns: master_json_list (list of all PAs) and pitcher_lookup.
    """
    inning_counter = 1
    max_inning = 1
    master_json_list = []
    pitcher_lookup = None

    while inning_counter <= max_inning:
        print(f"Processing Inning {inning_counter}...")
        file_tag = f'{game_id}_{inning_counter}'
        data = _get_or_fetch_json(file_tag, RELAY_URL, game_id, inning_counter)

        if data is None:
            # If fetch/load failed, stop the loop.
            print(f"Failed to retrieve data for Inning {inning_counter}. Stopping.")
            break
        
        if data:
            current_max_inning, pa_list = _extract_game_data(data)

            if current_max_inning < 4:
                print("Game canceled")
                break
            
            if inning_counter == 1:
                # Update max_inning based on the first inning's data
                max_inning = current_max_inning
                pitcher_lookup = _build_pitcher_lookup(data)
                print(f"Max inning set to: {max_inning}")

            if max_inning == 0:
                print(f"Unable to get a valid max_inning (is 0). Stopping.")
                break
                
            if not pa_list:
                print(f"Inning {inning_counter} contains no Plate Appearances. Stopping.")
                break
            
            # Collect the 'textRelays' list for later merging
            master_json_list.extend(pa_list)
            inning_counter += 1

    print(f"\nFinished data collection. Total PA lists collected: {len(master_json_list)}")
    return master_json_list, pitcher_lookup

def _build_record_lookup(data):
    """
    Returns:
        dict: dictionary mapping player 'pcode' to their record data
    """
    batter_record_lookup = {}
    pitcher_record_lookup = {}
    
    batter_record = data.get('battersBoxscore', {})
    pitcher_record = data.get('pitchersBoxscore', {})
    away_batter_record = batter_record.get('away', [])
    home_batter_record = batter_record.get('home', [])
    away_pitcher_record = pitcher_record.get('away', [])
    home_pitcher_record = pitcher_record.get('home', [])
    
    for p in away_batter_record:
        pcode = p.get('playerCode')
        if pcode:
            batter_record_lookup[pcode] = p
        
    for p in home_batter_record:
        pcode = p.get('playerCode')
        if pcode:
            batter_record_lookup[pcode] = p
        
    for p in away_pitcher_record:
        pcode = p.get('playerCode')
        if pcode:
            pitcher_record_lookup[pcode] = p
        
    for p in home_pitcher_record:
        pcode = p.get('playerCode')
        if pcode:
            pitcher_record_lookup[pcode] = p
        
        return batter_record_lookup, pitcher_record_lookup

def _build_pitcher_lookup(full_inning_1_data):
    """
    Extracts pitcher data from the full Inning 1 game object and creates a lookup dictionary.

    Args:
        full_inning_1_data (dict): The full JSON object from the first inning fetch.

    Returns:
        dict: A dictionary mapping pitcher 'pcode' to their static data (name, stance, etc.).
    """
    pitcher_lookup = {}
    
    try:
        data = full_inning_1_data['result']['textRelayData']
        home_pitcher_list = data['homeLineup']['pitcher']
        away_pitcher_list = data['awayLineup']['pitcher']
        all_pitchers_list = away_pitcher_list + home_pitcher_list
        
        for p in all_pitchers_list:
            pcode = p.get('pcode')
            if pcode:
                # Pre-calculate stance for efficiency
                hit_type = p.get('hitType', 'L')
                p['stance_derived'] = 'R' if hit_type.startswith('우') else 'L'
                pitcher_lookup[pcode] = p
        
        return pitcher_lookup
        
    except (KeyError, TypeError) as e:
        print(f"Error building pitcher lookup from lineup data: {e}")
        return {}

# --- KINEMATICS AND ZONE CLASSIFICATION FUNCTIONS ---

def calculate_plate_height(pitch_data):
    """Calculates the flight time and the corrected vertical position (z_plate)."""
    try:
        y0 = pitch_data['y0']
        vy0 = pitch_data['vy0']
        ay = pitch_data['ay']
        z0 = pitch_data['z0']
        vz0 = pitch_data['vz0']
        az = pitch_data['az']
    except KeyError as e:
        # print(f"Missing required key in pitch data: {e}") # Suppress during bulk processing
        return None

    # Step 1: Calculate Flight Time (t)
    a = 0.5 * ay
    b = vy0
    c = y0
    discriminant = (b**2) - (4 * a * c)
    
    if discriminant < 0 or a == 0:
        return {"error": "Invalid kinematics data."}
    
    # Use the minus branch for the time to the plate (t > 0)
    time_of_flight = (-b - math.sqrt(discriminant)) / (2 * a)
    
    # Step 2: Calculate Vertical Position (z_plate)
    z_plate = z0 + (vz0 * time_of_flight) + (0.5 * az * (time_of_flight**2))

    return {
        "time_of_flight": time_of_flight,
        "z_plate": z_plate
    }

def classify_5x5_zone(crossPlateX, calculated_z, topSz, bottomSz):
    """
    Classifies a pitch into one of 25 zones (11 to 55) based on a 5x5 grid 
    (3x3 zone + 1 block shadow), and returns a flag if it is outside even the shadow.
    """
    
    # 1. Define Standard Baseball Constants
    PLATE_WIDTH_FT = 1.4167  # 17 inches
    HALF_PLATE = PLATE_WIDTH_FT / 2 # 0.70835
    
    # 2. Define Zone Block Sizes
    X_BLOCK = PLATE_WIDTH_FT / 3
    Z_BLOCK = (topSz - bottomSz) / 3
    
    # 3. Define X Boundaries (6 boundaries create 5 zones)
    x_boundaries = [
        -HALF_PLATE - X_BLOCK,   # X1: Far Left Shadow boundary
        -HALF_PLATE,             # X2: Left edge of plate
        -HALF_PLATE + X_BLOCK,   # X3: Left-center boundary
        HALF_PLATE - X_BLOCK,    # X4: Right-center boundary
        HALF_PLATE,              # X5: Right edge of plate
        HALF_PLATE + X_BLOCK     # X6: Far Right Shadow boundary
    ]
    
    # 4. Define Z Boundaries (6 boundaries create 5 zones)
    z_boundaries = [
        bottomSz - Z_BLOCK,      # Z1: Far Low Shadow boundary
        bottomSz,                # Z2: Bottom of Zone
        bottomSz + Z_BLOCK,      # Z3: Low-mid boundary
        topSz - Z_BLOCK,         # Z4: Mid-high boundary
        topSz,                   # Z5: Top of Zone
        topSz + Z_BLOCK          # Z6: Far High Shadow boundary
    ]
    
    # Determine the X-Index -1, (0 to 4), 5
    x_index = -1
    is_out_bound = False
    for i, boundary in enumerate(x_boundaries):
        if crossPlateX < boundary:
            x_index = i
            break
    else:
        x_index = 5

    # Determine the Z-Index -1, (0 to 4), 5
    z_index = -1
    for i, boundary in enumerate(z_boundaries):
        if calculated_z < boundary:
            z_index = i
            break
    else:
        z_index = 5


    # --- Determine Outside Boundary Flag ---
    # The flag is True if the pitch is outside the 5x5 grid (index 0 or 5)
    is_outside_boundary = (x_index == 0) or (x_index == 5) or \
                              (z_index == 0) or (z_index == 5)
    
    # --- Calculate Final Zone ID (clamping index to 1-5) ---
    # Clamp the index between 1 and 5 (index 0 maps to zone 1, index 5 maps to zone 5)
    # This prevents the final Zone ID from being 00 or 66, but still flags the issue.
    # Note: Zone ID 1x is the low row, 5x is the high row.
    final_z_index = max(1, min(z_index, 5))
    final_x_index = max(1, min(x_index, 5))

    # Zone ID (e.g., Row 1 x 10 + Col 1 = 11, Row 5 x 10 + Col 5 = 55)
    zone_id = final_z_index * 10 + final_x_index
    
    return {
        "zone_5x5_id": str(zone_id),
        "is_outside_boundary": is_outside_boundary,
        "raw_x_index": x_index,
        "raw_z_index": z_index
    }

def _count_pa_home_in(text):
    """
    count home in during this pa, not from errors or steals, but from RBI
    """
    home_in = '홈인'
    home_run = '홈런'
    error = '실책'
    steal = '도루'

    if ((home_in in text) | (home_run in text)) & (error not in text) & (steal not in text):
        return 1

    return 0

def process_plate_appearance(pa_data, pitcher_lookup, away_code, home_code, batter_record_lookup, pitcher_record_lookup):
    """
    Processes all events
    within a single Plate Appearance (PA), correctly linking context to pitch data.
    Returns a list of dictionaries, one for each processed pitch (type 1).
    """
    processed_pitches_in_pa = []

    # Create a MAP for Trajectory Data (ptsOptions)
    # Key: pitchId (e.g., "251007_151509")
    # Value: The full trajectory dict
    trajectory_map = {
        pitch.get('pitchId'): pitch
        for pitch in pa_data.get('ptsOptions', [])
    }
    
    # Contextual variables to be updated by Type 8 events
    current_batter_name = 'N/A'
    current_batter_lineup_pos = 'N/A'
    current_batter_id = 'N/A' 
    is_batter_home = pa_data.get('homeOrAway') == "1"
    inn = pa_data.get('inn', 0)

    # Determine static PA team codes
    batter_team_code = home_code if is_batter_home else away_code
    pitcher_team_code = away_code if is_batter_home else home_code

    # Check for pitcher change SUBSTITUTION
    #pitcher_data = pitcher_lookup.get(pitcher_id, {})
    #pitcher_name = pitcher_data.get('name', 'N/A')
    #pitcher_stance = pitcher_data.get('stance_derived', 'L') # Using pre-derived stance

    text_options = pa_data.get('textOptions', [])
    pa_result_long = 'N/A'
    pa_result_base1 = 'N/A'
    pa_result_base2 = 'N/A'
    pa_result_base3 = 'N/A'
    pa_result_runs = 0
    
    # Iterate backward through Text Events (to get results)
    is_first_iteration = True
    for detail in reversed(text_options):
        if is_first_iteration:
            currentGameState = detail.get('currentGameState', {})
            pa_result_base1 = currentGameState.get('base1')
            pa_result_base2 = currentGameState.get('base2')
            pa_result_base3 = currentGameState.get('base3')
            is_first_iteration = False
        
        event_type = detail.get('type')
        match event_type:
            # even if inning ends not because of batter, need to record pa_result_base?
            case TextType.PA_RESULT_RUNNER:
                #pa_result = detail.get('text')
                pa_result = ''
            case TextType.PA_RESULT_RBI_RUNNER:
                text = detail.get('text').split(": ",1)[1]
                pa_result_runs += _count_pa_home_in(text)
            case TextType.PA_RESULT_SELF | TextType.PA_RESULT_RBI_SELF:
                pa_result_long = detail.get('text').split(": ",1)[1]
                pa_result_runs += _count_pa_home_in(pa_result_long)
            case TextType.PITCH:
                break

    before_strike = 'N/A'
    before_ball = 'N/A'
    before_out = 'N/A'
    # Iterate forward through Text Events (textOptions)
    for detail in text_options:
        event_type = detail.get('type')
        match event_type:
            case TextType.PA_START:
                batter_record = detail.get('batterRecord', {})
                if not batter_record:
                    break
                
                current_batter_name = batter_record.get('name', 'N/A')
                current_batter_lineup_pos = batter_record.get('batOrder', 'N/A')
                current_batter_id = batter_record.get('pcode', 'N/A')
            case TextType.PITCH:
                pitch_id = detail.get('ptsPitchId')
                pitch = trajectory_map.get(pitch_id)
    
                # --- A. MERGE DATA & CALCULATIONS ---
                
                pitch_summary = {}                
                pitch_summary['plate_z_ft'] = None
                zone_5x5_id = 0
                is_outside_boundary = False
                
                # This pitch event exists in textOptions but has Trajectory Data (ptsOptions)
                if pitch is None:
                    print(f"No pitchOpt: id {pitch_id}, inn {inn}")
                    pitch_summary['pitchId'] = pitch_id
                    pitch_summary['inn'] = inn
                else:
                    pitch_summary.update(pitch)
                    # Kinematic Calculations
                    calculation_result = calculate_plate_height(pitch)

                    if calculation_result:
                        calculated_z = calculation_result.get('z_plate')
                        zone_results = classify_5x5_zone(
                            pitch.get('crossPlateX', 0.0), calculated_z, 
                            pitch.get('topSz', 3.3), pitch.get('bottomSz', 1.6)
                        )
                        pitch_summary['plate_z_ft'] = round(calculated_z, 4)
                        zone_5x5_id = zone_results['zone_5x5_id']
                        is_outside_boundary = zone_results['is_outside_boundary']
    
                # --- B. ADD CONTEXTUAL & PITCH DATA ---
                
                pitcher_id = currentGameState.get('pitcher')
                pitcher_data = pitcher_lookup.get(pitcher_id, {})
    
                # 1. Game State
                pitch_summary['is_batter_home'] = is_batter_home
                pitch_summary['home_score'] = currentGameState.get('homeScore')
                pitch_summary['away_score'] = currentGameState.get('awayScore')
                pitch_summary['strike'] = before_strike
                pitch_summary['ball'] = before_ball
                pitch_summary['out'] = before_out
                pitch_summary['base1'] = currentGameState.get('base1')
                pitch_summary['base2'] = currentGameState.get('base2')
                pitch_summary['base3'] = currentGameState.get('base3')
    
                # 3. Pitcher Info (From currentGameState)
                pitch_summary['pitcher_id'] = pitcher_id
                hit_type = pitcher_data.get('hitType', 'L') # Check for the '우' (Right) character in 'hitType' X투X타
                pitch_summary['pitcher_stance'] = 'R' if hit_type.startswith('우') else 'L'
                pitch_summary['pitcher_team_code'] = pitcher_team_code
                pitch_summary['pitcher_name'] = pitcher_data.get('name', 'N/A')
            
                # 2. Batter Info (From Type 8 event)
                pitch_summary['batter_id'] = current_batter_id
                pitch_summary['batter_team_code'] = batter_team_code
                pitch_summary['batter_lineup_pos'] = current_batter_lineup_pos
                pitch_summary['batter_name'] = current_batter_name
    
                # 4. Pitch Details
                pitch_summary['is_throwing_stretch'] = (pitch_summary['base1'] != '0') or \
                                                     (pitch_summary['base2'] != '0') or \
                                                     (pitch_summary['base3'] != '0')
                
                pitch_summary['pitch_type'] = detail.get('stuff', 'N/A')
                pitch_summary['pitch_speed_kph'] = detail.get('speed', 'N/A')
                pitch_summary['pitch_result'] = detail.get('pitchResult', 'N/A')
                pitch_summary['is_outside_boundary'] = is_outside_boundary
                pitch_summary['zone_5x5_id'] = zone_5x5_id

                pa_result_short = batter_record_lookup[current_batter_id].get(f'inn{inn}', 'N/A')

                pitch_summary['pa_result_long'] = pa_result_long
                pitch_summary['pa_result_short'] = pa_result_short
                pitch_summary['pa_result_base1'] = pa_result_base1
                pitch_summary['pa_result_base2'] = pa_result_base2
                pitch_summary['pa_result_base3'] = pa_result_base3
                pitch_summary['pa_result_runs'] = pa_result_runs
                
                processed_pitches_in_pa.append(pitch_summary)
            case TextType.PA_RESULT_SELF | TextType.PA_RESULT_RBI_SELF:
                break
            case TextType.INNING_START | TextType.SUBSTITUTION | TextType.TIMEOUT | \
                    TextType.PA_RESULT_RUNNER | TextType.PA_RESULT_RBI_RUNNER | \
                    TextType.INNING_END:
                pass
            case _:
                print(f"Unknown Text Type: {event_type} {detail.get('text')}")
        
        currentGameState = detail.get('currentGameState', {})
        before_strike = currentGameState.get('strike')
        before_ball = currentGameState.get('ball')
        before_out = currentGameState.get('out')
    
    return processed_pitches_in_pa

def main_processing_script(game_id):
    away_code, home_code = extract_team_codes(game_id)
    batter_record_lookup, pitcher_record_lookup = get_record_json_file(game_id)
    master_pa_list, pitcher_lookup = get_json_files(game_id)

    if not master_pa_list:
        print("No plate appearances found. Exiting.")
        return pd.DataFrame()

    all_processed_pitches = []

    # 3. Process all Plate Appearances
    for i, pa_data in enumerate(master_pa_list):
        # print(f"Processing PA {i+1}/{len(master_pa_list)}...")
        pitches_in_pa = process_plate_appearance(pa_data, pitcher_lookup, away_code, home_code, batter_record_lookup, pitcher_record_lookup)
        all_processed_pitches.extend(pitches_in_pa)

    print(f"Total pitches processed: {len(all_processed_pitches)}")

    # 4. Convert to Pandas DataFrame and Clean
    df = pd.DataFrame(all_processed_pitches)

    df.rename(columns={
        'pitchId': 'pitch_id',
        'crossPlateX': 'plate_x_ft',
        'ballcount': 'ball_count',
        'crossPlateY': 'plate_y_ft',
        'topSz': 'strikezone_top',
        'bottomSz': 'strikezone_btm',
        'stance': 'batter_stance',
        'inn': 'inning'
    }, inplace=True)
    
    # 5. Output the Results
    if not df.empty:
        output_filename = f'./{CSV_FOLDER}/{game_id}_{CSV_SUFFIX}.csv'
        df.to_csv(output_filename, index=False, encoding='utf-8')
    
        print(f"\n--- Final Processed DataFrame Saved to {output_filename} ---")
        print(f"No. of Columns: {len(all_processed_pitches[0])}")
        # for key, value in all_processed_pitches[0].items():
        #     print(f"{key}: {value}")
    else:
        print(f"\n--- DF is empty. Skipping output. ---")
    
    return df

def get_lotte_game_ids(start_date_str: str = '2025-03-22', end_date_str: str = '2025-09-30', team_code: str = 'LT') -> List[str]:
    """
    Scrapes Naver Sports KBO schedule pages month-by-month to find unique game IDs 
    for a specified team within a given date range.

    Args:
        team_code (str, optional): The KBO team code to filter for. Defaults to 'LT' (Lotte).

    Returns:
        List[str]: A sorted list of unique KBO game IDs.
    """
    all_game_ids: List[str] = list()
    
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")

    current_date = start_date.replace(day=1)
    
    # 1. Iterate through the required months
    while current_date <= end_date:
        date_str = current_date.strftime("%Y-%m-%d")
        
        schedule_url = f"https://api-gw.sports.naver.com/schedule/calendar?upperCategoryId=kbaseball&categoryIds=kbo&date={date_str}&teamCode=LT"
        file_tag = f'{date_str}_month_lotte_matches'
        data = _get_or_fetch_json(file_tag, schedule_url)

        if data:
            try:
                dates_list = data.get('result').get('dates', [])
                print(len(dates_list))
                for date in dates_list:
                    game_id_list = date.get('gameIds', [])
                    all_game_ids.extend(game_id_list)
                            
            except requests.exceptions.RequestException as e:
                print(f"Error fetching data for {url}: {e}")
                continue
                
            except Exception as e:
                print(f"An unexpected error occurred while parsing {url}: {e}")
                continue
        else:
            print("Response is bad.")
        
        # Move to the next month for iteration
        if current_date.month == 12:
            current_date = current_date.replace(year=current_date.year + 1, month=1, day=1)
        else:
            current_date = current_date.replace(month=current_date.month + 1, day=1)

        # Stop if we've passed the end date
        if current_date.strftime("%Y%m") > end_date.strftime("%Y%m") and current_date.day == 1:
            break

    print(f"\nFound {len(all_game_ids)} unique game IDs for {team_code}.")
    print(all_game_ids)
    return sorted(all_game_ids)

# --- Execution Example ---
if __name__ == '__main__':
    START_DATE = '2025-03-22'
    END_DATE = '2025-09-30'

    all_lotte_ids = get_lotte_game_ids(START_DATE, END_DATE)
    for game_id in all_lotte_ids:
        main_processing_script(game_id)

# --- Execution Block ---
# if __name__ == '__main__':
#     #game_id = '20250930LTHH02025'
#     #game_id = '20250929LTSK02025'
#     game_id = '20250926SSLT02025'
    
#     # Execute the main function
#     final_df = main_processing_script(game_id)

Loaded existing JSON from pitch_raw\2025-03-01_month_lotte_matches_raw.json
31
Loaded existing JSON from pitch_raw\2025-04-01_month_lotte_matches_raw.json
30
Loaded existing JSON from pitch_raw\2025-05-01_month_lotte_matches_raw.json
31
Loaded existing JSON from pitch_raw\2025-06-01_month_lotte_matches_raw.json
30
Loaded existing JSON from pitch_raw\2025-07-01_month_lotte_matches_raw.json
31
Loaded existing JSON from pitch_raw\2025-08-01_month_lotte_matches_raw.json
31
Loaded existing JSON from pitch_raw\2025-09-01_month_lotte_matches_raw.json
30

Found 167 unique game IDs for LT.
['20250308HTLT02025', '20250309HTLT02025', '20250310LGLT02025', '20250311LGLT02025', '20250313HHLT02025', '20250314HHLT02025', '20250315KTLT02025', '20250316KTLT02025', '20250317LTWO02025', '20250318LTWO02025', '20250322LTLG02025', '20250323LTLG02025', '20250325LTSK02025', '20250326LTSK02025', '20250327LTSK02025', '20250328KTLT02025', '20250329KTLT02025', '20250330KTLT02025', '20250401LTHH02025', '20250402LTH

In [104]:
import pandas as pd
import numpy as np

KBO_P_RESULT_MAP = {
    'H': 'Hit',
    'S': 'Swing',
    'T': 'Strike',
    'F': 'Foul',
    'B': 'Ball',
    'V': 'Swing Bunt', #? 헛스윙번트
}

KBO_PA_RESULT_MAP = {
    # --- Walks & HBP (BB / HBP) ---
    '4구': 'BB',    # Walk (Base on Balls)
    '고4': 'BB',     # Intentional Walk (고의사구) - analytically still a BB
    '사구': 'BB',   # Hit By Pitch (HBP) - analytically still a BB

    # --- Strikeout (K) ---
    '삼진': 'K',     # Strikeout
    '스낫': 'NK',      # Strike not out (Dropped Third Strike) - counts as K, but not Out

    # --- Special/Other ---
    '야선': 'FC',    # Fielder's Choice (야수선택)
}
    
# --- Helper Functions for Validation ---

def standardize_pa_result(kbo_shorthand):
    """Converts KBO shorthand to a standard analytical result type."""
    if pd.isna(kbo_shorthand):
        return None
        
    kbo_shorthand = str(kbo_shorthand).strip().upper()

    # 1. Check for perfect matches
    if kbo_shorthand in KBO_PA_RESULT_MAP:
        return KBO_PA_RESULT_MAP[kbo_shorthand]

    # 2. Check for partial matches or specific patterns
    
    # Pattern for Field-Specific Outs
    # These often end in a common type: 땅(GO), 비(FO), 직(LO)
    if kbo_shorthand.endswith('땅'):
        return 'GO'
    if kbo_shorthand.endswith('병'): # 병살
        return 'GO'
    if kbo_shorthand.endswith('비'): # 비행
        return 'FO'
    if kbo_shorthand.endswith('파'): # 파울플라이
        return 'FO'
    if kbo_shorthand.endswith('직'):
        return 'LO'
    if kbo_shorthand.endswith('희'): # 희생
        return 'SAC'
    if kbo_shorthand.endswith('희번'): # 희생 번트
        return 'SAC'
    if kbo_shorthand.endswith('실'): # 실책
        return 'E'
        
    if kbo_shorthand.endswith('안'):
        return '1B'
    if kbo_shorthand.endswith('2'):
        return '2B'
    if kbo_shorthand.endswith('3'):
        return '3B'
    if kbo_shorthand.endswith('홈'):
        return 'HR'
    
    # Fallback for unknown codes (helps identify missing codes in the map)
    print(f"Warning: Unknown KBO shorthand '{kbo_shorthand}' encountered.")
    return 'UNKNOWN'

# missing 이닝, 자책
def calculate_pitcher_stats(df):
    # 1. Group by Pitcher and PA using the standardized column
    # Use the standardized column for filtering duplicates and defining the PA result
    pa_results = df.dropna(subset=['standard_result']).drop_duplicates(
        subset=['pitcher_id', 'batter_id', 'inning', 'batter_team_code'], 
        keep='last' 
    )
    
    # Prepare runs data (using 'pa_result_runs')
    pa_results['Runs_Allowed'] = pd.to_numeric(pa_results['pa_result_runs'], errors='coerce').fillna(0)
    
    # 2. Total Pitch Count (Remains the same)
    pitch_counts = df.groupby('pitcher_id')['pitch_id'].count().reset_index(name='Count')

    # 3. Plate Appearance Results (K, BB, H, AB) using the standardized codes
    pitcher_pa_stats = pa_results.groupby('pitcher_id').agg(
        # HITS (안타)
        H=('standard_result', lambda x: (x.isin(['1B', '2B', '3B', 'HR'])).sum()),
        # HOME RUNS ALLOWED (피홈런)
        HR=('standard_result', lambda x: (x == 'HR').sum()),
        # WALKS (볼넷)
        BB=('standard_result', lambda x: (x == 'BB').sum()),
        # STRIKEOUTS (삼진)
        K=('standard_result', lambda x: (x.isin(['K', 'NK'])).sum()),
        # HIT BY PITCH (사구)
        HBP=('standard_result', lambda x: (x == 'HBP').sum()),
        # SACRIFICE (희생타)
        SF_SAC=('standard_result', lambda x: (x.isin(['SF', 'SAC'])).sum()),
        # OUTS RECORDED
        Out=('standard_result', 
                       lambda x: (~x.isin(['1B', '2B', '3B', 'HR', 'BB', 'HBP', 'E', 'NK'])).sum()),
        # TOTAL PLATE APPEARANCES (타석)
        PA=('standard_result', 'count'),
        # TOTAL RUNS ALLOWED (실점)
        R=('Runs_Allowed', 'sum')
    ).reset_index()

    # 4. Calculate At Bats (타수)
    pitcher_pa_stats['AB'] = pitcher_pa_stats['PA'] - pitcher_pa_stats['BB'] - \
                           pitcher_pa_stats['HBP'] - pitcher_pa_stats['SF_SAC']
                           
    # 5. Merge and Finalize (remaining steps are the same)
    pitcher_report = pitch_counts.merge(pitcher_pa_stats, on='pitcher_id', how='left')

    pitcher_name_map = df.drop_duplicates(subset=['pitcher_id']).set_index('pitcher_id')['pitcher_name'] 
    pitcher_report['Name'] = pitcher_report['pitcher_id'].map(pitcher_name_map)

    pitcher_team_map = df.drop_duplicates(subset=['pitcher_id']).set_index('pitcher_id')['pitcher_team_code'] 
    pitcher_report['Team'] = pitcher_report['pitcher_id'].map(pitcher_team_map)
    
    # 6. Calculate IP (Innings Pitched) in the fractional X.Y format
    pitcher_report['IP_Outs'] = pitcher_report['Out'] // 3
    pitcher_report['IP_Remain'] = pitcher_report['Out'] % 3
    # Format as string X.Y (e.g., 5.1, 5.2, 6.0)
    pitcher_report['IP'] = pitcher_report['IP_Outs'].astype(str) + ',' + pitcher_report['IP_Remain'].astype(str)

    # 7. Final Output
    return pitcher_report[['Team', 'Name', 'IP', 'Count', 'PA', 'AB', 'H', 'HR', 'R', 'BB', 'K']].sort_values(by='Team', ascending=False)

def calculate_batter_stats(df):
    """
    Calculates key batter metrics (PA, R, H, RBI, HR, BB, K) 
    using standardized PA results and runs scored data.
    """
    
    # 1. Ensure standardization has run (if not already done)
    if 'standard_result' not in df.columns:
        df['standard_result'] = df['pa_result_short'].apply(standardize_pa_result)

    # 2. Filter for final PA results (one row per Plate Appearance)
    pa_results = df.dropna(subset=['standard_result']).drop_duplicates(
        subset=['pitcher_id', 'batter_id', 'inning', 'batter_team_code'], 
        keep='last'
    )
    
    # This column must contain the total number of runs that scored *due to this PA*
    pa_results['runs_on_play'] = pd.to_numeric(pa_results['pa_result_runs'], errors='coerce').fillna(0)


    # 3. Aggregate Batter Statistics
    batter_report = pa_results.groupby('batter_id').agg(
        # TOTAL PLATE APPEARANCES (타석)
        PA=('standard_result', 'count'),
        # HITS (안타)
        H=('standard_result', lambda x: (x.isin(['1B', '2B', '3B', 'HR'])).sum()),
        # HOME RUNS (홈런) - subset of Hits
        HR=('standard_result', lambda x: (x == 'HR').sum()),
        # WALKS (볼넷) + HBP - Assuming 'BB' standard result is for both 4구, 고4, and 사구
        BB=('standard_result', lambda x: (x == 'BB').sum()),
        # STRIKEOUTS (삼진)
        K=('standard_result', lambda x: (x == 'K').sum()),
        # SACRIFICE (희생타) - Needed for AB calculation
        SF_SAC=('standard_result', lambda x: (x.isin(['SF', 'SAC'])).sum()),
        # RUNS BATTED IN (타점) - Total runs scored *on this PA* (requires pa_result_runs)
        # We assume runs_on_play includes the batter if they hit a HR, but RBI calculation
        # is typically runs_on_play minus the run scored by the batter if they score on non-HR.
        # For simple validation, we use total runs scored on the play.
        # Note: True RBI logic is complex (e.g., doesn't count if batter gets K/GO/FO on error).
        # We will use a simple proxy for now:
        RBI_PROXY=('runs_on_play', 'sum')
    ).reset_index()

    # 4. Calculate At Bats (타수)
    # AB = PA - BB - HBP - SF - SAC. Since 'BB' includes HBP in your map, we simplify.
    # If HBP needs to be tracked separately for official AB calculation, you must adjust KBO_PA_RESULT_MAP
    # to differentiate between 'BB' (walk) and 'HBP'. For now, we assume your map handles it correctly.
    batter_report['AB'] = batter_report['PA'] - batter_report['BB'] - batter_report['SF_SAC']

    # 5. Get Runs Scored (R)
    # This requires identifying when a runner (including the batter) crossed home plate.
    # The simplest proxy: runs scored is R = sum of all runs scored on PAs by the *player's team* # when the *player* was the batter, where the batter also scores if they hit a HR.
    # The true R stat is complex, so we will skip it for this first pass of validation.
    # If you need R, you'll need a separate column that tracks whether the batter themselves scored (usually if they hit a HR and the bases weren't loaded).
    # For now, we'll keep the focus on *offensive production*: H, HR, RBI.
    
    # 6. Map Player Info (using the corrected mapping logic)
    batter_name_map = df.drop_duplicates(subset=['batter_id']).set_index('batter_id')['batter_name']
    batter_report['Name'] = batter_report['batter_id'].map(batter_name_map)
    
    batter_team_map = df.drop_duplicates(subset=['batter_id']).set_index('batter_id')['batter_team_code']
    batter_report['Team'] = batter_report['batter_id'].map(batter_team_map)

    # 7. Final Output Cleanup and Rename
    batter_report = batter_report.rename(columns={'RBI_PROXY': 'RBI', 'K': 'SO'})

    return batter_report[['Team', 'Name', 'PA', 'AB', 'H', 'HR', 'RBI', 'BB', 'SO']].sort_values(by='Team', ascending=False)

game_id = '20250629KTLT02025'
df = pd.read_csv(f"pitch_processed/{game_id}_processed.csv")
df['standard_result'] = df['pa_result_short'].apply(standardize_pa_result)

pitcher_validation_df = calculate_pitcher_stats(df)
batter_validation_df = calculate_batter_stats(df)

print (f"df length: {len(df)}")
print(f"pitch total: {pitcher_validation_df['Count'].sum()}")

print("\n" + "="*50)
print("Pitcher Validation Report")
print("="*50)
print(pitcher_validation_df)

print("\n" + "="*50)
print("Batter Validation Report")
print("="*50)
print(batter_validation_df)

df length: 331
pitch total: 331

Pitcher Validation Report
  Team  Name   IP  Count  PA  AB  H  HR  R  BB  K
0   LT   최준용  1,2     20   6   6  0   0  0   0  2
1   LT   정현수  0,2      6   2   2  0   0  0   0  0
3   LT   박세웅  5,0     90  24  22  6   2  3   2  3
6   LT   정철원  0,0     11   3   0  0   0  0   3  0
9   LT   김상수  1,1     27   6   5  1   0  2   1  1
2   KT   김재원  1,0     16   5   4  1   0  1   1  1
4   KT   김민수  0,1      9   3   3  2   0  2   0  1
5   KT    주권  1,0     22   6   5  2   0  1   1  0
7   KT  쿠에바스  4,0     93  23  19  6   0  4   4  2
8   KT   임준형  1,1     37   7   6  2   0  1   1  2

Batter Validation Report
   Team  Name  PA  AB  H  HR  RBI  BB  SO
2    LT   나승엽   3   3  1   0    0   0   1
4    LT   김동혁   5   2  1   0    2   3   1
5    LT  레이예스   5   4  1   0    2   1   0
6    LT   한승현   1   1  0   0    1   0   0
7    LT   박찬형   4   4  2   0    0   0   2
8    LT   유강남   2   2  0   0    0   0   1
9    LT   박승욱   4   3  0   0    0   1   0
20   LT   전준우   5   5  3   0 

In [62]:
# Plot pitch

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.cm as cm # For color maps

def plot_pitch_trajectories(df, game_id):
    """
    Generates a scatter plot of pitch locations (X vs Z) for all pitchers.
    
    Args:
        df (pd.DataFrame): The pitch-by-pitch DataFrame.
    """
    plt.rcParams['font.family'] = ['Noto Serif KR']  # Set the font family

    x_col = 'plate_x_ft'
    z_col = 'plate_z_ft'
    pitch_type = 'pitch_type'
    
    # 1. Get all unique pitchers, their teams, and sort by team code
    pitcher_info = df[['pitcher_id', 'pitcher_team_code', 'pitcher_name']].drop_duplicates(subset=['pitcher_id'])
    pitcher_info = pitcher_info.sort_values(by=['pitcher_team_code', 'pitcher_id'])
    
    sample_pitchers = pitcher_info['pitcher_id'].tolist()
    num_pitchers = len(sample_pitchers)
    
    if num_pitchers == 0: 
        print("Error: No unique pitchers found in the data.")
        return
        
    # 2. Set up the plotting grid (dynamic sizing)
    # Use max 3 columns for readability, adjusting rows accordingly
    cols = min(num_pitchers, 3) 
    rows = int(np.ceil(num_pitchers / cols))
    
    # Increase figsize dynamically
    fig, axes = plt.subplots(nrows=rows, ncols=cols, figsize=(5 * cols, 5 * rows), sharex=True, sharey=True)
    axes = axes.flatten() if num_pitchers > 1 else [axes]
    
    # Handle cases where the number of subplots exceeds the number of pitchers
    for i in range(num_pitchers, len(axes)):
        fig.delaxes(axes[i])
        
    # Get average strike zone dimensions
    sz_top_avg = df['strikezone_top'].mean()
    sz_bot_avg = df['strikezone_btm'].mean()
    sz_width = 1.416 # Standard 17 inches in feet (17/12)
    
    # Prepare colors for pitch types
    unique_pitch_types = df[pitch_type].unique()
    unique_pitch_types = [pt for pt in unique_pitch_types if pd.notna(pt)]
    
    # Use a colormap to get distinct colors
    colors = cm.get_cmap('tab10', len(unique_pitch_types))
    pitch_type_color_map = {pt: colors(i) for i, pt in enumerate(unique_pitch_types)}
    
    # 3. Iterate and plot each pitcher
    for i, pitcher_id in enumerate(sample_pitchers):
        ax = axes[i]
        
        # Filter data for the current pitcher
        pitcher_data = df[df['pitcher_id'] == pitcher_id].copy()
        
        info = pitcher_info[pitcher_info['pitcher_id'] == pitcher_id].iloc[0]
        pitcher_name = info['pitcher_name']
        pitcher_team = info['pitcher_team_code']
        
        # Scatter Plot - now colored by pitch type
        for pt in unique_pitch_types:
            subset = pitcher_data[pitcher_data[pitch_type] == pt]
            ax.scatter(subset[x_col], subset[z_col], alpha=0.4, s=15, 
                       color=pitch_type_color_map.get(pt, 'gray'), label=pt) # Use 'gray' for unknown
            
        # --- Draw Strike Zone (The primary validation step) ---
        # Draw the standard rectangle
        rect = plt.Rectangle(
            (-sz_width / 2, sz_bot_avg), # Bottom-left corner
            sz_width,                    # Width
            sz_top_avg - sz_bot_avg,     # Height
            edgecolor='black', 
            facecolor='none', 
            lw=2, 
            zorder=5,
            label='Strike Zone (Avg)'
        )
        ax.add_patch(rect)
        
        # --- Draw 5x5 Grid Lines (Validation for zone_5x5_id) ---
        #x_third = sz_width / 3
        z_third = (sz_top_avg - sz_bot_avg) / 3

        # Vertical lines (separating 9 zones in X)
        ax.axvline(-sz_width / 6, color='gray', linestyle=':', lw=1, zorder=4)
        ax.axvline(sz_width / 6, color='gray', linestyle=':', lw=1, zorder=4)
        
        # Horizontal lines (separating 9 zones in Z)
        ax.axhline(sz_bot_avg + z_third, color='gray', linestyle=':', lw=1, zorder=4)
        ax.axhline(sz_top_avg - z_third, color='gray', linestyle=':', lw=1, zorder=4)
        
        # --- Set Plot Aesthetics ---
        ax.set_title(f"{pitcher_name} ({pitcher_team})", fontsize=10)
        ax.set_xlabel(f'{x_col} (ft)')
        ax.set_ylabel(f'{z_col} (ft)')
        ax.legend(loc='lower left', fontsize=8)
        
        # Set uniform limits for a clear view
        ax.set_xlim(-2.5, 2.5) 
        ax.set_ylim(0.0, 5.0) 

    fig.suptitle('Pitch Trajectories (X vs Z at Plate)', fontsize=16)
    plt.tight_layout(rect=[0, 0.03, 1, 0.97]) # Adjust layout for suptitle
    plt.savefig(f'{game_id}_pitcher_trajectories.png')
    plt.close()
    
    print("Plot 'pitcher_trajectories.png' generated successfully.")

game_id = '20250930LTHH02025'
#game_id = '20250926SSLT02025'
df = pd.read_csv(f"pitch_processed/{game_id}_processed.csv")
plot_pitch_trajectories(df, game_id)

  colors = cm.get_cmap('tab10', len(unique_pitch_types))


Plot 'pitcher_trajectories.png' generated successfully.


In [113]:
# Check batters' O_Swing_% (non-strike swings) and Z_Swing_% (strike swings)

import pandas as pd
import numpy as np

KBO_P_RESULT_MAP = {
    'H': 'Hit',
    'S': 'Swing',
    'T': 'Strike',
    'F': 'Foul',
    'B': 'Ball',
    'V': 'Swing Bunt', #? 헛스윙번트
}

# --- KBO Pitch Result Map Interpretation ---
# Assuming 'T' is a 'Called Strike' (a TAKE) and 'B' is a 'Called Ball' (a TAKE).
# 'H', 'S', 'F', 'V' all represent some form of contact or swing (a SWING).
KBO_SWING_CODES = {'H', 'S', 'F', 'V'}
KBO_P_RESULT_KNOWN = {'H', 'S', 'F', 'V', 'T', 'B'}

def _is_pitch_a_strike(row, horizontal_buffer=0.0):
    """
    Determines if a pitch is a physical strike based on the ABS-defined zone 
    (personalized to the batter's height).
    
    Args:
        row (pd.Series): A row from the pitch-by-pitch DataFrame.
        horizontal_buffer (float): Optional buffer (in ft) for the plate width.
    
    Returns:
        bool: True if the pitch is physically within the strike zone, False otherwise.
    """
    plate_x = row['plate_x_ft']
    plate_z = row['plate_z_ft']
    sz_top = row['strikezone_top']
    sz_btm = row['strikezone_btm']
    
    # KBO standard plate width is 17 inches (1.416 ft). Half is 0.708 ft.
    half_plate = 0.708
    
    # 1. Vertical check (The core of the ABS zone)
    is_vertical_strike = (plate_z <= sz_top) and (plate_z >= sz_btm)
    
    # 2. Horizontal check (Standard 17-inch width + optional buffer)
    is_horizontal_strike = (plate_x >= (-half_plate - horizontal_buffer)) and \
                           (plate_x <= (half_plate + horizontal_buffer))
                           
    return is_vertical_strike and is_horizontal_strike


def _infer_swing_decision(pitch_result: str) -> bool:
    """
    Infers if a batter swung at a pitch based on the pitch result code 
    using the defined KBO_SWING_CODES set.
    """
    if pitch_result not in KBO_P_RESULT_KNOWN:
        print(f"UNKNOWN P_RESULT: {pitch_result}")
    
    return pitch_result in KBO_SWING_CODES


def calculate_batter_discipline(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculates O-Swing % (Chase Rate) and Z-Swing % (In-Zone Swing Rate) per batter 
    based on the objective ABS strike zone for a single game.
    
    Args:
        df (pd.DataFrame): Pitch-by-pitch data for one game.

    Returns:
        pd.DataFrame: Batter plate discipline statistics.
    """
    # Drop rows missing crucial location or zone data
    df_clean = df.dropna(subset=['plate_x_ft', 'plate_z_ft', 'strikezone_top', 'strikezone_btm', 'pitch_result']).copy()
    
    if df_clean.empty:
        print("Warning: Insufficient data after cleaning for discipline calculation.")
        return pd.DataFrame()
        
    # 1. Define Zone for Every Pitch
    df_clean['is_ABS_Strike'] = df_clean.apply(_is_pitch_a_strike, axis=1)
    
    # 2. Infer Swing for Every Pitch
    df_clean['is_swing'] = df_clean['pitch_result'].apply(_infer_swing_decision)
    
    # 3. Categorize Pitches
    df_clean['is_Z_Pitch'] = df_clean['is_ABS_Strike']   # Inside Zone (Z)
    df_clean['is_O_Pitch'] = ~df_clean['is_ABS_Strike']  # Outside Zone (O)
    
    # 4. Group and Aggregate
    discipline_stats = df_clean.groupby(['batter_id', 'batter_team_code', 'batter_name']).agg(
        # Swings
        O_Swings=('is_swing', lambda x: x[df_clean.loc[x.index, 'is_O_Pitch']].sum()),
        Z_Swings=('is_swing', lambda x: x[df_clean.loc[x.index, 'is_Z_Pitch']].sum()),
        
        # Total Pitches
        Total_O_Pitches=('is_O_Pitch', 'sum'),
        Total_Z_Pitches=('is_Z_Pitch', 'sum'),
        
        Total_Pitches=('pitch_id', 'count')
    ).reset_index()
    
    # 5. Calculate Percentages
    
    # O-Swing % (Chase Rate) = O_Swings / Total_O_Pitches
    discipline_stats['O_Swing_%'] = (
        (discipline_stats['O_Swings'] / discipline_stats['Total_O_Pitches']) * 100
    ).round(2).fillna(0)
    
    # Z-Swing % = Z_Swings / Total_Z_Pitches
    discipline_stats['Z_Swing_%'] = (
        (discipline_stats['Z_Swings'] / discipline_stats['Total_Z_Pitches']) * 100
    ).round(2).fillna(0)
    
    # 6. Final Output
    return discipline_stats[['batter_id', 'batter_team_code', 'batter_name', 'Total_Pitches', 
                             'O_Swings', 'Total_O_Pitches', 'O_Swing_%', 
                             'Z_Swings', 'Total_Z_Pitches', 'Z_Swing_%']].sort_values(by='batter_team_code')

game_id = '20250322LTLG02025'
df = pd.read_csv(f"pitch_processed/{game_id}_processed.csv")
calculate_batter_discipline(df)

Unnamed: 0,batter_id,batter_team_code,batter_name,Total_Pitches,O_Swings,Total_O_Pitches,O_Swing_%,Z_Swings,Total_Z_Pitches,Z_Swing_%
12,65207,LG,신민재,2,1,2,50.0,0,0,0.0
21,76290,LG,김현수,10,3,5,60.0,3,5,60.0
19,69102,LG,문보경,21,4,11,36.36,5,10,50.0
18,69100,LG,구본혁,15,2,11,18.18,2,4,50.0
16,68119,LG,문성주,3,0,1,0.0,1,2,50.0
15,68110,LG,송찬의,12,1,4,25.0,5,8,62.5
14,66108,LG,홍창기,22,1,10,10.0,8,12,66.67
13,65905,LG,최승민,4,2,2,100.0,1,2,50.0
23,79109,LG,오지환,18,5,10,50.0,5,8,62.5
10,62415,LG,박해민,22,4,14,28.57,4,8,50.0


In [72]:
import pandas as pd
import altair as alt
import glob
import os

# --- Configuration and Helper Functions ---

# Codes that imply a swing or contact occurred (used for Plot_Result categorization)
KBO_SWING_CODES = {'H', 'S', 'F', 'V'} 

WIDTH = 860
HEIGHT = 760

BALL_IN_PLAY = 'Ball In Play'
FOUL = 'Foul'
MISS = 'Swing/Bunt Miss'
STRIKE = 'Take (Called Strike)'
BALL = 'Take (Ball)'

RESULT_DOMAIN = [BALL_IN_PLAY, FOUL, MISS, STRIKE, BALL]
RESULT_RANGE = ['steelblue', 'mediumpurple', 'palevioletred', 'orange', 'mediumseagreen']

PITCH_DOMAIN = ['직구', '투심', '슬라이더', '커브', '포크', '체인지업', '커터']
PITCH_RANGE = ['circle', 'diamond', 'triangle-left', 'triangle-down', 'cross', 'square', 'triangle-right']

LOTTE_CODE = 'LT'

def map_pitch_to_plot_category(pitch_result: str) -> str:
    """Maps pitch result codes to descriptive categories for plotting."""
    if pd.isna(pitch_result):
        return 'Unknown'
    pitch_result = str(pitch_result).upper()
    if pitch_result in ['H']:
        return BALL_IN_PLAY
    elif pitch_result in ['F']:
        return FOUL
    elif pitch_result in ['S', 'V']:
        return MISS
    elif pitch_result in ['T']:
        return STRIKE
    elif pitch_result in ['B']:
        return BALL
    else:
        return 'Other'

# --- MODIFIED FUNCTION: LOTTE OFFENSE (vs. Single Opponent) ---

def generate_lotte_offense_chart(df: pd.DataFrame, opponent_code: str, output_filename: str) -> None:
    
    # 1. Hard Filter Data: Lotte Batters vs. Specific Opponent Pitchers
    df_plot = df[(df['batter_team_code'] == LOTTE_CODE) & (df['pitcher_team_code'] == opponent_code)].copy()
    
    if df_plot.empty:
        print(f"No Lotte Batter data found vs {opponent_code}. Skipping chart generation for {output_filename}")
        return

    df_plot.dropna(subset=['plate_x_ft', 'plate_z_ft', 'pitch_type', 'pitch_result', 'batter_name', 'pitcher_name', 'strikezone_top', 'strikezone_btm'], inplace=True)
    df_plot['plot_result'] = df_plot['pitch_result'].apply(map_pitch_to_plot_category)
    
    sz_top_avg = df_plot['strikezone_top'].mean()
    sz_btm_avg = df_plot['strikezone_btm'].mean()
    half_plate = 0.708

    # 2. Setup Lotte Batters Dropdown
    lotte_batters = df_plot['batter_name'].unique().tolist()
    batter_selection = alt.selection_point(
        fields=['batter_name'], 
        empty='all', 
        name='BatterSelector',
        bind=alt.binding_select(options=[None] + lotte_batters, name=f'{LOTTE_CODE} Batter:')
    )
    
    # 3. Setup Opponent Pitchers Dropdown (SIMPLE FILTER)
    opponent_pitchers = df_plot['pitcher_name'].unique().tolist()
    pitcher_selection_opp = alt.selection_point(
        fields=['pitcher_name'], 
        empty='all', 
        name='OpponentPitcherSelector',
        bind=alt.binding_select(options=[None] + opponent_pitchers, name=f'{opponent_code} Pitcher:')
    )

    # 4. Chart Generation
    base = alt.Chart(df_plot).add_params(batter_selection, pitcher_selection_opp)
    
    # Apply both filters
    base = base.transform_filter(batter_selection).transform_filter(pitcher_selection_opp)
    
    # C. Strike Zone Reference (Layer 1)
    strike_zone = alt.Chart(pd.DataFrame({
        'x': [-half_plate], 'x2': [half_plate], 'y': [sz_btm_avg], 'y2': [sz_top_avg]
    })).mark_rect(stroke='black', strokeWidth=2, fillOpacity=0.0, color='gray' 
    ).encode(x='x', x2='x2', y='y', y2='y2')

    # D. Pitch Location Scatter Plot (Layer 2)
    pitch_chart = base.mark_point(filled=True, size=70).encode(
        x=alt.X('plate_x_ft', 
                title='Plate X (ft, Positive=RHH Outer)', 
                axis=alt.Axis(values=[-2.5, -2, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, 2, 2.5]),
                scale=alt.Scale(domain=[-2.5, 2.5])),
        y=alt.Y('plate_z_ft', 
                title='Plate Z (ft, Above Ground)', 
                axis=alt.Axis(values=[0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5]),
                scale=alt.Scale(domain=[0, 5.0])),
        color=alt.Color('plot_result', title='Action/Result').scale(domain=RESULT_DOMAIN, range=RESULT_RANGE),
        shape=alt.Shape('pitch_type', title='Pitch Type').scale(domain=PITCH_DOMAIN, range=PITCH_RANGE),
        tooltip=['batter_name', 'pitcher_name', 'pitch_type', 'pitch_speed_kph', 'plot_result', 'final_pitch_result', 'strike', 'ball', 'out', alt.Tooltip('plate_x_ft', format='.2f'), alt.Tooltip('plate_z_ft', format='.2f')]
    ).properties(
        title=f'{LOTTE_CODE} Batters Plate Discipline vs. {opponent_code} Pitchers', 
        width=WIDTH, 
        height=HEIGHT
    ).interactive()

    # Combine and save
    combined_chart = (strike_zone + pitch_chart)
    combined_chart = combined_chart.resolve_scale(
        x='shared',
        y='shared'
    ).interactive()
    combined_chart = combined_chart.properties(
        autosize=alt.AutoSizeParams(
            type='fit', 
            contains='padding' 
        )
    )
    combined_chart.save(output_filename)
    print(f"Interactive Altair chart saved to '{output_filename}'.")


# --- MODIFIED FUNCTION: LOTTE DEFENSE (vs. Single Opponent) ---

def generate_lotte_defense_chart(df: pd.DataFrame, opponent_code: str, output_filename: str) -> None:
    
    # 1. Hard Filter Data: Lotte Pitchers vs. Specific Opponent Batters
    df_plot = df[(df['pitcher_team_code'] == LOTTE_CODE) & (df['batter_team_code'] == opponent_code)].copy()
    
    if df_plot.empty:
        print(f"No Lotte Pitcher data found vs {opponent_code}. Skipping chart generation for {output_filename}")
        return

    df_plot.dropna(subset=['plate_x_ft', 'plate_z_ft', 'pitch_type', 'pitch_result', 'batter_name', 'pitcher_name', 'strikezone_top', 'strikezone_btm'], inplace=True)
    df_plot['plot_result'] = df_plot['pitch_result'].apply(map_pitch_to_plot_category)
    
    sz_top_avg = df_plot['strikezone_top'].mean()
    sz_btm_avg = df_plot['strikezone_btm'].mean()
    half_plate = 0.708

    # 2. Setup Lotte Pitchers Dropdown
    lotte_pitchers = df_plot['pitcher_name'].unique().tolist()
    pitcher_selection = alt.selection_point(
        fields=['pitcher_name'], 
        empty='all', 
        name='PitcherSelector',
        bind=alt.binding_select(options=[None] + lotte_pitchers, name=f'{LOTTE_CODE} Pitcher:')
    )

    # 3. Setup Opponent Batters Dropdown (SIMPLE FILTER)
    opponent_batters = df_plot['batter_name'].unique().tolist()
    batter_selection_opp = alt.selection_point(
        fields=['batter_name'], 
        empty='all', 
        name='OpponentBatterSelector',
        bind=alt.binding_select(options=[None] + opponent_batters, name=f'{opponent_code} Batter:')
    )

    # 4. Chart Generation
    base = alt.Chart(df_plot).add_params(pitcher_selection, batter_selection_opp)
    
    # Apply both filters
    base = base.transform_filter(pitcher_selection).transform_filter(batter_selection_opp)
    
    # C. Strike Zone Reference (Layer 1)
    strike_zone = alt.Chart(pd.DataFrame({
        'x': [-half_plate], 'x2': [half_plate], 'y': [sz_btm_avg], 'y2': [sz_top_avg]
    })).mark_rect(stroke='black', strokeWidth=2, fillOpacity=0.0, color='gray' 
    ).encode(x='x', x2='x2', y='y', y2='y2')

    # D. Pitch Location Scatter Plot (Layer 2)
    pitch_chart = base.mark_point(filled=True, size=70).encode(
        x=alt.X('plate_x_ft', 
                title='Plate X (ft, Positive=RHH Outer)', 
                axis=alt.Axis(values=[-2.5, -2, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, 2, 2.5]),
                scale=alt.Scale(domain=[-2.5, 2.5])),
        y=alt.Y('plate_z_ft', 
                title='Plate Z (ft, Above Ground)', 
                axis=alt.Axis(values=[0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5]),
                scale=alt.Scale(domain=[0, 5.0])),
        color=alt.Color('plot_result', title='Action/Result').scale(domain=RESULT_DOMAIN, range=RESULT_RANGE),
        shape=alt.Shape('pitch_type', title='Pitch Type').scale(domain=PITCH_DOMAIN, range=PITCH_RANGE),
        tooltip=['batter_name', 'pitcher_name', 'pitch_type', 'pitch_speed_kph', 'plot_result', 'final_pitch_result', 'strike', 'ball', 'out', alt.Tooltip('plate_x_ft', format='.2f'), alt.Tooltip('plate_z_ft', format='.2f')]
    ).properties(
        title=f'{LOTTE_CODE} Pitchers Plate Discipline vs. {opponent_code} Batters', 
        width=WIDTH, 
        height=HEIGHT
    ).interactive()

    # Combine and save
    combined_chart = (strike_zone + pitch_chart)
    combined_chart = combined_chart.resolve_scale(
        x='shared',
        y='shared'
    ).interactive()
    combined_chart = combined_chart.properties(
        autosize=alt.AutoSizeParams(
            type='fit', 
            contains='padding' 
        )
    )
    combined_chart.save(output_filename)
    print(f"Interactive Altair chart saved to '{output_filename}'.")


# --- Execution Block ---
PITCH_DATA_FOLDER = 'final_processed'
OUTPUT_BASE_PATH = 'web/public/assets'

try:
    # 1. Find all CSV files in the folder
    all_files = glob.glob(os.path.join(PITCH_DATA_FOLDER, "*.csv"))

    if not all_files:
        print(f"Error: No CSV files found in {PITCH_DATA_FOLDER}. Please check the folder path and ensure files have the '.csv' extension.")
    else:
        # 2. Read and concatenate all files
        df_list = [pd.read_csv(filename, index_col=None, header=0) for filename in all_files]
        pitch_data = pd.concat(df_list, axis=0, ignore_index=True)
        
        # 3. Identify all unique opposing team codes
        # Get all team codes that are NOT Lotte, from either pitcher or batter side
        opponent_teams = set(pitch_data[pitch_data['pitcher_team_code'] != LOTTE_CODE]['pitcher_team_code'].unique())
        opponent_teams.update(pitch_data[pitch_data['batter_team_code'] != LOTTE_CODE]['batter_team_code'].unique())
        
        if not opponent_teams:
            print("Error: Could not identify any opposing teams.")
            
        
        # 4. Loop and generate charts for each opponent
        generated_files = []
        for opponent_code in sorted(list(opponent_teams)):
            
            # Lotte Offense (Batters) Chart
            output_file_batters = os.path.join(OUTPUT_BASE_PATH, f'{LOTTE_CODE}_batters_vs_{opponent_code}_discipline.json')
            generate_lotte_offense_chart(pitch_data, opponent_code, output_file_batters)
            generated_files.append(output_file_batters)

            # Lotte Defense (Pitchers) Chart
            output_file_pitchers = os.path.join(OUTPUT_BASE_PATH, f'{LOTTE_CODE}_pitchers_vs_{opponent_code}_discipline.json')
            generate_lotte_defense_chart(pitch_data, opponent_code, output_file_pitchers)
            generated_files.append(output_file_pitchers)
        
        print(f"\nSuccessfully generated {len(generated_files)} charts using {len(all_files)} data files.")

except FileNotFoundError:
    print(f"Error: The directory '{PITCH_DATA_FOLDER}' was not found. Please ensure it exists.")
except Exception as e:
    print(f"An unexpected error occurred during chart generation: {e}")

Interactive Altair chart saved to 'web/public/assets\LT_batters_vs_HH_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_pitchers_vs_HH_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_batters_vs_HT_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_pitchers_vs_HT_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_batters_vs_KT_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_pitchers_vs_KT_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_batters_vs_LG_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_pitchers_vs_LG_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_batters_vs_NC_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_pitchers_vs_NC_discipline.json'.
Interactive Altair chart saved to 'web/public/assets\LT_batters_vs_OB_discipline.json'.
Interactive Altair chart sa

In [67]:
# more processing to label final pitch

import pandas as pd
import numpy as np
import os
import glob

def process_pitch_data(file_path: str, output_dir: str) -> None:
    """
    Applies the final pitch outcome calculation logic to a single pitch data file.
    
    Args:
        file_path: The path to the input CSV file.
        output_dir: The directory where the processed file should be saved.
    """
    try:
        # Load data
        df = pd.read_csv(file_path)
    except Exception as e:
        print(f"Skipping file {file_path}: Error loading data: {e}")
        return

    # --- 1. Identify the Unique Plate Appearance (PA) ---
    df['pa_id'] = (
        df['batter_team_code'] + '_' + 
        df['inning'].astype(str) + '_' + 
        df['batter_id'].astype(str)
    )

    # --- 2. Identify the Final Pitch in Each PA ---
    # Find the maximum pitch_id (timestamp) for correct final pitch identification
    df['final_pitch_id'] = df.groupby('pa_id')['pitch_id'].transform('max')
    df['is_final_pitch'] = df['pitch_id'] == df['final_pitch_id']

    # --- 3. Create the Conditional Outcome Column with Edge Case Handling ---

    # Check 1: Runner Out/PA Ended case (N/A in pa_result_long)
    is_runner_out_case = (
        df['is_final_pitch'] == True
    ) & (
        df['pa_result_long'].astype(str).str.upper().str.strip() == 'N/A'
    )

    # Check 2: Standard PA result case (Not N/A)
    is_standard_result_case = (
        df['is_final_pitch'] == True
    ) & (
        df['pa_result_long'].astype(str).str.upper().str.strip() != 'N/A'
    )

    # Define the conditions and choices for numpy.select
    conditions = [
        is_runner_out_case,         # Final pitch AND PA result is N/A
        is_standard_result_case     # Final pitch AND PA result is NOT N/A
    ]

    choices = [
        'Runner Out/PA Ended',      # Choice 1: Clear marker for non-batter result
        df['pa_result_short']       # Choice 2: Use the short PA result
    ]

    # Default choice: In-at-bat pitch result
    default_choice = df['pitch_result']

    df['final_pitch_result'] = np.select(
        conditions, 
        choices, 
        default=default_choice
    )

    # --- 4. Save the Updated Data ---
    base_name = os.path.basename(file_path).replace("_processed.csv", "_with_final_result.csv")
    output_path = os.path.join(output_dir, base_name)
    df.to_csv(output_path, index=False)
    print(f"Successfully processed and saved: {output_path}")


# --- Batch Processing Execution ---

# Define the directories
INPUT_DIR = "pitch_processed"
OUTPUT_DIR = "final_processed"

# 1. Create the output directory if it doesn't exist
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)
    print(f"Created output directory: {OUTPUT_DIR}")

# 2. Find all CSV files in the input directory
# glob is safe for pattern matching across operating systems
input_files = glob.glob(os.path.join(INPUT_DIR, '*.csv'))

if not input_files:
    print(f"No CSV files found in the directory: {INPUT_DIR}")
else:
    # 3. Process each file
    print(f"Found {len(input_files)} files. Starting batch processing...")
    for file in input_files:
        process_pitch_data(file, OUTPUT_DIR)
    
    print("\nBatch processing complete.")

Found 153 files. Starting batch processing...
Successfully processed and saved: final_processed\20250308HTLT02025_with_final_result.csv
Successfully processed and saved: final_processed\20250309HTLT02025_with_final_result.csv
Successfully processed and saved: final_processed\20250310LGLT02025_with_final_result.csv
Successfully processed and saved: final_processed\20250311LGLT02025_with_final_result.csv
Successfully processed and saved: final_processed\20250313HHLT02025_with_final_result.csv
Successfully processed and saved: final_processed\20250314HHLT02025_with_final_result.csv
Successfully processed and saved: final_processed\20250316KTLT02025_with_final_result.csv
Successfully processed and saved: final_processed\20250317LTWO02025_with_final_result.csv
Successfully processed and saved: final_processed\20250318LTWO02025_with_final_result.csv
Successfully processed and saved: final_processed\20250322LTLG02025_with_final_result.csv
Successfully processed and saved: final_processed\2025