In [2]:
import polars as pl
from sqlalchemy import create_engine, Column, DateTime, String
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import inspect
from typing import Optional, Dict, Any
import logging
from datetime import datetime
import os
from pydantic import BaseModel, Field
from string import Template

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# SQLAlchemy setup
Base = declarative_base()

class DataUploadLog(Base):
    __tablename__ = 'data_upload_log'
    id = Column(String, primary_key=True)
    data_type = Column(String)
    year = Column(String)
    date_uploaded = Column(DateTime)

class DBConfig(BaseModel):
    username: str = Field(default="")
    password: str = Field(default="")
    host: str = Field(default="localhost")
    port: str = Field(default="5432")
    database: str = Field(default="sportsdata")

class DataSourceConfig(BaseModel):
    base_url: str = Field(default="https://github.com/nflverse/nflverse-data/releases/download")
    url_patterns: Dict[str, str] = Field(default_factory=dict)

class NFLVerseDataSource:
    def __init__(self, config: DataSourceConfig):
        self.config = config

    def fetch_data(self, data_type: str, year: Optional[int] = None) -> pl.DataFrame:
        if data_type not in self.config.url_patterns:
            raise ValueError(f"Unsupported data type: {data_type}")
        
        url_template = self.config.url_patterns[data_type]
        template = Template(url_template)
        
        substitution = {
            "base_url": self.config.base_url,
            "year": year
        }

        url = template.safe_substitute(substitution)

        try:
            df = pl.read_parquet(url)
            df = df.with_columns([
                pl.lit(datetime.now()).alias("date_uploaded"),
                pl.lit(data_type).alias("data_type"),
                pl.lit(str(year) if year else "N/A").alias("year")
            ])
            return df
        except Exception as e:
            logger.error(f"Error fetching data for {data_type} (year: {year}): {str(e)}")
            return None

class PostgresDataStorage:
    def __init__(self, config: DBConfig):
        self.config = config
        self.engine = create_engine(f"postgresql://{config.username}:{config.password}@{config.host}:{config.port}/{config.database}")
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)

    def store_data(self, df: pl.DataFrame, schema: str, table_name: str, if_exists: str = "append"):
        """
        Store data into PostgreSQL table.

        :param df: DataFrame to store.
        :param schema: Schema name.
        :param table_name: Table name.
        :param if_exists: Behavior when table exists ('append' or 'replace').
        """
        if df is None or df.is_empty():
            logger.warning(f"No data to store for {table_name}")
            return

        session = self.Session()
        try:
            df.to_pandas().to_sql(table_name, self.engine, schema=schema, if_exists=if_exists, index=False)
            if if_exists == "replace":
                logger.info(f"Data for '{schema}.{table_name}' has been replaced.")
            else:
                logger.info(f"Data successfully stored in {schema}.{table_name}")
            if if_exists == "append":
                self._log_upload(session, table_name)
            session.commit()
        except Exception as e:
            session.rollback()
            logger.error(f"Error storing data to PostgreSQL: {str(e)}")
        finally:
            session.close()

    def _log_upload(self, session, table_name):
        log_entry = DataUploadLog(
            id=f"{table_name}_{datetime.now().strftime('%Y%m%d%H%M%S')}",
            data_type=table_name,
            year=str(datetime.now().year),
            date_uploaded=datetime.now()
        )
        session.add(log_entry)

    def table_exists(self, schema: str, table_name: str) -> bool:
        """
        Check if a table exists in the specified schema.

        :param schema: The schema name.
        :param table_name: The table name.
        :return: True if the table exists, False otherwise.
        """
        inspector = inspect(self.engine)
        exists = inspector.has_table(table_name, schema=schema)
        logger.info(f"Table '{schema}.{table_name}' exists: {exists}")
        return exists

    def is_data_uploaded(self, table_name: str) -> bool:
        """
        Check if data for the given table has already been uploaded.

        :param table_name: The name of the table.
        :return: True if data is already uploaded, False otherwise.
        """
        session = self.Session()
        try:
            exists = session.query(DataUploadLog).filter_by(data_type=table_name).first() is not None
            logger.info(f"Data for table '{table_name}' already uploaded: {exists}")
            return exists
        except Exception as e:
            logger.error(f"Error checking upload log for {table_name}: {str(e)}")
            return False
        finally:
            session.close()

