# DWH fact_energy_trade

## Energy trade table creation

Lambda layer: group2_dwh_fact_energy_trade

In [None]:
import os
import json
import requests
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from datetime import date, datetime, timedelta

In [None]:
# ------------------------------------------------------------------------------
# Configuration: Load environment variables for database access
# ------------------------------------------------------------------------------

DB_CONFIG_DATALAKE = {
    "host": os.getenv("var_host"),
    "port": os.getenv("var_port"),
    "database": os.getenv("var_database"),
    "user": os.getenv("var_user"),
    "password": os.getenv("var_password")
}

DB_CONFIG_DWH = {
    "host": os.getenv("var_host2"),
    "port": os.getenv("var_port"),
    "database": os.getenv("var_database2"),
    "user": os.getenv("var_user"),
    "password": os.getenv("var_password2")
}

In [None]:
def lambda_handler(event, context):
    """
    AWS Lambda function to extract, transform, and load hourly energy trade data
    from a Datalake to a DWH (fact_energy_trade). It processes data for one day (2 days ago),
    enriches it with currency rates, energy prices, and dimension lookups, and loads
    it into the fact table.

    Args:
        event (dict): Lambda event payload.
        context (LambdaContext): Lambda context object.

    Returns:
        dict: Status message and number of inserted rows or error message.
    """
    try:
        for days_ago in range(2, 3):  # Loop allows extension to more days if needed
            trade_day = date.today() - timedelta(days=days_ago)
            start_time = datetime.combine(trade_day, datetime.min.time())
            end_time = start_time + timedelta(days=1)

            # ------------------------------------------------------------------
            # Connect to source (Datalake) and target (DWH) databases
            # ------------------------------------------------------------------
            conn_src = psycopg2.connect(**DB_CONFIG_DATALAKE)
            conn_tgt = psycopg2.connect(**DB_CONFIG_DWH)
            cur_tgt = conn_tgt.cursor()

            # ------------------------------------------------------------------
            # Ensure target table exists
            # ------------------------------------------------------------------
            cur_tgt.execute("""
            CREATE TABLE IF NOT EXISTS fact_energy_trade (
                trade_id BIGINT PRIMARY KEY,
                time_id INT,
                origin_country_id INT,
                neighbor_country_id INT,
                direction TEXT,
                energy_value_gw FLOAT,
                exchange_rate FLOAT,
                value_eur FLOAT,
                value_chf FLOAT,
                UNIQUE(time_id, neighbor_country_id)
            );
            """)
            conn_tgt.commit()

            # ------------------------------------------------------------------
            # Step 1: Load and aggregate hourly CBET energy data
            # ------------------------------------------------------------------
            sql_energy = f"""
            SELECT
                date_trunc('hour', timestamp) AS hour,
                country,
                SUM(value) AS energy_sum_gw
            FROM tbl_energy_cbet_data
            WHERE timestamp >= '{start_time}' AND timestamp < '{end_time}'
            GROUP BY 1, 2
            ORDER BY 1, 2;
            """
            df_energy = pd.read_sql_query(sql_energy, conn_src)
            df_energy = df_energy[df_energy['country'].str.strip().str.lower() != 'sum']

            # ------------------------------------------------------------------
            # Step 2: Load daily CHF to EUR exchange rates
            # ------------------------------------------------------------------
            sql_currency = """
            SELECT
                DATE(timestamp) AS rate_date,
                exchange_rate
            FROM tbl_currency_data
            WHERE source_currency='CHF' AND target_currency='EUR';
            """
            df_curr = pd.read_sql_query(sql_currency, conn_src)
            df_curr = df_curr.drop_duplicates(subset=['rate_date'])

            # ------------------------------------------------------------------
            # Step 3: Prepare energy dataframe with calculated fields
            # ------------------------------------------------------------------
            df_energy['direction'] = df_energy['energy_sum_gw'].apply(
                lambda x: 'import' if x > 0 else ('export' if x < 0 else 'none')
            )
            df_energy['energy_value_gw'] = df_energy['energy_sum_gw'].abs()
            df_energy['rate_date'] = pd.to_datetime(df_energy['hour']).dt.date

            df_energy = df_energy.merge(df_curr, on='rate_date', how='left')

            # Set fixed origin_country_id (Switzerland = 6)
            df_energy['origin_country_id'] = 6

            # ------------------------------------------------------------------
            # Step 4: Map foreign keys using dimension tables
            # ------------------------------------------------------------------

            # Map country names to country IDs
            sql_country = "SELECT country_id, country_name_en FROM dim_countries;"
            df_country = pd.read_sql_query(sql_country, conn_tgt)
            df_energy = df_energy.merge(df_country, left_on='country', right_on='country_name_en', how='left')
            df_energy.rename(columns={'country_id': 'neighbor_country_id'}, inplace=True)

            # Map timestamps to time IDs
            sql_time = "SELECT time_id, timestamp_utc FROM dim_time;"
            df_time = pd.read_sql_query(sql_time, conn_tgt)
            df_time['timestamp_utc'] = pd.to_datetime(df_time['timestamp_utc'])
            df_energy = df_energy.merge(df_time, left_on='hour', right_on='timestamp_utc', how='left')

            # ------------------------------------------------------------------
            # Step 5: Load hourly energy prices and calculate trade values
            # ------------------------------------------------------------------
            sql_price = f"""
            SELECT
                date_trunc('hour', timestamp) AS hour,
                price
            FROM tbl_energy_price_data
            WHERE timestamp >= '{start_time}' AND timestamp < '{end_time}';
            """
            df_price = pd.read_sql_query(sql_price, conn_src)
            df_price = df_price.drop_duplicates(subset=['hour'])
            df_price['hour'] = pd.to_datetime(df_price['hour']).dt.floor('H')
            df_price.rename(columns={'price': 'price_eur_mwh'}, inplace=True)

            df_energy = df_energy.merge(df_price[['hour', 'price_eur_mwh']], on='hour', how='left')

            # Calculate monetary values
            df_energy['value_eur'] = df_energy['price_eur_mwh'] * 1000 * df_energy['energy_value_gw']
            df_energy['value_chf'] = df_energy['value_eur'] / df_energy['exchange_rate']
            df_energy['value_eur'] = df_energy['value_eur'].round(2)
            df_energy['value_chf'] = df_energy['value_chf'].round(2)

            # ------------------------------------------------------------------
            # Step 6: Generate trade_id and sort
            # ------------------------------------------------------------------
            cur_tgt.execute("SELECT COALESCE(MAX(trade_id), 0) FROM fact_energy_trade;")
            max_id = cur_tgt.fetchone()[0]
            df_energy = df_energy.sort_values(['hour', 'neighbor_country_id'])
            df_energy['trade_id'] = range(max_id + 1, max_id + 1 + len(df_energy))

            # ------------------------------------------------------------------
            # Step 7: Prepare and insert records into fact table
            # ------------------------------------------------------------------
            insert_sql = """
            INSERT INTO fact_energy_trade
                (trade_id, time_id, origin_country_id, neighbor_country_id,
                 direction, energy_value_gw, exchange_rate, value_eur, value_chf)
            VALUES %s
            """

            # Prepare clean records with native Python types
            records = df_energy[['trade_id', 'time_id', 'origin_country_id',
                                 'neighbor_country_id', 'direction',
                                 'energy_value_gw', 'exchange_rate', 'value_eur', 'value_chf']].values.tolist()

            clean_records = [
                tuple(v.item() if hasattr(v, 'item') else v for v in row)
                for row in records
            ]

            execute_values(cur_tgt, insert_sql, clean_records)
            conn_tgt.commit()

            # ------------------------------------------------------------------
            # Cleanup: Close DB connections
            # ------------------------------------------------------------------
            cur_tgt.close()
            conn_src.close()
            conn_tgt.close()

        return {'status': 'success', 'rows_inserted': len(clean_records)}

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }