In [25]:
import power_system_simulation as pss
import pandas as pd
from power_grid_model.utils import json_deserialize_from_file

In [26]:
dataset = json_deserialize_from_file("input_network_data_assign3.json")

In [27]:
reactive_power_data = pd.read_parquet("reactive_power_profile_assign3.parquet")

In [28]:
active_power_data = pd.read_parquet("active_power_profile_assign3.parquet")

In [29]:
# declare new PowerSimModel object
test1 = pss.PowerSim(grid_data=dataset)

# PowerSim saves the model as PowerSimModel
# access to assignment 2 functions is now given as
batch_test = test1.PowerSimModel.batch_powerflow(
    active_power_profile=active_power_data, reactive_power_profile=reactive_power_data
)
aggregate_test = test1.PowerSimModel.aggregate_voltage_table(
    active_power_profile=active_power_data, reactive_power_profile=reactive_power_data
)

In [30]:
try:
    test1.optimal_tap_poisition()
except:
    print("PowerProfileNotFound: No active power profile provided.")

PowerProfileNotFound: No active power profile provided.


In [31]:
test2 = pss.PowerSim(
    grid_data=dataset, active_power_profile=active_power_data, reactive_power_profile=reactive_power_data
)

## EV Penetration Testing

In [32]:
import math
import random
import graph_processing as gp

grid_data = json_deserialize_from_file("input_network_data_2.json")
# meta_data = json_deserialize_from_file("meta_data_2.json")
ev_active_power_profile = pd.read_parquet("ev_active_power_profile_2.parquet")
active_power_profile = pd.read_parquet("active_power_profile_2.parquet")
reactive_power_profile = pd.read_parquet("reactive_power_profile_2.parquet")

test1 = pss.PowerSim(grid_data=dataset)

num_houses = 150
penetration_level = 20
num_feeders = 7

In [33]:
# Calculate number of EVs per feeder
total_evs = math.floor(penetration_level * num_houses / 100)
evs_per_feeder = math.floor(total_evs / num_feeders)
print(f"EVs per feeder: {evs_per_feeder}")

# Initialize the GraphProcessor to find downstream vertices
edge_vertex_id_pairs = list(zip(grid_data["line"]["from_node"], grid_data["line"]["to_node"])) + list(
    zip(grid_data["transformer"]["from_node"], grid_data["transformer"]["to_node"])
)
edge_enabled = [
    (f_status == 1 and t_status == 1)
    for f_status, t_status in zip(grid_data["line"]["from_status"], grid_data["line"]["to_status"])
]
edge_enabled += [(grid_data["transformer"]["from_status"][0] == 1 and grid_data["transformer"]["to_status"][0] == 1)]
source_vertex_id = grid_data["source"]["node"][0]
edge_ids = list(grid_data["line"]["id"]) + list(grid_data["transformer"]["id"])
vertex_ids = grid_data["node"]["id"]

test4 = gp.GraphProcessor(
    vertex_ids=vertex_ids,
    edge_ids=edge_ids,
    edge_vertex_id_pairs=edge_vertex_id_pairs,
    edge_enabled=edge_enabled,
    source_vertex_id=source_vertex_id,
)

transformer_to_node = grid_data["transformer"]["to_node"][0]

assigned_profiles = set()
for line in grid_data["line"]:
    if line["from_node"] == transformer_to_node:
        print(f"Found line with from_node {transformer_to_node}: to_node = {line['to_node']}")
        downstream_vertices = test4.find_downstream_vertices(line["id"])
        sym_load_nodes = set(grid_data["sym_load"]["node"])
        common_nodes = sym_load_nodes.intersection(downstream_vertices)

        common_nodes_list = sorted(common_nodes)

        # Check if the number of EVs per feeder is greater than the available houses
        if len(common_nodes_list) < evs_per_feeder:
            print(
                f"Warning: Fewer houses available ({len(common_nodes_list)}) than EVs per feeder ({evs_per_feeder}). Adjusting to use all available houses."
            )
            evs_per_feeder = len(common_nodes_list)

        selected_houses = random.sample(common_nodes_list, evs_per_feeder)
        print(f"Selected houses for EVs in feeder {line['id']}: {selected_houses}")

        available_ev_profiles = list(set(ev_active_power_profile.columns) - assigned_profiles)

        selected_ev_profiles = random.sample(available_ev_profiles, evs_per_feeder)
        assigned_profiles.update(selected_ev_profiles)

        for house, ev_profile in zip(selected_houses, selected_ev_profiles):
            # Debugging: Print house and ev_profile
            print(f"Assigning EV profile {ev_profile} to house {house}")
            if house not in active_power_profile.columns:
                print(f"House {house} not found in active_power_profile columns")
            if ev_profile not in ev_active_power_profile.columns:
                print(f"EV profile {ev_profile} not found in ev_active_power_profile columns")
            # Ensure the house and ev_profile are present in the respective DataFrames
            if house in active_power_profile.columns and ev_profile in ev_active_power_profile.columns:
                active_power_profile[house] += ev_active_power_profile[ev_profile]

# Run time-series power flow after assigning EV profiles
voltage_table = test1.PowerSimModel.aggregate_voltage_table(active_power_profile, reactive_power_profile)
loading_table = test1.PowerSimModel.aggregate_loading_table(active_power_profile, reactive_power_profile)

print(voltage_table)
print(loading_table)

EVs per feeder: 4
Found line with from_node 1: to_node = 2
Selected houses for EVs in feeder 16: [3, 5]
Assigning EV profile 1 to house 3
House 3 not found in active_power_profile columns
Assigning EV profile 0 to house 5
House 5 not found in active_power_profile columns
Found line with from_node 1: to_node = 6
Selected houses for EVs in feeder 20: [7, 9]
Assigning EV profile 3 to house 7
House 7 not found in active_power_profile columns
Assigning EV profile 2 to house 9
House 9 not found in active_power_profile columns
                     Max_Voltage  Max_Voltage_Node  Min_Voltage  \
Timestamp                                                         
2025-01-01 00:00:00     1.072931                 1     1.049819   
2025-01-01 00:15:00     1.075911                 1     1.050022   
2025-01-01 00:30:00     1.069725                 1     1.049603   
2025-01-01 00:45:00     1.073244                 1     1.049842   
2025-01-01 01:00:00     1.072924                 1     1.049819   
...  

In [35]:
from power_system_simulation import VoltageDeviation
from power_system_simulation import TotalEnergyLoss

optimal_tap = test1.optimal_tap_position(
    active_power_profile=active_power_profile,
    reactive_power_profile=reactive_power_profile,
    opt_criteria=TotalEnergyLoss,
)

print(optimal_tap)

optimal_tap = test1.optimal_tap_position(
    active_power_profile=active_power_profile,
    reactive_power_profile=reactive_power_profile,
    opt_criteria=VoltageDeviation,
)

print(optimal_tap)

5
1
