<a href="https://colab.research.google.com/github/b-harr/dmcb/blob/develop/notebooks/dmcb_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Scripts

## get_spotrac_contracts.py

`get_spotrac_contracts.py` is a Python script that scrapes NBA player contract data from https://www.spotrac.com/nba/{team}/yearly for all 30 teams. It extracts relevant contract details then processes and saves this contract data to a CSV file (`data/spotrac_contracts.csv`) for use in salary cap management for the league.

In [10]:
import os
import sys
import logging
from dotenv import load_dotenv

# Get the root project directory (2 levels up from the current script)
base_dir = os.getcwd()

# Append the base_dir to sys.path to ensure modules can be imported
sys.path.append(base_dir)

# Import custom modules for the script
import config
from utils.text_formatter import make_player_key, format_text

# Configure logging to capture detailed script execution and errors
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger()

# Log the script start with a timestamp to track execution
logger.info("The script started successfully.")

# Load environment variables from the .env file
load_dotenv()
logger.info("Environment variables loaded successfully.")

import pandas as pd
import requests
import re
from bs4 import BeautifulSoup

# List of NBA teams to scrape salary data for (from Spotrac)
teams = [
    "atlanta-hawks", "brooklyn-nets", "boston-celtics", "charlotte-hornets",
    "cleveland-cavaliers", "chicago-bulls", "dallas-mavericks", "denver-nuggets",
    "detroit-pistons", "golden-state-warriors", "houston-rockets", "indiana-pacers",
    "la-clippers", "los-angeles-lakers", "memphis-grizzlies", "miami-heat",
    "milwaukee-bucks", "minnesota-timberwolves", "new-york-knicks",
    "new-orleans-pelicans", "oklahoma-city-thunder", "orlando-magic",
    "philadelphia-76ers", "phoenix-suns", "portland-trail-blazers",
    "san-antonio-spurs", "sacramento-kings", "toronto-raptors",
    "utah-jazz", "washington-wizards"
]

# Define the directory and filename for saving the CSV file
output_csv = config.spotrac_contracts_path

