1: DimDate Table

In [None]:
%pip install --upgrade sqlalchemy
%pip install --upgrade pyodbc

In [None]:
import pyodbc
print(pyodbc.drivers())

In [None]:
import re
import pandas as pd
from sqlalchemy import create_engine

In [None]:
# Gegevens voor de verbinding
server = r"localhost"  # Servernaam of IP-adres van je SQL Server
database = "DEP1_DWH"  # Naam van je database

# Maak de verbindingsstring met Windows Authenticatie (Integrated Security)
engine = create_engine("mssql+pyodbc://@{}/{}?driver=ODBC+Driver+17+for+SQL+Server".format(server, database))

In [None]:
# Maak een range van datums
date_list = pd.date_range(start="01-01-2010", end="31-12-2025", freq='D')

months_translation = {
    'January': 'Januari', 'February': 'Februari', 'March': 'Maart', 'April': 'April',
    'May': 'Mei', 'June': 'Juni', 'July': 'Juli', 'August': 'Augustus', 
    'September': 'September', 'October': 'Oktober', 'November': 'November', 'December': 'December'
}

days_translation = {
    'Monday': 'Maandag', 'Tuesday': 'Dinsdag', 'Wednesday': 'Woensdag', 'Thursday': 'Donderdag',
    'Friday': 'Vrijdag', 'Saturday': 'Zaterdag', 'Sunday': 'Zondag'
}

dim_date_df = pd.DataFrame({
    'DateKey': date_list.strftime('%Y%m%d').astype(int),  # YYYYMMDD als key
    'FullDate': date_list.date,  # Volledige datum
    'MonthNameDutch': date_list.strftime('%B').map(months_translation),  # Maandnaam (kan vertaald worden)
    'MonthNameEN': date_list.strftime('%B'),  # Maandnaam in Engels
    'DayNameDutch': date_list.strftime('%A').map(days_translation),  # Dagnaam in Nederlands
    'DayNameEN': date_list.strftime('%A'),  # Dagnaam in Engels
    'QuarterName': 'Q' + date_list.quarter.astype(str),  # Kwartaal als 'Q1', 'Q2', ...
    'QuarterNumber': date_list.quarter  # Kwartaalnummer (1-4)
})

In [None]:
# Schrijf naar SQL Server
dim_date_df.to_sql('DimDate', con=engine, if_exists='append', index=False)

In [None]:
dim_date_df.head()

2: DimTime Table

In [None]:
def generate_dim_time():
    time_data = []

    for hour in range(0, 24):
        for minute in range(0, 60):
            am_pm = 'AM' if hour < 12 else 'PM'
            hour_12 = hour if 1 <= hour <= 12 else (12 if hour == 0 or hour == 24 else hour - 12)
            time_key = f"{hour:02}{minute:02}"
            full_time = f"{hour:02}:{minute:02}:00"
            
            time_data.append({
                "TimeKey": time_key,
                "Hour": hour_12,
                "Minutes": minute,
                "FullTime": full_time,
                "TimeAM_PM": am_pm
            })
    
    return pd.DataFrame(time_data)

# Data genereren
dim_time_df = generate_dim_time()

In [None]:
# Data naar SQL Server schrijven
dim_time_df.to_sql("DimTime", con=engine, if_exists="append", index=False)

In [None]:
dim_time_df.head()

3: DimWeatherStation Table

In [None]:
# Lees de CSV voor weerstations
weather_station_df = pd.read_csv('../data/input/aws_station.csv')

# Verwerk de kolommen
weather_station_df.rename(columns={
    "code": "WeatherStationID",
    "name": "WeatherStationName",
    "altitude": "Altitude",
    "the_geom": "Coordinates"
}, inplace=True)



# Functie om Latitude en Longitude te extraheren uit 'the_geom' kolom
def extract_lat_lon(geom):
    match = re.search(r"POINT \(([\d\.-]+) ([\d\.-]+)\)", geom)
    if match:
        lon, lat = match.groups()
        return float(lat), float(lon)
    return None, None

# Latitude en Longitude kolommen toevoegen
weather_station_df["Latitude"], weather_station_df["Longitude"] = zip(*weather_station_df["Coordinates"].apply(extract_lat_lon))

# Onnodige kolom verwijderen
weather_station_df.drop(columns=["Coordinates"], inplace=True)
weather_station_df = weather_station_df.drop(['FID', 'date_begin', 'date_end'], axis = 1)

In [None]:
# Data naar SQL Server schrijven
weather_station_df.to_sql("DimWeatherStation", con=engine, if_exists="append", index=False)

