In [1]:
#GET PRICE DATA TO PANDAS

import requests
import xml.etree.ElementTree as ET
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenv
import os
import json

# Load environment variables from .env file
load_dotenv()
security_token = os.getenv('_ENTSOE_SECURITY_TOKEN')
url = "https://web-api.tp.entsoe.eu/api"

# Get bidding zones EIC code from file
df_csv = pd.read_csv('country_code_mapper.csv')
bidding_zones = df_csv['EIC'].dropna().str.strip().tolist()

# Parse dates from the JSON file
with open('config_dates.json', 'r') as f:
    config = json.load(f)
start_date = datetime.strptime(config["start_date"], "%Y-%m-%d")
end_date = datetime.strptime(config["end_date"], "%Y-%m-%d")
period_duration = timedelta(days=31)  # Example: one-month intervals

time_periods = []
current_start = start_date
while current_start < end_date:
    current_end = min(current_start + period_duration, end_date)
    time_periods.append((current_start.strftime("%Y%m%d%H%M"), current_end.strftime("%Y%m%d%H%M")))
    current_start = current_end

# Namespace for XML parsing
namespaces = {
    'ns': 'urn:iec62325.351:tc57wg16:451-3:publicationdocument:7:3'
}

# Data collection
data = []
# Loop through bidding_zones and time periods
for bidding_zone in bidding_zones:
    for period_start, period_end in time_periods:
        # API request parameters
        params = {
            "documentType": "A44",
            "out_Domain": bidding_zone,
            "in_Domain": bidding_zone,
            "periodStart": period_start,
            "periodEnd": period_end,
            "contract_MarketAgreement.type": 'A01',
            "securityToken": security_token,
        }

        # Send API request
        response = requests.get(url, params=params)
        if response.status_code != 200:
            print(f"Error for {bidding_zone}, {period_start}-{period_end}: {response.status_code}")
            continue

        # Parse XML response
        root = ET.fromstring(response.text)

        # Iterate over TimeSeries elements
        for time_series in root.findall('.//ns:TimeSeries', namespaces=namespaces):
            currency_unit = time_series.find('.//ns:currency_Unit.name', namespaces=namespaces).text
            price_measure_unit = time_series.find('.//ns:price_Measure_Unit.name', namespaces=namespaces).text
            resolution = time_series.find('.//ns:Period//ns:resolution', namespaces=namespaces).text
            EIC_code_out = time_series.find('.//ns:out_Domain.mRID', namespaces=namespaces).text
            start_time_str = time_series.find('.//ns:Period//ns:timeInterval//ns:start', namespaces=namespaces).text
            end_time_str = time_series.find('.//ns:Period//ns:timeInterval//ns:end', namespaces=namespaces).text
            
            # Convert to datetime objects
            start_time = datetime.strptime(start_time_str, "%Y-%m-%dT%H:%MZ")
            end_time = datetime.strptime(end_time_str, "%Y-%m-%dT%H:%MZ")

            # Extract Points
            for point in time_series.findall('.//ns:Period//ns:Point', namespaces=namespaces):
                position = int(point.find('ns:position', namespaces=namespaces).text)
                price = float(point.find('ns:price.amount', namespaces=namespaces).text)

                # Append to data list
                data.append({
                    'eic_code': EIC_code_out,
                    'price': price,
                    'currency_unit': currency_unit,
                    'price_measure_unit': price_measure_unit,
                    'resolution': resolution,
                    'position': position,
                    'start_time': start_time,
                    'end_time': end_time,
                })

# Convert collected data to DataFrame
df = pd.DataFrame(data)

# Calculate the duration_from_start
def calculate_duration(row):
    if row['resolution'] == 'PT60M':
        return timedelta(hours=row['position'])
    elif row['resolution'] == 'PT15M':
        return timedelta(minutes=row['position'] * 15)
    else:
        return pd.NaT  #return nan if there is a new resolution

