# myDataLoading: Loading DIM Tables Data into PostgreSQL DB

In [None]:
import boto3
import pandas as pd
import logging
import os
import psycopg2
import io
import numpy as np

# Initialize the S3 client
s3_client = boto3.client('s3')

# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Fetch database connection details from environment variables
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')

def lambda_handler(event, context):
    try:
        # Define S3 bucket and file paths for each table
        bucket_name = 'datalakepartition2'
        keys = {
            'dimplayer': 'dataframes/dimplayer.csv',
            'dimteams': 'dataframes/dimteams.csv',
            'dimvenue': 'dataframes/dimvenue.csv',
            'dimmatches': 'dataframes/dimmatches.csv'
        }

        # Process and insert each CSV file into its respective table
        for table_name, s3_key in keys.items():
            df = load_csv_from_s3(bucket_name, s3_key)
            logger.info(f"Loaded data for '{table_name}' from S3 key '{s3_key}':\n{df.head()}")

            # Process the DataFrame according to the table
            df = process_dataframe(df, table_name)

            # Insert the DataFrame into the SQL table
            insert_dataframe_to_sql(df, table_name)
        
        return {
            'statusCode': 200,
            'body': 'Data processing and SQL insertion for all tables completed successfully.'
        }

    except Exception as e:
        logger.error(f"Error processing data: {str(e)}")
        return {
            'statusCode': 500,
            'body': f"Error processing data: {str(e)}"
        }

def load_csv_from_s3(bucket_name, s3_key):
    try:
        # Load a CSV file from S3 and return as a DataFrame
        logger.info(f"Loading file '{s3_key}' from bucket '{bucket_name}'")
        csv_obj = s3_client.get_object(Bucket=bucket_name, Key=s3_key)
        body = csv_obj['Body']
        csv_string = body.read().decode('utf-8')
        return pd.read_csv(io.StringIO(csv_string))
    except Exception as e:
        logger.error(f"Error loading CSV from S3: {str(e)}")
        raise

def process_dataframe(df, table_name):
    # Replace empty strings with NaN
    df = df.replace(r'^\s*$', np.nan, regex=True)
    logger.info(f"Initial data statistics for '{table_name}':\n{df.describe(include='all')}")

    # Define primary keys and data types based on table names
    primary_keys = {
        'dimplayer': 'player_id',
        'dimteams': 'team_id',
        'dimvenue': 'venue_id',
        'dimmatches': 'fixture_id'
    }

    primary_key = primary_keys[table_name]
    
    if primary_key not in df.columns:
        logger.error(f"Primary key column '{primary_key}' not found in table '{table_name}'")
        raise KeyError(f"Primary key column '{primary_key}' not found in table '{table_name}'")
    
    if table_name in ['dimplayer', 'dimteams', 'dimvenue']:
        df[primary_key] = validate_and_convert_int(df[primary_key], fill_value=0)
        df = df.drop_duplicates(subset=primary_key, keep='first').dropna()

    elif table_name == 'dimmatches':
        df['fixture_id'] = validate_and_convert_int(df['fixture_id'], fill_value=0)
        df['home_team_id'] = validate_and_convert_int(df['home_team_id'], fill_value=0)
        df['away_team_id'] = validate_and_convert_int(df['away_team_id'], fill_value=0)
        df['home_goals'] = validate_and_convert_float(df['home_goals'], fill_value=0.0)
        df['away_goals'] = validate_and_convert_float(df['away_goals'], fill_value=0.0)
        df['home_winner'] = validate_and_convert_float(df['home_winner'], fill_value=0.0)
        df['away_winner'] = validate_and_convert_float(df['away_winner'], fill_value=0.0)
        df['venue_id'] = validate_and_convert_int(df['venue_id'], fill_value=0)  # Add the venue_id conversion

        # Convert timestamp column to datetime
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s', errors='coerce')
        df = df.drop_duplicates(subset='fixture_id', keep='first').dropna()

    logger.info(f"Cleaned data statistics for '{table_name}':\n{df.describe(include='all')}")
    logger.info(f"Cleaned DataFrame for '{table_name}':\n{df}")
    return df

def validate_and_convert_int(series, fill_value=0):
    """
    Validate and convert a pandas series to integers.
    """
    logger.info(f"Validating and converting integer series. Initial data:\n{series.head()}")
    series = pd.to_numeric(series, errors='coerce').fillna(fill_value).astype(int)
    return series

def validate_and_convert_float(series, fill_value=0.0):
    """
    Validate and convert a pandas series to floats.
    """
    logger.info(f"Validating and converting float series. Initial data:\n{series.head()}")
    series = pd.to_numeric(series, errors='coerce').fillna(fill_value).astype(float)
    return series

def insert_dataframe_to_sql(df, table_name):
    try:
        # Establish a connection to the PostgreSQL database
        conn = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )
        cursor = conn.cursor()

        # Prepare the SQL insert statements for each table
        insert_queries = {
            'dimplayer': "INSERT INTO dimplayer (player_id, player_name) VALUES (%s, %s) ON CONFLICT (player_id) DO NOTHING;",
            'dimteams': "INSERT INTO dimteams (team_id, team_name) VALUES (%s, %s) ON CONFLICT (team_id) DO NOTHING;",
            'dimvenue': "INSERT INTO dimvenue (venue_id, venue_name, venue_city) VALUES (%s, %s, %s) ON CONFLICT (venue_id) DO NOTHING;",
            'dimmatches': """
                INSERT INTO dimmatches (fixture_id, referee, date, time, timestamp, status_short,
                                        home_team_id, home_team_name, home_winner, home_goals,
                                        away_team_id, away_team_name, away_winner, away_goals, venue_id)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (fixture_id) DO NOTHING;
            """
        }

        insert_query = insert_queries[table_name]
        
        # Iterate over each row in the DataFrame and insert into the table
        for row in df.itertuples(index=False):
            cursor.execute(insert_query, row)
        
        # Commit the transaction
        conn.commit()
        logger.info(f"All rows inserted into '{table_name}' successfully.")
        
        # Close the cursor and connection
        cursor.close()
        conn.close()

    except Exception as e:
        logger.error(f"Error inserting data into SQL table '{table_name}': {str(e)}")
        raise


# myDataLoading2: Storing Stats Data into a easily loadable DataFrame

In [None]:
import boto3
import pandas as pd
import io
import logging

# Initialize the S3 client
s3_client = boto3.client('s3')

# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    # Define S3 bucket and file suffixes
    bucket_name = 'datalakepartition2'
    suffix_statistics_fixture_all = 'statistics_fixtures_all.csv'
    suffix_statistics_players_all = 'statistics_players_all.csv'
    destination_folder = 'dataframes/'

    try:
        # Load and process files with the specified suffixes
        stats_fixture_all_df = load_and_merge_files(bucket_name, suffix_statistics_fixture_all)
        stats_players_all_df = load_and_merge_files(bucket_name, suffix_statistics_players_all)

        # Log DataFrame columns for debugging
        logger.info(f"Columns in stats_fixture_all_df: {stats_fixture_all_df.columns.tolist()}")
        logger.info(f"Columns in stats_players_all_df: {stats_players_all_df.columns.tolist()}")

        # Process each DataFrame
        factteamsstats_df = process_factteamsstats(stats_fixture_all_df)
        factplayerstats_df = process_factplayerstats(stats_players_all_df)

        # Remove duplicates from DataFrames
        factteamsstats_df = remove_duplicates(factteamsstats_df, subset=['fixture_id', 'team_id'])
        factplayerstats_df = remove_duplicates(factplayerstats_df, subset=['fixture_id', 'team_id', 'player_id'])

        # Save processed DataFrames to S3
        save_df_to_s3(bucket_name, factteamsstats_df, f'{destination_folder}factteamsstats.csv')
        save_df_to_s3(bucket_name, factplayerstats_df, f'{destination_folder}factplayerstats.csv')

        return {
            'statusCode': 200,
            'body': 'Data processing and CSV generation completed successfully.'
        }

    except Exception as e:
        logger.error(f"Error processing data: {str(e)}")
        return {
            'statusCode': 500,
            'body': f"Error processing data: {str(e)}"
        }

def load_and_merge_files(bucket_name, suffix):
    # List all files with the specified suffix
    response = s3_client.list_objects_v2(Bucket=bucket_name)
    files = [item['Key'] for item in response.get('Contents', []) if item['Key'].endswith(suffix)]
    
    logger.info(f"Files found with suffix '{suffix}': {files}")
    
    if not files:
        logger.warning(f"No files found with suffix '{suffix}' in bucket '{bucket_name}'.")
        return pd.DataFrame()  # Return an empty DataFrame if no files found
    
    # Load and concatenate all matching files
    dataframes = [load_csv_from_s3(bucket_name, file) for file in files]
    if not dataframes:
        logger.warning(f"No data loaded for suffix '{suffix}'. Returning an empty DataFrame.")
        return pd.DataFrame()
    
    merged_df = pd.concat(dataframes, ignore_index=True)
    return merged_df

def load_csv_from_s3(bucket_name, file_key):
    # Load a CSV file from S3 and return as a DataFrame
    logger.info(f"Loading file '{file_key}' from bucket '{bucket_name}'")
    csv_obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    body = csv_obj['Body']
    csv_string = body.read().decode('utf-8')
    df = pd.read_csv(io.StringIO(csv_string), dtype=str)

    # Log the columns of the loaded DataFrame
    logger.info(f"Loaded DataFrame columns from file '{file_key}': {df.columns.tolist()}")

    return df

def save_df_to_s3(bucket_name, df, file_key):
    # Convert DataFrame to CSV and upload to S3
    if df.empty:
        logger.warning(f"DataFrame is empty. Skipping upload for '{file_key}'.")
        return
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index=False)
    s3_client.put_object(Bucket=bucket_name, Key=file_key, Body=csv_buffer.getvalue())
    logger.info(f"DataFrame saved to '{file_key}' in bucket '{bucket_name}'")

def remove_duplicates(df, subset=None):
    # Remove duplicates from the DataFrame based on the given subset
    return df.drop_duplicates(subset=subset)

def process_factteamsstats(df):
    if df.empty:
        return pd.DataFrame()
    
    columns_mapping = {
        'fixtureID': 'fixture_id',
        'team_Id': 'team_id',
        'Shots on Goal': 'shots_on_goal',
        'Shots off Goal': 'shots_off_goal',
        'Blocked Shots': 'blocked_shots',
        'Shots insidebox': 'shots_insidebox',
        'Shots outsidebox': 'shots_outsidebox',
        'Fouls': 'fouls',
        'Corner Kicks': 'corner_kicks',
        'Offsides': 'offsides',
        'Ball Possession': 'ball_possession',
        'Yellow Cards': 'yellow_cards',
        'Red Cards': 'red_cards',
        'Goalkeeper Saves': 'goalkeeper_saves',
        'Total passes': 'total_passes',
        'Passes accurate': 'passes_accurate',
        'Passes %': 'passes_percentage'
    }

    # Check if all required columns are present
    missing_columns = [col for col in columns_mapping.keys() if col not in df.columns]
    if missing_columns:
        logger.error(f"Missing columns in stats_fixture_all_df: {missing_columns}")
        raise ValueError(f"Missing columns in stats_fixture_all_df: {missing_columns}")

    # Rename columns according to the mapping
    df = df[list(columns_mapping.keys())].rename(columns=columns_mapping)

    # Replace 'nan' string with actual NaN
    df.replace('nan', pd.NA, inplace=True)

    # Convert columns to appropriate types
    df['fixture_id'] = df['fixture_id'].astype(int)
    df['team_id'] = df['team_id'].astype(int)
    for col in ['shots_on_goal', 'shots_off_goal', 'blocked_shots', 'shots_insidebox', 'shots_outsidebox',
                'fouls', 'corner_kicks', 'offsides', 'yellow_cards', 'red_cards', 'goalkeeper_saves', 'total_passes', 'passes_accurate']:
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
    for col in ['ball_possession', 'passes_percentage']:
        df[col] = df[col].str.replace('%', '').astype(float) / 100.0

    return df