In [None]:


# Add a unique key column (starting from 1)
weather_station_df.insert(0, 'WeatherStationKey', range(1, len(weather_station_df) + 1))



In [None]:
weather_station_df.head()

4: FactWeather Table

In [None]:
# Load the CSV file
file_path = "../data/input/aws_1day.csv"

weather_data_df = pd.read_csv(file_path)

weather_data_df.head(), weather_data_df.columns

In [None]:
weather_data_df = weather_data_df.drop(['FID', 'the_geom', 'qc_flags'], axis = 1)
weather_data_df.head()

In [None]:
weather_data_df = weather_data_df.merge(weather_station_df, how='inner', left_on="code", right_on='WeatherStationID')
weather_data_df.head()

In [None]:
weather_data_df = weather_data_df.drop(['WeatherStationName', 'Latitude', 'Longitude', 'Altitude', 'WeatherStationID'], axis = 1)
weather_data_df.head()

In [None]:
# We converteren timestamp naar DateKey.
weather_data_df['DateKey'] = weather_data_df['timestamp'].str[0:4] + weather_data_df['timestamp'].str[5:7] + weather_data_df['timestamp'].str[8:10]
# DimTime toevoegen: eerst veld toevoegen, dan de waarden mergen.
weather_data_df['Time'] = weather_data_df['timestamp'].str[-8:]
weather_data_df = weather_data_df.merge(dim_time_df, how='inner', left_on="Time", right_on='FullTime')

# weather_data_df = weather_data_df.drop(['timestamp', 'Hour', 'Minutes', 'FullTime', 'TimeAM_PM'], axis = 1)

weather_data_df.head()

In [None]:
# we hernoemen de kolommen volgens de namen die in de DWH zitten. 
weather_data_df = weather_data_df.rename(columns={"precip_quantity": "PrecipQuantity","temp_avg": "TempAvg","temp_max": "TempMax","temp_min": "TempMin",
                                                  "temp_grass_pt100_avg": "TempGrassPt100Avg","temp_soil_avg": "TempSoilAvg","temp_soil_avg_5cm": "TempSoilAvg5cm",
                                                  "temp_soil_avg_10cm": "TempSoilAvg10cm","temp_soil_avg_20cm": "TempSoilAvg20cm",
                                                  "temp_soil_avg_50cm": "TempSoilAvg50cm","wind_speed_10m": "WindSpeed10m",
                                                  "wind_speed_avg_30m": "WindSpeedAvg30m","wind_gusts_speed": "WindGustsSpeed",
                                                  "humidity_rel_shelter_avg": "HumidityRelShelterAvg","pressure": "Pressure","sun_duration": "SunDuration",
                                                  "short_wave_from_sky_avg": "ShortWaveFromSkyAvg","sun_int_avg": "SunIntAvg"})

weather_data_df = weather_data_df.reindex(columns=["DateKey", "TimeKey", "WeatherStationKey", "PrecipQuantity", "TempAvg", "TempMax", "TempMin",
                                                    "TempGrassPt100Avg", "TempSoilAvg", "TempSoilAvg5cm", "TempSoilAvg10cm", 
                                                    "TempSoilAvg20cm", "TempSoilAvg50cm", "WindSpeed10m", "WindSpeedAvg30m", 
                                                    "WindGustsSpeed", "HumidityRelShelterAvg", "Pressure", "SunDuration", "ShortWaveFromSkyAvg", 
                                                    "SunIntAvg"])

weather_data_df.head()

In [None]:
# Data naar SQL Server schrijven
weather_data_df.to_sql("FactWeather", con=engine, if_exists="append", index=False)

5: FactBelpex Table

In [None]:
from datetime import datetime

# Read the CSV with proper headers and separator
belpex_df = pd.read_csv('../data/input/BelpexFilter.csv', encoding='Windows-1252', sep=';', names=['Date', 'BelpexPrice'], skiprows=1)

# Clean and convert 'BelpexPrice'
belpex_df['BelpexPrice'] = belpex_df['BelpexPrice'].str.replace('€', '', regex=False).str.replace(',', '.', regex=False).str.strip()
belpex_df['BelpexPrice'] = pd.to_numeric(belpex_df['BelpexPrice'], errors='coerce')

# Parse 'Date' into DateKey and FullTime using datetime
belpex_df['Date'] = belpex_df['Date'].str.strip()  # Remove extra spaces
belpex_df['DateKey'] = belpex_df['Date'].apply(lambda x: datetime.strptime(x.split()[0], '%d/%m/%Y').strftime('%Y%m%d'))
belpex_df['FullTime'] = belpex_df['Date'].apply(lambda x: x.split()[1])  # Extract time part

