In [1]:
# Carter Naegle & Cabot Steward

# READ ME

# We decided to put all the connection info into a .env inorder to put this on github more easily
# we used the dotenv library
 
# I added logging instead of the print statements we've used throughout class as they are much easier
# to navigate if you have an issue or want to CTRL + F to find information, we did this with the loggin
# package.  Plus they are faster

from dotenv import load_dotenv
import os
import psycopg2
import pymssql
import pandas as pd
from datetime import datetime
import csv
import logging
import numpy as np

environment = 'prod'
# 'test' or 'prod'

load_dotenv()
log_folder = 'logging'
os.makedirs(log_folder, exist_ok=True)

def get_connections():
    try:
        post_conn = psycopg2.connect(
            database=os.getenv('POSTGRES_DB'),
            user=os.getenv('POSTGRES_USER'),
            password=os.getenv('POSTGRES_PASSWORD'),
            host=os.getenv('POSTGRES_HOST'),
            port=os.getenv('POSTGRES_PORT')
        )
        post_cursor = post_conn.cursor()
        if environment == 'prod':
            schema = 'prod'
        else:
            schema = 'test'

        post_cursor.execute(f"SET search_path TO {schema};")


        ssms_conn = pymssql.connect(
            server=os.getenv('MSSQL_SERVER'),
            user=os.getenv('MSSQL_USER'),
            password=os.getenv('MSSQL_PASSWORD'),
            database=os.getenv('MSSQL_DB')
        )
        ssms_cursor = ssms_conn.cursor()

        return {
            'post_conn': post_conn,
            'post_cursor': post_cursor,
            'ssms_conn': ssms_conn,
            'ssms_cursor': ssms_cursor
        }
    except Exception as e:
        print(f"Error connecting to the database: {e}")
        return None


In [2]:
connections = get_connections()

if connections:
    post_conn = connections['post_conn']
    post_cursor = connections['post_cursor']
    ssms_conn = connections['ssms_conn']
    ssms_cursor = connections['ssms_cursor']