# Set up a persistent session for making HTTP requests
def get_session():
    """
    Creates and returns a persistent session with appropriate headers
    for making requests to Spotrac.
    """
    session = requests.Session()
    session.headers.update(
        {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
    )
    return session

# Safe request function to handle HTTP errors and return the response
def safe_request(session, url):
    """
    Attempts to fetch data from the given URL, handling HTTP errors gracefully.
    Returns the response if successful, otherwise logs an error.
    """
    try:
        response = session.get(url)
        response.raise_for_status()
        return response
    except requests.exceptions.HTTPError as e:
        logging.warning(f"HTTP error for {url}: {e}")
    except Exception as e:
        logging.warning(f"Error fetching {url}: {e}")
    return None

# Extract season headers (e.g., "2024-25", "2023-24") from the team's salary table
def extract_season_headers(session, team):
    """
    Extracts the season headers (e.g., "2024-25") from the salary table
    for a given team on Spotrac. These headers represent the years for
    the player's salary data.
    """
    url = f"https://www.spotrac.com/nba/{team}/yearly"
    response = safe_request(session, url)
    if response:
        soup = BeautifulSoup(response.text, "html.parser")
        table = soup.select_one("table")
        if table:
            header_row = table.find("tr")
            if header_row:
                # Extract headers that represent years
                headers = [th.get_text(strip=True) for th in header_row.find_all("th")]
                season_headers = [header for header in headers if re.match(r"^\d{4}-\d{2}$", header)]
                if season_headers:
                    logging.info(f"Season headers extracted for team: {team}")
                    return season_headers
    logging.warning(f"Failed to extract headers for team: {team}")
    return []

# Extract player data (name, position, salary, etc.) from the salary table of the team
def extract_player_data(session, team, season_headers):
    """
    Extracts player data (name, position, salary, etc.) from the salary table
    of the given team on Spotrac, limiting to the first 5 salary years.
    """
    url = f"https://www.spotrac.com/nba/{team}/yearly"
    team_name = format_text(team)
    response = safe_request(session, url)
    if not response:
        return []

    soup = BeautifulSoup(response.text, "html.parser")
    table = soup.select_one("table")
    if not table:
        logging.warning(f"No table found for team {team}")
        return []

    rows = table.find_all("tr")
    team_data = []

    for row in rows[1:]:
        cols = row.find_all("td")
        if len(cols) < 4:
            continue

        # Extract player name and other details
        player_name_tag = cols[0].find("a")
        if not player_name_tag:
            continue

        player_name = player_name_tag.get_text(strip=True)
        player_link = player_name_tag["href"]
        player_key = make_player_key(player_name)
        position = cols[1].get_text(strip=True)
        age = cols[2].get_text(strip=True)

        salary_data = []
        # Extract salary data (e.g., "$10M", "UFA", etc.)
        for col in cols[3:]:
            cell_text = col.get_text(strip=True)
            if "Two-Way" in cell_text:
                salary_data.append("Two-Way")
            elif "UFA" in cell_text:
                salary_data.append("UFA")
            elif "RFA" in cell_text:
                salary_data.append("RFA")
            else:
                salary_matches = re.findall(r"\$\d{1,3}(?:,\d{3})*(?:\.\d{2})?", cell_text)
                salary_data.extend([s.replace(",", "") for s in salary_matches])

        # Only keep the first 5 salaries (and pad with empty strings if fewer)
        player_data = [player_name, player_link, player_key, team_name, url, position, age] + salary_data[:5]
        player_data += [""] * (5 - len(salary_data))  # Fill missing salary slots with empty strings
        team_data.append(player_data)

        # Log each player being processed
        logging.info(f"Processed player: {player_name}, Position: {position}, Salary: {', '.join(salary_data[:5])}")

    return team_data

# Main function to scrape data for all teams and save to CSV
def scrape_and_save_data():
    """
    Scrapes salary data for all teams, processes the data, and saves it to a CSV file.
    The CSV includes player names, links, positions, ages, and the first 5 years of salary data.
    """
    session = get_session()
    # Try extracting the season headers (representing the salary years)
    season_headers = []
    for team in teams:
        season_headers = extract_season_headers(session, team)
        if season_headers:
            break

    if not season_headers:
        raise ValueError("Failed to extract season headers.")

    # Headers include the player details and first 5 salary years
    headers = ["Player", "Player Link", "Player Key", "Team", "Team Link", "Position", "Age"] + season_headers[:5]  # Limit to first 5 seasons
    pd.DataFrame(columns=headers).to_csv(output_csv, index=False, mode="w", encoding="utf-8")
    logging.info(f"CSV header written to {output_csv}")

    # Collect all player data across teams
    all_data = []
    for idx, team in enumerate(teams):
        progress = (idx + 1) / len(teams) * 100
        logging.info(f"Processing team {idx+1}/{len(teams)} ({progress:.2f}%) - {team}")
        team_data = extract_player_data(session, team, season_headers)
        all_data.extend(team_data)

    # Log after all teams are processed
    logging.info("All teams processed. Sorting player data...")

    # Sort player data by player name (Player Key)
    sorted_data = sorted(all_data, key=lambda x: x[2].lower())
    # Write the sorted data to CSV
    pd.DataFrame(sorted_data, columns=headers).to_csv(output_csv, index=False, mode="w", encoding="utf-8")
    logging.info(f"Data processing completed. Data successfully written to the file: {output_csv}")

if __name__ == "__main__":
    # Run the scraping and data-saving process
    scrape_and_save_data()


## sync_bbref_stats.csv

Description

In [9]:
import os
import sys
import logging
from dotenv import load_dotenv

# Get the root project directory (2 levels up from the current script)
base_dir = os.getcwd()

# Append the base_dir to sys.path to make sure modules can be imported
sys.path.append(base_dir)

# Import custom modules for the script
import config
from utils.google_sheets_manager import GoogleSheetsManager
from utils.csv_handler import CSVHandler
from utils.data_fetcher import fetch_data, parse_html
from utils.text_formatter import make_player_key

# Configure logging to capture detailed script execution and errors
logging.basicConfig(
    level=logging.INFO,  # Log messages with level INFO and above
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger()

# Log the script start with a timestamp to track execution
logger.info("The script started successfully.")

# Load environment variables from the .env file
load_dotenv()
logger.info("Environment variables loaded successfully.")

# Retrieve necessary configuration values from the config module
google_sheets_url = config.google_sheets_url
sheet_name = "Stats"  # Name of the sheet where data will be written
numeric_columns = "PTS,TRB,AST,STL,BLK,TOV,PF,G,MP".split(",")  # Columns to be used for numerical calculations
logger.debug(f"Using numeric columns: {numeric_columns}")

import pandas as pd

def main(year=2025):
    """
    Main function to fetch, process, and store NBA player stats for the given year.
    It fetches data from Basketball-Reference, processes it, calculates fantasy points,
    and stores the result both in a CSV file and Google Sheets (only for the default year).
    """
    # Construct the URL for the requested year's player stats page on Basketball-Reference
    url = f"https://www.basketball-reference.com/leagues/NBA_{year}_totals.html"
    # Set the request headers for fetching data
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }
    logger.info(f"Fetching data from URL: {url}")

    # Fetch the HTML data from the Basketball-Reference website
    try:
        response = fetch_data(url, headers)
        logger.info(f"Data fetched successfully from {url}.")
    except Exception as e:
        logger.error(f"Error fetching data: {e}")
        return

    # Parse the HTML content to extract player stats into a DataFrame
    try:
        df = parse_html(response)
        logger.info("HTML parsed successfully into a DataFrame.")
    except Exception as e:
        logger.error(f"Error parsing HTML: {e}")
        return

    # Exclude 'League Average' rows to focus on individual player stats
    df = df[df["Player"] != "League Average"]
    logger.info(f"Excluded 'League Average' rows. Data size is now {len(df)} rows.")

    # Check for and drop any rows with missing player names
    missing_players = df[df["Player"].isna()]
    if not missing_players.empty:
        logger.warning(f"Dropped {len(missing_players)} rows with missing player data.")
    df = df.dropna(subset=["Player"])

    # Add a 'Player Key' column by applying the make_player_key function to the 'Player' column
    df["Player Key"] = df["Player"].apply(make_player_key)
    logger.info("Added 'Player Key' column to DataFrame.")

    # Sort the DataFrame by 'Player Key' and 'Team' columns for organized output
    df = df.sort_values(by=["Player Key", "Team"])
    logger.info("Sorted DataFrame by 'Player Key' and 'Team'.")

    # Drop duplicates based on 'Player Key', keeping the first occurrence
    df = df.drop_duplicates(subset='Player Key', keep='first')
    logger.info("Removed individual teams when more than one.")

    # Convert specified numeric columns to proper numeric types for calculations
    df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors="coerce")
    logger.info(f"Converted numeric columns: {numeric_columns} to numeric types.")

    # Replace any NaN values with 0 across the DataFrame to prevent errors in calculations
    df.fillna(0, inplace=True)
    logger.info("Filled NaN values with 0.")

    # Perform vectorized calculations to compute fantasy points and related stats
    try:
        # Calculate Fantasy Points (FP) as a sum of positive stats and negative ones
        df["FP"] = (
            df["PTS"] + df["TRB"] + df["AST"] +
            df["STL"] + df["BLK"] - df["TOV"] - df["PF"]
        ).astype(int)  # Fantasy Points: PTS/REB/AST/STL/BLK +1, TO/PF/TF -1
        df["FPPG"] = (df["FP"] / df["G"]).round(1)  # Fantasy Points Per Game
        df["FPPM"] = (df["FP"] / df["MP"]).round(2)  # Fantasy Points Per Minute
        df["MPG"] = (df["MP"] / df["G"]).round(1)  # Minutes Per Game
        df["FPR"] = ((df["FP"] ** 2) / (df["G"] * df["MP"])).round(1)  # Fantasy Point Rating = FPPG * FPPM = (FP ** 2) / (G * MP)

        logger.info("Calculated new fantasy stats (FP, FPPG, FPPM, MPG, FPR).")
    except Exception as e:
        logger.error(f"Error during calculations: {e}")
        return

    # Define the file path where the processed data will be saved
    if year == 2025:  # Only sync to Google Sheets for the default year
        output_csv = config.bbref_stats_path
        logger.info(f"Saving data to CSV file: {output_csv}")
        # Save the processed data to a CSV file
        try:
            CSVHandler.write_csv(output_csv, df.values.tolist(), headers=df.columns.tolist())
            logger.info(f"Data successfully written to the CSV file: {output_csv}")
        except Exception as e:
            logger.error(f"Error saving data to CSV: {e}")
            return

        # Authenticate and update the Google Sheets with the processed data
        try:
            # Get the current timestamp to indicate when the data was last updated
            timestamp = logging.Formatter('%(asctime)s').format(logging.LogRecord("", 0, "", 0, "", [], None))  # Get the current timestamp

            # Initialize Google Sheets manager and clear existing data in the sheet
            sheets_manager = GoogleSheetsManager()
            sheets_manager.clear_data(sheet_name="Stats")
            logger.info(f"Cleared existing data in Google Sheets '{sheet_name}'.")

            # Write the timestamp to Google Sheets
            sheets_manager.write_data([[f"Last updated {timestamp} by {config.service_account_email}"]], sheet_name="Stats", start_cell="A1")
            logger.info("Wrote timestamp to Google Sheets.")

            # Write the processed data to the 'Stats' sheet
            sheets_manager.write_data([df.columns.tolist()] + df.values.tolist(), sheet_name="Stats", start_cell="A2")
            logger.info(f"Data successfully written to the '{sheet_name}' sheet.")
        except Exception as e:
            logger.error(f"Error updating Google Sheets: {e}")

    else:
        # Generate a dynamic alternate file path using the year
        alternate_output_path = f"data/bbref_stats_{year}.csv"  # Path for alternate CSV output
        logger.info(f"Saving data to alternate CSV file: {alternate_output_path}")
        try:
            CSVHandler.write_csv(alternate_output_path, df.values.tolist(), headers=df.columns.tolist())
            logger.info(f"Data successfully written to alternate CSV file: {alternate_output_path}")
        except Exception as e:
            logger.error(f"Error saving data to alternate CSV: {e}")

