In [1]:
import json
from config import settings
import os
import polars as pl
from polars import col as c
import polars.selectors as cs

from data_federation.input_model import SmallflexInputSchema

from data_federation.parser.market_price import parse_market_price
from utility.general_function import dictionary_key_filtering
os.chdir(os.getcwd().replace("/src", ""))



In [2]:
input_file_names: dict[str, str] = json.load(open(settings.INPUT_FILE_NAMES))
market_price_metadata: dict = json.load(open(input_file_names["market_price_metadata"]))
small_flex_input_schema: SmallflexInputSchema = SmallflexInputSchema()

In [3]:

kwargs: dict = {"small_flex_input_schema": small_flex_input_schema, "input_file_names": input_file_names}
kwargs["small_flex_input_schema"] = parse_market_price(**kwargs)
kwargs["small_flex_input_schema"]

SmallflexInputSchema(market_price=shape: (1_663_990, 9)
┌─────────────────────────┬──────────┬───────────┬─────────┬───┬─────────┬──────┬──────────┬──────┐
│ timestamp               ┆ market   ┆ direction ┆ country ┆ … ┆ unit    ┆ max  ┆ mean     ┆ min  │
│ ---                     ┆ ---      ┆ ---       ┆ ---     ┆   ┆ ---     ┆ ---  ┆ ---      ┆ ---  │
│ datetime[μs, UTC]       ┆ str      ┆ str       ┆ str     ┆   ┆ str     ┆ f64  ┆ f64      ┆ f64  │
╞═════════════════════════╪══════════╪═══════════╪═════════╪═══╪═════════╪══════╪══════════╪══════╡
│ 2019-12-25 08:00:00 UTC ┆ mFRR-cap ┆ neg       ┆ AT      ┆ … ┆ EUR/MWh ┆ 1.84 ┆ 0.289282 ┆ 0.0  │
│ 2019-12-12 12:00:00 UTC ┆ aFRR-cap ┆ pos       ┆ AT      ┆ … ┆ EUR/MWh ┆ 1.79 ┆ 1.315441 ┆ 0.0  │
│ 2019-12-30 00:00:00 UTC ┆ mFRR-cap ┆ neg       ┆ AT      ┆ … ┆ EUR/MWh ┆ 5.24 ┆ 4.430974 ┆ 0.0  │
│ 2019-12-14 12:00:00 UTC ┆ aFRR-cap ┆ pos       ┆ AT      ┆ … ┆ EUR/MWh ┆ 1.44 ┆ 0.965686 ┆ 0.0  │
│ 2019-12-20 00:00:00 UTC ┆ aFRR-cap ┆ neg  

In [4]:

market_price: pl.DataFrame = pl.DataFrame()
name = "reg_afrr_cap"
name= "reg_afrr_ene"
for entry in list(os.scandir(market_price_metadata[name]["folder"])):
    if entry.name.startswith("RESULT_OVERVIEW"):
        data: pl.DataFrame = pl.read_excel(entry.path)
        if data.is_empty():
            continue
        col_mapping = dictionary_key_filtering(market_price_metadata[name]["col_mapping"], data.columns)
        data = data[list(col_mapping.keys())].rename(col_mapping).with_columns(
            c("min", "mean", "max").cast(pl.Float64),
            pl.col("metadata").str.split("_").list.slice(offset=0, length=2)
            .list.to_struct(fields=["direction", "hour"])
        ).unnest("metadata")\
        .with_columns(
            pl.concat_str(["timestamp", "hour", pl.lit("00")], separator=" ").str.to_datetime("%Y-%m-%d %H %M", time_zone="UTC").alias("timestamp"),
            c("direction").str.to_lowercase(),
        ).with_columns(
            pl.lit(value).alias(name) for name, value in market_price_metadata[name]["data"].items()
        ).filter(pl.any_horizontal(~c("max", "mean", "min").is_null()))
        market_price = pl.concat([market_price, data], how="diagonal_relaxed")

InvalidOperationError: conversion from `str` to `datetime[μs, UTC]` failed in column 'timestamp' for 5760 out of 5760 values: ["2023-11-01 001 00", "2023-11-01 002 00", … "2023-11-30 096 00"]

You might want to try:
- setting `strict=False` to set values that cannot be converted to `null`
- using `str.strptime`, `str.to_date`, or `str.to_datetime` and providing a format string

In [5]:
data[list(col_mapping.keys())].rename(col_mapping)

timestamp,metadata,min,mean,max
date,str,f64,f64,f64
2023-11-01,"""NEG_001""",48.34,-752.3,-15000.0
2023-11-01,"""NEG_002""",42.34,-779.6,-15000.0
2023-11-01,"""NEG_003""",40.34,-791.87,-15000.0
2023-11-01,"""NEG_004""",39.34,-798.44,-15000.0
2023-11-01,"""NEG_005""",31.34,-804.68,-15000.0
…,…,…,…,…
2023-11-30,"""POS_092""",168.37,963.61,15000.0
2023-11-30,"""POS_093""",183.34,1010.1,15000.0
2023-11-30,"""POS_094""",187.91,979.19,15000.0
2023-11-30,"""POS_095""",189.3,955.92,15000.0


In [5]:
data[list(col_mapping.keys())]

PRODUCT
str
"""NEG_001"""
"""NEG_002"""
"""NEG_003"""
"""NEG_004"""
"""NEG_005"""
…
"""POS_092"""
"""POS_093"""
"""POS_094"""
"""POS_095"""


In [5]:
name = "reg_afrr_ene"
for entry in list(os.scandir(market_price_metadata[name]["folder"])):
    if entry.name.startswith("RESULT_OVERVIEW"):
        data: pl.DataFrame = pl.read_excel(entry.path)

In [7]:
data

            "GERMANY_MIN_ENERGY_PRICE_[(EUR/MW)/h]": "min", "GERMANY_AVERAGE_ENERGY_PRICE_[(EUR/MW)/h]": "mean", 
            "GERMANY_MARGINAL_ENERGY_PRICE_[(EUR/MW)/h]": "max" 

DELIVERY_DATE,TYPE_OF_RESERVES,PRODUCT,GERMANY_MIN_ENERGY_PRICE_[EUR/MWh],GERMANY_AVERAGE_ENERGY_PRICE_[EUR/MWh],GERMANY_MARGINAL_ENERGY_PRICE_[EUR/MWh],GERMANY_SUM_OF_OFFERED_CAPACITY_[MW],NOTE
date,str,str,f64,f64,f64,i64,str
2021-01-01,"""aFRR""","""POS_00_04""",65.76,32854.4,91241.37,2174,""""""
2021-01-01,"""aFRR""","""POS_04_08""",75.76,2703.12,13984.21,2508,""""""
2021-01-01,"""aFRR""","""POS_08_12""",76.56,2255.89,9869.0,2400,""""""
2021-01-01,"""aFRR""","""POS_12_16""",77.13,396.18,4899.0,2345,""""""
2021-01-01,"""aFRR""","""POS_16_20""",64.34,6801.55,35285.21,2307,""""""
…,…,…,…,…,…,…,…
2021-01-31,"""aFRR""","""NEG_04_08""",17.0,-843.68,-9999.0,2096,""""""
2021-01-31,"""aFRR""","""NEG_08_12""",26.0,-98.36,-600.0,2685,""""""
2021-01-31,"""aFRR""","""NEG_12_16""",26.0,-57.41,-300.0,2536,""""""
2021-01-31,"""aFRR""","""NEG_16_20""",25.34,-64.05,-764.58,2456,""""""


In [6]:
col_name = "GERMANY_AVERAGE_CAPACITY_PRICE"




data[list(COL_MAPPING.keys())].rename(COL_MAPPING).with_columns(
    c("min", "mean", "max").cast(pl.Float64),
    pl.col("metadata").str.split("_").list.slice(offset=0, length=2)
    .list.to_struct(fields=["direction", "hour"])
).unnest("metadata")\
.with_columns(
    pl.concat_str(["timestamp", "hour", pl.lit("00")], separator=" ").str.to_datetime("%Y-%m-%d %H %M", time_zone="UTC").alias("timestamp"),
    c("direction").str.to_lowercase(),
    c("market") + "-cap"
)

ColumnNotFoundError: "DATE_FROM" not found

NameError: name 'COL_MAPPING' is not defined

In [22]:
data.select([
    pl.col("DATE_FROM"),
    pl.col(act_col_name).alias("mean").cast(pl.Float64, strict=False),
    pl.col("PRODUCT").str.split("_").list.slice(offset=0, length=2)
    .list.to_struct(fields=["market", "time"]).alias("meta_data")
]).unnest("meta_data").with_columns(
    
)


DATE_FROM,[EUR/MW],market,time
date,f64,str,str
2021-07-01,21.18,"""POS""","""00"""
2021-07-01,32.41,"""POS""","""04"""
2021-07-01,46.43,"""POS""","""08"""
2021-07-01,30.17,"""POS""","""12"""
2021-07-01,45.37,"""POS""","""16"""
…,…,…,…
2021-07-31,35.52,"""NEG""","""04"""
2021-07-31,88.74,"""NEG""","""08"""
2021-07-31,229.09,"""NEG""","""12"""
2021-07-31,100.81,"""NEG""","""16"""


In [None]:

.with_columns([
    pl.col("PRODUCT").str.split_exact("_", 2).struct.rename_fields(["first_part", "second_part", "third_part"]).alias("fields")
]).unnest("fields").select([
    (pl.concat_str(["DATE_FROM", "second_part"], separator=" ") + ":00") .str.to_datetime("%m-%d-%y %H:%M").alias("datetime"),
    pl.col("first_part").str.to_lowercase().alias("market"),
    pl.col("[EUR/MW]")
])

In [6]:
final_data: pl.DataFrame = pl.DataFrame()
name = "apg_energy"
for entry in os.scandir(market_price_metadata[name]["folder"]):
    file_path: str = entry.path
    
data = pl.read_csv(file_path, separator=",", has_header=True, null_values=["NA"])
data

Time from [CET/CEST],Time to [CET/CEST],Type,Direction,Rank,Quantity [MW],Energy Price [€/MWh]
str,str,str,str,i64,f64,f64
"""2021-01-01 00:00:00""","""2021-01-01 04:00:00""","""SRR""","""NEG""",1,1.0,7.4
"""2021-01-01 00:00:00""","""2021-01-01 04:00:00""","""SRR""","""NEG""",2,5.0,-0.01
"""2021-01-01 00:00:00""","""2021-01-01 04:00:00""","""SRR""","""NEG""",3,5.0,-0.01
"""2021-01-01 00:00:00""","""2021-01-01 04:00:00""","""SRR""","""NEG""",4,1.0,-0.9
"""2021-01-01 00:00:00""","""2021-01-01 04:00:00""","""SRR""","""NEG""",5,2.0,-2.7
…,…,…,…,…,…,…
"""2021-12-31 20:00:00""","""2022-01-01 00:00:00""","""TRR""","""POS""",32,1.0,9990.0
"""2021-12-31 20:00:00""","""2022-01-01 00:00:00""","""TRR""","""POS""",33,1.0,9990.0
"""2021-12-31 20:00:00""","""2022-01-01 00:00:00""","""TRR""","""POS""",34,2.0,9990.0
"""2021-12-31 20:00:00""","""2022-01-01 00:00:00""","""TRR""","""POS""",35,2.0,9990.0


In [7]:
pl.read_csv(file_path, separator=",", has_header=True, null_values=["NA"])\
        .with_columns([
            pl.col("Time from [CET/CEST]").str.to_datetime("%Y-%m-%d %H:%M:%S").alias("datetime"), 
            pl.concat_str(["Type", "Direction"], separator="_").alias("market"), 
            (pl.col("Quantity [MW]") * pl.col(col_name)).alias("volume_price")
        ]).group_by(["datetime", "market"]).agg(
            pl.struct(
            c(col_name).max().alias("max"),
            ((c(col_name) *c("Quantity [MW]")).sum()/c("Quantity [MW]").sum()).alias("mean"), 
            c(col_name).min().alias("min")).alias("price")
        ).with_columns(
            pl.lit("[EUR/MWh]").alias("unit")
        )

NameError: name 'col_name' is not defined

In [7]:
data.sort("datetime")

ColumnNotFoundError: "datetime" not found

In [8]:

all_data = pl.DataFrame()


    file_path = os.path.join(local_file_path, file_name)
    df_temp = pl.read_csv(file_path, separator=",", has_header=True, null_values=["NA"])
    df_temp = df_temp.with_columns([
        pl.col("Time from [CET/CEST]").alias("datetime"), 
        pl.concat_str(["Type", "Direction"], separator="_").alias("market"), 
        (pl.col("Quantity [MW]") * pl.col(col_name)).alias("volume_price")
        ]).group_by(by=["datetime", "market"]).sum().with_columns(
            (pl.col("volume_price") / pl.col("Quantity [MW]")).alias(final_col)
            ).select(["datetime", "market", final_col]).with_columns(
                pl.col("datetime").str.to_datetime("%Y-%m-%d %H:%M:%S")
                )
    df_temp0 = df_temp.filter(~pl.col("market").is_in(["SRR_NEG", "TRR_NEG"]))
    if energy:
        df_temp1 = df_temp.filter(pl.col("market").is_in(["SRR_NEG", "TRR_NEG"])).with_columns(- pl.col(final_col))  # pylint: disable= invalid-unary-operand-type
    else:
        df_temp1 = df_temp.filter(pl.col("market").is_in(["SRR_NEG", "TRR_NEG"]))
    all_data = pl.concat([all_data, df_temp0, df_temp1], how="diagonal")
all_data = all_data.unique().sort("datetime")
if where is not None:
    save_pyarrow_data(all_data, where)
return all_data

IndentationError: unexpected indent (1432515968.py, line 4)

In [7]:
data

Time from [CET/CEST],Time to [CET/CEST],Type,Direction,Rank,Quantity [MW],Capacity Price [€/MWh]
str,str,str,str,i64,f64,f64
"""2020-01-01 00:00:00""","""2020-01-01 04:00:00""","""SRR""","""NEG""",1,1.0,4.0
"""2020-01-01 00:00:00""","""2020-01-01 04:00:00""","""SRR""","""NEG""",2,5.0,4.5
"""2020-01-01 00:00:00""","""2020-01-01 04:00:00""","""SRR""","""NEG""",3,8.0,5.0
"""2020-01-01 00:00:00""","""2020-01-01 04:00:00""","""SRR""","""NEG""",4,5.0,5.0
"""2020-01-01 00:00:00""","""2020-01-01 04:00:00""","""SRR""","""NEG""",5,1.0,5.0
…,…,…,…,…,…,…
"""2020-12-31 20:00:00""","""2021-01-01 00:00:00""","""TRR""","""POS""",43,5.0,1.69
"""2020-12-31 20:00:00""","""2021-01-01 00:00:00""","""TRR""","""POS""",44,5.0,1.71
"""2020-12-31 20:00:00""","""2021-01-01 00:00:00""","""TRR""","""POS""",45,5.0,1.73
"""2020-12-31 20:00:00""","""2021-01-01 00:00:00""","""TRR""","""POS""",46,5.0,1.81