def process_factplayerstats(df):
    if df.empty:
        return pd.DataFrame()

    # Updated columns mapping to match the provided CSV file
    columns_mapping = {
        'fixtureID': 'fixture_id',
        'team_Id': 'team_id',
        'player_id': 'player_id',
        'minutes': 'player_statistics_games_minutes',
        'number': 'player_statistics_games_number',
        'rating': 'games_rating',
        'offsides': 'games_offside',
        'goals_total': 'goals_total',
        'goals_conceded': 'goals_conceded',
        'goals_assists': 'goals_assists',
        'passes_total': 'passes_total',
        'passes_accuracy': 'passes_accuracy',
        'tackles_total': 'tackles_total',
        'duels_total': 'duels_total',
        'fouls_committed': 'fouls_committed'
    }

    # Check if all required columns are present
    missing_columns = [col for col in columns_mapping.keys() if col not in df.columns]
    if missing_columns:
        logger.error(f"Missing columns in stats_players_all_df: {missing_columns}")
        raise ValueError(f"Missing columns in stats_players_all_df: {missing_columns}")

    # Select and rename columns according to the mapping
    df = df[list(columns_mapping.keys())].rename(columns=columns_mapping)

    # Replace 'nan' string with actual NaN
    df.replace('nan', pd.NA, inplace=True)

    # Convert columns to appropriate types
    try:
        df['fixture_id'] = pd.to_numeric(df['fixture_id'], errors='coerce').fillna(0).astype(int)
        df['team_id'] = pd.to_numeric(df['team_id'], errors='coerce').fillna(0).astype(int)
        df['player_id'] = pd.to_numeric(df['player_id'], errors='coerce').fillna(0).astype(int)
        
        # Convert numeric columns, handling NaNs appropriately
        numeric_cols = ['player_statistics_games_minutes', 'player_statistics_games_number', 'games_rating', 'games_offside', 
                        'goals_total', 'goals_conceded', 'goals_assists', 'passes_total', 'passes_accuracy', 'tackles_total', 
                        'duels_total', 'fouls_committed']
        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)

    except Exception as e:
        logger.error(f"Error converting factplayerstats columns: {str(e)}")

    return df


# myDataLoading3: Loading Stats Data into DB

In [None]:
import boto3
import pandas as pd
import psycopg2
import psycopg2.extras
import io
import logging
import os

# Initialize the S3 client
s3_client = boto3.client('s3')

# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Database connection details from environment variables
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_PORT = int(os.getenv('DB_PORT', 5432))  # Default to 5432 if DB_PORT is not set

# Hard-coded database connection details. - TESTDATABASE
# DB_HOST = 'database-3.cto9dqorm6ji.us-east-1.rds.amazonaws.com'
# DB_NAME = 'databasetest'
# DB_USER = 'postgres'
# DB_PASSWORD = 'HSLU2024,rai!'
# DB_PORT = 5432

def lambda_handler(event, context):
    # Define S3 bucket and file paths
    bucket_name = 'datalakepartition2'
    file_key_teams = 'dataframes/factteamsstats.csv'
    file_key_players = 'dataframes/factplayerstats.csv'

    connection = None
    try:
        # Load the CSV files from S3
        teams_df = load_csv_from_s3(bucket_name, file_key_teams)
        players_df = load_csv_from_s3(bucket_name, file_key_players)

        # Log DataFrame columns for debugging
        logger.info(f"Columns in teams_df: {teams_df.columns.tolist()}")
        logger.info(f"Columns in players_df: {players_df.columns.tolist()}")

        # Convert data types for teams DataFrame
        teams_df = convert_teams_df_types(teams_df)
        players_df = convert_players_df_types(players_df)

        # Connect to the PostgreSQL database
        connection = connect_to_db()
        cursor = connection.cursor()

        # Load data into SQL database
        load_data_to_sql(teams_df, cursor, 'fact_teams_stats')
        load_data_to_sql(players_df, cursor, 'fact_player_stats')

        # Commit the transaction
        connection.commit()

        logger.info("Data loaded into the SQL database successfully.")
        return {
            'statusCode': 200,
            'body': 'Data loaded into the SQL database successfully.'
        }

    except Exception as e:
        logger.error(f"Error loading data into the SQL database: {str(e)}")
        if connection:
            connection.rollback()  # Rollback in case of error
        return {
            'statusCode': 500,
            'body': f"Error loading data into the SQL database: {str(e)}"
        }

    finally:
        if connection:
            cursor.close()
            connection.close()

def load_csv_from_s3(bucket_name, file_key):
    """Load a CSV file from S3 and return as a DataFrame."""
    logger.info(f"Loading file '{file_key}' from bucket '{bucket_name}'")
    csv_obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    body = csv_obj['Body']
    csv_string = body.read().decode('utf-8')
    df = pd.read_csv(io.StringIO(csv_string), dtype=str)  # Read as strings to handle all types
    return df

def connect_to_db():
    """Connect to the PostgreSQL database using hard-coded details."""
    try:
        connection = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )
        logger.info("Successfully connected to the database")
        return connection
    except Exception as e:
        logger.error(f"Error connecting to the database: {str(e)}")
        raise

def load_data_to_sql(df, cursor, table_name):
    """Load data from DataFrame into SQL table."""
    if df.empty:
        logger.warning(f"The DataFrame for table '{table_name}' is empty. Skipping load.")
        return
    
    # Prepare SQL query for inserting data
    columns = df.columns.tolist()
    columns_str = ', '.join(columns)
    values_str = ', '.join(['%s'] * len(columns))

    insert_query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str})"

    # Log the SQL query for debugging
    logger.info(f"Insert query for table '{table_name}': {insert_query}")

    # Convert DataFrame rows to list of tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Execute the SQL query using batch insert for efficiency
    try:
        psycopg2.extras.execute_batch(cursor, insert_query, data_tuples)
        logger.info(f"Loaded data into table '{table_name}' successfully")
    except Exception as e:
        logger.error(f"Error executing SQL insert for table '{table_name}': {str(e)}")
        raise

def convert_teams_df_types(df):
    """Convert data types for the teams DataFrame."""
    try:
        df['fixture_id'] = df['fixture_id'].astype(int)
        df['team_id'] = df['team_id'].astype(int)
        df['shots_on_goal'] = df['shots_on_goal'].apply(lambda x: int(float(x) if x else 0))
        df['shots_off_goal'] = df['shots_off_goal'].apply(lambda x: int(float(x) if x else 0))
        df['blocked_shots'] = df['blocked_shots'].apply(lambda x: int(float(x) if x else 0))
        df['shots_insidebox'] = df['shots_insidebox'].apply(lambda x: int(float(x) if x else 0))
        df['shots_outsidebox'] = df['shots_outsidebox'].apply(lambda x: int(float(x) if x else 0))
        df['fouls'] = df['fouls'].apply(lambda x: int(float(x) if x else 0))
        df['corner_kicks'] = df['corner_kicks'].apply(lambda x: int(float(x) if x else 0))
        df['offsides'] = df['offsides'].apply(lambda x: int(float(x) if x else 0))
        df['ball_possession'] = df['ball_possession'].astype(float)
        df['yellow_cards'] = df['yellow_cards'].apply(lambda x: int(float(x) if x else 0))
        df['red_cards'] = df['red_cards'].apply(lambda x: int(float(x) if x else 0))
        df['goalkeeper_saves'] = df['goalkeeper_saves'].apply(lambda x: int(float(x) if x else 0))
        df['total_passes'] = df['total_passes'].apply(lambda x: int(float(x) if x else 0))
        df['passes_accurate'] = df['passes_accurate'].apply(lambda x: int(float(x) if x else 0))
        df['passes_percentage'] = df['passes_percentage'].astype(float)
    except Exception as e:
        logger.error(f"Error converting data types for teams DataFrame: {str(e)}")
        raise

    return df