# Apply the function to create the duration_from_start column
df['duration_from_start'] = df.apply(calculate_duration, axis=1)
# Calculate the datetime for each price point by adding the duration to start_time
df['datetime'] = df['start_time'] + df['duration_from_start']
# Drop unnecessary columns
df = df.drop(columns=['resolution', 'position', 'start_time', 'end_time', 'duration_from_start'])
df.to_csv('price.csv')
df

Unnamed: 0,eic_code,price,currency_unit,price_measure_unit,datetime
0,10Y1001A1001A39I,38.37,EUR,MWH,2024-01-01 00:00:00
1,10Y1001A1001A39I,28.46,EUR,MWH,2024-01-01 01:00:00
2,10Y1001A1001A39I,26.66,EUR,MWH,2024-01-01 02:00:00
3,10Y1001A1001A39I,24.48,EUR,MWH,2024-01-01 03:00:00
4,10Y1001A1001A39I,24.01,EUR,MWH,2024-01-01 04:00:00
...,...,...,...,...,...
7477,10YFI-1--------U,110.66,EUR,MWH,2024-01-15 19:00:00
7478,10YFI-1--------U,103.15,EUR,MWH,2024-01-15 20:00:00
7479,10YFI-1--------U,92.95,EUR,MWH,2024-01-15 21:00:00
7480,10YFI-1--------U,89.49,EUR,MWH,2024-01-15 22:00:00


In [3]:
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import StringType, LongType, FloatType, TimestampType

df = pd.read_csv('price.csv')
def save_to_iceberg(df= df, iceberg_directory="/test_warehouse/price", table_name="price"):
    # Convert pandas dataframe to pyarrow Table
    arrow_table = pa.Table.from_pandas(df)
    print(arrow_table)

    # Initialize Iceberg catalog
    catalog = load_catalog('hadoop', {'uri': 'thrift://localhost:9083'})
    #catalog = load_catalog("rest", uri="http://iceberg_rest:8181")

    # Try loading the Iceberg table, if it doesn't exist, create it
    try:
        iceberg_table = catalog.load_table(table_name)
        print(f"Table {table_name} exists. Appending new data.")
    except Exception as e:
        print(f"Table {table_name} does not exist. Creating new table. Error: {e}")
        
        # Define a basic Iceberg schema based on the DataFrame
        iceberg_schema = Schema([
            NestedField(1, 'eic_code', StringType()),
            NestedField(2, 'price', FloatType()),
            NestedField(3, 'currency_unit', StringType()),
            NestedField(4, 'price_measure_unit', StringType()),
            NestedField(5, 'datetime', TimestampType()),
        ])
        
        # Create the Iceberg table
        catalog.create_table(
            identifier=table_name,
            schema=iceberg_schema,
            location=iceberg_directory
        )
        iceberg_table = catalog.load_table(table_name)

    # Save data to Iceberg table by appending the arrow table
    iceberg_table.append(arrow_table)
    print(f"Appended data to Iceberg table: {table_name}")

save_to_iceberg()

pyarrow.Table
Unnamed: 0: int64
eic_code: string
price: double
currency_unit: string
price_measure_unit: string
datetime: string
----
Unnamed: 0: [[0,1,2,3,4,...,7477,7478,7479,7480,7481]]
eic_code: [["10Y1001A1001A39I","10Y1001A1001A39I","10Y1001A1001A39I","10Y1001A1001A39I","10Y1001A1001A39I",...,"10YFI-1--------U","10YFI-1--------U","10YFI-1--------U","10YFI-1--------U","10YFI-1--------U"]]
price: [[38.37,28.46,26.66,24.48,24.01,...,110.66,103.15,92.95,89.49,86.43]]
currency_unit: [["EUR","EUR","EUR","EUR","EUR",...,"EUR","EUR","EUR","EUR","EUR"]]
price_measure_unit: [["MWH","MWH","MWH","MWH","MWH",...,"MWH","MWH","MWH","MWH","MWH"]]
datetime: [["2024-01-01 00:00:00","2024-01-01 01:00:00","2024-01-01 02:00:00","2024-01-01 03:00:00","2024-01-01 04:00:00",...,"2024-01-15 19:00:00","2024-01-15 20:00:00","2024-01-15 21:00:00","2024-01-15 22:00:00","2024-01-15 23:00:00"]]


