In [1]:
import statsapi
# from google.cloud import bigquery
from datetime import date, timedelta, datetime
import requests
import time
import sys
import os
import csv
import glob
from pprint import pprint

In [2]:
# File paths
PLAYER_STATS_FOLDER = 'stats/player/'
TEAM_STATS_FOLDER = 'stats/team/'

In [3]:
#Get today/yesterdays date 
today = date.today()
yesterday = today - timedelta(days = 1)
yesterday = yesterday.isoformat()
start_date = '2024-05-20'
end_date = '2024-06-06'
currentSeason = start_date[:4]

In [4]:
games = []
seen_games = set()

In [5]:
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.expanduser("~/Documents/jupyter/MLB/credentials/mlb-analysis-463501-869e729270f3.json")

# client = bigquery.Client()
# print("Project:", client.project)

In [6]:
#Function for team season stats
def team_stat_data(team_id, group, season, stats='season'):
    url = f"https://statsapi.mlb.com/api/v1/teams/{team_id}/stats"
    params = {
        "season": season,
        "group": group,
        "stats": stats,
        "updateDate": today
    }
    r = requests.get(url, params=params)
    r.raise_for_status()
    return r.json()["stats"][0]["splits"][0]

In [7]:
def player_stat_data(player_id, group=None, season=None, stats='season', gamePk=None):
    """
    Fetch player stats from the MLB Stats API.

    Args:
        player_id (int): The player ID.
        group (str): Stat group, e.g., 'hitting', 'pitching', or 'fielding'.
        season (int): The season year (required for 'season' stats).
        stats (str): Stat type. Either 'season' or 'gameLog'.
        gamePk (int): Optional. If provided with 'gameLog', filters to specific game.

    Returns:
        dict: A single stat split (for game or season), or None if not found.
    """
    url = f"https://statsapi.mlb.com/api/v1/people/{player_id}/stats"
    
    params = {
        "stats": stats
    }
    
    if season:
        params["season"] = season
    if group:
        params["group"] = group
    if stats=="gameLog" and gamePk:
        params["gamePk"] = gamePk
        
    response = requests.get(url, params=params)
    response.raise_for_status()
    data = response.json()
    
    if not data["stats"] or not data["stats"][0]["splits"]:
        return None

    splits = data["stats"][0]["splits"]

    if stats == "gameLog" and gamePk is not None:
        for split in splits:
            if split["game"]["gamePk"] == gamePk:
                return split
        return None  # GamePk not found

    # For 'season' or all game logs
    return splits[0] if splits else None


In [8]:
#Normalize name for pulling special fields from boxscore
def normalize_name(name_raw):
    if ',' in name_raw:
        last, first = [part.strip() for part in name_raw.split(',', 1)]
        return f"{first} {last}".strip()
    return name_raw

In [None]:
#Get team season stats
teams = statsapi.get("teams",{"sportId": 1})['teams']

team_season_stats = []
for team in teams:
    teamId = team['id']
    teamName = team['name']
    for group in ["hitting","pitching","fielding"]:
        
        season_stats = team_stat_data(teamId, group, currentSeason, stats='season')
        if 'stat' in season_stats:
            for key, val in season_stats['stat'].items():            
                team_season_stats.append({
                    "domain": "team",
                    "range": "season",
                    "group": group,
                    "stat": key,
                    "val": val,
                    "season": currentSeason,
                    "teamId": teamId,
                    "updateDate": today
                })
            print(f"Stored team {group} season stats for {teamName}")
                
            
            
with open(f"{TEAM_STATS_FOLDER}season/{currentSeason}_season.csv", mode="w", newline="", encoding="utf-8") as file:
    
    writer = csv.DictWriter(file, fieldnames=team_season_stats[0].keys())

    writer.writeheader()  
    writer.writerows(team_season_stats)              
        

Stored team hitting season stats for Athletics
Stored team pitching season stats for Athletics
Stored team fielding season stats for Athletics
Stored team hitting season stats for Pittsburgh Pirates
Stored team pitching season stats for Pittsburgh Pirates
Stored team fielding season stats for Pittsburgh Pirates
Stored team hitting season stats for San Diego Padres
Stored team pitching season stats for San Diego Padres
Stored team fielding season stats for San Diego Padres
Stored team hitting season stats for Seattle Mariners
Stored team pitching season stats for Seattle Mariners
Stored team fielding season stats for Seattle Mariners
Stored team hitting season stats for San Francisco Giants
Stored team pitching season stats for San Francisco Giants
Stored team fielding season stats for San Francisco Giants
Stored team hitting season stats for St. Louis Cardinals
Stored team pitching season stats for St. Louis Cardinals
Stored team fielding season stats for St. Louis Cardinals
Stored tea

