In [None]:
!pip install pandapower numpy pandas matplotlib seaborn scikit-learn tensorflow keras statsmodels cvxpy control pymoo scipy

In [None]:
import pandapower as pp
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from tensorflow import keras
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.arima.model import ARIMA
import cvxpy as cp
from scipy.integrate import odeint
from sklearn.preprocessing import MinMaxScaler
import pandapower.networks as pn
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from scipy.optimize import minimize, differential_evolution
from scipy.stats import weibull_min
from scipy import signal
import control
import pymoo
from pymoo.core.problem import Problem
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.optimize import minimize as pymoo_minimize
from pymoo.termination import get_termination

np.random.seed(42)

In [None]:
def create_ieee_networks():
    networks = {
        "IEEE 33": pn.case33bw(),
        "IEEE 118": pn.case118(),
        "IEEE 145": pn.case145(),
        "IEEE 300": pn.case300()
    }

    for name, net in networks.items():
        print(f"\n{name} bus system:")
        print(f"Number of buses: {len(net.bus)}")
        print(f"Number of lines: {len(net.line)}")
        print(f"Number of generators: {len(net.gen)}")
        print(f"Number of loads: {len(net.load)}")

        pp.runpp(net)

        plt.figure(figsize=(12, 8))
        pp.plotting.plotly.simple_plotly(net)
        plt.title(f"{name} Bus System Topology")
        plt.show()

        plt.figure(figsize=(12, 6))
        plt.plot(net.res_bus.vm_pu.values)
        plt.title(f"{name} Bus System Voltage Profile")
        plt.xlabel("Bus Index")
        plt.ylabel("Voltage (p.u.)")
        plt.grid(True)
        plt.show()

        plt.figure(figsize=(12, 6))
        plt.bar(range(len(net.res_line)), net.res_line.loading_percent.values)
        plt.title(f"{name} Bus System Line Loading")
        plt.xlabel("Line Index")
        plt.ylabel("Loading (%)")
        plt.grid(True)
        plt.show()

    return networks

ieee_networks = create_ieee_networks()

In [None]:
def generate_solar_data(days=365, resolution_min=5):
    intervals = int(24 * 60 / resolution_min)
    time = np.linspace(0, 24, intervals)
    data = []

    for day in range(days):
        # creating the base curve for clear sky radiation
        base = np.sin(np.pi * time / 12) * np.exp(-0.01 * (time - 12)**2)
        base = np.where(base < 0, 0, base)

        season_factor = 1 + 0.2 * np.sin(2 * np.pi * day / 365)

        # create randomness for things like cloud cover
        noise = np.random.normal(0, 0.1, intervals)

        day_data = base * season_factor + noise
        day_data = np.clip(day_data, 0, 1)

        data.extend(day_data)

    return np.array(data)

def generate_wind_data(days=365, resolution_min=5):
    intervals = int(24 * 60 / resolution_min)
    total_intervals = days * intervals

    # making the wind speeds' dataset using the weibull distribution
    shape, scale = 2, 6
    wind_speeds = weibull_min.rvs(shape, loc=0, scale=scale, size=total_intervals)

    cut_in, rated, cut_out = 3.5, 12, 25  # wind turbine (i am using wind turbine details from a datasheet - check the report for details)
    power_output = np.where(wind_speeds < cut_in, 0,
                   np.where(wind_speeds < rated, (wind_speeds - cut_in) / (rated - cut_in),
                   np.where(wind_speeds < cut_out, 1, 0)))

    time = np.tile(np.linspace(0, 24, intervals), days)
    day_of_year = np.repeat(np.arange(days), intervals)

    diurnal_factor = 1 + 0.2 * np.sin(2 * np.pi * (time - 3) / 24)  # peak at 3pm
    seasonal_factor = 1 + 0.3 * np.sin(2 * np.pi * (day_of_year - 45) / 365)  # peak in mid-february (i'll add the references for this later)

    power_output *= diurnal_factor * seasonal_factor

    return np.clip(power_output, 0, 1)