TypeError: load_catalog() takes from 0 to 1 positional arguments but 2 were given

In [5]:
#GET LOAD DATA (consumption)

import requests
import xml.etree.ElementTree as ET
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenv
import os
import json

# Load environment variables from .env file
load_dotenv()
security_token = os.getenv('_ENTSOE_SECURITY_TOKEN')
url = "https://web-api.tp.entsoe.eu/api"


# Get bidding zones EIC code from file
df_csv = pd.read_csv('country_code_mapper.csv')
bidding_zones = df_csv['EIC'].dropna().str.strip().tolist()

# Parse dates from the JSON file
with open('config_dates.json', 'r') as f:
    config = json.load(f)
start_date = datetime.strptime(config["start_date"], "%Y-%m-%d")
end_date = datetime.strptime(config["end_date"], "%Y-%m-%d")
period_duration = timedelta(days=31)  # Example: one-month intervals

time_periods = []
current_start = start_date
while current_start < end_date:
    current_end = min(current_start + period_duration, end_date)
    time_periods.append((current_start.strftime("%Y%m%d%H%M"), current_end.strftime("%Y%m%d%H%M")))
    current_start = current_end

# Namespace for XML parsing
namespaces = {
    "ns": "urn:iec62325.351:tc57wg16:451-6:generationloaddocument:3:0"
}

# Initialize empty DataFrame to collect all data
data = []

# Loop through each bidding zone and time period
for bidding_zone in bidding_zones:
    for period_start, period_end in time_periods:
        params = {
            "securityToken": security_token,
            "documentType": "A65",
            "processType": "A16",
            "outBiddingZone_Domain": bidding_zone,
            "periodStart": period_start,
            "periodEnd": period_end
        }

        # Send API request
        response = requests.get(url, params=params)
        if response.status_code != 200:
            print(f"Error for {bidding_zone}, {period_start}-{period_end}: {response.status_code}")
            continue

        # Parse the XML response
        root = ET.fromstring(response.text)

        resolution = root.find(".//ns:resolution", namespaces).text
        start_time = pd.to_datetime(root.find(".//ns:timeInterval/ns:start", namespaces).text)
        end_time = pd.to_datetime(root.find(".//ns:timeInterval/ns:end", namespaces).text)
        bidding_zone = root.find(".//ns:outBiddingZone_Domain.mRID", namespaces).text
        quantity_measure_unit = root.find(".//ns:quantity_Measure_Unit.name", namespaces).text
        points = root.findall(".//ns:Point", namespaces)

        for point in points:
            position = int(point.find("ns:position", namespaces).text)
            quantity = float(point.find("ns:quantity", namespaces).text)
            
            data.append({
                "bidding_zone": bidding_zone,
                "quantity": quantity,
                "unit": quantity_measure_unit,
                "resolution": resolution,
                "position": position,
                "start_time": start_time,
                "end_time": end_time,
            })

# Create DataFrame with all data
df = pd.DataFrame(data)

# Calculate the duration_from_start
def calculate_duration(row):
    if row['resolution'] == 'PT60M':
        return timedelta(hours=row['position'])
    elif row['resolution'] == 'PT15M':
        return timedelta(minutes=row['position'] * 15)
    else:
        return pd.NaT  # return NaT if there is a new resolution

# Apply the function to create the duration_from_start column
df['duration_from_start'] = df.apply(calculate_duration, axis=1)