if __name__ == "__main__":
    main()
    #main(year=2024)
    #main(year=2023)


INFO:utils.csv_handler:Wrote headers to /content/data/bbref_stats.csv: ['Player', 'Age', 'Team', 'Pos', 'G', 'GS', 'MP', 'FG', 'FGA', 'FG%', '3P', '3PA', '3P%', '2P', '2PA', '2P%', 'eFG%', 'FT', 'FTA', 'FT%', 'ORB', 'DRB', 'TRB', 'AST', 'STL', 'BLK', 'TOV', 'PF', 'PTS', 'Trp-Dbl', 'Awards', 'Player Key', 'FP', 'FPPG', 'FPPM', 'MPG', 'FPR']
INFO:utils.csv_handler:Wrote 485 rows to /content/data/bbref_stats.csv.
INFO:utils.google_sheets_manager:Initializing GoogleSheetsManager...
INFO:utils.google_sheets_manager:Successfully connected to Google Sheets. Service Account Email: gchelp@dmcb-442123.iam.gserviceaccount.com
INFO:utils.google_sheets_manager:Accessed worksheet: Stats
INFO:utils.google_sheets_manager:Cleared data from worksheet 'Stats'.
INFO:utils.google_sheets_manager:Accessed worksheet: Stats
INFO:utils.google_sheets_manager:Written data to worksheet 'Stats' starting at 'A1'.
INFO:utils.google_sheets_manager:Accessed worksheet: Stats
INFO:utils.google_sheets_manager:Written data