def convert_players_df_types(df):
    """Convert data types for the players DataFrame."""
    try:
        df['fixture_id'] = df['fixture_id'].astype(int)
        df['team_id'] = df['team_id'].astype(int)
        df['player_id'] = df['player_id'].astype(int)
        df['player_statistics_games_minutes'] = df['player_statistics_games_minutes'].astype(float)
        df['player_statistics_games_number'] = df['player_statistics_games_number'].apply(lambda x: int(float(x) if x else 0))
        df['games_rating'] = df['games_rating'].astype(float)
        df['games_offside'] = df['games_offside'].apply(lambda x: int(float(x) if x else 0))
        df['goals_total'] = df['goals_total'].apply(lambda x: int(float(x) if x else 0))
        df['goals_conceded'] = df['goals_conceded'].apply(lambda x: int(float(x) if x else 0))
        df['goals_assists'] = df['goals_assists'].apply(lambda x: int(float(x) if x else 0))
        df['passes_total'] = df['passes_total'].apply(lambda x: int(float(x) if x else 0))
        df['passes_accuracy'] = df['passes_accuracy'].astype(float)
        df['tackles_total'] = df['tackles_total'].apply(lambda x: int(float(x) if x else 0))
        df['duels_total'] = df['duels_total'].apply(lambda x: int(float(x) if x else 0))
        df['fouls_committed'] = df['fouls_committed'].apply(lambda x: int(float(x) if x else 0))
    except Exception as e:
        logger.error(f"Error converting data types for players DataFrame: {str(e)}")
        raise

    return df


# myDataLoading4: Storing DIM TABLES Data into DF 

In [1]:
import boto3
import pandas as pd
import io
import logging

# Initialize the S3 client
s3_client = boto3.client('s3')

# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    # Define S3 bucket and file suffixes
    bucket_name = 'datalakepartition2'
    suffix_fixtures = 'fixtures.csv'
    suffix_statistics_fixture_all = 'statistics_fixtures_all.csv'
    suffix_statistics_players_all = 'statistics_players_all.csv'
    destination_folder = 'dataframes/'

    try:
        # Load and process files with the specified suffixes
        fixtures_df = load_and_merge_files(bucket_name, suffix_fixtures)
        stats_fixture_all_df = load_and_merge_files(bucket_name, suffix_statistics_fixture_all)
        stats_players_all_df = load_and_merge_files(bucket_name, suffix_statistics_players_all)

        # Process each DataFrame
        dimmatches_df = process_dimmatches(fixtures_df)
        dimvenue_df = process_dimvenue(fixtures_df)
        dimteams_df = process_dimteams(stats_fixture_all_df)
        dimplayer_df = process_dimplayer(stats_players_all_df)

        # Remove duplicates from DataFrames
        dimmatches_df = remove_duplicates(dimmatches_df)
        dimvenue_df = remove_duplicates(dimvenue_df)
        dimteams_df = remove_duplicates(dimteams_df)
        dimplayer_df = remove_duplicates(dimplayer_df)

        # Save processed DataFrames to S3
        save_df_to_s3(bucket_name, dimmatches_df, f'{destination_folder}dimmatches.csv')
        save_df_to_s3(bucket_name, dimvenue_df, f'{destination_folder}dimvenue.csv')
        save_df_to_s3(bucket_name, dimteams_df, f'{destination_folder}dimteams.csv')
        save_df_to_s3(bucket_name, dimplayer_df, f'{destination_folder}dimplayer.csv')

        return {
            'statusCode': 200,
            'body': 'Data processing and upload completed successfully.'
        }

    except Exception as e:
        logger.error(f"Error processing data: {str(e)}")
        return {
            'statusCode': 500,
            'body': f"Error processing data: {str(e)}"
        }

def load_and_merge_files(bucket_name, suffix):
    # List all files with the specified suffix
    response = s3_client.list_objects_v2(Bucket=bucket_name)
    files = [item['Key'] for item in response.get('Contents', []) if item['Key'].endswith(suffix)]
    
    logger.info(f"Files found with suffix '{suffix}': {files}")
    
    if not files:
        logger.warning(f"No files found with suffix '{suffix}' in bucket '{bucket_name}'.")
        return pd.DataFrame()  # Return an empty DataFrame if no files found
    
    # Load and concatenate all matching files
    dataframes = [load_csv_from_s3(bucket_name, file) for file in files]
    if not dataframes:
        logger.warning(f"No data loaded for suffix '{suffix}'. Returning an empty DataFrame.")
        return pd.DataFrame()
    
    merged_df = pd.concat(dataframes, ignore_index=True)
    return merged_df

def load_csv_from_s3(bucket_name, file_key):
    # Load a CSV file from S3 and return as a DataFrame
    logger.info(f"Loading file '{file_key}' from bucket '{bucket_name}'")
    csv_obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    body = csv_obj['Body']
    csv_string = body.read().decode('utf-8')
    return pd.read_csv(io.StringIO(csv_string))

def save_df_to_s3(bucket_name, df, file_key):
    # Convert DataFrame to CSV and upload to S3
    if df.empty:
        logger.warning(f"DataFrame is empty. Skipping upload for '{file_key}'.")
        return
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index=False)
    s3_client.put_object(Bucket=bucket_name, Key=file_key, Body=csv_buffer.getvalue())
    logger.info(f"DataFrame saved to '{file_key}' in bucket '{bucket_name}'")

def remove_duplicates(df):
    # Remove duplicates from the DataFrame
    return df.drop_duplicates()