def generate_load_demand(days=365, resolution_min=5):
    intervals = int(24 * 60 / resolution_min)
    total_intervals = days * intervals

    time = np.tile(np.linspace(0, 24, intervals), days)
    base_load = 0.5 + 0.3 * np.sin(2 * np.pi * (time - 5) / 24)  # daily peak at 5pm

    day_of_week = np.tile(np.repeat(np.arange(7), intervals), (days + 6) // 7)[:total_intervals]
    weekend_factor = np.where(day_of_week >= 5, 0.8, 1)

    day_of_year = np.repeat(np.arange(days), intervals)
    seasonal_factor = 1 + 0.2 * np.sin(2 * np.pi * (day_of_year - 200) / 365)  # peak in mid-july

    load = base_load * weekend_factor * seasonal_factor

    noise = np.random.normal(0, 0.05, total_intervals)
    load += noise

    scaler = MinMaxScaler()
    load = scaler.fit_transform(load.reshape(-1, 1)).flatten()

    return load

days = 365
resolution_min = 5

solar_data = generate_solar_data(days, resolution_min)
wind_data = generate_wind_data(days, resolution_min)
load_data = generate_load_demand(days, resolution_min)

start_date = pd.Timestamp('2023-01-01')
date_range = pd.date_range(start=start_date, periods=len(solar_data), freq=f'{resolution_min}T')

df = pd.DataFrame({
    'datetime': date_range,
    'solar_output': solar_data,
    'wind_output': wind_data,
    'load_demand': load_data
})

df.set_index('datetime', inplace=True)

plt.figure(figsize=(15, 10))

plt.subplot(3, 1, 1)
plt.plot(df.index, df['solar_output'])
plt.title('Solar Power Output')
plt.ylabel('Normalized Output')

plt.subplot(3, 1, 2)
plt.plot(df.index, df['wind_output'])
plt.title('Wind Power Output')
plt.ylabel('Normalized Output')

plt.subplot(3, 1, 3)
plt.plot(df.index, df['load_demand'])
plt.title('Load Demand')
plt.ylabel('Normalized Demand')

plt.tight_layout()
plt.show()

print(df.describe())

df.to_csv('synthetic_energy_data.csv')

In [None]:
class BatteryModel:
    def __init__(self, capacity, charge_rate, discharge_rate, efficiency):
        self.capacity = capacity
        self.charge_rate = charge_rate
        self.discharge_rate = discharge_rate
        self.efficiency = efficiency
        self.soc = 0.5

    def charge(self, power, dt):
        energy = power * dt * self.efficiency
        self.soc = min(1.0, self.soc + energy / self.capacity)
        return min(power, (1.0 - self.soc) * self.capacity / dt / self.efficiency)

    def discharge(self, power, dt):
        energy = power * dt / self.efficiency
        self.soc = max(0.0, self.soc - energy / self.capacity)
        return min(power, self.soc * self.capacity * self.efficiency / dt)

class SupercapacitorModel:
    def __init__(self, capacitance, max_voltage, esr):
        self.capacitance = capacitance
        self.max_voltage = max_voltage
        self.esr = esr
        self.voltage = 0.5 * max_voltage

    def charge(self, power, dt):
        dv = np.sqrt(self.voltage**2 + 2*power*dt/self.capacitance) - self.voltage
        self.voltage = min(self.max_voltage, self.voltage + dv)
        return power

    def discharge(self, power, dt):
        dv = self.voltage - np.sqrt(max(0, self.voltage**2 - 2*power*dt/self.capacitance))
        self.voltage = max(0, self.voltage - dv)
        return min(power, self.capacitance * self.voltage * dv / dt)

class HydrogenStorageModel:
    def __init__(self, max_storage, electrolyzer_efficiency, fuel_cell_efficiency):
        self.max_storage = max_storage
        self.electrolyzer_efficiency = electrolyzer_efficiency
        self.fuel_cell_efficiency = fuel_cell_efficiency
        self.stored_hydrogen = 0.5 * max_storage

    def produce_hydrogen(self, power, dt):
        produced = power * dt * self.electrolyzer_efficiency
        stored = min(self.max_storage - self.stored_hydrogen, produced)
        self.stored_hydrogen += stored
        return stored / dt / self.electrolyzer_efficiency

    def consume_hydrogen(self, power, dt):
        required = power * dt / self.fuel_cell_efficiency
        consumed = min(self.stored_hydrogen, required)
        self.stored_hydrogen -= consumed
        return consumed * self.fuel_cell_efficiency / dt

class HESSModel:
    def __init__(self, battery, supercapacitor, hydrogen_storage):
        self.battery = battery
        self.supercapacitor = supercapacitor
        self.hydrogen_storage = hydrogen_storage

    def charge(self, power, dt):
        sc_power = min(power, self.supercapacitor.capacitance * (self.supercapacitor.max_voltage**2 - self.supercapacitor.voltage**2) / 2 / dt)
        sc_charged = self.supercapacitor.charge(sc_power, dt)

        battery_power = min(power - sc_charged, self.battery.charge_rate)
        battery_charged = self.battery.charge(battery_power, dt)

        h2_power = power - sc_charged - battery_charged
        h2_produced = self.hydrogen_storage.produce_hydrogen(h2_power, dt)

        return sc_charged + battery_charged + h2_produced

    def discharge(self, power, dt):
        sc_power = min(power, self.supercapacitor.capacitance * self.supercapacitor.voltage**2 / 2 / dt)
        sc_discharged = self.supercapacitor.discharge(sc_power, dt)

        battery_power = min(power - sc_discharged, self.battery.discharge_rate)
        battery_discharged = self.battery.discharge(battery_power, dt)

        h2_power = power - sc_discharged - battery_discharged
        h2_consumed = self.hydrogen_storage.consume_hydrogen(h2_power, dt)

        return sc_discharged + battery_discharged + h2_consumed

def rule_based_control(hess, net_load, dt):
    if net_load > 0:
        return hess.discharge(net_load, dt)
    else:
        return -hess.charge(-net_load, dt)

def mpc_control(hess, net_load_forecast, dt, horizon):
    def objective(u):
        cost = 0
        state = [hess.battery.soc, hess.supercapacitor.voltage, hess.hydrogen_storage.stored_hydrogen]
        for i in range(horizon):
            if u[i] > 0:
                power = hess.discharge(u[i], dt)
            else:
                power = -hess.charge(-u[i], dt)
            cost += (power - net_load_forecast[i])**2
            state = [hess.battery.soc, hess.supercapacitor.voltage, hess.hydrogen_storage.stored_hydrogen]
        return cost

    bounds = [(-hess.battery.discharge_rate - hess.supercapacitor.capacitance * hess.supercapacitor.max_voltage**2 / 2 / dt - hess.hydrogen_storage.max_storage * hess.hydrogen_storage.fuel_cell_efficiency / dt,
               hess.battery.charge_rate + hess.supercapacitor.capacitance * hess.supercapacitor.max_voltage**2 / 2 / dt + hess.hydrogen_storage.max_storage * hess.hydrogen_storage.electrolyzer_efficiency / dt)] * horizon

    result = minimize(objective, [0]*horizon, method='SLSQP', bounds=bounds)
    return result.x[0]

class MLControl:
    def __init__(self, input_size, hidden_size, output_size):
        self.model = keras.Sequential([
            keras.layers.Dense(hidden_size, activation='relu', input_shape=(input_size,)),
            keras.layers.Dense(hidden_size, activation='relu'),
            keras.layers.Dense(output_size)
        ])
        self.model.compile(optimizer='adam', loss='mse')

    def train(self, X, y, epochs=100, batch_size=32):
        self.model.fit(X, y, epochs=epochs, batch_size=batch_size, verbose=0)

    def predict(self, X):
        return self.model.predict(X)

def generate_scenarios(base_data, num_scenarios):
    scenarios = []
    for _ in range(num_scenarios):
        scenario = base_data.copy()
        scenario['solar_output'] *= np.random.uniform(0.8, 1.2, len(scenario))
        scenario['wind_output'] *= np.random.uniform(0.8, 1.2, len(scenario))
        scenario['load_demand'] *= np.random.uniform(0.9, 1.1, len(scenario))
        scenarios.append(scenario)
    return scenarios

def evaluate_performance(hess, control_strategy, scenario):
    total_mismatch = 0
    total_renewable_generated = 0
    total_renewable_utilized = 0
    total_load = 0

    for t in range(len(scenario)):
        net_load = scenario['load_demand'].iloc[t] - scenario['solar_output'].iloc[t] - scenario['wind_output'].iloc[t]
        control_action = control_strategy(hess, net_load, 1)  # i mentioned that i'll do it in 1-hour time steps

        if control_action > 0:
            power_supplied = hess.discharge(control_action, 1)
        else:
            power_supplied = -hess.charge(-control_action, 1)

        mismatch = abs(net_load - power_supplied)
        total_mismatch += mismatch

        total_renewable_generated += scenario['solar_output'].iloc[t] + scenario['wind_output'].iloc[t]
        total_renewable_utilized += min(scenario['load_demand'].iloc[t], scenario['solar_output'].iloc[t] + scenario['wind_output'].iloc[t] + max(0, -power_supplied))
        total_load += scenario['load_demand'].iloc[t]

    mismatch_ratio = total_mismatch / total_load
    renewable_utilization_factor = total_renewable_utilized / total_renewable_generated
    load_satisfaction_ratio = (total_load - total_mismatch) / total_load

    return {
        'mismatch_ratio': mismatch_ratio,
        'renewable_utilization_factor': renewable_utilization_factor,
        'load_satisfaction_ratio': load_satisfaction_ratio
    }

def time_series_analysis(data):
    result = seasonal_decompose(data, model='additive', period=24*7)

    plt.figure(figsize=(12, 10))
    plt.subplot(411)
    plt.plot(result.observed)
    plt.title('Observed')
    plt.subplot(412)
    plt.plot(result.trend)
    plt.title('Trend')
    plt.subplot(413)
    plt.plot(result.seasonal)
    plt.title('Seasonal')
    plt.subplot(414)
    plt.plot(result.resid)
    plt.title('Residual')
    plt.tight_layout()
    plt.show()

    model = ARIMA(data, order=(1, 1, 1))
    results = model.fit()
    print(results.summary())

    forecast = results.forecast(steps=24)
    plt.figure(figsize=(10, 6))
    plt.plot(data.index[-100:], data.values[-100:], label='Observed')
    plt.plot(forecast.index, forecast.values, color='r', label='Forecast')
    plt.title('ARIMA Forecast')
    plt.legend()
    plt.show()

def statistical_analysis(data):
    print(data.describe())

    plt.figure(figsize=(12, 4))
    for column in data.columns:
        plt.subplot(1, 3, data.columns.get_loc(column) + 1)
        data[column].hist(bins=50)
        plt.title(f'{column} Distribution')
    plt.tight_layout()
    plt.show()

    correlation = data.corr()
    plt.figure(figsize=(10, 8))
    sns.heatmap(correlation, annot=True, cmap='coolwarm')
    plt.title('Correlation Heatmap')
    plt.show()

def cluster_analysis(data):
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(data)

    kmeans = KMeans(n_clusters=3, random_state=42)
    clusters = kmeans.fit_predict(scaled_data)

    pca = PCA(n_components=2)
    pca_result = pca.fit_transform(scaled_data)

    plt.figure(figsize=(10, 8))
    scatter = plt.scatter(pca_result[:, 0], pca_result[:, 1], c=clusters, cmap='viridis')
    plt.colorbar(scatter)
    plt.title('K-means Clustering (PCA Visualization)')
    plt.show()

def optimize_hess_configuration(scenarios):
    def objective(x):
        battery_capacity, supercap_capacity, h2_capacity = x
        hess = HESSModel(
            BatteryModel(battery_capacity, battery_capacity*0.5, battery_capacity*0.5, 0.95),
            SupercapacitorModel(supercap_capacity, 1000, 0.1),
            HydrogenStorageModel(h2_capacity, 0.7, 0.6)
        )

        total_cost = battery_capacity * 100 + supercap_capacity * 1000 + h2_capacity * 50
        total_performance = 0

        for scenario in scenarios:
            performance = evaluate_performance(hess, rule_based_control, scenario)
            total_performance += performance['renewable_utilization_factor']

        return -total_performance / len(scenarios) + total_cost * 1e-6

    bounds = [(1, 1000), (0.1, 100), (1, 1000)]

    def print_iteration_number(xk, convergence):
      print(f"Iteration: {print_iteration_number.iteration}")
      print_iteration_number.iteration += 1

    print_iteration_number.iteration = 1

    result = differential_evolution(objective, bounds, maxiter=100, popsize=25, callback=print_iteration_number)

    return result.x

def main():
    df = pd.read_csv('synthetic_energy_data.csv', index_col='datetime', parse_dates=True)

    time_series_analysis(df['load_demand'])
    statistical_analysis(df)
    cluster_analysis(df[['solar_output', 'wind_output', 'load_demand']])

    scenarios = generate_scenarios(df, 10)
    optimal_config = optimize_hess_configuration(scenarios)

    print(f"Optimal HESS Configuration:")
    print(f"Battery Capacity: {optimal_config[0]:.2f} kWh")
    print(f"Supercapacitor Capacity: {optimal_config[1]:.2f} F")
    print(f"Hydrogen Storage Capacity: {optimal_config[2]:.2f} kg")

    hess = HESSModel(
        BatteryModel(optimal_config[0], optimal_config[0]*0.5, optimal_config[0]*0.5, 0.95),
        SupercapacitorModel(optimal_config[1], 1000, 0.1),
        HydrogenStorageModel(optimal_config[2], 0.7, 0.6)
    )

    rule_based_performance = evaluate_performance(hess, rule_based_control, df)
    print("\nRule-based Control Performance:")
    print(f"Mismatch Ratio: {rule_based_performance['mismatch_ratio']:.4f}")
    print(f"Renewable Utilization Factor: {rule_based_performance['renewable_utilization_factor']:.4f}")
    print(f"Load Satisfaction Ratio: {rule_based_performance['load_satisfaction_ratio']:.4f}")

    mpc_performance = evaluate_performance(hess, lambda h, n, d: mpc_control(h, [n]*24, d, 24), df)
    print("\nMPC Performance:")
    print(f"Mismatch Ratio: {mpc_performance['mismatch_ratio']:.4f}")
    print(f"Renewable Utilization Factor: {mpc_performance['renewable_utilization_factor']:.4f}")
    print(f"Load Satisfaction Ratio: {mpc_performance['load_satisfaction_ratio']:.4f}")

    ml_control = MLControl(4, 64, 1)
    X = np.column_stack((df['solar_output'], df['wind_output'], df['load_demand'], df.index.hour))
    y = df['load_demand'] - df['solar_output'] - df['wind_output']
    ml_control.train(X, y)

    ml_performance = evaluate_performance(hess, lambda h, n, d: ml_control.predict(np.array([[df['solar_output'].iloc[-1], df['wind_output'].iloc[-1], df['load_demand'].iloc[-1], df.index[-1].hour]]))[0][0], df)
    print("\nML Control Performance:")
    print(f"Mismatch Ratio: {ml_performance['mismatch_ratio']:.4f}")
    print(f"Renewable Utilization Factor: {ml_performance['renewable_utilization_factor']:.4f}")
    print(f"Load Satisfaction Ratio: {ml_performance['load_satisfaction_ratio']:.4f}")

    plt.figure(figsize=(15, 10))
    plt.subplot(3, 1, 1)
    plt.plot(df.index, df['solar_output'] + df['wind_output'], label='Renewable Generation')
    plt.plot(df.index, df['load_demand'], label='Load Demand')
    plt.title('Renewable Generation vs Load Demand')
    plt.legend()

    plt.subplot(3, 1, 2)
    plt.plot(df.index, hess.battery.soc, label='Battery SOC')
    plt.plot(df.index, hess.supercapacitor.voltage / hess.supercapacitor.max_voltage, label='Supercapacitor SOC')
    plt.plot(df.index, hess.hydrogen_storage.stored_hydrogen / hess.hydrogen_storage.max_storage, label='Hydrogen Storage Level')
    plt.title('HESS State of Charge')
    plt.legend()

    plt.subplot(3, 1, 3)
    plt.plot(df.index, df['load_demand'] - df['solar_output'] - df['wind_output'], label='Net Load')
    plt.title('Net Load')
    plt.legend()

    plt.tight_layout()
    plt.show()

    return hess, optimal_config

def power_flow_analysis(net):
    pp.runpp(net)

    plt.figure(figsize=(12, 8))
    plt.subplot(2, 1, 1)
    plt.bar(net.bus.index, net.res_bus.vm_pu)
    plt.title('Bus Voltage Magnitudes')
    plt.xlabel('Bus Index')
    plt.ylabel('Voltage (p.u.)')

    plt.subplot(2, 1, 2)
    plt.bar(net.line.index, net.res_line.loading_percent)
    plt.title('Line Loading')
    plt.xlabel('Line Index')
    plt.ylabel('Loading (%)')

    plt.tight_layout()
    plt.show()

    overloaded_lines = net.line[net.res_line.loading_percent > 80]
    print("Overloaded Lines:")
    print(overloaded_lines)

    low_voltage_buses = net.bus[net.res_bus.vm_pu < 0.95]
    print("Low Voltage Buses:")
    print(low_voltage_buses)

def economic_analysis(hess, scenarios):
    capex = (hess.battery.capacity * 100 +
             hess.supercapacitor.capacitance * 1000 +
             hess.hydrogen_storage.max_storage * 50)

    annual_opex = capex * 0.02  # from what i've read, usually 2% of CAPEX is set as the annual OPEX

    total_energy_throughput = 0
    total_revenue = 0

    for scenario in scenarios:
        for t in range(len(scenario)):
            net_load = scenario['load_demand'].iloc[t] - scenario['solar_output'].iloc[t] - scenario['wind_output'].iloc[t]
            control_action = rule_based_control(hess, net_load, 1)

            if control_action > 0:
                power_supplied = hess.discharge(control_action, 1)
                total_energy_throughput += power_supplied
                total_revenue += power_supplied * 0.1  # $0.1/kWh selling price
            else:
                power_charged = hess.charge(-control_action, 1)
                total_energy_throughput += power_charged
                total_revenue -= power_charged * 0.05  # $0.05/kWh buying price

    lcoe = (capex + annual_opex * 20) / (total_energy_throughput * 20)  # i set the total lifetime as 20 years
    annual_revenue = total_revenue / len(scenarios) * 365
    payback_period = capex / (annual_revenue - annual_opex)

    print(f"Capital Expenditure: ${capex:,.2f}")
    print(f"Annual Operational Expenditure: ${annual_opex:,.2f}")
    print(f"Levelized Cost of Energy: ${lcoe:.4f}/kWh")
    print(f"Annual Revenue: ${annual_revenue:,.2f}")
    print(f"Payback Period: {payback_period:.2f} years")

def sensitivity_analysis(base_params, param_ranges, scenarios):
    results = []

    for param, range_values in param_ranges.items():
        for value in range_values:
            params = base_params.copy()
            params[param] = value

            hess = HESSModel(
                BatteryModel(params['battery_capacity'], params['battery_capacity']*0.5, params['battery_capacity']*0.5, params['battery_efficiency']),
                SupercapacitorModel(params['supercap_capacity'], params['supercap_voltage'], params['supercap_esr']),
                HydrogenStorageModel(params['h2_capacity'], params['electrolyzer_efficiency'], params['fuel_cell_efficiency'])
            )

            performance = np.mean([evaluate_performance(hess, rule_based_control, scenario)['renewable_utilization_factor'] for scenario in scenarios])
            results.append({'param': param, 'value': value, 'performance': performance})

    results_df = pd.DataFrame(results)

    plt.figure(figsize=(12, 8))
    for param in param_ranges.keys():
        param_results = results_df[results_df['param'] == param]
        plt.plot(param_results['value'], param_results['performance'], label=param)

    plt.xlabel('Parameter Value')
    plt.ylabel('Renewable Utilization Factor')
    plt.title('Sensitivity Analysis')
    plt.legend()
    plt.grid(True)
    plt.show()

if __name__ == "__main__":
    hess, optimal_config = main()

    for name, net in ieee_networks.items():
        print(f"\nPower Flow Analysis for {name}")
        power_flow_analysis(net)

    # economic analysis
    scenarios = generate_scenarios(df, 10)
    economic_analysis(hess, scenarios)

    # sensitivity analysis
    base_params = {
        'battery_capacity': optimal_config[0],
        'battery_efficiency': 0.95,
        'supercap_capacity': optimal_config[1],
        'supercap_voltage': 1000,
        'supercap_esr': 0.1,
        'h2_capacity': optimal_config[2],
        'electrolyzer_efficiency': 0.7,
        'fuel_cell_efficiency': 0.6
    }

    param_ranges = {
        'battery_capacity': np.linspace(0.5 * optimal_config[0], 1.5 * optimal_config[0], 10),
        'battery_efficiency': np.linspace(0.8, 0.99, 10),
        'supercap_capacity': np.linspace(0.5 * optimal_config[1], 1.5 * optimal_config[1], 10),
        'h2_capacity': np.linspace(0.5 * optimal_config[2], 1.5 * optimal_config[2], 10),
        'electrolyzer_efficiency': np.linspace(0.5, 0.9, 10),
        'fuel_cell_efficiency': np.linspace(0.4, 0.8, 10)
    }

    sensitivity_analysis(base_params, param_ranges, scenarios)

    # uncertainty analysis
    num_monte_carlo = 1000
    performance_results = []

    for _ in range(num_monte_carlo):
        scenario = generate_scenarios(df, 1)[0]
        performance = evaluate_performance(hess, rule_based_control, scenario)
        performance_results.append(performance)

    performance_df = pd.DataFrame(performance_results)

    plt.figure(figsize=(12, 8))
    for column in performance_df.columns:
        sns.kdeplot(performance_df[column], label=column)

    plt.xlabel('Performance Metric Value')
    plt.ylabel('Density')
    plt.title('Uncertainty Analysis: Performance Metric Distributions')
    plt.legend()
    plt.grid(True)
    plt.show()

    print("\nUncertainty Analysis Results:")
    print(performance_df.describe())

    # control system stability analysis
    def system_dynamics(X, t, u):
        battery_soc, supercap_voltage, h2_level = X
        battery_power, supercap_power, h2_power = u

        d_battery_soc = battery_power / hess.battery.capacity
        d_supercap_voltage = supercap_power / (hess.supercapacitor.capacitance * hess.supercapacitor.max_voltage)
        d_h2_level = h2_power / hess.hydrogen_storage.max_storage

        return [d_battery_soc, d_supercap_voltage, d_h2_level]

    def lyapunov_function(X):
        battery_soc, supercap_voltage, h2_level = X
        return (battery_soc - 0.5)**2 + (supercap_voltage / hess.supercapacitor.max_voltage - 0.5)**2 + (h2_level / hess.hydrogen_storage.max_storage - 0.5)**2

    t = np.linspace(0, 24, 1000)
    X0 = [0.5, 0.5 * hess.supercapacitor.max_voltage, 0.5 * hess.hydrogen_storage.max_storage]
    u = [0, 0, 0]

    solution = odeint(system_dynamics, X0, t, args=(u,))

    lyapunov_values = [lyapunov_function(state) for state in solution]

    plt.figure(figsize=(12, 8))
    plt.subplot(2, 1, 1)
    plt.plot(t, solution[:, 0], label='Battery SOC')
    plt.plot(t, solution[:, 1] / hess.supercapacitor.max_voltage, label='Supercapacitor SOC')
    plt.plot(t, solution[:, 2] / hess.hydrogen_storage.max_storage, label='Hydrogen Storage Level')
    plt.title('HESS State Evolution')
    plt.xlabel('Time (hours)')
    plt.ylabel('State of Charge')
    plt.legend()
    plt.grid(True)

    plt.subplot(2, 1, 2)
    plt.plot(t, lyapunov_values)
    plt.title('Lyapunov Function')
    plt.xlabel('Time (hours)')
    plt.ylabel('Lyapunov Function Value')
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    if all(v <= lyapunov_values[0] for v in lyapunov_values):
        print("The control system is stable according to Lyapunov stability analysis.")
    else:
        print("The control system may be unstable or further analysis is required.")

In [None]:
def frequency_domain_analysis(hess, scenario):
    net_load = scenario['load_demand'] - scenario['solar_output'] - scenario['wind_output']
    control_actions = np.array([rule_based_control(hess, nl, 1) for nl in net_load])

    freq, power_spectral_density = scipy.signal.welch(control_actions, fs=1/3600, nperseg=1024)

    plt.figure(figsize=(12, 6))
    plt.semilogx(freq, power_spectral_density)
    plt.title('Power Spectral Density of Control Actions')
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Power/Frequency')
    plt.grid(True)
    plt.show()

    s = control.tf('s')
    H = 1 / (1 + s * 3600)
    control.bode(H, dB=True, Hz=True, plot=True)
    plt.suptitle('Bode Plot of HESS Transfer Function')
    plt.show()

def harmonic_analysis(scenario):
    load = scenario['load_demand'].values
    fft_result = np.fft.fft(load)
    freqs = np.fft.fftfreq(len(load), d=1/24)

    plt.figure(figsize=(12, 6))
    plt.plot(freqs[:len(freqs)//2], np.abs(fft_result)[:len(freqs)//2])
    plt.title('Harmonic Content of Load Demand')
    plt.xlabel('Frequency (cycles per day)')
    plt.ylabel('Magnitude')
    plt.grid(True)
    plt.show()

def reliability_analysis(hess, scenarios):
    failures = 0
    total_time = len(scenarios[0]) * len(scenarios)

    for scenario in scenarios:
        net_load = scenario['load_demand'] - scenario['solar_output'] - scenario['wind_output']
        for nl in net_load:
            control_action = rule_based_control(hess, nl, 1)
            if control_action < abs(nl):
                failures += 1

    reliability = 1 - (failures / total_time)
    print(f"HESS Reliability: {reliability:.4f}")

    return reliability

def multi_objective_optimization(scenarios):
    def objective(x):
        battery_capacity, supercap_capacity, h2_capacity = x
        hess = HESSModel(
            BatteryModel(battery_capacity, battery_capacity*0.5, battery_capacity*0.5, 0.95),
            SupercapacitorModel(supercap_capacity, 1000, 0.1),
            HydrogenStorageModel(h2_capacity, 0.7, 0.6)
        )

        total_cost = battery_capacity * 100 + supercap_capacity * 1000 + h2_capacity * 50
        total_performance = 0
        reliability = reliability_analysis(hess, scenarios)

        for scenario in scenarios:
            performance = evaluate_performance(hess, rule_based_control, scenario)
            total_performance += performance['renewable_utilization_factor']

        avg_performance = total_performance / len(scenarios)

        return [-avg_performance, total_cost, -reliability]

    bounds = [(1, 1000), (0.1, 100), (1, 1000)]
    result = pymoo.optimize.minimize(
        problem=pymoo.core.problem.FunctionalProblem(3, objective, xl=[b[0] for b in bounds], xu=[b[1] for b in bounds]),
        algorithm=pymoo.algorithms.moo.nsga2.NSGA2(),
        termination=pymoo.termination.get_termination("n_gen", 100),
        seed=1
    )

    return result.X, result.F

def life_cycle_analysis(hess, scenarios, years=20):
    total_energy_throughput = 0
    for scenario in scenarios:
        net_load = scenario['load_demand'] - scenario['solar_output'] - scenario['wind_output']
        for nl in net_load:
            control_action = rule_based_control(hess, nl, 1)
            total_energy_throughput += abs(control_action)

    avg_annual_throughput = total_energy_throughput / len(scenarios) * 365
    total_life_throughput = avg_annual_throughput * years

    battery_cycles = total_life_throughput / (2 * hess.battery.capacity)
    supercap_cycles = total_life_throughput / (0.5 * hess.supercapacitor.capacitance * hess.supercapacitor.max_voltage**2)
    h2_cycles = total_life_throughput / hess.hydrogen_storage.max_storage

    print(f"Battery Cycles over {years} years: {battery_cycles:.0f}")
    print(f"Supercapacitor Cycles over {years} years: {supercap_cycles:.0f}")
    print(f"Hydrogen Storage Cycles over {years} years: {h2_cycles:.0f}")

def dynamic_pricing_scenario(base_scenario, price_volatility=0.2):
    prices = np.random.normal(0.1, price_volatility, len(base_scenario))
    prices = np.clip(prices, 0.05, 0.2)  # Ensure prices are between $0.05 and $0.20 per kWh
    return pd.DataFrame({'price': prices}, index=base_scenario.index)

def economic_dispatch(hess, net_load, price, dt):
    def objective(u):
        if u > 0:
            power = hess.discharge(u, dt)
            return -power * price  # revenue
        else:
            power = hess.charge(-u, dt)
            return power * price  # cost

    bounds = [(-hess.battery.charge_rate - hess.supercapacitor.capacitance * hess.supercapacitor.max_voltage**2 / 2 / dt - hess.hydrogen_storage.max_storage * hess.hydrogen_storage.electrolyzer_efficiency / dt,
               hess.battery.discharge_rate + hess.supercapacitor.capacitance * hess.supercapacitor.max_voltage**2 / 2 / dt + hess.hydrogen_storage.max_storage * hess.hydrogen_storage.fuel_cell_efficiency / dt)]

    result = minimize(objective, [0], method='SLSQP', bounds=bounds)
    return result.x[0]

if __name__ == "__main__":
    main()

    frequency_domain_analysis(hess, df)
    harmonic_analysis(df)
    reliability = reliability_analysis(hess, scenarios)

    pareto_front, objectives = multi_objective_optimization(scenarios)
    plt.figure(figsize=(10, 8))
    plt.scatter(objectives[:, 0], objectives[:, 1], c=objectives[:, 2], cmap='viridis')
    plt.colorbar(label='Reliability')
    plt.xlabel('Performance')
    plt.ylabel('Cost')
    plt.title('Pareto Front of HESS Optimization')
    plt.show()

    life_cycle_analysis(hess, scenarios)

    # dynamic pricing scenario
    price_scenario = dynamic_pricing_scenario(df)
    df_with_price = pd.concat([df, price_scenario], axis=1)

    economic_dispatch_performance = evaluate_performance(hess,
        lambda h, nl, dt: economic_dispatch(h, nl, df_with_price['price'].iloc[-1], dt),
        df_with_price)

    print("\nEconomic Dispatch Performance:")
    print(f"Mismatch Ratio: {economic_dispatch_performance['mismatch_ratio']:.4f}")
    print(f"Renewable Utilization Factor: {economic_dispatch_performance['renewable_utilization_factor']:.4f}")
    print(f"Load Satisfaction Ratio: {economic_dispatch_performance['load_satisfaction_ratio']:.4f}")

    plt.figure(figsize=(15, 10))
    plt.subplot(4, 1, 1)
    plt.plot(df_with_price.index, df_with_price['solar_output'] + df_with_price['wind_output'], label='Renewable Generation')
    plt.plot(df_with_price.index, df_with_price['load_demand'], label='Load Demand')
    plt.title('Renewable Generation vs Load Demand')
    plt.legend()

    plt.subplot(4, 1, 2)
    plt.plot(df_with_price.index, df_with_price['price'])
    plt.title('Electricity Price')
    plt.ylabel('Price ($/kWh)')

    plt.subplot(4, 1, 3)
    net_load = df_with_price['load_demand'] - df_with_price['solar_output'] - df_with_price['wind_output']
    dispatch = [economic_dispatch(hess, nl, p, 1) for nl, p in zip(net_load, df_with_price['price'])]
    plt.plot(df_with_price.index, dispatch)
    plt.title('HESS Dispatch')
    plt.ylabel('Power (kW)')

    plt.subplot(4, 1, 4)
    plt.plot(df_with_price.index, np.cumsum([d * p for d, p in zip(dispatch, df_with_price['price'])]))
    plt.title('Cumulative Revenue')
    plt.ylabel('Revenue ($)')

    plt.tight_layout()
    plt.show()

print("Extended HESS optimization study completed.")