# Calculate the datetime for each price point by adding the duration to start_time
df['datetime'] = df['start_time'] + df['duration_from_start']

# Drop unnecessary columns
df = df.drop(columns=['resolution', 'position', 'start_time', 'end_time', 'duration_from_start'])
df

Unnamed: 0,bidding_zone,quantity,unit,datetime
0,10Y1001A1001A39I,990.0,MAW,2024-01-01 01:00:00+00:00
1,10Y1001A1001A39I,973.0,MAW,2024-01-01 02:00:00+00:00
2,10Y1001A1001A39I,967.0,MAW,2024-01-01 03:00:00+00:00
3,10Y1001A1001A39I,966.0,MAW,2024-01-01 04:00:00+00:00
4,10Y1001A1001A39I,973.0,MAW,2024-01-01 05:00:00+00:00
...,...,...,...,...
7723,10YFI-1--------U,11802.0,MAW,2024-01-14 23:00:00+00:00
7724,10YFI-1--------U,11738.0,MAW,2024-01-14 23:15:00+00:00
7725,10YFI-1--------U,11630.0,MAW,2024-01-14 23:30:00+00:00
7726,10YFI-1--------U,11622.0,MAW,2024-01-14 23:45:00+00:00


In [6]:
# GET GENERATION DATA

import requests
import xml.etree.ElementTree as ET
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenv
import os
import json

# Load environment variables from .env file
load_dotenv()
security_token = os.getenv('_ENTSOE_SECURITY_TOKEN')
url = "https://web-api.tp.entsoe.eu/api"


# Get bidding zones EIC code from file
df_csv = pd.read_csv('country_code_mapper.csv')
bidding_zones = df_csv['EIC'].dropna().str.strip().tolist()

# Parse dates from the JSON file
with open('config_dates.json', 'r') as f:
    config = json.load(f)
start_date = datetime.strptime(config["start_date"], "%Y-%m-%d")
end_date = datetime.strptime(config["end_date"], "%Y-%m-%d")
period_duration = timedelta(days=31)  # Example: one-month intervals

time_periods = []
current_start = start_date
while current_start < end_date:
    current_end = min(current_start + period_duration, end_date)
    time_periods.append((current_start.strftime("%Y%m%d%H%M"), current_end.strftime("%Y%m%d%H%M")))
    current_start = current_end

# Namespace for XML parsing
namespaces = {
    "ns": "urn:iec62325.351:tc57wg16:451-6:generationloaddocument:3:0"
    }

# Initialize empty DataFrame to collect all data
data = []

# Loop through each bidding zone and time period
for bidding_zone in bidding_zones:
    #print(bidding_zone)
    for period_start, period_end in time_periods:
        #print(period_start)
        params = {
            "securityToken": security_token,
            "documentType": "A75",
            "processType": "A16",
            "in_Domain": bidding_zone,
            "periodStart": period_start,
            "periodEnd": period_end
        }

        # Send API request
        response = requests.get(url, params=params)
        if response.status_code != 200:
            print(f"Error for {bidding_zone}, {period_start}-{period_end}: {response.status_code}")
            continue

        # Parse the XML response
        root = ET.fromstring(response.text)

        for timeseries in root.findall(".//ns:TimeSeries", namespaces):
            bidding_zone_element = timeseries.find(".//ns:inBiddingZone_Domain.mRID", namespaces)
            if bidding_zone_element is None:
                # If not found, try outBiddingZone_Domain.mRID
                bidding_zone_element = timeseries.find(".//ns:outBiddingZone_Domain.mRID", namespaces)
            if bidding_zone_element is not None:
                bidding_zone = bidding_zone_element.text
            else:
                # Handle case where neither is found
                raise ValueError("Neither inBiddingZone_Domain.mRID nor outBiddingZone_Domain.mRID found in TimeSeries!")
            resolution = timeseries.find(".//ns:resolution", namespaces).text
            start_time = pd.to_datetime(timeseries.find(".//ns:timeInterval/ns:start", namespaces).text)
            end_time = pd.to_datetime(timeseries.find(".//ns:timeInterval/ns:end", namespaces).text)
            quantity_measure_unit = timeseries.find(".//ns:quantity_Measure_Unit.name", namespaces).text
            produced_type = timeseries.find(".//ns:MktPSRType/ns:psrType", namespaces).text

            # Extract Points for this TimeSeries
            points = timeseries.findall(".//ns:Point", namespaces)

            for point in points:
                position = int(point.find("ns:position", namespaces).text)
                quantity = float(point.find("ns:quantity", namespaces).text)

                # Append data for each point
                data.append({
                    "bidding_zone": bidding_zone,
                    "quantity": quantity,
                    "unit": quantity_measure_unit,
                    "resolution": resolution,
                    "position": position,
                    "start_time": start_time,
                    "end_time": end_time,
                    "produced_energy_type_code": produced_type
                })