def process_dimmatches(df):
    # Transform fixtures DataFrame into dimmatches DataFrame including venue_id
    if df.empty:
        return pd.DataFrame()
    return df[['fixture_id', 'referee', 'date', 'time', 'timestamp', 'status_short',
               'home_team_id', 'home_team_name', 'home_winner', 'home_goals',
               'away_team_id', 'away_team_name', 'away_winner', 'away_goals',
               'venue_id']]

def process_dimvenue(df):
    # Transform fixtures DataFrame into dimvenue DataFrame
    if df.empty:
        return pd.DataFrame()
    return df[['venue_id', 'venue_name', 'venue_city']]

def process_dimteams(df):
    # Transform statistics_fixture_all DataFrame into dimteams DataFrame
    if df.empty:
        return pd.DataFrame()
    return df[['team_Id', 'team_name']].rename(columns={'team_Id': 'team_id'})

def process_dimplayer(df):
    # Transform statistics_players_all DataFrame into dimplayer DataFrame
    if df.empty:
        return pd.DataFrame()
    return df[['player_id', 'player_name']]


# myDataLoading5: Loading Forecast16d and historic weather data into DB

In [None]:
import boto3
import pandas as pd
import io
import logging
import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_batch
import os

# Initialize the S3 client
s3_client = boto3.client('s3')

# Database connection details from environment variables
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_PORT = int(os.getenv('DB_PORT', 5432))

# PostgreSQL connection details - TESTDATABASE
# DB_HOST = 'database-3.cto9dqorm6ji.us-east-1.rds.amazonaws.com'
# DB_NAME = 'databasetest'
# DB_USER = 'postgres'
# DB_PASSWORD = 'HSLU2024,rai!'
# DB_PORT = 5432

# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    # Define S3 bucket and file suffixes
    bucket_name = 'datalakepartition2'
    file_mappings = {
        'predictions16days': 'factweatherFC16d',
        'historic_weather': 'factweather'
    }

    try:
        # Load and process each type of file
        for source_file, dest_table in file_mappings.items():
            suffix = f'{source_file}.csv'
            source_df = load_and_merge_files(bucket_name, suffix)
            if not source_df.empty:
                processed_df = process_dataframe(source_file, source_df)
                
                # Remove duplicates and empty values
                processed_df = processed_df.drop_duplicates().dropna()

                # Save DataFrame to PostgreSQL
                save_df_to_postgres(processed_df, dest_table)
            else:
                logger.warning(f"No data processed for file type '{source_file}'")

        return {
            'statusCode': 200,
            'body': 'Data processing and upload completed successfully.'
        }

    except Exception as e:
        logger.error(f"Error processing data: {str(e)}")
        return {
            'statusCode': 500,
            'body': f"Error processing data: {str(e)}"
        }

def load_and_merge_files(bucket_name, suffix):
    # Use pagination to list all files with the specified suffix
    paginator = s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name)

    files = []
    for page in page_iterator:
        files.extend([item['Key'] for item in page.get('Contents', []) if item['Key'].endswith(suffix)])
    
    logger.info(f"Files found with suffix '{suffix}': {files}")
    
    if not files:
        logger.warning(f"No files found with suffix '{suffix}' in bucket '{bucket_name}'.")
        return pd.DataFrame()  # Return an empty DataFrame if no files found
    
    # Load and concatenate all matching files
    dataframes = [load_csv_from_s3(bucket_name, file) for file in files]
    if not dataframes:
        logger.warning(f"No data loaded for suffix '{suffix}'. Returning an empty DataFrame.")
        return pd.DataFrame()
    
    merged_df = pd.concat(dataframes, ignore_index=True)
    return merged_df

def load_csv_from_s3(bucket_name, file_key):
    # Load a CSV file from S3 and return as a DataFrame
    logger.info(f"Loading file '{file_key}' from bucket '{bucket_name}'")
    csv_obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    body = csv_obj['Body']
    csv_string = body.read().decode('utf-8')
    return pd.read_csv(io.StringIO(csv_string))

def process_dataframe(source_file, df):
    # Transform the source DataFrame into the required destination DataFrame structure
    column_mappings = {
        'predictions16days': [
            ('city_id', None), 
            ('city_name', 'venue_city'),
            ('coord_lon', 'coord_lon'),
            ('coord_lat', 'coord_lat'),
            ('country', 'country'),
            ('population', 'population'),
            ('timezone', 'timezone'),
            ('date', 'date'),
            ('time', 'time'),
            ('temp_day', 'temperature'),
            ('pressure', 'pressure'),
            ('humidity', 'humidity'),
            ('weather_description', 'weather_description'),
            ('wind_speed', 'wind_speed'),
            ('wind_deg', None),
            ('rain', 'rain_volume')
        ],
        'historic_weather': [
            ('city_id', 'venue_city'),
            ('datetime', None),
            ('date', 'date'),
            ('time', 'time'),
            ('temp', 'temperature'),
            ('pressure', 'pressure'),
            ('humidity', 'humidity'),
            ('wind_speed', 'wind_speed'),
            ('clouds', None),
            ('weather_description', 'weather_description')
        ]
    }

    if source_file not in column_mappings:
        logger.error(f"No column mappings found for source file '{source_file}'")
        return pd.DataFrame()

    mappings = column_mappings[source_file]
    transformed_df = pd.DataFrame()

    for mapping in mappings:
        if len(mapping) == 2:
            source_col, dest_col = mapping
        elif len(mapping) == 3:
            source_col, dest_col, _ = mapping
        else:
            logger.error(f"Invalid column mapping: {mapping}")
            continue

        if dest_col:
            transformed_df[dest_col] = df[source_col]
    
    return transformed_df

def save_df_to_postgres(df, table_name):
    # Connect to the PostgreSQL database and save the DataFrame
    try:
        connection = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )
        cursor = connection.cursor()
        
        # Prepare the insert statement
        columns = df.columns.tolist()
        insert_query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
            sql.Identifier(table_name),
            sql.SQL(', ').join(map(sql.Identifier, columns)),
            sql.SQL(', ').join(sql.Placeholder() * len(columns))
        )

        # Convert DataFrame to list of tuples for batch insertion
        data = df.to_records(index=False).tolist()

        # Execute batch insert
        execute_batch(cursor, insert_query, data)

        # Commit the transaction
        connection.commit()
        logger.info(f"Data saved to table '{table_name}' in PostgreSQL database")

    except Exception as e:
        logger.error(f"Error saving data to PostgreSQL: {str(e)}")
        raise

    finally:
        if connection:
            cursor.close()
            connection.close()