tables = [
  """CREATE TABLE IF NOT EXISTS weather_station (
    stadium_weather_station_id SERIAL PRIMARY KEY,
    stadium_weather_station_code VARCHAR(20),
    station_name VARCHAR(70),
    latitude NUMERIC(9,5),
    longitude NUMERIC(9,5),
    elevation NUMERIC(7,2)
  );""",
  """CREATE TABLE IF NOT EXISTS nfl_stadium (
    stadium_id SERIAL PRIMARY KEY,
    stadium_name VARCHAR(50) NOT NULL,
    stadium_city VARCHAR(50),
    stadium_state VARCHAR(20),
    stadium_open_year DATE,
    stadium_close_year DATE,
    stadium_type VARCHAR(15),
    stadium_surface VARCHAR(50),
    stadium_capacity INT,
    stadium_weather_type VARCHAR(10),
    stadium_weather_station_id INT,
    CONSTRAINT FK_nfl_stadium_weather_station_id
      FOREIGN KEY (stadium_weather_station_id)
      REFERENCES weather_station(stadium_weather_station_id)
  );""",
  """CREATE TABLE IF NOT EXISTS customer (
    customer_id SERIAL PRIMARY KEY,
    customer_first_name VARCHAR(50) NOT NULL,
    customer_last_name VARCHAR(50) NOT NULL,
    customer_age SMALLINT NOT NULL,
    customer_type VARCHAR(25) NOT NULL,
    customer_since DATE NOT NULL,
    customer_income INT,
    household_size SMALLINT,
    mode_color VARCHAR(10),
    source_customer_id INT
  );""",
  """CREATE TABLE IF NOT EXISTS teams (
    team_id SERIAL PRIMARY KEY,
    team_name VARCHAR(50) NOT NULL,
    team_name_short VARCHAR(25) NOT NULL,
    team_abv VARCHAR(4) NOT NULL,
    team_abv_pfr VARCHAR(4) NOT NULL,
    team_conference VARCHAR(4) NOT NULL,
    team_division VARCHAR(12),
    team_conference_pre2002 VARCHAR(4) NOT NULL,
    team_division_pre2002 VARCHAR(15)
  );""",
    """CREATE TABLE IF NOT EXISTS game (
      game_outcome_id SERIAL PRIMARY KEY,
      schedule_date DATE NOT NULL,
      schedule_season INT NOT NULL,
      schedule_week VARCHAR(12) NOT NULL,
      schedule_playoff BOOLEAN NOT NULL,
      team_id_home INT NOT NULL,
      score_home SMALLINT NOT NULL,
      team_id_away INT NOT NULL,
      score_away SMALLINT NOT NULL,
      winner_ou VARCHAR(6) NOT NULL,
      winner_line VARCHAR(8) NOT NULL,
      team_favored INT,
      favored_spread NUMERIC(3,1) NOT NULL,
      over_under_line NUMERIC(3,1) NOT NULL,
      stadium_id INT NOT NULL,
      stadium_neutral BOOLEAN NOT NULL,
      weather_temperature SMALLINT,
      weather_wind_mph SMALLINT,
      weather_humidity SMALLINT,
      weather_detail VARCHAR(40),
      CONSTRAINT FK_game_stadium_id
        FOREIGN KEY (stadium_id)
        REFERENCES nfl_stadium(stadium_id),
      CONSTRAINT FK_game_team_id_home
        FOREIGN KEY (team_id_home)
        REFERENCES teams(team_id),
      CONSTRAINT FK_game_team_id_away
        FOREIGN KEY (team_id_away)
        REFERENCES teams(team_id),
        CONSTRAINT FK_game_team_favored
        FOREIGN KEY (team_favored)
        REFERENCES teams(team_id)
    );""",
  """CREATE TABLE IF NOT EXISTS placed_bet (
    bet_id SERIAL PRIMARY KEY,
    customer_id INT NOT NULL,
    game_outcome_id INT NOT NULL,
    bet_amount SMALLINT NOT NULL,
    bet_result VARCHAR(4),
    commision_paid NUMERIC(8,2) NOT NULL,
    bet_on VARCHAR(40),
    bet_type VARCHAR(10),
    CONSTRAINT FK_placed_bet_customer_id
      FOREIGN KEY (customer_id)
      REFERENCES customer(customer_id),
    CONSTRAINT FK_placed_bet_game_outcome_id
      FOREIGN KEY (game_outcome_id)
      REFERENCES game(game_outcome_id)
  );"""
]

for i in tables:
  post_cursor.execute(i)

post_conn.commit()

In [3]:
# Populate Customer Table
connections = get_connections()

if connections:
    post_conn = connections['post_conn']
    post_cursor = connections['post_cursor']
    ssms_conn = connections['ssms_conn']
    ssms_cursor = connections['ssms_cursor']

log_file = os.path.join(log_folder, f"customer_upload_{datetime.now().strftime('%Y%m%d')}.log")
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w'  
)
    
query = """SELECT * FROM customer_table """

try:
    df = pd.read_sql(query, ssms_conn)
    df[['customer_first_name', 'customer_last_name']] = df['customer_name'].str.split(n=1, expand=True)
    df['customer_since'] = df['customer_since'].apply(lambda x: datetime(int(x), 1, 1).strftime('%Y-%m-%d'))

    df.drop('customer_name', axis=1, inplace=True)
    df.rename(columns={'customer_id': 'source_customer_id'}, inplace=True)

    db_column_order = [
        'customer_first_name',
        'customer_last_name',
        'customer_age',
        'customer_type',
        'customer_since',
        'customer_income',
        'household_size',
        'mode_color',
        'source_customer_id'
    ]
    
    df = df[db_column_order]

    temp_csv = 'customer_temp.csv'
    df.to_csv(temp_csv, index=False)

    with open(temp_csv, 'r') as file:
        post_cursor.copy_expert(
            """
            COPY customer (
                customer_first_name,
                customer_last_name,
                customer_age,
                customer_type,
                customer_since,
                customer_income,
                household_size,
                mode_color,
                source_customer_id
            )
            FROM STDIN WITH CSV HEADER
            """,
            file
        )
    post_conn.commit()
    logging.info("All customer data has been successfully inserted.")
    print("All Data Ingested Successfully!")