# Setup Colab

## Step 1: Upload the `.env` file to Colab

In [8]:
from google.colab import files
uploaded = files.upload()


Saving dmcb-442123-966817b53d6f.json to dmcb-442123-966817b53d6f.json


## Optional: Remove file in Colab
The `-f` flag ensures that no error is raised if the file doesn't exist

In [None]:
!rm -f dmcb-442123-966817b53d6f.json

## Step 2: Install `python-dotenv`

In [3]:
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1


## Step 3: Load the `.env` file

In [5]:
from dotenv import load_dotenv
import os

# Load the .env file (assuming the file is in the current directory)
load_dotenv('.env')

# Access a specific environment variable
my_api_key = os.getenv('GOOGLE_SHEETS_CREDENTIALS')  # Replace with the actual variable name

print(my_api_key)  # Test that it works


secrets/dmcb-442123-966817b53d6f.json


### Upload your secrets directory to Google Drive

#### 1. Mount Google Drive to Colab


In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

#### 2. Access the uploaded folder in Colab



In [None]:
import os

cwd = os.getcwd()
secrets_dir = '/content/secrets'

print(f"The current working directory is: {cwd}")
print(f"The secrets directory is: {cwd}")

# Check if the secrets directory exists
if os.path.exists(cwd):
    print(f"The directory {cwd} exists.")
    print(os.listdir(cwd))
else:
    print(f"The directory {cwd} does not exist.")

The current working directory is: /content
The secrets directory is: /content
The directory /content exists.
['.config', '.ipynb_checkpoints', 'utils', 'secrets', '.env', '__pycache__', 'scripts', 'config.py', 'sample_data']