class NFLDataManager:
    def __init__(self, data_source: NFLVerseDataSource, data_storage: PostgresDataStorage):
        self.data_source = data_source
        self.data_storage = data_storage
        self.current_year = datetime.now().year

    def fetch_and_store_data(self, data_type: str, year: Optional[int], schema: str, table_name: str):
        """
        Fetch data from the source and store it into PostgreSQL.

        :param data_type: Type of data to fetch.
        :param year: Year of the data.
        :param schema: Schema name in PostgreSQL.
        :param table_name: Table name in PostgreSQL.
        """
        try:
            if year is not None:
                if year < self.current_year:
                    # For past seasons, skip if data already exists
                    if self.data_storage.is_data_uploaded(table_name):
                        logger.info(f"Data for table '{table_name}' (year: {year}) already exists. Skipping.")
                        return
                elif year == self.current_year:
                    # For current season, replace existing data
                    if self.data_storage.table_exists(schema, table_name):
                        logger.info(f"Table '{schema}.{table_name}' exists for current year. Replacing existing data.")
                        if_exists = "replace"
                    else:
                        if_exists = "append"
                else:
                    logger.warning(f"Year {year} is in the future. Skipping data ingestion for '{table_name}'.")
                    return
            else:
                # Handle non-year-specific data (e.g., players)
                if self.data_storage.is_data_uploaded(table_name):
                    logger.info(f"Data for table '{table_name}' has already been uploaded. Skipping.")
                    return
                if_exists = "append"

            if year == self.current_year and self.data_storage.table_exists(schema, table_name):
                if_exists = "replace"
            else:
                if_exists = "append"

            df = self.data_source.fetch_data(data_type, year)
            if df is not None:
                self.data_storage.store_data(df, schema, table_name, if_exists=if_exists)
            else:
                logger.warning(f"No data fetched for {data_type} (year: {year})")
        except Exception as e:
            logger.error(f"Error processing {data_type} data: {str(e)}")

# Example usage:
if __name__ == "__main__":
    db_config = DBConfig(
        username=os.getenv("DB_USERNAME", ""),
        password=os.getenv("DB_PASSWORD", ""),
        host=os.getenv("DB_HOST", "localhost"),
        port=os.getenv("DB_PORT", "5432"),
        database=os.getenv("DB_NAME", "sportsdata")
    )

    data_source_config = DataSourceConfig(
        base_url="https://github.com/nflverse/nflverse-data/releases/download",
        url_patterns={
            "weekly_rosters": "${base_url}/weekly_rosters/roster_weekly_${year}.parquet",
            "depth_charts": "${base_url}/depth_charts/depth_charts_${year}.parquet",
            "pbp": "${base_url}/pbp/play_by_play_${year}.parquet",
            "players": "${base_url}/players/players.parquet",
            "injuries": "${base_url}/injuries/injuries_${year}.parquet",
            "player_stats": "${base_url}/player_stats/player_stats_${year}.parquet",
            "rosters": "${base_url}/rosters/roster_${year}.parquet"
        }
    )

    data_source = NFLVerseDataSource(data_source_config)
    data_storage = PostgresDataStorage(db_config)
    nfl_data_manager = NFLDataManager(data_source, data_storage)

    current_year = datetime.now().year
    previous_year = current_year - 1

    # Try current year first, if not available, use previous year
    for year in [current_year, previous_year]:
        nfl_data_manager.fetch_and_store_data("weekly_rosters", year, "raw", f"nflverse__weekly_rosters_{year}")
        nfl_data_manager.fetch_and_store_data("depth_charts", year, "raw", f"nflverse__depth_charts_{year}")
        nfl_data_manager.fetch_and_store_data("pbp", year, "raw", f"nflverse__pbp_{year}")
        nfl_data_manager.fetch_and_store_data("injuries", year, "raw", f"nflverse__injuries_{year}")
        nfl_data_manager.fetch_and_store_data("player_stats", year, "raw", f"nflverse__player_stats_{year}")
        nfl_data_manager.fetch_and_store_data("rosters", year, "raw", f"nflverse__rosters_{year}")


    # Fetch players data (not year-specific)
    nfl_data_manager.fetch_and_store_data("players", None, "raw", "nflverse__players")

2024-09-27 17:27:14,746 - INFO - Table 'raw.nflverse__weekly_rosters_2024' exists: True
2024-09-27 17:27:14,746 - INFO - Table 'raw.nflverse__weekly_rosters_2024' exists for current year. Replacing existing data.
2024-09-27 17:27:14,747 - INFO - Table 'raw.nflverse__weekly_rosters_2024' exists: True
2024-09-27 17:27:17,102 - INFO - Data for 'raw.nflverse__weekly_rosters_2024' has been replaced.
2024-09-27 17:27:17,113 - INFO - Table 'raw.nflverse__depth_charts_2024' exists: True
2024-09-27 17:27:17,113 - INFO - Table 'raw.nflverse__depth_charts_2024' exists for current year. Replacing existing data.
2024-09-27 17:27:17,114 - INFO - Table 'raw.nflverse__depth_charts_2024' exists: True
2024-09-27 17:27:18,618 - INFO - Data for 'raw.nflverse__depth_charts_2024' has been replaced.
2024-09-27 17:27:18,624 - INFO - Table 'raw.nflverse__pbp_2024' exists: True
2024-09-27 17:27:18,624 - INFO - Table 'raw.nflverse__pbp_2024' exists for current year. Replacing existing data.
2024-09-27 17:27:18,6

In [None]:
1673