except Exception as e:
    logging.error(f"Error occurred during customer data ingestion: {e}")
    post_conn.rollback()

finally:
    if 'temp_csv' in locals() and os.path.exists(temp_csv):
        os.remove(temp_csv)
        logging.info(f"Temporary file '{temp_csv}' has been deleted.")

  df = pd.read_sql(query, ssms_conn)


All Data Ingested Successfully!


In [4]:
# Inserting Teams
connections = get_connections()

if connections:
    post_conn = connections['post_conn']
    post_cursor = connections['post_cursor']
    ssms_conn = connections['ssms_conn']
    ssms_cursor = connections['ssms_cursor']

for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

log_file = os.path.join(log_folder, f"teams_{datetime.now().strftime('%Y%m%d')}.log")
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w' 
)

csv_teams = 'nfl_teams.csv'
teams_df = pd.read_csv(csv_teams)

teams_df['team_id'] = teams_df['team_id'].replace({'LVR': 'LV'})

temp_csv = 'teams_temp.csv'
teams_df.to_csv(temp_csv, index=False)

try:
    with open(temp_csv, 'r') as file:
        post_cursor.copy_expert(
            """
            COPY teams (
                team_name,
                team_name_short,
                team_abv,
                team_abv_pfr,
                team_conference,
                team_division,
                team_conference_pre2002,
                team_division_pre2002
            )
            FROM STDIN WITH CSV HEADER
            """,
            file
        )
    post_conn.commit()
    logging.info("All team data has been successfully inserted.")
except Exception as e:
    logging.error(f"Error occurred during insertion: {e}")
    post_conn.rollback()
finally:
    if os.path.exists(temp_csv):
        os.remove(temp_csv)
        logging.info(f"Temporary file '{temp_csv}' has been deleted.")


In [5]:
# weather stations
connections = get_connections()

if connections:
    post_conn = connections['post_conn']
    post_cursor = connections['post_cursor']
    ssms_conn = connections['ssms_conn']
    ssms_cursor = connections['ssms_cursor']

for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

log_file = os.path.join(log_folder, f"weather_stations_{datetime.now().strftime('%Y%m%d')}.log")
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w'  
)

csv_stadiums = 'nfl_stadiums.csv'
temp_csv_file = 'temp_weather_station_upload.csv'

weather_df = pd.read_csv(csv_stadiums, encoding='latin1')
weather_df = weather_df.dropna(subset=['STATION']).drop_duplicates(subset=['STATION'])

bulk_data = []
error_rows = []

for index, row in weather_df.iterrows():
    try:
        post_cursor.execute(
            'SELECT count(*) FROM weather_station WHERE stadium_weather_station_code = %s;', 
            (row['STATION'],)
        )
        result = post_cursor.fetchone()

        if result[0] != 0:
            logging.error(f"Weather station {row['STATION']} already exists, skipping.")
            continue

        bulk_data.append([
            row['STATION'],  
            row['NAME'],    
            float(row['LATITUDE']),  
            float(row['LONGITUDE']),  
            float(row['ELEVATION'])  
        ])

    except Exception as e:
        logging.error(f"Error processing row {index}: {e} {row.to_dict()}")
        error_rows.append(row.to_dict())

with open(temp_csv_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['stadium_weather_station_code', 'station_name', 'latitude', 'longitude', 'elevation'])
    writer.writerows(bulk_data)

try:
    with open(temp_csv_file, 'r') as file:
        post_cursor.copy_expert(
            """COPY weather_station (
                stadium_weather_station_code, station_name, latitude, longitude, elevation
            ) FROM STDIN WITH CSV HEADER""",
            file
        )
    post_conn.commit()
    logging.info(f"Successfully inserted bulk data from {temp_csv_file}")
except Exception as e:
    post_conn.rollback()
    logging.error(f"Error during bulk insert: {e}")
finally:
    if os.path.exists(temp_csv_file):
        os.remove(temp_csv_file)
        logging.info(f"Temporary file '{temp_csv_file}' deleted.")


In [6]:
# stadium
connections = get_connections()

