In [219]:
import polars as pl
from polars import col as c
import networkx as nx

from config import settings
import json
import os
from datetime import datetime, UTC
import datetime as dt

from utility.polars_operation import generate_uuid_col
from utility.parser_utility import (
    add_table_to_changes_schema,
    generate_connectivity_table,
    generate_random_uuid,
)
from utility.general_function import pl_to_dict

from twindigrid_changes.schema import ChangesSchema
from twindigrid_sql.schema.enum import (
    MeasurementClass,
    MeasurementPhase,
    MeasurementColumn,
    SubstationType,
    TerminalSide,
)
from twindigrid_sql.entries.source import (
    SCADA,
    CONVENTIONAL_METER,
    GRID_LOAD,
    SCADA,
    ESTIMATED,
)


from twindigrid_sql.entries.equipment_class import (
    TRANSFORMER,
    BRANCH,
    SWITCH,
    INDIRECT_FEEDER,
    BUSBAR_SECTION,
    ENERGY_CONSUMER,
)
from twindigrid_sql.entries.measurement_type import ENERGY, ACTIVE_POWER, REACTIVE_POWER
from twindigrid_sql.entries.unit_symbol import WATTHOUR, WATT

# Useless outside jupiternotebook because in settings.py a line that changes the directory to src for ipynb
os.chdir(os.getcwd().replace("/src", ""))
# os.getcwd()

# Import data from matlab

In [220]:
file_names: dict[str, str] = json.load(open(settings.INPUT_FILE_NAMES))

In [221]:
parameter_distflow: pl.DataFrame = pl.read_csv(file_names["Distflow_parameter"])
nodedata_distflow: pl.DataFrame = pl.read_csv(file_names["Distflow_node_data"])
powerdata_distflow: pl.DataFrame = pl.read_csv(file_names["Distflow_Power_data"])
linedata_distflow: pl.DataFrame = pl.read_csv(file_names["Distflow_Line_data"])
result_distflow: pl.DataFrame = pl.read_csv(file_names["Distflow_result"])
# nodedata_distflow = nodedata_distflow.with_columns(c("Snom").cast(pl.Int8))
# # To have base value (need lenght of line), not from matlab !
# line_data_from_input_file: pl.DataFrame = pl.read_excel(
#     file_names["Line_Data_From_Input_File"]
# )

# Add node number to power data
powerdata_distflow = powerdata_distflow.with_row_index(
    "node_number", offset=1
)  # offset=1 because slack bus is 0 and no power on it
powerdata_distflow = powerdata_distflow.with_columns(c("node_number").cast(pl.Int64))
# Create a topology dataframe with basic topology information

df_topology = nodedata_distflow.select(
    c("index").alias("node_number"),
    c("indexLines_1").alias("index_branch_1"),
    c("indexLines_2").alias("index_branch_2"),
    c("indexLines_3").alias("index_branch_3"),
    c("Vnom"),
)

# Add the power data to the topology dataframe with node as key
df_topology = df_topology.join(
    powerdata_distflow, on="node_number", how="full", coalesce=True
)
df_topology

node_number,index_branch_1,index_branch_2,index_branch_3,Vnom,Pload,Qload
i64,i64,i64,i64,i64,f64,f64
0,1,2,3,400,,
1,4,5,,400,0.0,0.0
2,6,7,8,400,0.0,0.0
3,9,,,400,0.0,0.0
4,,,,400,0.0,0.0
…,…,…,…,…,…,…
53,,,,400,0.0,0.0
54,55,56,,400,0.000035,-0.000012
55,57,,,400,0.0,0.0
56,,,,400,0.0,0.0


# Set missing value for equipment

In [222]:
### Set missing value for equipment
# Fake value for the length of the branch
base_length = 1
# Fake value for the switch state
switch_state = False
switch_type = "locked_switch"
switch_command = "unknown"

## Connectivity node table

