In [20]:
import pandas as pd
import os

# Set the path to the folder containing CSV files
folder_path = 'data_from_hdfs'

# List all files in the folder
csv_files = [file for file in os.listdir(folder_path) if file.endswith('.csv')]

# Initialize an empty DataFrame to store the merged data
df = pd.DataFrame()

# Loop through each CSV file and merge its content into the main DataFrame
for csv_file in csv_files:
    # Construct the full path to the CSV file
    csv_path = os.path.join(folder_path, csv_file)
    
    # Read the CSV file into a DataFrame
    current_df = pd.read_csv(csv_path)
    
    # Merge the current DataFrame into the main DataFrame
    df = pd.concat([df, current_df], ignore_index=True)

In [21]:
df.head()

Unnamed: 0,league,season,player_id,player_name,team_title,position,games,time,goals,assists,npg,xG_percentage,xA_percentage,npxG_percentage,shots,missed_shots,key_passes,yellow_cards,red_cards
0,bundesliga,2014-2015,356,Alexander Meier,Eintracht Frankfurt,F M S,26,2209,19,2,16,53.035558,4.370365,42.594077,80,61,22,0,0
1,bundesliga,2014-2015,227,Robert Lewandowski,Bayern Munich,F M S,31,2493,17,5,16,45.059526,11.807632,43.132842,104,87,32,0,0
2,bundesliga,2014-2015,392,Arjen Robben,Bayern Munich,D F M S,21,1681,17,7,15,38.789584,28.092463,33.117953,88,71,50,0,0
3,bundesliga,2014-2015,158,Bas Dost,Wolfsburg,F S,21,1532,16,4,16,44.753637,10.492727,44.753637,43,27,15,1,0
4,bundesliga,2014-2015,318,Pierre-Emerick Aubameyang,Borussia Dortmund,F M S,33,2724,16,6,14,43.181416,17.887656,38.930928,102,86,46,4,0


In [22]:
# Data Warehouse Preparation :
 
# Extracting necessary columns for each table
league = df[['league']]
season = df[['season']]
player = df[['player_id', 'player_name']]
team = df[['team_title']]
position = df[['position']]
playerstatistics = df[['games', 'time', 'goals', 'assists', 'npg', 'xG_percentage', 'xA_percentage', 'npxG_percentage', 'shots', 'missed_shots', 'key_passes', 'yellow_cards', 'red_cards']]

# Encoding categorical columns and creating unique IDs
league['league_id'] = league['league'].astype('category').cat.codes
season['season_id'] = season['season'].astype('category').cat.codes
team['team_id'] = team['team_title'].astype('category').cat.codes
position['position_id'] = position['position'].astype('category').cat.codes

# Generating factorized ID for playerstatistics
playerstatistics['pstatistics_id'] = pd.factorize(
    playerstatistics[
        ['games', 'time', 'goals', 'assists', 'npg', 'xG_percentage', 'xA_percentage',
         'npxG_percentage', 'shots', 'missed_shots', 'key_passes', 'yellow_cards', 'red_cards']
    ].apply(tuple, axis=1)
)[0]

# Merging IDs from other tables
playerstatistics['league_id'] = league['league_id']
playerstatistics['season_id'] = season['season_id']
playerstatistics['player_id'] = player['player_id']
playerstatistics['team_id'] = team['team_id']
playerstatistics['position_id'] = position['position_id']

# Change the column order using indexing on each DataFrame
league = league[['league_id', 'league']]
season = season[['season_id', 'season']]
team = team[['team_id', 'team_title']]
position = position[['position_id', 'position']]

desired_column_order = ['pstatistics_id', 'league_id', 'season_id', 'player_id', 'team_id', 'position_id',
                        'games', 'time', 'goals', 'assists', 'npg', 'xG_percentage', 'xA_percentage',
                         'npxG_percentage', 'shots', 'missed_shots', 'key_passes', 'yellow_cards', 'red_cards']
playerstatistics = playerstatistics[desired_column_order]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  league['league_id'] = league['league'].astype('category').cat.codes
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  season['season_id'] = season['season'].astype('category').cat.codes
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  team['team_id'] = team['team_title'].astype('category').cat.codes
A v