if connections:
    post_conn = connections['post_conn']
    post_cursor = connections['post_cursor']
    ssms_conn = connections['ssms_conn']
    ssms_cursor = connections['ssms_cursor']

for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

log_file = os.path.join(log_folder, f"stadium_{datetime.now().strftime('%Y%m%d')}.log")
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w' 
)

csv_stadiums = 'nfl_stadiums.csv'
bulk_data = []
error_rows = []

stadium_df = pd.read_csv(csv_stadiums, encoding='latin1')
stadium_df[['city', 'state']] = stadium_df['stadium_location'].str.split(', ', expand=True)


for index, row in stadium_df.iterrows():   
    try:
        post_cursor.execute(
                'select count(*) from nfl_stadium where stadium_name = %s;', 
                (row['stadium_name'],)
            )
        result = post_cursor.fetchone()

        if result[0] != 0:
            logging.error(f"This record already exists: {row.to_dict()}")
            continue
        
        stadium_open = None
        if pd.notna(row['stadium_open']):
            stadium_open = datetime.date(int(row['stadium_open']), 1, 1)

        stadium_close = None
        if pd.notna(row['stadium_close']):
            stadium_close = datetime.date(int(row['stadium_close']), 1, 1)

        stadium_capacity = None
        if pd.notna(row['stadium_capacity']):
            stadium_capacity = int(row['stadium_capacity'].replace(',', ''))


        stadium_weather_station_id = None
        if pd.notna(row['STATION']):
            post_cursor.execute(
                'SELECT stadium_weather_station_id FROM weather_station WHERE stadium_weather_station_code = %s;', 
                (row['STATION'],)
            )
            weather_result = post_cursor.fetchone()
            if weather_result:
                stadium_weather_station_id = weather_result[0]

        stadium_surface = None
        if pd.notna(row['stadium_surface']):
            stadium_surface = row['stadium_surface'].split(',')[0].strip()

        stadium_weather_type = None
        if pd.notna(row['stadium_weather_type']):
            stadium_weather_type = row['stadium_capacity']

        stadium_type = None
        if pd.notna(row['stadium_type']):
            stadium_type = row['stadium_type']

        bulk_data.append([
                stadium_weather_station_id,
                row['stadium_name'],
                row['city'],
                row['state'],
                stadium_open,
                stadium_close,
                stadium_type,
                stadium_surface,
                stadium_capacity,
                stadium_weather_type
            ])
        logging.info(f"Stadium {row['stadium_name']} is ready to be written to csv")

    except Exception as e:
        logging.error(f"Error processing row {index}: {e} {row.to_dict()}")
        error_rows.append(row.to_dict())

temp_csv_file = 'temp_nfl_stadium_upload.csv'

with open(temp_csv_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow([
        'stadium_weather_station_id', 'stadium_name', 'stadium_city', 
        'stadium_state', 'stadium_open_year', 'stadium_close_year', 
        'stadium_type', 'stadium_surface', 'stadium_capacity', 'stadium_weather_type'
    ])
    writer.writerows(bulk_data)
    logging.info(f"Successfully wrote all data to csv: {temp_csv_file}")


try:
    with open(temp_csv_file, 'r') as file:
        post_cursor.copy_expert(
            """COPY nfl_stadium (
                stadium_weather_station_id, stadium_name, stadium_city, 
                stadium_state, stadium_open_year, stadium_close_year, 
                stadium_type, stadium_surface, stadium_capacity, stadium_weather_type
            ) FROM STDIN WITH CSV HEADER""",
            file
        )
    post_conn.commit()
    logging.info(f"Successfully inserted bulk data from {temp_csv_file}")
    os.remove(temp_csv_file)
except Exception as e:
    post_conn.rollback()
    logging.error(f"Error during bulk insert: {e}")
finally:
    pass
    


In [7]:
# game

connections = get_connections()

if connections:
    post_conn = connections['post_conn']
    post_cursor = connections['post_cursor']
    ssms_conn = connections['ssms_conn']
    ssms_cursor = connections['ssms_cursor']

for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

log_file = os.path.join(log_folder, f"game_{datetime.now().strftime('%Y%m%d')}.log")
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w' 
)