# myDataLoading6: Storing Forecast4days into DF

In [None]:
import boto3
import pandas as pd
import io
import logging

# Initialize the S3 client
s3_client = boto3.client('s3')

# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    # Define S3 bucket and file suffix for factweatherFC4d
    bucket_name = 'datalakepartition2'
    source_file = 'predictions4days'
    dest_df = 'factweatherFC4d'

    try:
        suffix = f'{source_file}.csv'
        source_df = load_and_merge_files(bucket_name, suffix)
        if not source_df.empty:
            processed_df = process_dataframe(source_file, source_df)
            
            # Remove duplicates and empty values
            processed_df = processed_df.drop_duplicates().dropna()

            # Log the cleaned DataFrame
            logger.info(f"Processed DataFrame for '{dest_df}':\n{processed_df.head()}")

            # Save DataFrame to S3
            save_df_to_s3(bucket_name, processed_df, f'dataframes/{dest_df}.csv')

        else:
            logger.warning(f"No data processed for file type '{source_file}'")

        return {
            'statusCode': 200,
            'body': 'Data processing completed successfully.'
        }

    except Exception as e:
        logger.error(f"Error processing data: {str(e)}")
        return {
            'statusCode': 500,
            'body': f"Error processing data: {str(e)}"
        }

def load_and_merge_files(bucket_name, suffix):
    # Use pagination to list all files with the specified suffix
    paginator = s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name)

    files = []
    for page in page_iterator:
        files.extend([item['Key'] for item in page.get('Contents', []) if item['Key'].endswith(suffix)])
    
    logger.info(f"Files found with suffix '{suffix}': {files}")
    
    if not files:
        logger.warning(f"No files found with suffix '{suffix}' in bucket '{bucket_name}'.")
        return pd.DataFrame()  # Return an empty DataFrame if no files found
    
    # Load and concatenate all matching files
    dataframes = [load_csv_from_s3(bucket_name, file) for file in files]
    if not dataframes:
        logger.warning(f"No data loaded for suffix '{suffix}'. Returning an empty DataFrame.")
        return pd.DataFrame()
    
    merged_df = pd.concat(dataframes, ignore_index=True)
    return merged_df

def load_csv_from_s3(bucket_name, file_key):
    # Load a CSV file from S3 and return as a DataFrame
    logger.info(f"Loading file '{file_key}' from bucket '{bucket_name}'")
    csv_obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    body = csv_obj['Body']
    csv_string = body.read().decode('utf-8')
    return pd.read_csv(io.StringIO(csv_string))

def process_dataframe(source_file, df):
    # Transform the source DataFrame into the required destination DataFrame structure
    column_mappings = {
        'predictions4days': [
            ('city_id', None), 
            ('city_name', 'venue_city'),
            ('coord_lon', 'coord_lon'),
            ('coord_lat', 'coord_lat'),
            ('country', 'country'),
            ('population', 'population'),
            ('timezone', 'timezone'),
            ('date', 'date'),
            ('time', 'time'),
            ('temp_day', 'temperature'),
            ('pressure', 'pressure'),
            ('humidity', 'humidity'),
            ('weather_description', 'weather_description'),
            ('wind_speed', 'wind_speed'),
            ('wind_deg', None),
            ('rain', 'rain_volume')
        ]
    }

    if source_file not in column_mappings:
        logger.error(f"No column mappings found for source file '{source_file}'")
        return pd.DataFrame()

    mappings = column_mappings[source_file]
    transformed_df = pd.DataFrame()

    for mapping in mappings:
        if len(mapping) == 2:
            source_col, dest_col = mapping
        elif len(mapping) == 3:
            source_col, dest_col, _ = mapping
        else:
            logger.error(f"Invalid column mapping: {mapping}")
            continue

        if dest_col:
            transformed_df[dest_col] = df[source_col]
    
    # Filter the DataFrame for dates between 2024-05-01 and 2024-06-07
    transformed_df['date'] = pd.to_datetime(transformed_df['date'])  # Ensure 'date' column is in datetime format
    filtered_df = transformed_df[(transformed_df['date'] >= '2024-05-01') & (transformed_df['date'] <= '2024-06-07')]

    return filtered_df

def save_df_to_s3(bucket_name, df, file_key):
    # Convert DataFrame to CSV and upload to S3
    if df.empty:
        logger.warning(f"DataFrame is empty. Skipping upload for '{file_key}'.")
        return
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index=False)
    s3_client.put_object(Bucket=bucket_name, Key=file_key, Body=csv_buffer.getvalue())
    logger.info(f"DataFrame saved to '{file_key}' in bucket '{bucket_name}'")

# This code should be set up in an AWS Lambda function with the necessary IAM permissions


# myDataLoading7: Storing Forecast16days and historic weather into DF

In [None]:
import boto3
import pandas as pd
import psycopg2
import psycopg2.extras
import io
import logging
import os

# Initialize the S3 client
s3_client = boto3.client('s3')

# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Database connection details from environment variables
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_PORT = int(os.getenv('DB_PORT', 5432))  # Default to 5432 if DB_PORT is not set

# Hard-coded database connection details. - TESTDATABASE
# DB_HOST = 'database-3.cto9dqorm6ji.us-east-1.rds.amazonaws.com'
# DB_NAME = 'databasetest'
# DB_USER = 'postgres'
# DB_PASSWORD = 'HSLU2024,rai!'
# DB_PORT = 5432