In [24]:
import pyodbc

# Establish a connection
connection = pyodbc.connect('DRIVER=SQL SERVER;SERVER=LAPTOP-1US3GU3J\SQLEXPRESS,1433;DATABASE=scd')

# Create a cursor to execute SQL queries
cursor = connection.cursor()

In [25]:
# Function to check if a table exists
def table_exists(cursor, table_name):
    cursor.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?", (table_name,))
    return cursor.fetchone()[0] > 0

# Function to check if an index exists
def index_exists(cursor, index_name):
    cursor.execute("SELECT COUNT(*) FROM sys.indexes WHERE name = ?", (index_name,))
    return cursor.fetchone()[0] > 0

# Create the league Dimension if it doesn't exist
if not table_exists(cursor, 'DimLeague'):
    cursor.execute('''
        CREATE TABLE [DimLeague] (
          [league_id] INT,
          [league] VARCHAR(255),
          PRIMARY KEY ([league_Id])
        );
    ''')

# Create the season Dimension if it doesn't exist
if not table_exists(cursor, 'DimSeason'):
    cursor.execute('''
        CREATE TABLE [DimSeason] (
          [season_id] INT,
          [season] VARCHAR(255),
          PRIMARY KEY ([season_id])
        );
    ''')

# Create the player Dimension if it doesn't exist
if not table_exists(cursor, 'DimPlayer'):
    cursor.execute('''
        CREATE TABLE [DimPlayer] (
          [player_id] INT,
          [player_name] VARCHAR(255),
          PRIMARY KEY ([player_id])
        );
    ''')

# Create the team Dimension if it doesn't exist
if not table_exists(cursor, 'DimTeam'):
    cursor.execute('''
        CREATE TABLE [DimTeam] (
          [team_id] INT,
          [team_title] VARCHAR(255),
          PRIMARY KEY ([team_id])
        );
    ''')

# Create the position Dimension if it doesn't exist
if not table_exists(cursor, 'DimPosition'):
    cursor.execute('''
        CREATE TABLE [DimPosition] (
          [position_id] INT,
          [position] VARCHAR(255),
          PRIMARY KEY ([position_id])
        );
    ''')

# Create the FactPlayerStatistics table with foreign key constraints if it doesn't exist
if not table_exists(cursor, 'FactPlayerStatistics'):
    cursor.execute('''
        CREATE TABLE [FactPlayerStatistics] (
          [pstatistics_id] INT PRIMARY KEY,
          [league_id] INT,
          [season_id] INT,
          [player_id] INT,
          [team_id] INT,
          [position_id] INT,
          [games] INT,
          [time] INT,
          [goals] INT,
          [assists] INT,
          [npg] INT,
          [xG_percentage] FLOAT,
          [xA_percentage] FLOAT,
          [npxG_percentage] FLOAT,
          [shots] INT,
          [missed_shots] INT,
          [key_passes] INT,
          [yellow_cards] INT,
          [red_cards] INT,
          FOREIGN KEY ([league_id]) REFERENCES [DimLeague]([league_id]),
          FOREIGN KEY ([season_id]) REFERENCES [DimSeason]([season_id]),
          FOREIGN KEY ([player_id]) REFERENCES [DimPlayer]([player_id]),
          FOREIGN KEY ([team_id]) REFERENCES [DimTeam]([team_id]),
          FOREIGN KEY ([position_id]) REFERENCES [DimPosition]([position_id])
        );
    ''')

# Commit the changes to the database
connection.commit()

# Create indexes for FactPlayerStatistics table if they don't exist
index_names = ['idx_goals', 'idx_time', 'idx_assists', 'idx_npg', 'idx_shots', 'idx_missed_shots', 'idx_key_passes', 'idx_yellow_cards', 'idx_red_cards']
for index_name in index_names:
    if not index_exists(cursor, index_name):
        cursor.execute(f'CREATE INDEX {index_name} ON FactPlayerStatistics({index_name[4:]});')