csv_file = 'spread_scores-2.csv'
game = pd.read_csv(csv_file)
game = game[game['schedule_season'] >= 2015]

error_csv = 'failed_inserts.csv'
error_rows = []

nullable = ['weather_temperature', 'weather_wind_mph', 'weather_humidity', 'weather_detail', 'team_favorite_id']
game[nullable] = game[nullable].replace({np.nan: None})

game['schedule_date'] = pd.to_datetime(game['schedule_date'], format='%m/%d/%Y', errors='coerce')

def get_team_id(values):
    post_cursor.execute(
        'SELECT team_id FROM teams WHERE team_name = %s;',
        (values,)
    )
    team_id = post_cursor.fetchone()
    return team_id[0] if team_id else None

def get_team_favorite_id(values):
    if values == 'PICK':
        return None
    post_cursor.execute(
        'SELECT team_id FROM teams WHERE team_abv = %s;',
        (values,)
    )
    team_favorite_id = post_cursor.fetchone()
    return team_favorite_id[0] if team_favorite_id else None

def get_stadium_id(values):
    post_cursor.execute(
        'SELECT stadium_id FROM nfl_stadium WHERE stadium_name = %s;',
        (values,)
    )
    stadium_id = post_cursor.fetchone()

    if stadium_id == None:
        insert_stadium = (
            """insert into nfl_stadium (stadium_name) values (%s) returning stadium_id"""
        )
        post_cursor.execute(insert_stadium, (values,))
        stadium_id = post_cursor.fetchone()

    return stadium_id[0]

def calculate_betting_results(row):
    total_points = row['score_home'] + row['score_away']
    if total_points > row['over_under_line']:
        winner_ou = 'over'
    elif total_points < row['over_under_line']:
        winner_ou = 'under'
    else:
        winner_ou = 'push'

    # Spread
    margin = (
        row['score_home'] - row['score_away']
        if row['team_favorite_id'] == row['team_home']
        else row['score_away'] - row['score_home']
    )
    if margin > row['spread_favorite']:
        winner_line = 'favored'
    elif margin < row['spread_favorite']:
        winner_line = 'underdog'
    else:
        winner_line = 'push'

    return winner_ou, winner_line

def parse_int(values, allow_negative=False):
# allow_negative: True, allow negative integers; if False, return None for negatives.
    try:
        result = int(values)
        if not allow_negative and result < 0:
            return None
        return result
    except:
        return None