# Merge with DimTime to get TimeKey
belpex_df = belpex_df.merge(dim_time_df[['FullTime', 'TimeKey']], how='inner', on='FullTime')

# Drop unnecessary columns
belpex_df = belpex_df.drop(['FullTime', 'Date'], axis=1)

# Write to SQL Server
belpex_df.to_sql('FactBelpex', con=engine, if_exists='append', index=False)

##### 6: FactNetworkCosts

In [None]:
from datetime import datetime


csv_file_path = "../data/input/Distributiekosten.csv"
df_distributiekosten = pd.read_csv(csv_file_path)

# Datum omzetten naar DateKey (YYYYMMDD)
df_distributiekosten["DateKey"] = df_distributiekosten["Van"].apply(lambda x: datetime.strptime(x, "%d/%m/%Y").strftime("%Y%m%d"))

df_distributiekosten.head()


In [None]:
# Kolommen hernoemen
column_mapping = {
    "Capaciteitstarief_Digitale_meter": "CapacityTariff_DigitalMeter",
    "Afnametarief_Digitale_meter_Normaal": "ConsumptionTariff_DigitalMeter_Normal",
    "Afnametarief_Digitale_meter_Exclusief_nacht": "ConsumptionTariff_DigitalMeter_ExclusiveNight",
    "Capaciteitstarief_Klassieke_meter": "CapacityTariff_ClassicMeter",
    "Afnametarief_Klassieke_meter_Normaal": "ConsumptionTariff_ClassicMeter_Normal",
    "Afnametarief_Klassieke_meter_Exclusief_nacht": "ConsumptionTariff_ClassicMeter_ExclusiveNight",
    "Prosumententarief": "ProsumerTariff",
    "Tarief_databeheer_Jaar_en_maandgelezen_meters": "DataManagementTariff_YearlyMonthlyReadMeters",
    "Tarief_databeheer_Kwartiergelezen_meters": "DataManagementTariff_QuarterlyReadMeters",
    "Intercommunale": "NetworkOperator"
}
df_distributiekosten = df_distributiekosten.rename(columns=column_mapping)

df_distributiekosten.head()

In [None]:
columns = [
    "DateKey", "NetworkOperator", "CapacityTariff_DigitalMeter",
    "ConsumptionTariff_DigitalMeter_Normal", "ConsumptionTariff_DigitalMeter_ExclusiveNight",
    "CapacityTariff_ClassicMeter", "ConsumptionTariff_ClassicMeter_Normal",
    "ConsumptionTariff_ClassicMeter_ExclusiveNight", "ProsumerTariff",
    "DataManagementTariff_YearlyMonthlyReadMeters", "DataManagementTariff_QuarterlyReadMeters"
]
df_distributiekosten = df_distributiekosten[columns]

df_distributiekosten.to_sql('FactNetworkCosts', con=engine, if_exists='append', index=False)



6: Verbruikersdata



In [None]:
# dowmload de dataset: https://opendata.fluvius.be/explore/dataset/1_50-verbruiksprofielen-dm-elek-kwartierwaarden-voor-een-volledig-jaar/information/
# Lees de CSV met alle verbruikersdata in een pandas dataframe.
csv_file_path = "../data/input/P6269_1_50_DMK_Sample_Elek.csv"
df_verbruikersdata = pd.read_csv(csv_file_path, sep=';', encoding='utf-8')

In [None]:
# Hernoem kolom om te matchen met tabel in sql server
df_verbruikersdata.rename(columns={
    'PV-Installatie_Indicator': 'PV_Installatie_Indicator',
}, inplace=True)

In [None]:

df_verbruikersdata.to_sql('DimUser_Staging', con=engine, if_exists='append', index=False, chunksize=5000)

# Laatste stap, voer sql script uit: ../DWH/Staging_To_DimUser.sql

7: Overige Tabellen

- Voor de overige tabellen, volg dezelfde logica:
    - Lees de CSV’s.
    - Voeg de benodigde foreign keys toe.
    - Schrijf de data weg naar de juiste tabellen via bulk-insert of andere batch methoden.

Algemeen:
Voor alle bulk-insert taken moet je zorgen voor een efficiënte schrijfmethode naar SQL Server, bijvoorbeeld:

- to_sql() in combinatie met een SQLAlchemy engine.
- Bulk-insert via pyodbc of tools zoals bcp.
- Gebruik maken van BULK INSERT in SQL Server voor het snel inladen van grote datasets.