# Create indexes for DimPlayer table if they don't exist
if not index_exists(cursor, 'idx_player_name'):
    cursor.execute('CREATE INDEX idx_player_name ON DimPlayer(player_name);')

# Create indexes for DimTeam table if they don't exist
if not index_exists(cursor, 'idx_team_title'):
    cursor.execute('CREATE INDEX idx_team_title ON DimTeam(team_title);')

# Commit the changes to the database
connection.commit()

In [28]:
# Function to insert or update records in a table with SCD logic
def insert_update_scd(cursor, table_name, id_column, id_value, data_columns, data_values):
    cursor.execute(f'SELECT * FROM {table_name} WHERE {id_column} = ?', (id_value,))
    existing_row = cursor.fetchone()

    if existing_row:
        # If the row exists, update the record
        update_query = f'''
            UPDATE {table_name}
            SET {', '.join([f'{column} = ?' for column in data_columns])}
            WHERE {id_column} = ?;
        '''
        cursor.execute(update_query, data_values + [id_value])

        # Remove the older row
        cursor.execute(f'DELETE FROM {table_name} WHERE {id_column} = ?;', (id_value,))
    else:
        # Insert the record if it doesn't exist
        insert_query = f'''
            INSERT INTO {table_name} ({', '.join(data_columns)})
            VALUES ({', '.join(['?' for _ in data_columns])});
        '''
        cursor.execute(insert_query, data_values)

# Insert data into DimLeague table
for row in league.itertuples():
    insert_update_scd(cursor, 'DimLeague', 'league_id', row.league_id, ['league_id', 'league'], [row.league_id, row.league])

# Insert data into DimPlayer table
for row in player.itertuples():
    insert_update_scd(cursor, 'DimPlayer', 'player_id', row.player_id, ['player_id', 'player_name'], [row.player_id, row.player_name])

# Insert data into DimTeam table
for row in team.itertuples():
    insert_update_scd(cursor, 'DimTeam', 'team_id', row.team_id, ['team_id', 'team_title'], [row.team_id, row.team_title])

# Insert data into DimPosition table
for row in position.itertuples():
    insert_update_scd(cursor, 'DimPosition', 'position_id', row.position_id, ['position_id', 'position'], [row.position_id, row.position])

# Insert data into DimSeason table
for row in season.itertuples():
    insert_update_scd(cursor, 'DimSeason', 'season_id', row.season_id, ['season_id', 'season'], [row.season_id, row.season])  

# Commit the changes to the database
connection.commit()

In [None]:
# Now you can proceed with inserting into FactPlayerStatistics
for row in playerstatistics.itertuples():
    data_columns = [
        'pstatistics_id', 'league_id', 'season_id', 'player_id', 'team_id', 'position_id', 'games', 'time',
        'goals', 'assists', 'npg', 'xG_percentage', 'xA_percentage', 'npxG_percentage', 'shots',
        'missed_shots', 'key_passes', 'yellow_cards', 'red_cards'
    ]
    data_values = [
        row.pstatistics_id, row.league_id, row.season_id, row.player_id, row.team_id, row.position_id,
        row.games, row.time, row.goals, row.assists, row.npg, row.xG_percentage, row.xA_percentage,
        row.npxG_percentage, row.shots, row.missed_shots, row.key_passes, row.yellow_cards, row.red_cards
    ]
    insert_update_scd(cursor, 'FactPlayerStatistics', 'pstatistics_id', row.pstatistics_id, data_columns, data_values)

# Commit the changes to the database
connection.commit()

In [9]:
# Define the SQL query for Player Performance Data Mart View
player_performance_view_query = '''
CREATE VIEW PlayerPerformanceDataMart AS
SELECT
    fps.pstatistics_id,
    l.league,
    s.season,
    p.player_name,
    t.team_title,
    pos.position,
    fps.games,
    fps.time,
    fps.goals,
    fps.assists,
    fps.npg,
    fps.xG_percentage,
    fps.xA_percentage,
    fps.npxG_percentage,
    fps.shots,
    fps.missed_shots,
    fps.key_passes,
    fps.yellow_cards,
    fps.red_cards
FROM
    FactPlayerStatistics fps
    JOIN DimLeague l ON fps.league_id = l.league_id
    JOIN DimSeason s ON fps.season_id = s.season_id
    JOIN DimPlayer p ON fps.player_id = p.player_id
    JOIN DimTeam t ON fps.team_id = t.team_id
    JOIN DimPosition pos ON fps.position_id = pos.position_id;
'''