for index, row in game.iterrows():
    query = """select count(*) 
                from game 
                where team_id_home in (
                		select team_id
                		from teams 
                		where team_name = %s)
                	and schedule_date = %s"""
    post_cursor.execute(query, (row['team_home'], row['schedule_date']))
    duplicate_game = post_cursor.fetchone()

    if duplicate_game[0] > 0:
        logging.warning(f"game already exists for {row['team_home']} {row['schedule_date']}.")
        continue
    
    try:
        team_id_home = get_team_id(row['team_home'])
        team_id_away = get_team_id(row['team_away'])
        stadium_id = get_stadium_id(row['stadium'])
        team_id_favorite = get_team_favorite_id(row['team_favorite_id'])
        weather_temperature = parse_int(row['weather_temperature'], allow_negative=True)
        weather_wind = parse_int(row['weather_wind_mph'])
        weather_humidity = parse_int(row['weather_humidity'])

        weather_detail = None
        if pd.notna(row['weather_detail']):
            weather_detail = row['weather_detail']

        winner_ou, winner_line = calculate_betting_results(row)


        game_insert_query = """INSERT INTO game (schedule_date,
                                                schedule_season,
                                                schedule_week,
                                                schedule_playoff,
                                                team_id_home,
                                                score_home,
                                                team_id_away,
                                                score_away,
                                                winner_ou,
                                                winner_line,
                                                team_favored,
                                                favored_spread,
                                                over_under_line,
                                                stadium_id,
                                                stadium_neutral,
                                                weather_temperature,
                                                weather_wind_mph,
                                                weather_humidity,
                                                weather_detail)
                                  VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""

        post_cursor.execute(game_insert_query, (
            row['schedule_date'],
            row['schedule_season'],
            row['schedule_week'],
            row['schedule_playoff'],
            team_id_home,
            row['score_home'],
            team_id_away,
            row['score_away'],
            winner_ou,
            winner_line,
            team_id_favorite,
            row['spread_favorite'],
            row['over_under_line'],
            stadium_id,
            row['stadium_neutral'],
            weather_temperature,
            weather_wind,
            weather_humidity,
            weather_detail
        ))

        post_conn.commit()

        logging.info(f"Successfully uploaded record: {row.to_dict()}")

    except Exception as e:
        logging.error(f"Error processing row {index}: {e} {row.to_dict()}")
        error_rows.append(row.to_dict())
        post_conn.rollback()

post_conn.commit()


if error_rows:
    with open(error_csv, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = game.columns
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(error_rows)

logging.info("Game upload complete.")

if error_rows:
    logging.warning(f"Some rows failed. Check {error_csv} for details.")
    print('Some rows failed, check logging')
else:
    print('All rows imported with no failures.')

All rows imported with no failures.


In [8]:
# placed bet
connections = get_connections()

if connections:
    post_conn = connections['post_conn']
    post_cursor = connections['post_cursor']
    ssms_conn = connections['ssms_conn']
    ssms_cursor = connections['ssms_cursor']
    
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

log_file = os.path.join(log_folder, f"placed_bet_{datetime.now().strftime('%Y%m%d')}.log")
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w' 
)
# removed TEMP from it for now
temp_table = """CREATE TEMP TABLE IF NOT EXISTS placed_bet_staging (
    bet_amount SMALLINT NOT NULL,
    commission_paid NUMERIC(8,2),
    bet_on VARCHAR(40),
    bet_type VARCHAR(10),
    customer_name VARCHAR(50),
    game_id VARCHAR(30),
    source_customer_id INT,
    ingested BOOLEAN DEFAULT FALSE
);"""

post_cursor.execute(temp_table)

post_conn.commit()

query = """
with cte as (
SELECT 
    bet_amount,  
    CASE
        WHEN bet_amount <= 1000 THEN bet_amount * 0.1
        WHEN bet_amount <= 4000 THEN (1000 * 0.1) + ((bet_amount - 1000) * 0.08)
        ELSE (1000 * 0.1) + (3000 * 0.08) + ((bet_amount - 4000) * 0.06)
    END AS commission_paid,
    bet_on,
    CASE 
        WHEN bet_on IN ('under', 'over', 'push') THEN LOWER(bet_on)
        ELSE 'line' 
    END AS bet_type,
    ct.customer_name,
    game_id, 
    ct.customer_id
FROM betlog bl
INNER JOIN customer_table ct ON bl.customer_id = ct.customer_id
where bet_amount is not NULL 
	and bet_on is not NULL)
select * 
from cte 
where commission_paid is not NULL
	and customer_name is not NULL 
	and game_id is not NULL
	and customer_id is not NULL
;
"""

try:
    df = pd.read_sql(query, ssms_conn)
    df = df.dropna(how='all')

    csv_file = 'placed_bet_staging.csv'
    df.to_csv(csv_file, index=False)

    with open(csv_file, 'r') as file:
        next(file) 
        post_cursor.copy_expert(
            f"""
            COPY placed_bet_staging (
                bet_amount, commission_paid, bet_on, bet_type, customer_name, game_id, source_customer_id
            )
            FROM STDIN WITH CSV
            DELIMITER ','
            """,
            file
        )
        post_conn.commit()

    print("Ingestion Successfull")
    logging.info("Data successfully copied to PostgreSQL staging table.")
    os.remove(csv_file)
    logging.info(f"Temporary file '{csv_file}' deleted.")


except Exception as e:
    print(f"Error during data processing: {e}")
    logging.error(f"Error during data processing: {e}")

finally:
    pass

stage_to_prod = """
WITH 
TeamOccurrences AS (
    SELECT 
        t.team_id,
        t.team_name,
        t.team_abv,
        COUNT(*) AS occurrences,
        ROW_NUMBER() OVER (PARTITION BY t.team_name ORDER BY COUNT(*) DESC) AS row_num
    FROM 
        teams t
    INNER JOIN 
        game g 
        ON t.team_id = g.team_id_home
    WHERE 
        g.schedule_date >= NOW() - INTERVAL '1 year'
    GROUP BY 
        t.team_id, t.team_name),