In [223]:
# Generate the node dict with uuid for each node
connectivity_node: dict[float, str] = pl_to_dict(
    df_topology.select(
        c("node_number"),
        c("node_number").pipe(generate_uuid_col, added_string="node_").alias("uuid"),
    )
)
connectivity_node

{0: 'df941fce-ceda-5874-ab63-5c8af9bec38b',
 1: 'ba84d70a-80d7-590e-b112-f9c4b5fabf56',
 2: '078656ed-79f8-53a1-a67a-bb8f53476cec',
 3: 'c2247320-9fc2-538a-ba64-3ac70e49994e',
 4: 'af72457f-f983-5eeb-a635-0609f4cae0c1',
 5: 'dbd2411e-1e87-5956-86d9-d69ee7d848d5',
 6: '2db84a3d-aa74-5b4f-866a-3313180cb863',
 7: 'cce2c582-1c62-5b80-aac3-b840ce3f5f14',
 8: 'd177af44-109b-50d6-8b8a-97874a03462b',
 9: '8a7f105e-71f3-5101-8b4f-1a9007ec858d',
 10: '41ac73c2-162a-5ce9-964f-68a510db4d95',
 11: '2974cbe7-7e8b-54b5-9978-65f4214528ca',
 12: 'bc038a56-4e78-504b-a9a5-3e5666086e8e',
 13: '2e63d367-00e4-59d5-a861-9117f6825754',
 14: 'd8148587-7cb2-5800-b216-0eeb7afcf7e4',
 15: 'c682ba2f-b6aa-5ee6-8924-b395e42ea750',
 16: 'ba439656-84c7-5804-bf01-e9ac87641692',
 17: '82ae2e23-b674-53d7-ada4-fb1657822f5e',
 18: '7573ef10-2d63-5ec5-8190-b70bd3c452e5',
 19: '807905d4-b3d4-532e-a544-9cdac0f377d5',
 20: '71c4b22f-584c-581d-9508-099e859582d2',
 21: '7e6993e0-fae2-5ce4-85ed-6e2d27445e4c',
 22: '069c0cf6-35f1-

In [224]:
## Add the cn_fk to the topology dataframe
df_topology = df_topology.with_columns(
    c("node_number").replace_strict(connectivity_node, default=None).alias("cn_fk")
)

## Branch

In [225]:
# branch :pl.DataFrame =

default_install_date: datetime = datetime(*settings.DEFAULT_INSTALL_DATE, tzinfo=UTC)
heartbeat = datetime.now(UTC)
changes_schema = ChangesSchema()


# Current and other line parameter in pu

# Filter to take only branch, connection_type == 2
branch = linedata_distflow.filter(c("connection_type") == 2).select(
    c("line_number").alias("dso_code"),
    c("i_pu").alias("current_limit"),
    c("r_pu"),
    c("x_pu"),
    c("b_pu"),
    # Need column name non null value for validation of the schema
    pl.lit(base_length).alias("length"),  # km
    pl.lit(BRANCH).alias("concrete_class"),
    pl.lit(default_install_date).alias("start"),
    pl.lit(heartbeat).alias("start_heartbeat"),
    c("line_number").pipe(generate_uuid_col, added_string=BRANCH).alias("uuid"),
    # Generate uuid for each terminal of branch with node uuid
    c("node_from").replace_strict(connectivity_node, default=None).alias("t1"),
    c("node_to").replace_strict(connectivity_node, default=None).alias("t2"),
    # Need column name for validation of the schema
    pl.lit(None).alias("t1_container_fk"),
    pl.lit(None).alias("t2_container_fk"),
)
new_tables_pl: dict[str, pl.DataFrame] = {
    "Resource": branch,
    "Equipment": branch,
    "Branch": branch,
}
changes_schema = add_table_to_changes_schema(
    schema=changes_schema, new_tables_pl=new_tables_pl, raw_table_name="branch"
)
changes_schema = generate_connectivity_table(
    changes_schema=changes_schema, eq_table=branch, raw_data_table="branch"
)

## Measurement

    load: pl.DataFrame = topology_df.filter(pl.col("KEYWORD") == "LOAD")[["FROM", "t1_container_fk"]]

    measurement: pl.DataFrame = pl.read_csv(file_names["conventional_meter"], separator=";")\
        .filter(c("METERING").is_first_distinct())\
        .with_columns(c("NODE").cast(pl.Utf8))\
        .join(load, left_on="NODE", right_on="FROM", how="left")\
        .with_columns(
            c("METERING").pipe(generate_random_uuid).alias("uuid"),
            c("ANNUAL E kWh").cast(pl.Float64).alias("double_value"),
            c("METERING").alias("dso_code"),
            pl.lit(MeasurementClass.SPAN.value).alias("concrete_type"),
            c("t1_container_fk").alias("resource_fk"),
            pl.lit(3).alias("unit_multiplier"),
            pl.lit(WATTHOUR).alias("unit_symbol"),
            pl.lit(CONVENTIONAL_METER).alias("source_fk"),
            pl.lit(op(AVG)).alias("op_type"),
            pl.lit(MeasurementPhase.ABC.value).alias("phase"),
            pl.lit(MeasurementColumn.DOUBLE.value).alias("column_type"),
            pl.lit(60*60*24*365).alias("default_period"),
            pl.lit(ENERGY).alias("measurement_type"),
            pl.lit(heartbeat).alias("start_heartbeat"),
        ).drop_nulls("resource_fk")

    measurement_span = measurement.with_columns(
        c("uuid").alias("measurement_fk"),
        c("uuid").pipe(generate_random_uuid).alias("uuid"),
        pl.lit(datetime(2022, 1, 1)).dt.replace_time_zone(time_zone="Europe/Zurich")
        .dt.convert_time_zone(time_zone="UTC").alias("start"),
        pl.lit(datetime(2023, 1, 1)).dt.replace_time_zone(time_zone="Europe/Zurich")
        .dt.convert_time_zone(time_zone="UTC").alias("end")
    )

    new_tables_pl: dict[str, pl.DataFrame] = {
                "Measurement": measurement, "MeasurementSpan": measurement_span
            }
    changes_schema = add_table_to_changes_schema(
            schema=changes_schema, new_tables_pl=new_tables_pl, raw_table_name="meter_id")

In [226]:
## Add the uuid of the node to the power data
measurement = df_topology.select(
    pl.lit(None).pipe(generate_random_uuid).alias("uuid"),
    c("cn_fk").alias("resource_fk"),
    pl.lit(heartbeat).alias("start_heartbeat"),
    pl.lit(MeasurementClass.SPAN.value).alias("concrete_type"),
    pl.lit(MeasurementPhase.ABC.value).alias("phase"),
    pl.lit(MeasurementColumn.STR.value).alias("column_type"),
    pl.lit(CONVENTIONAL_METER).alias("source_fk"),
    # pl.lit(60*60*24*365).alias("default_period"),
    pl.lit("[ACTIVE_POWER, REACTIVE_POWER]").alias("measurement_type"),
    # Not ok, but don't kwow how to do it with two value for one "client/container"
    ("[" + c("Pload").cast(pl.String) + "," + c("Qload").cast(pl.String) + "]").alias(
        "string_value"
    ),
    pl.lit("pu").alias("unit_symbol"),
    pl.lit(1).alias("unit_multiplier"),
)

measurement_span = measurement.with_columns(
    c("uuid").alias("measurement_fk"),
    c("uuid").pipe(generate_random_uuid).alias("uuid"),
    pl.lit(datetime(2022, 1, 1))
    .dt.replace_time_zone(time_zone="Europe/Zurich")
    .dt.convert_time_zone(time_zone="UTC")
    .alias("start"),
    pl.lit(datetime(2023, 1, 1))
    .dt.replace_time_zone(time_zone="Europe/Zurich")
    .dt.convert_time_zone(time_zone="UTC")
    .alias("end"),
)

new_tables_pl: dict[str, pl.DataFrame] = {
    "Measurement": measurement,
    "MeasurementSpan": measurement_span,
}
changes_schema = add_table_to_changes_schema(
    schema=changes_schema, new_tables_pl=new_tables_pl, raw_table_name="meter_id"
)



In [227]:
# changes_schema.measurement
measurement

uuid,resource_fk,start_heartbeat,concrete_type,phase,column_type,source_fk,measurement_type,string_value,unit_symbol,unit_multiplier
str,str,"datetime[μs, UTC]",str,str,str,str,str,str,str,i32
"""d3fcd843-1a69-47cd-850d-b8ff93…","""df941fce-ceda-5874-ab63-5c8af9…",2024-12-20 12:17:06.465830 UTC,"""measurement_span""","""ABC""","""str_value""","""conventional_meter""","""[ACTIVE_POWER, REACTIVE_POWER]""",,"""pu""",1
"""d3fcd843-1a69-47cd-850d-b8ff93…","""ba84d70a-80d7-590e-b112-f9c4b5…",2024-12-20 12:17:06.465830 UTC,"""measurement_span""","""ABC""","""str_value""","""conventional_meter""","""[ACTIVE_POWER, REACTIVE_POWER]""","""[0.0,0.0]""","""pu""",1
"""d3fcd843-1a69-47cd-850d-b8ff93…","""078656ed-79f8-53a1-a67a-bb8f53…",2024-12-20 12:17:06.465830 UTC,"""measurement_span""","""ABC""","""str_value""","""conventional_meter""","""[ACTIVE_POWER, REACTIVE_POWER]""","""[0.0,0.0]""","""pu""",1
"""d3fcd843-1a69-47cd-850d-b8ff93…","""c2247320-9fc2-538a-ba64-3ac70e…",2024-12-20 12:17:06.465830 UTC,"""measurement_span""","""ABC""","""str_value""","""conventional_meter""","""[ACTIVE_POWER, REACTIVE_POWER]""","""[0.0,0.0]""","""pu""",1
"""d3fcd843-1a69-47cd-850d-b8ff93…","""af72457f-f983-5eeb-a635-0609f4…",2024-12-20 12:17:06.465830 UTC,"""measurement_span""","""ABC""","""str_value""","""conventional_meter""","""[ACTIVE_POWER, REACTIVE_POWER]""","""[0.0,0.0]""","""pu""",1
…,…,…,…,…,…,…,…,…,…,…
"""d3fcd843-1a69-47cd-850d-b8ff93…","""4505ed8e-f087-5ee2-8c67-775daa…",2024-12-20 12:17:06.465830 UTC,"""measurement_span""","""ABC""","""str_value""","""conventional_meter""","""[ACTIVE_POWER, REACTIVE_POWER]""","""[0.0,0.0]""","""pu""",1
"""d3fcd843-1a69-47cd-850d-b8ff93…","""23bc00b6-0e27-5e6d-a02e-dda5e9…",2024-12-20 12:17:06.465830 UTC,"""measurement_span""","""ABC""","""str_value""","""conventional_meter""","""[ACTIVE_POWER, REACTIVE_POWER]""","""[0.0000354390681003584,-0.0000…","""pu""",1
"""d3fcd843-1a69-47cd-850d-b8ff93…","""b1d51456-8036-5737-accc-1103d2…",2024-12-20 12:17:06.465830 UTC,"""measurement_span""","""ABC""","""str_value""","""conventional_meter""","""[ACTIVE_POWER, REACTIVE_POWER]""","""[0.0,0.0]""","""pu""",1
"""d3fcd843-1a69-47cd-850d-b8ff93…","""9bed56b6-83af-51ce-b629-9a787c…",2024-12-20 12:17:06.465830 UTC,"""measurement_span""","""ABC""","""str_value""","""conventional_meter""","""[ACTIVE_POWER, REACTIVE_POWER]""","""[0.0,0.0]""","""pu""",1


## Switch

In [228]:
# Filter to take only switch, connection_type == 3
switch = linedata_distflow.filter(c("connection_type") == 3).select(
    c("line_number").alias("dso_code"),
    pl.lit(SWITCH).alias("concrete_class"),
    pl.lit(default_install_date).alias("start"),
    pl.lit(heartbeat).alias("start_heartbeat"),
    pl.lit(switch_state).alias("normal_open"),
    pl.lit(switch_type).alias("type"),
    pl.lit(switch_command).alias("command"),
    # Generate uuid for each terminal of branch with node uuid
    c("node_from").replace_strict(connectivity_node, default=None).alias("t1"),
    c("node_to").replace_strict(connectivity_node, default=None).alias("t2"),
    # Need column name for validation of the schema
    pl.lit(None).alias("t1_container_fk"),
    pl.lit(None).alias("t2_container_fk"),
    c("line_number").pipe(generate_uuid_col, added_string=SWITCH).alias("uuid"),
)
new_tables_pl: dict[str, pl.DataFrame] = {
    "Resource": switch,
    "Equipment": switch,
    "Switch": switch,
}
changes_schema = add_table_to_changes_schema(
    schema=changes_schema, new_tables_pl=new_tables_pl, raw_table_name="switch"
)
changes_schema = generate_connectivity_table(
    changes_schema=changes_schema, eq_table=switch, raw_data_table="switch"
)

In [229]:
# Begin time of the data from matlab (from main_FC.ipynb before)
str(datetime(2020, 4, 4, 23, 00, 0, 0, UTC) - dt.timedelta(hours=192))

'2020-03-27 23:00:00+00:00'

## Parser

In [230]:
# Parse connectivity node
df_topology

node_number,index_branch_1,index_branch_2,index_branch_3,Vnom,Pload,Qload,cn_fk
i64,i64,i64,i64,i64,f64,f64,str
0,1,2,3,400,,,"""df941fce-ceda-5874-ab63-5c8af9…"
1,4,5,,400,0.0,0.0,"""ba84d70a-80d7-590e-b112-f9c4b5…"
2,6,7,8,400,0.0,0.0,"""078656ed-79f8-53a1-a67a-bb8f53…"
3,9,,,400,0.0,0.0,"""c2247320-9fc2-538a-ba64-3ac70e…"
4,,,,400,0.0,0.0,"""af72457f-f983-5eeb-a635-0609f4…"
…,…,…,…,…,…,…,…
53,,,,400,0.0,0.0,"""4505ed8e-f087-5ee2-8c67-775daa…"
54,55,56,,400,0.000035,-0.000012,"""23bc00b6-0e27-5e6d-a02e-dda5e9…"
55,57,,,400,0.0,0.0,"""b1d51456-8036-5737-accc-1103d2…"
56,,,,400,0.0,0.0,"""9bed56b6-83af-51ce-b629-9a787c…"


In [231]:
def parse_connectivity_node(
    topology_df: pl.DataFrame, changes_schema: ChangesSchema, **kwargs
) -> ChangesSchema:

    cn_voltage_mapping: dict[str, float] = pl_to_dict(
        topology_df.filter(c("KEYWORD") != "TR2")
        .unpivot(
            index=["UN"], on=["t1", "t2"], value_name="cn_fk", variable_name="side"
        )
        .drop_nulls("cn_fk")
        .group_by("cn_fk")
        .agg(c("UN").drop_nulls().first())
        .drop_nulls("UN")[["cn_fk", "UN"]]
    )
    node = topology_df.filter(c("KEYWORD") == "NODE").with_columns(
        (1e3 * c("uuid").replace_strict(cn_voltage_mapping, default=c("UN")))
        .cast(pl.Int32)
        .alias("base_voltage_fk"),  # kV to V
    )

    changes_schema = add_table_to_changes_schema(
        schema=changes_schema,
        new_tables_pl={"ConnectivityNode": node},
        raw_table_name="ConnectivityNode",
    )
    return changes_schema

## Import data to changes schema

In [None]:
def sum_downstream_power(col: pl.Expr, df: pl.DataFrame):
    return col.map_elements(
        lambda x: df.filter(c("upstream") == x)["p_line"].sum(), return_dtype=pl.Float64
    )


def calculate_line_power(df: pl.DataFrame):
    return (c("downstream").pipe(sum_downstream_power, df=df) + c("P")) * (1 + c("F"))


def sum_power(df: pl.DataFrame, lv: int):

    return df.with_columns(
        pl.when(c("lv") == lv)
        .then(calculate_line_power(df=df))
        .otherwise(c("p_line"))
        .alias("p_line")
    )


# UP Use for each powerflow
# Down Use only one time
def get_node_level(G: nx.DiGraph) -> dict:
    level_mapping: dict = {}
    for node in reversed(list(nx.topological_sort(G))):
        if not len(list(G.successors(node))):
            level_mapping[node] = 0
        else:
            level_mapping[node] = max(level_mapping[n] for n in G.successors(node)) + 1
    return level_mapping


line_data: pl.DataFrame = pl.DataFrame(
    {
        "downstream": [1, 2, 3, 4, 5, 6, 7, 8],
        "upstream": [None, 1, 2, 1, 4, 4, 4, 6],
        "P": [0, 1, 2, 1, 4, 3, 6, 5],
        "F": [0.0, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1],
        "p_line": [0] * 8,
    }
)

grid = nx.DiGraph()

_ = line_data.drop_nulls(subset="upstream").with_columns(
    pl.struct(c("upstream"), c("downstream")).map_elements(
        lambda x: grid.add_edge(x["upstream"], x["downstream"]), return_dtype=pl.Struct
    )
)
level_mapping: dict = get_node_level(G=grid)
line_data = line_data.with_columns(
    c("downstream").replace_strict(level_mapping, default=None).alias("lv")
)

for i in range(line_data["lv"].max() + 1):
    line_data = sum_power(df=line_data, lv=i)

print(line_data.sort("lv"))

shape: (8, 6)
┌────────────┬──────────┬─────┬─────┬────────┬─────┐
│ downstream ┆ upstream ┆ P   ┆ F   ┆ p_line ┆ lv  │
│ ---        ┆ ---      ┆ --- ┆ --- ┆ ---    ┆ --- │
│ i64        ┆ i64      ┆ i64 ┆ f64 ┆ f64    ┆ i64 │
╞════════════╪══════════╪═════╪═════╪════════╪═════╡
│ 3          ┆ 2        ┆ 2   ┆ 0.1 ┆ 2.2    ┆ 0   │
│ 5          ┆ 4        ┆ 4   ┆ 0.1 ┆ 4.4    ┆ 0   │
│ 7          ┆ 4        ┆ 6   ┆ 0.1 ┆ 6.6    ┆ 0   │
│ 8          ┆ 6        ┆ 5   ┆ 0.1 ┆ 5.5    ┆ 0   │
│ 2          ┆ 1        ┆ 1   ┆ 0.1 ┆ 3.52   ┆ 1   │
│ 6          ┆ 4        ┆ 3   ┆ 0.1 ┆ 9.35   ┆ 1   │
│ 4          ┆ 1        ┆ 1   ┆ 0.1 ┆ 23.485 ┆ 2   │
│ 1          ┆ null     ┆ 0   ┆ 0.0 ┆ 27.005 ┆ 3   │
└────────────┴──────────┴─────┴─────┴────────┴─────┘