# Execute the query to create Player Performance Data Mart View
cursor.execute(player_performance_view_query)

# Commit the changes to the database
connection.commit()

ProgrammingError: ('42S01', "[42S01] [Microsoft][ODBC SQL Server Driver][SQL Server]There is already an object named 'PlayerPerformanceDataMart' in the database. (2714) (SQLExecDirectW)")

In [17]:
# Define the SQL query for Team Statistics Data Mart View without grouping by fps.pstatistics_id
team_statistics_view_query = '''
CREATE VIEW TeamStatisticsDataMart AS
SELECT
    l.league,
    s.season,
    t.team_title,
    SUM(fps.goals) AS total_goals,
    SUM(fps.assists) AS total_assists,
    AVG(fps.xG_percentage) AS avg_xG_percentage,
    AVG(fps.xA_percentage) AS avg_xA_percentage,
    SUM(fps.shots) AS total_shots,
    SUM(fps.missed_shots) AS total_missed_shots,
    SUM(fps.yellow_cards) AS total_yellow_cards,
    SUM(fps.red_cards) AS total_red_cards
FROM
    FactPlayerStatistics fps
    JOIN DimLeague l ON fps.league_id = l.league_id
    JOIN DimSeason s ON fps.season_id = s.season_id
    JOIN DimTeam t ON fps.team_id = t.team_id
GROUP BY
    l.league,
    s.season,
    t.team_title;
'''

# Execute the query to create Team Statistics Data Mart View
cursor.execute(team_statistics_view_query)

# Commit the changes to the database
connection.commit()

In [11]:
# # Create server-level logins with passwords
# cursor.execute("CREATE LOGIN DataEngineerLogin WITH PASSWORD = 'DataEngineer2004';")
# cursor.execute("CREATE LOGIN DataAnalystLogin WITH PASSWORD = 'DataAnalyst2004';")

# Create database users
cursor.execute("CREATE USER DataEngineerUser FOR LOGIN DataEngineerLogin;")
cursor.execute("CREATE USER DataAnalystUser FOR LOGIN DataAnalystLogin;")

# Create database roles for Data Engineer and Data Analyst
cursor.execute("CREATE ROLE DataEngineerRole;")
cursor.execute("CREATE ROLE DataAnalystRole;")

# Add users to their respective roles
cursor.execute("ALTER ROLE DataEngineerRole ADD MEMBER DataEngineerUser;")
cursor.execute("ALTER ROLE DataAnalystRole ADD MEMBER DataAnalystUser;")

# Grant permissions to roles
# Adjust permissions based on your specific requirements

# DimLeague
cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON [DimLeague] TO DataEngineerRole;")
cursor.execute("GRANT SELECT ON [DimLeague] TO DataAnalystRole;")

# DimSeason
cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON [DimSeason] TO DataEngineerRole;")
cursor.execute("GRANT SELECT ON [DimSeason] TO DataAnalystRole;")

# DimPlayer
cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON [DimPlayer] TO DataEngineerRole;")
cursor.execute("GRANT SELECT ON [DimPlayer] TO DataAnalystRole;")

# DimTeam
cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON [DimTeam] TO DataEngineerRole;")
cursor.execute("GRANT SELECT ON [DimTeam] TO DataAnalystRole;")

# DimPosition
cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON [DimPosition] TO DataEngineerRole;")
cursor.execute("GRANT SELECT ON [DimPosition] TO DataAnalystRole;")

# FactPlayerStatistics
cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON [FactPlayerStatistics] TO DataEngineerRole;")
cursor.execute("GRANT SELECT ON [FactPlayerStatistics] TO DataAnalystRole;")

# Commit the changes to the database
connection.commit()

In [20]:
cursor.close()
connection.close()