Voeg contract types toe:

In [None]:
# Create a list of energy contracts with their types
energy_contracts = [
    # BOLT
    {'Provider': 'BOLT', 'ContractName': 'ELEKTRICITEIT', 'ContractType': 'Variabel'},
    {'Provider': 'BOLT', 'ContractName': 'VAST', 'ContractType': 'Vast'},
    
    # DATS24
    {'Provider': 'DATS24', 'ContractName': 'ELEKTRICITEIT', 'ContractType': 'Variabel'},
    
    # ENECO
    {'Provider': 'ENECO', 'ContractName': 'ZON WIND FLEX', 'ContractType': 'Variabel'},
    
    # ENERGIE-BE
    {'Provider': 'ENERGIE-BE', 'ContractName': 'VARIABEL', 'ContractType': 'Variabel'},
    {'Provider': 'ENERGIE-BE', 'ContractName': 'VAST', 'ContractType': 'Vast'},
    
    # LUMINUS
    {'Provider': 'LUMINUS', 'ContractName': 'DYNAMIC', 'ContractType': 'Dynamisch'},
    
    # OCTA+
    {'Provider': 'OCTA+', 'ContractName': 'DYNAMIC', 'ContractType': 'Dynamisch'},
    {'Provider': 'OCTA+', 'ContractName': 'ECO CLEAR', 'ContractType': 'Variabel'},
    {'Provider': 'OCTA+', 'ContractName': 'FIXED', 'ContractType': 'Vast'},
    {'Provider': 'OCTA+', 'ContractName': 'SMART VARIABEL', 'ContractType': 'Variabel'},
    
    # TOTALENERGIES
    {'Provider': 'TOTALENERGIES', 'ContractName': 'PIXEL', 'ContractType': 'Variabel'},
    {'Provider': 'TOTALENERGIES', 'ContractName': 'PIXEL NEXT VAST', 'ContractType': 'Vast'},
    {'Provider': 'TOTALENERGIES', 'ContractName': 'PIXEL EDRIVE', 'ContractType': 'Variabel'},
    {'Provider': 'TOTALENERGIES', 'ContractName': 'PIXIE', 'ContractType': 'Variabel'}
]

# Create a DataFrame
dim_energy_contract_df = pd.DataFrame(energy_contracts)

In [None]:
# Display the DataFrame for verification
print(dim_energy_contract_df)

In [None]:
# Write to SQL Server
dim_energy_contract_df.to_sql('DimEnergyContract', con=engine, if_exists='append', index=False)

print(f"Successfully inserted {len(dim_energy_contract_df)} energy contracts into DimEnergyContract table.")

Voeg tariefkaarten toe:

In [None]:
# Read the CSV file with semicolon delimiter
csv_file_path = '../data/input/energy_costs.csv'
df = pd.read_csv(csv_file_path, sep=';')

# Get provider and contract name from EnergyCostKey
df['Provider'] = df['EnergyCostKey'].apply(lambda x: x.split('_')[1])
df['ContractName'] = df['EnergyCostKey'].apply(lambda x: x.split('_')[2])

# Convert DateKey from 'YYYY-MM-DD' to integer format 'YYYYMMDD'
df['DateKey'] = df['DateKey'].apply(lambda x: int(x.replace('-', '')))

# Replace empty strings with None for proper SQL NULL values
df = df.replace('', None)

# Look up ContractKey from DimEnergyContract table
# First, get the existing contracts from the database
contracts_query = "SELECT ContractKey, Provider, ContractName FROM DimEnergyContract"
contracts_df = pd.read_sql(contracts_query, engine)

# Create a dictionary mapping (Provider, ContractName) to ContractKey
contract_keys = {}
for _, row in contracts_df.iterrows():
    contract_keys[(row['Provider'], row['ContractName'])] = row['ContractKey']

# Map the ContractKey to each row
df['ContractKey'] = df.apply(lambda row: contract_keys.get((row['Provider'], row['ContractName'])), axis=1)

# Drop the original EnergyCostKey and the temporary Provider and ContractName columns
df = df.drop(['EnergyCostKey', 'Provider', 'ContractName'], axis=1)


In [None]:
# Display the first few rows to verify the transformation
print(df.head())

In [None]:
 # Insert the data into the SQL Server database
df.to_sql('FactEnergyCost', con=engine, if_exists='append', index=False)

print(f"Successfully inserted {len(df)} rows into FactEnergyCost table.")