In [None]:
# Get schedule for start/end date (regular season)
schedule = statsapi.schedule(
    start_date,
    end_date,
    sportId=1
)
games = []
weather = []
print(len(schedule))

#Pull available game data from schedule
for game in schedule:
    gamePk = game['game_id']

    
    #Get weather
    boxscore = statsapi.get("game_boxscore", {'gamePk': gamePk})
    for item in boxscore['info']:
        if item['label'] == "Weather":
            weather = item['value']
        elif item['label'] == "Att":
            attendance = item['value']
        
    games.append({
        "gamePk": game['game_id'],
        "gametime": game['game_datetime'],
        "season": game['game_date'][:4],
        "awayId": game['away_id'],
        "awayName": game['away_name'],
        "homeId": game['home_id'],
        "homeName": game['home_name'],
        "weather": weather,
        "attendance": attendance or None,
        "venue": game['venue_name']
    })
    seen_games.add(game['game_id'])
    

#Remove duplicates
# games_dict = {g['gamePk']: g for g in games}
# games = list(games_dict.values())

# pprint(games)
with open(f"games/{start_date}to{end_date}_games.csv", mode="w", newline="", encoding="utf-8") as file:
    
    writer = csv.DictWriter(file, fieldnames=games[0].keys())

    writer.writeheader()  
    writer.writerows(games)       

In [None]:
team_game_stats = []
player_game_stats = []

#Get team and player stats for games in games
for g in games:
    gamePk = g['gamePk']
    gameDate = g['gametime'][:10]
    boxscore = statsapi.get("game_boxscore", {'gamePk': gamePk})
    for side in ["home", "away"]:

        team_stats = boxscore["teams"][side]["teamStats"]
        #Add team stats for games in schedule from boxscore
        for stat, val in team_stats['batting'].items():
            team_game_stats.append({
                "domain": "team",
                "group":"hitting",
                "range":"game",
                "stat": stat, 
                "val": val, 
                "gamePk": gamePk,
                "gameDate":gameDate,
                "side": side
            })
        for stat, val in team_stats['pitching'].items():
            team_game_stats.append({
                "domain": "team",
                "group":"pitching",
                "range":"game",
                "stat": stat, 
                "val": val, 
                "gamePk": gamePk,
                "gameDate":gameDate,
                "side": side
            })
        for stat, val in team_stats['fielding'].items():
            team_game_stats.append({
                "domain": "team",
                "group":"fielding",
                "range":"game",
                "stat": stat, 
                "val": val, 
                "gamePk": gamePk,
                "gameDate":gameDate,
                "side": side
            })
        print(f"Loaded {side} team data from game: {boxscore['teams']['away']['team']['abbreviation']} vs. {boxscore['teams']['home']['team']['abbreviation']}")
        #Player stats from boxscore 
        
        for player_key, player_data in boxscore["teams"][side]["players"].items():
            player_id = player_data["person"]["id"]  # ✅ This is the integer ID
            player_name = player_data["person"]["fullName"] #Thanks chat 
            position = player_data["position"]["abbreviation"]
            for group in ["hitting", "fielding", "pitching"]:
                group_stats = player_data["stats"].get(group, {})
                if len(group_stats) == 0:
                    continue
                for stat, val in group_stats.items():
                    player_game_stats.append({
                        "domain": "player",
                        "group":group,
                        "range":"game",
                        "stat":stat, 
                        "val":val, 
                        "gamePk":gamePk,
                        "gameDate":gameDate,
                        "playerId":player_id,
                        "side":side,
                        "positionsPlayed": position
                    })
                print(f"Loaded player data for {player_name} in game: {boxscore['teams']['away']['team']['abbreviation']} vs. {boxscore['teams']['home']['team']['abbreviation']}")
                

                
game_date = games[0]['gametime'].split('T')[0]
with open(f"{TEAM_STATS_FOLDER}/game/{gameDate}_team_game_stats.csv", mode="w", newline="", encoding="utf-8") as file:
    
    writer = csv.DictWriter(file, fieldnames=team_game_stats[0].keys())

    writer.writeheader()  
    writer.writerows(team_game_stats)    
    
with open(f"{PLAYER_STATS_FOLDER}/game/{gameDate}_player_game_stats.csv", mode="w", newline="", encoding="utf-8") as file:
    
    writer = csv.DictWriter(file, fieldnames=player_game_stats[0].keys())

    writer.writeheader()  
    writer.writerows(player_game_stats)        