# Create DataFrame with all data
df = pd.DataFrame(data)

# Define the dictionary
energy_type_dict = {
    "B01": "Biomass",
    "B02": "Fossil Brown coal/Lignite",
    "B03": "Fossil Coal-derived gas",
    "B04": "Fossil Gas",
    "B05": "Fossil Hard coal",
    "B06": "Fossil Oil",
    "B07": "Fossil Oil shale",
    "B08": "Fossil Peat",
    "B09": "Geothermal",
    "B10": "Hydro Pumped Storage",
    "B11": "Hydro Run-of-river and poundage",
    "B12": "Hydro Water Reservoir",
    "B13": "Marine",
    "B14": "Nuclear",
    "B15": "Other renewable",
    "B16": "Solar",
    "B17": "Waste",
    "B18": "Wind Offshore",
    "B19": "Wind Onshore",
    "B20": "Other",
    "B25": "Energy storage",
}

# Calculate the duration_from_start
def calculate_duration(row):
    if row['resolution'] == 'PT60M':
        return timedelta(hours=row['position'])
    elif row['resolution'] == 'PT15M':
        return timedelta(minutes=row['position'] * 15)
    else:
        return pd.NaT  # return NaT if there is a new resolution

# Apply the function to create the duration_from_start column
df['duration_from_start'] = df.apply(calculate_duration, axis=1)
df['produced_energy_type'] = df['produced_energy_type_code'].map(energy_type_dict)

# Calculate the datetime for each price point by adding the duration to start_time
df['datetime'] = df['start_time'] + df['duration_from_start']

# Drop unnecessary columns
df = df.drop(columns=['resolution', 'position', 'start_time', 'end_time', 'duration_from_start', 'produced_energy_type_code'])
df

Unnamed: 0,bidding_zone,quantity,unit,produced_energy_type,datetime
0,10Y1001A1001A39I,49.0,MAW,Biomass,2024-01-01 01:00:00+00:00
1,10Y1001A1001A39I,50.0,MAW,Biomass,2024-01-01 02:00:00+00:00
2,10Y1001A1001A39I,48.0,MAW,Biomass,2024-01-01 03:00:00+00:00
3,10Y1001A1001A39I,49.0,MAW,Biomass,2024-01-01 04:00:00+00:00
4,10Y1001A1001A39I,49.0,MAW,Biomass,2024-01-01 05:00:00+00:00
...,...,...,...,...,...
75595,10YFI-1--------U,2134.0,MAW,Wind Onshore,2024-01-14 23:00:00+00:00
75596,10YFI-1--------U,2103.0,MAW,Wind Onshore,2024-01-14 23:15:00+00:00
75597,10YFI-1--------U,2069.0,MAW,Wind Onshore,2024-01-14 23:30:00+00:00
75598,10YFI-1--------U,2042.0,MAW,Wind Onshore,2024-01-14 23:45:00+00:00