def lambda_handler(event, context):
    # Define S3 bucket and file paths for factweather and factweatherFC16d
    bucket_name = 'datalakepartition2'
    file_key_factweather = 'dataframes/factweather.csv'
    file_key_factweatherFC16d = 'dataframes/factweatherFC16d.csv'

    connection = None
    try:
        # Load the CSV files from S3
        factweather_df = load_csv_from_s3(bucket_name, file_key_factweather)
        factweatherFC16d_df = load_csv_from_s3(bucket_name, file_key_factweatherFC16d)

        # Log DataFrame columns for debugging
        logger.info(f"Columns in factweather_df: {factweather_df.columns.tolist()}")
        logger.info(f"Columns in factweatherFC16d_df: {factweatherFC16d_df.columns.tolist()}")

        # Convert data types for both DataFrames
        factweather_df = convert_factweather_df_types(factweather_df)
        factweatherFC16d_df = convert_factweatherFC16d_df_types(factweatherFC16d_df)

        # Remove duplicates based on the primary key
        factweather_df = remove_duplicates(factweather_df, ['venue_city', 'date', 'time'])
        factweatherFC16d_df = remove_duplicates(factweatherFC16d_df, ['venue_city', 'date', 'time'])

        # Connect to the PostgreSQL database
        connection = connect_to_db()
        cursor = connection.cursor()

        # Load data into SQL database
        load_data_to_sql(factweather_df, cursor, 'factweather')
        load_data_to_sql(factweatherFC16d_df, cursor, 'factweatherFC16d')

        # Commit the transaction
        connection.commit()

        logger.info("Data loaded into the SQL database successfully.")
        return {
            'statusCode': 200,
            'body': 'Data loaded into the SQL database successfully.'
        }

    except Exception as e:
        logger.error(f"Error loading data into the SQL database: {str(e)}")
        if connection:
            connection.rollback()  # Rollback in case of error
        return {
            'statusCode': 500,
            'body': f"Error loading data into the SQL database: {str(e)}"
        }

    finally:
        if connection:
            cursor.close()
            connection.close()

def load_csv_from_s3(bucket_name, file_key):
    """Load a CSV file from S3 and return as a DataFrame."""
    logger.info(f"Loading file '{file_key}' from bucket '{bucket_name}'")
    csv_obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    body = csv_obj['Body']
    csv_string = body.read().decode('utf-8')
    df = pd.read_csv(io.StringIO(csv_string))  # Load as a DataFrame
    return df

def connect_to_db():
    """Connect to the PostgreSQL database using hard-coded details."""
    try:
        connection = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )
        logger.info("Successfully connected to the database")
        return connection
    except Exception as e:
        logger.error(f"Error connecting to the database: {str(e)}")
        raise

def load_data_to_sql(df, cursor, table_name):
    """Load data from DataFrame into SQL table."""
    if df.empty:
        logger.warning(f"The DataFrame for table '{table_name}' is empty. Skipping load.")
        return
    
    # Prepare SQL query for inserting data
    columns = df.columns.tolist()
    columns_str = ', '.join(columns)
    values_str = ', '.join(['%s'] * len(columns))

    insert_query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str})"

    # Log the SQL query for debugging
    logger.info(f"Insert query for table '{table_name}': {insert_query}")

    # Convert DataFrame rows to list of tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Execute the SQL query using batch insert for efficiency
    try:
        psycopg2.extras.execute_batch(cursor, insert_query, data_tuples)
        logger.info(f"Loaded data into table '{table_name}' successfully")
    except Exception as e:
        logger.error(f"Error executing SQL insert for table '{table_name}': {str(e)}")
        raise

def convert_factweather_df_types(df):
    """Convert data types for the factweather DataFrame."""
    try:
        df['venue_city'] = df['venue_city'].astype(str)
        df['temperature'] = df['temperature'].astype(float)
        df['pressure'] = df['pressure'].astype(int)
        df['humidity'] = df['humidity'].astype(int)
        df['wind_speed'] = df['wind_speed'].astype(float)
        df['weather_description'] = df['weather_description'].astype(str)
        # 'rain_volume' column is not present in factweather, so it's not converted
        df['date'] = pd.to_datetime(df['date'])
        df['time'] = pd.to_datetime(df['time']).dt.time  # Extract only time part
    except Exception as e:
        logger.error(f"Error converting data types for factweather DataFrame: {str(e)}")
        raise

    return df

def convert_factweatherFC16d_df_types(df):
    """Convert data types for the factweatherFC16d DataFrame."""
    try:
        df['venue_city'] = df['venue_city'].astype(str)
        df['coord_lon'] = df['coord_lon'].astype(float)
        df['coord_lat'] = df['coord_lat'].astype(float)
        df['country'] = df['country'].astype(str)
        df['population'] = df['population'].astype(int)
        df['timezone'] = df['timezone'].astype(int)
        df['date'] = pd.to_datetime(df['date'])
        df['time'] = pd.to_datetime(df['time']).dt.time  # Extract only time part
        df['temperature'] = df['temperature'].astype(float)
        df['pressure'] = df['pressure'].astype(int)
        df['humidity'] = df['humidity'].astype(int)
        df['weather_description'] = df['weather_description'].astype(str)
        df['wind_speed'] = df['wind_speed'].astype(float)
        df['rain_volume'] = df['rain_volume'].astype(float)  # This column exists in factweatherFC16d
    except Exception as e:
        logger.error(f"Error converting data types for factweatherFC16d DataFrame: {str(e)}")
        raise

    return df

def remove_duplicates(df, primary_key_columns):
    """Remove duplicate rows based on primary key columns."""
    try:
        # Remove duplicates based on the primary key columns
        df = df.drop_duplicates(subset=primary_key_columns)
        logger.info(f"Removed duplicates based on columns: {primary_key_columns}")
    except Exception as e:
        logger.error(f"Error removing duplicates: {str(e)}")
        raise
    
    return df


# myDataLoading8: Loading Forecast4days into DB

In [None]:
import boto3
import pandas as pd
import psycopg2
import psycopg2.extras
import io
import logging
import os

# Initialize the S3 client
s3_client = boto3.client('s3')

# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Database connection details from environment variables
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_PORT = int(os.getenv('DB_PORT', 5432))  # Default to 5432 if DB_PORT is not set

# Hard-coded database connection details. - TESTDATABASE
# DB_HOST = 'database-3.cto9dqorm6ji.us-east-1.rds.amazonaws.com'
# DB_NAME = 'databasetest'
# DB_USER = 'postgres'
# DB_PASSWORD = 'HSLU2024,rai!'
# DB_PORT = 5432