game_data AS (
    SELECT 
        pbs.bet_amount,
        pbs.commission_paid,
        pbs.bet_on,
        pbs.bet_type,
        pbs.customer_name,
        pbs.game_id,
        pbs.source_customer_id,
        th.team_id AS home_team_id,
        th.team_abv AS home_team_abv,
        ta.team_id AS away_team_id,
        ta.team_abv AS away_team_abv,
        g.game_outcome_id,
        g.winner_line,
        g.schedule_date,
        g.score_home,
        g.score_away,
        c.customer_id,
        g.favored_spread
     FROM placed_bet_staging pbs
    FULL OUTER JOIN TeamOccurrences th
        ON th.team_abv = case when split_part(pbs.game_id, '-', 2) in ('JAC') then 'JAX' else split_part(pbs.game_id, '-', 2) end -- Home team abbreviation
        and th.row_num = 1
    FULL OUTER JOIN TeamOccurrences ta
        ON ta.team_abv = case when split_part(pbs.game_id, '-', 3) in ('JAC') then 'JAX' else split_part(pbs.game_id, '-', 3) end  -- Away team abbreviation
        and th.row_num = 1
--        and team
    full OUTER JOIN game g
        ON g.schedule_season = LEFT(pbs.game_id, 4)::INT -- Extract year
        AND CASE
        	WHEN schedule_week = 'Wildcard' THEN 19
        	WHEN schedule_week = 'Division' THEN 20
        	WHEN schedule_week = 'Conference' THEN 21
        	WHEN schedule_week = 'Superbowl' THEN 22
        	ELSE schedule_week::INT end = right(split_part(pbs.game_id, '-', 1),2)::INT -- Extract week
        AND g.team_id_home = th.team_id
        AND g.team_id_away = ta.team_id
    INNER JOIN customer c
        ON c.customer_id = pbs.source_customer_id
     where LEFT(split_part(pbs.game_id, '-', 1), 4)::INT > 2015
),
bet_outcomes AS (
    SELECT 
        gd.customer_id,
        gd.game_outcome_id,
        gd.bet_amount,
        CASE 
            WHEN gd.bet_type = 'line' AND gd.bet_on = gd.winner_line THEN 'win'
            WHEN gd.bet_type = 'line' AND gd.bet_on != gd.winner_line THEN 'lose'
            WHEN gd.bet_type = 'over' AND (gd.score_home + gd.score_away) > gd.favored_spread THEN 'win'
            WHEN gd.bet_type = 'over' AND (gd.score_home + gd.score_away) <= gd.favored_spread THEN 'lose'
            WHEN gd.bet_type = 'under' AND (gd.score_home + gd.score_away) < gd.favored_spread THEN 'win'
            WHEN gd.bet_type = 'under' AND (gd.score_home + gd.score_away) >= gd.favored_spread THEN 'lose'
            ELSE null
        END AS bet_result,
        gd.commission_paid,
        gd.bet_on,
        gd.bet_type
    FROM game_data gd
)
INSERT INTO placed_bet (
    customer_id,
    game_outcome_id,
    bet_amount,
    bet_result,
    commision_paid,
    bet_on,
    bet_type)
    SELECT 
    customer_id,
    game_outcome_id,
    bet_amount,
    bet_result,
    commission_paid as commision_paid ,
    bet_on,
    bet_type
FROM bet_outcomes;
"""

try:
    post_cursor.execute(stage_to_prod)
    post_conn.commit()

    print("successfully moved out of staging")
    logging.info('Inserting into prod successfully')
except Exception as e:
    print(f"Error executing query: {e}")
    logging.error(f"Error during relpication processing: {e}")

  df = pd.read_sql(query, ssms_conn)


Ingestion Successfull
successfully moved out of staging