In [None]:
# #Get special stats for pitchers from schedule
# for game in schedule:
#     gamePk = game['game_id']
#     boxscore = statsapi.get("game_boxscore", {'gamePk': gamePk})
#     #pprint(boxscore['info'])
#     for item in boxscore['info']:
#         player_stats_temp = []
#         if item['label'] == "Pitches-Strikes":
#             statType = 'pitching_player_game'
#             raw = item['value']
#             entries = [entry.strip() for entry in raw.strip('.').split(';')]
#             for entry in entries:
#                 parts = entry.rsplit(" ", 1)
#                 if len(parts) == 2:
#                     name_part, stat_part = parts
#                     player_stats_temp.append((name_part, stat_part))
#             for nameraw, record in player_stats_temp:
#                 normalized_name = normalize_name(nameraw)
#                 result = statsapi.lookup_player(normalized_name)
#                 if result:
#                     player_id = result[0]["id"]
#                     player_game_stats.append({
#                         "statType": statType,
#                         "stat":item['label'],
#                         "val":record,
#                         "gamePk":gamePk,
#                         "playerId":player_id
#                     })
#                     print(f"{item['label']} stored for {result[0]['fullName']}")
#                 else:
#                     print(f"{normalized_name}: NOT FOUND")

#         elif item['label'] == "Batters faced":
#             statType = "pitching_player_game"
#             raw = item['value']
#             entries = [entry.strip() for entry in raw.strip('.').split(';')]
#             for entry in entries:
#                 parts = entry.rsplit(" ", 1)
#                 if len(parts) == 2:
#                     name_part, stat_part = parts
#                     player_stats_temp.append((name_part, stat_part))
#             for nameraw, stat in player_stats_temp:
#                 normalized_name = normalize_name(nameraw)
#                 result = statsapi.lookup_player(normalized_name)
#                 if result:
#                     player_id = result[0]["id"]
#                     player_game_stats.append({
#                         "statType": statType,
#                         "stat":item['label'],
#                         "val":stat,
#                         "gamePk":gamePk,
#                         "playerId":player_id
#                     })
#                     print(f"{item['label']} stored for {result[0]['fullName']}")
#                 else:
#                     print(f"{normalized_name}: NOT FOUND")
#         elif item['label'] == "Groundouts-flyouts":
#             player_stats_temp = []
#             statType = "pitching_player_game"
#             raw = item['value']
#             entries = [entry.strip() for entry in raw.strip('.').split(';')]
#             for entry in entries:
#                 parts = entry.rsplit(" ", 1)
#                 if len(parts) == 2:
#                     nameraw, stat = parts
#                     player_stats_temp.append((nameraw, stat))
#                 for nameraw, stat in player_stats_temp:
#                     normalized_name = normalize_name(nameraw)
#                     result = statsapi.lookup_player(normalized_name)
#                     if result:
#                         player_id = result[0]["id"]
#                         player_game_stats.append({
#                             "statType": statType,
#                             "stat":item['label'],
#                             "val":stat,
#                             "gamePk":gamePk,
#                             "playerId":player_id
#                         })
#                         print(f"{item['label']} stored for {result[0]['fullName']}")
#                     else:
#                         print(f"{normalized_name}: NOT FOUND")


In [None]:
#Get player season stats
def player_season_stats(player_id, group, season=2025):
    url = f"https://statsapi.mlb.com/api/v1/people/{player_id}/stats"
    params = {
        "stats": "season",
        "season": season
    }
    response = requests.get(url, params=params)
    data = response.json()
    
    # Extract the stats if available
    if data["stats"] and data["stats"][0]["splits"]:
        stats = data["stats"][0]["splits"][0]["stat"]
        return [stats, {'season': season}]
    else:
        return None

In [None]:
#Get player season stats
season_stats = []
seen_players = set()

for game_stats in player_game_stats:
    if game_stats['playerId'] in seen_players:
        continue
    else:
        playerId = game_stats['playerId']
    
    for group in ["hitting", "pitching", "fielding"]:
        data = player_stat_data(playerId, currentSeason)
        for stat, val in data['stat'].items():
            season_stats.append({
                "domain": "player",
                "season":currentSeason,
                "stat":stat, 
                "val":val,  
                "playerId":player_id
            })
    
    seen_players.add(playerId)
    print(f"Got {currentSeason} season stats for {playerId}")
            

    
    
with open(f"{PLAYER_STATS_FOLDER}/season/{currentSeason}_player_season_stats.csv", mode="w", newline="", encoding="utf-8") as file:
        
        writer = csv.DictWriter(file, fieldnames=season_stats[0].keys())

        writer.writeheader()  
        writer.writerows(season_stats)         
    