def lambda_handler(event, context):
    # Define S3 bucket and file suffix for factweatherFC4d
    bucket_name = 'datalakepartition2'
    source_file = 'predictions4days'
    dest_df = 'factweatherFC4d'
    destination_folder = 'dataframes/'

    connection = None
    try:
        suffix = f'{source_file}.csv'
        source_df = load_and_merge_files(bucket_name, suffix)
        if not source_df.empty:
            processed_df = process_dataframe(source_file, source_df)
            
            # Remove duplicates based on primary key columns
            processed_df = processed_df.drop_duplicates(subset=['venue_city', 'date', 'time']).dropna()

            # Log the cleaned DataFrame
            logger.info(f"Processed DataFrame for '{dest_df}':\n{processed_df.head()}")

            # Save DataFrame to S3
            save_df_to_s3(bucket_name, processed_df, f'{destination_folder}{dest_df}.csv')

            # Connect to the PostgreSQL database
            connection = connect_to_db()
            cursor = connection.cursor()

            # Load data into the factweatherFC4d table in SQL database
            load_data_to_sql(processed_df, cursor, 'factweatherFC4d')

            # Commit the transaction
            connection.commit()

            logger.info("Data loaded into the SQL database successfully.")
            return {
                'statusCode': 200,
                'body': 'Data processing and loading to SQL completed successfully.'
            }

        else:
            logger.warning(f"No data processed for file type '{source_file}'")

            return {
                'statusCode': 204,
                'body': 'No data processed because no files were found.'
            }

    except Exception as e:
        logger.error(f"Error processing data: {str(e)}")
        if connection:
            connection.rollback()  # Rollback in case of error
        return {
            'statusCode': 500,
            'body': f"Error processing data: {str(e)}"
        }

    finally:
        if connection:
            cursor.close()
            connection.close()

def load_and_merge_files(bucket_name, suffix):
    # Use pagination to list all files with the specified suffix
    paginator = s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name)

    files = []
    for page in page_iterator:
        files.extend([item['Key'] for item in page.get('Contents', []) if item['Key'].endswith(suffix)])
    
    logger.info(f"Files found with suffix '{suffix}': {files}")
    
    if not files:
        logger.warning(f"No files found with suffix '{suffix}' in bucket '{bucket_name}'.")
        return pd.DataFrame()  # Return an empty DataFrame if no files found
    
    # Load and concatenate all matching files
    dataframes = [load_csv_from_s3(bucket_name, file) for file in files]
    if not dataframes:
        logger.warning(f"No data loaded for suffix '{suffix}'. Returning an empty DataFrame.")
        return pd.DataFrame()
    
    merged_df = pd.concat(dataframes, ignore_index=True)
    return merged_df

def load_csv_from_s3(bucket_name, file_key):
    # Load a CSV file from S3 and return as a DataFrame
    logger.info(f"Loading file '{file_key}' from bucket '{bucket_name}'")
    csv_obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    body = csv_obj['Body']
    csv_string = body.read().decode('utf-8')
    return pd.read_csv(io.StringIO(csv_string))

def process_dataframe(source_file, df):
    # Transform the source DataFrame into the required destination DataFrame structure
    column_mappings = {
        'predictions4days': [
            ('city_id', None), 
            ('city_name', 'venue_city'),
            ('coord_lon', 'coord_lon'),
            ('coord_lat', 'coord_lat'),
            ('country', 'country'),
            ('population', 'population'),
            ('timezone', 'timezone'),
            ('date', 'date'),
            ('time', 'time'),
            ('temp_day', 'temperature'),
            ('pressure', 'pressure'),
            ('humidity', 'humidity'),
            ('weather_description', 'weather_description'),
            ('wind_speed', 'wind_speed'),
            ('wind_deg', None),
            ('rain', 'rain_volume')
        ]
    }

    if source_file not in column_mappings:
        logger.error(f"No column mappings found for source file '{source_file}'")
        return pd.DataFrame()

    mappings = column_mappings[source_file]
    transformed_df = pd.DataFrame()

    for mapping in mappings:
        if len(mapping) == 2:
            source_col, dest_col = mapping
        elif len(mapping) == 3:
            source_col, dest_col, _ = mapping
        else:
            logger.error(f"Invalid column mapping: {mapping}")
            continue

        if dest_col:
            transformed_df[dest_col] = df[source_col]
    
    # Filter the DataFrame for dates between 2024-05-01 and 2024-06-07
    transformed_df['date'] = pd.to_datetime(transformed_df['date'])  # Ensure 'date' column is in datetime format
    filtered_df = transformed_df[(transformed_df['date'] >= '2024-05-01') & (transformed_df['date'] <= '2024-06-07')]

    return filtered_df

def save_df_to_s3(bucket_name, df, file_key):
    # Convert DataFrame to CSV and upload to S3
    if df.empty:
        logger.warning(f"DataFrame is empty. Skipping upload for '{file_key}'.")
        return
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index=False)
    s3_client.put_object(Bucket=bucket_name, Key=file_key, Body=csv_buffer.getvalue())
    logger.info(f"DataFrame saved to '{file_key}' in bucket '{bucket_name}'")

def connect_to_db():
    """Connect to the PostgreSQL database using hard-coded details."""
    try:
        connection = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            database=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD
        )
        logger.info("Successfully connected to the database")
        return connection
    except Exception as e:
        logger.error(f"Error connecting to the database: {str(e)}")
        raise

def load_data_to_sql(df, cursor, table_name):
    """Load data from DataFrame into SQL table."""
    if df.empty:
        logger.warning(f"The DataFrame for table '{table_name}' is empty. Skipping load.")
        return
    
    # Prepare SQL query for inserting data
    columns = df.columns.tolist()
    columns_str = ', '.join(columns)
    values_str = ', '.join(['%s'] * len(columns))

    # Extract unique primary keys from the DataFrame
    primary_key_columns = ['venue_city', 'date', 'time']
    unique_keys = df[primary_key_columns].drop_duplicates()

    # Delete existing rows that match the primary keys
    for index, row in unique_keys.iterrows():
        delete_query = f"DELETE FROM {table_name} WHERE venue_city = %s AND date = %s AND time = %s"
        try:
            cursor.execute(delete_query, (row['venue_city'], row['date'], row['time']))
            logger.info(f"Deleted existing rows for key: {row['venue_city']}, {row['date']}, {row['time']}")
        except Exception as e:
            logger.error(f"Error deleting existing row: {str(e)}")
            raise

    insert_query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str})"

    # Log the SQL query for debugging
    logger.info(f"Insert query for table '{table_name}': {insert_query}")

    # Convert DataFrame rows to list of tuples
    data_tuples = [tuple(x) for x in df.to_numpy()]

    # Execute the SQL query using batch insert for efficiency
    try:
        psycopg2.extras.execute_batch(cursor, insert_query, data_tuples)
        logger.info(f"Loaded data into table '{table_name}' successfully")
    except Exception as e:
        logger.error(f"Error executing SQL insert for table '{table_name}': {str(e)}")
        raise

# This code should be set up in an AWS Lambda function with the necessary IAM permissions
