In [None]:
import pandas as pd
import os
import yaml

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from data_loader import *
from utils import *



In [None]:
# Config
config_path = 'config.yaml'
with open(config_path, 'r') as f:
    config = yaml.safe_load(f)


In [None]:
# Mysql string
db_url = config['MYSQL_STRING']

In [None]:
def write_df_to_db(df, table_name, db_type="postgres", db_url=None, if_exists="append", chunksize=5000):
    """
    Write a pandas DataFrame to either PostgreSQL or MySQL based on db_type.
    
    Args:
        df (pd.DataFrame): DataFrame to write
        table_name (str): Table name
        db_type (str): "postgres" or "mysql"
        db_url (str): Full SQLAlchemy DB URL
        if_exists (str): 'append', 'replace', or 'fail'
        chunksize (int): Number of rows per batch
    """
    import pandas as pd
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker

    if db_url is None:
        raise ValueError("db_url must be provided")

    # Convert unsupported types
    for col in df.columns:
        if pd.api.types.is_period_dtype(df[col]):
            df[col] = df[col].dt.to_timestamp()
        elif pd.api.types.is_timedelta64_dtype(df[col]):
            df[col] = df[col].dt.total_seconds()
        elif pd.api.types.is_categorical_dtype(df[col]):
            df[col] = df[col].astype(str)

    # Ensure the driver is correct
    if db_type.lower() == "postgres":
        # PostgreSQL usually uses psycopg2
        if "postgresql" not in db_url:
            db_url = "postgresql+psycopg2://" + db_url.split("://")[1]
    elif db_type.lower() == "mysql":
        # MySQL usually uses pymysql
        if "mysql" not in db_url:
            db_url = "mysql+pymysql://" + db_url.split("://")[1]
    else:
        raise ValueError("db_type must be 'postgres' or 'mysql'")

    # Create engine and session
    engine = create_engine(db_url)
    Session = sessionmaker(bind=engine)

    # Write using session manager
    with Session() as session:
        try:
            df.to_sql(
                table_name,
                con=session.connection(),
                if_exists=if_exists,
                index=False,
                chunksize=chunksize
            )
            session.commit()
        except Exception as e:
            session.rollback()
            print(f"Error writing to table '{table_name}': {e}")
            raise

In [None]:
# League dat
leagues_dat = get_leagues(home_dir +"/data/Leagues/leagues.parquet")
leagues_dat[['league_id','league_name','country_name']].sort_values('league_id')

In [None]:
write_df_to_db(leagues_dat.iloc[:,1:], "league", db_type="mysql", db_url=db_url)

In [None]:
teams_dat = pd.read_parquet(home_dir + "/data/Teams/team_league.parquet")
teams_dat.head()

In [None]:
# Add Team-legaue map to db
write_df_to_db(teams_dat, "team_league_map", db_type="mysql", db_url=db_url)

In [None]:
fixtures_dir = home_dir + "/data/Fixtures"

complete_data = pd.DataFrame()
for file in os.listdir(fixtures_dir):
    dat = pd.read_parquet(os.path.join(fixtures_dir,file))
    complete_data = pd.concat([complete_data,dat],axis = 0)

complete_data = complete_data.reset_index()
complete_data.drop(columns = ['index'],inplace=True)

In [None]:
# Data checks
complete_data['passes_accuracy'] = complete_data['passes_accuracy'].astype("float64")
complete_data.rename(columns= {'passes_accuracy':'passes_accurate'},inplace =True)
complete_data['fixture_date'] = pd.to_datetime(complete_data.fixture_date)
complete_data['fixture_date_dt'] = complete_data['fixture_date'].dt.date
complete_data = create_datetime_columns(complete_data,'fixture_date')
complete_data['games_rating'] = pd.to_numeric(complete_data['games_rating'])

complete_data['season'] = complete_data['fixture_date'].apply(get_season)


# Targets
complete_data['outcome_num'] = pd.Categorical(complete_data.outcome).codes

complete_data['win'] = np.where(complete_data.outcome.str.lower() == 'win', 1,0)
complete_data['draw'] = np.where(complete_data.outcome.str.lower() == 'draw', 1,0)
complete_data['loss'] = np.where(complete_data.outcome.str.lower() == 'loss', 1,0)

# # primary position map:
# player_map = get_major_position(complete_data)

# # Join back to complete_data
# complete_data = pd.merge(complete_data,player_map,on = 'player_id',how = 'left')

# Joins:
complete_data = complete_data.merge(teams_dat.drop_duplicates(),how = 'left', left_on= 'team',right_on = 'team_name').drop(columns = ['team_name'])
complete_data = complete_data.merge(leagues_dat[['league_id','league_name']],how = 'left', left_on = 'league', right_on = 'league_id')


In [None]:
stored_fixtures = pd.read_sql("select distinct fixture_id from overperformxg.complete_data",config['MYSQL_STRING'])
stored_fixtures

In [None]:
complete_data.head()

In [None]:
write_df_to_db(complete_data,"complete_data",db_type="mysql", db_url=db_url)

In [None]:
fixture_dat = calculate_fixture_stats(complete_data,['league_name'])

In [None]:
fixture_dat.head()

In [None]:
complete_data = pd.read_sql("select * from public.complete_data", con = config['DB_STRING'])

In [None]:
complete_data.head()