# Optimizing battery storage during power outages with MPC

### *Authors: Kiera Fullick, Tim Diller, Gregor Henze*




# Intro Information
As climate change intensifies, severe weather events increasingly
disrupt power grids, resulting in economic losses, social inequities, and critical service failures. Enhancing grid resilience is essential to mitigate these impacts, and Model Predictive Control (MPC) can be used with future weather predictions to prepare for likely outages through the use of battery storage systems. This code implements MPC with battery storage systems to reduce
unmet loads and optimize costs during outages caused by severe weather, informed by utility data, time-of-use pricing, and battery specifications. It can also be used to explore the effects and benefits of different forecast horizons.

This workbook goes through all required steps, from loading the sample data, via the scenario and parameter selection to the model formulation and plotting routine.

The reader is invited to change the parameters and observe the changes in the ability of the battery system to mitigate the effects of power outages known with various lead times.

# Load Data
The load data is in a CSV file on GDrive, so mounting the Google Drive is necessary to use the files.

In [None]:
from google.colab import drive  # The google colab module to access folders on GDrive
import os  # the python module for all things related to the OS.

# we mount our gDrive drive at the startpoint
drive.mount('/content/drive')

# change that into the path you want to change into (as if you were starting in your current root folder in drive)
my_folder_path = "AREN5090"

# we navigate to the target folder
os.chdir("drive/My Drive/" + my_folder_path)

Mounted at /content/drive


# Import Modules
Import the modules needed to run the code and plot the graphs.

In [None]:
import cvxpy as cp
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import random
import math
from matplotlib.pyplot import step # Needed to plot the battery dynamics as a step function

# Cost Data
This block is responsible for loading the correct load and cost data.

In [None]:
# Load required load data from CSV
winter_csv_file = "January_Hourly_Load_Data.csv"  # Replace with your CSV file path
winter_load = pd.read_csv(winter_csv_file)
summer_csv_file = "July_Hourly_Load_Data.csv"  # Replace with your CSV file path
summer_load = pd.read_csv(summer_csv_file)

# Get the proper columns
winter_load_data = winter_load['Scaled Load (kW)']
summer_load_data = summer_load['Scaled Load (kW)']

# Define cost schedule for a single day (e.g., hourly costs for 24 hours)
cost_schedule_winter = [
    0.0233, 0.0233, 0.0233, 0.0233, 0.0233, 0.0233, 0.0233, 0.0233,  # Midnight to 8 AM
    0.1223, 0.1223, 0.1223, 0.1223, 0.1223, 0.1223, 0.1223, 0.1223,  # 8 AM to 4 PM
    0.1223, 0.1223, 0.1223, 0.1223, 0.1223, 0.1223, 0.1223, 0.1223   # 4 PM to Midnight
]

cost_schedule_summer = [
    0.0233, 0.0233, 0.0233, 0.0233, 0.0233, 0.0233, 0.0233, 0.0233,  # Midnight to 8 AM
    0.3305, 0.3305, 0.3305, 0.3305, 0.3305, 0.3305, 0.3305, 0.3305,  # 8 AM to 4 PM
    0.3305, 0.3305, 0.3305, 0.3305, 0.3305, 0.3305, 0.3305, 0.3305   # 4 PM to Midnight
]

  winter_load = pd.read_csv(winter_csv_file)
  summer_load = pd.read_csv(summer_csv_file)


# Variables
All the variables that can be changed are found in this block.

In [None]:
# Choose the scenario to consider
scenario = "summer"  # Set to "summer" or "winter"

# Scaling Parameters
percentage_scaled = 0.0004 # Based on definition of an outage for NYC

# Day and Timestep Parameters
N_steps = 24  # Total number of timesteps in a day (1-hour intervals)
days_considered = 1 # Total number of days considered

# Choose horizon lengths to consider
horizons = [3,6,12]  # MPC horizons in hours

# Outage Parameters
max_duration=17 # Maximum length in hours of a power outage
min_duration=3 # Minimum length in hours of a power outage
ran_outages = "set" # Set to "random" for random number of outages or "Set" for a defined number of outages
set_outages = 1 # Number of outages if a set number of outages is desired
min_outages = 1 # Minimum number of outages if random
max_outages = 3 # Maximum number of outages if random

# Battery Parameters
battery_capacity_hours = 6  # Hours of the battery should meet peak load
initial_battery_energy = 0  # Initial Battery State kWh
VOLL = 11  # Value of Load Lost ($/kW)
efficiency = .9 # Battery efficiency
average_charging_rate = 5 # kW per hour
average_battery_capacity = 13.5 # kWh



# Choose Scenario
This block is used to choose a given scenario either winter or summer and modifies the load accordingly.

In [None]:
# Choose either summer or winter month to analyze
if scenario == "summer":
  load = summer_load_data
  cost_schedule = cost_schedule_summer
  print("Summer Scenario Selected")
else:
  load = winter_load_data
  cost_schedule = cost_schedule_winter
  print("Winter Scenario Selected")

# Ensure load data is limited to the considered days
load = percentage_scaled * load[:N_steps * days_considered]
total_load = load.sum()
print(f"Total Load: {total_load} kW")

#Calculate and scale the average by same amount the load is scaled by
average_load_scaled = ((winter_load_data + summer_load_data) / 2)[:N_steps* days_considered]*percentage_scaled

Summer Scenario Selected
Total Load: 65455.7214132 kW


# Battery Calculations
The following code determines the charging rate of all the batteries and the numbers of batteries necessary based off of the battery parameters used.

In [None]:
# Use peak load to determine capacity
battery_capacity = battery_capacity_hours * average_load_scaled.max() # kWh
# Find Charging Rate
number_batteries = math.ceil(battery_capacity/average_battery_capacity)
charge_rate = average_charging_rate * number_batteries # kW per hour
print(f"Charging Rate: {charge_rate} kW")
print(f"Battery Capacity: {battery_capacity} kWh")
print(f"Number of Batteries: {number_batteries}")

Charging Rate: 6145 kW
Battery Capacity: 16584.4023912 kWh
Number of Batteries: 1229


# Generate Outages
The following code is used to generate outages with random lengths, initial start time, and total occurence number. The constraints can be modified to update the outages as needed. The number of days considered is defined in this section and establishes the period over which the optimization is performed.

In [None]:
# Get the correct number of timesteps based on the defined steps and days considered.
timesteps = np.arange(N_steps * days_considered)

# Function to generate a random number of outages with random durations and start times
def generate_random_outages(num_outages, timesteps, max_duration, min_duration):
    outages = []
    for _ in range(num_outages):
        while True:
            # Randomly select outage start time and duration
            outage_start = random.choice(timesteps)
            outage_duration = random.randint(min_duration, max_duration)
            outage_end = min(outage_start + outage_duration, N_steps * days_considered)

            # Check if this new outage overlaps with any existing ones
            overlap = False
            for existing_outage_start, existing_outage_end in outages:
                if (outage_start < existing_outage_end) and (outage_end > existing_outage_start):
                    overlap = True
                    break

            # If no overlap, add this outage to the list
            if not overlap:
                outages.append((outage_start, outage_end))
                break
    return outages

# Generate a random number of outages (e.g., between 1 and 3)
if ran_outages == "random":
  num_outages = random.randint(min_outages,max_outages)
else:
  num_outages = set_outages

outages = generate_random_outages(num_outages, timesteps, max_duration, min_duration)

# Display the generated outages for debugging
print(f"Generated outages: {outages}")

Generated outages: [(np.int64(6), np.int64(19))]


# Modify Cost Data
The following code snippet repeats the cost schedule for one more day than the days considered parameter to ensure the horizons can work properly.

In [None]:
# Repeat cost data for the number of days considered
def generate_cost_data(cost_schedule, num_days, time_interval_minutes=60):
    # Validate that cost_schedule has correct intervals for a day
    if len(cost_schedule) != 24 * 60 // time_interval_minutes:
        raise ValueError("The cost_schedule must have a length equal to the number of intervals in a day.")

    # Repeat the cost schedule for the specified number of days
    cost_data = np.tile(cost_schedule, num_days)
    return cost_data

num_days = days_considered+1  # Number of days to repeat the schedule

# Generate cost data
cost_data = generate_cost_data(cost_schedule, num_days)

# Cost Calculations
The following code will calculate the total cost during the outage if there is no battery used and also defines a variable that keeps track of the total unmet load during the outages.

In [None]:
# Calculate the total cost of the system during an outage with no battery
total_cost_no_battery = 0
total_unmet_load_no_battery = 0  # To store the total unmet load during outages

for t in range(N_steps * days_considered):
    # Check if this timestep is within an outage
    outage_in_effect = any(outage_start <= t < outage_end for outage_start, outage_end in outages)

    if outage_in_effect:
        # During an outage, add the cost of unmet load multiplied by VOLL
        total_cost_no_battery += load[t] * VOLL
    else:
        # Otherwise, add the cost of grid power based on TOU price
        total_cost_no_battery += load[t] * cost_data[t]

# Print the results
print(f"Total Cost of the System During Outage with No Battery: ${total_cost_no_battery:.2f}")

Total Cost of the System During Outage with No Battery: $421258.41


# Plotting Costs
This block is a function called from within and code for plotting used to plot the price of grid power and when it changes.

In [None]:
# Helper function for plotting costs
def plot_cost_blocks(fig, cost_data, days_considered, N_steps):
    """Helper function to plot cost blocks above the plot using Plotly."""
    cost_data_subset = cost_data[:days_considered * N_steps]

    block_start = 0
    for i in range(1, len(cost_data_subset)):
        if cost_data_subset[i] != cost_data_subset[i - 1]:
            fig.add_shape(
                type="rect", xref="x", yref="paper", x0=block_start, y0=0.95, x1=i,
                y1=1,
                fillcolor="gray" if cost_data_subset[i-1] == min(cost_data_subset) else "blue",
                opacity=0.3,
                line_width=0,
                layer="below",
            )
            fig.add_annotation(
                x=(block_start + i) / 2, y=1,  # Adjust the y-position as needed
                text=f"<b>${cost_data_subset[i-1]:.2f}/kWh</b>",  # Make text bold using HTML tags
                showarrow=False,
                font=dict(size=11, color="black", family="Arial"),
                xref="x", yref="paper"  # Use "paper" for relative positioning
            )
            block_start = i

      # Add the last cost block
    fig.add_shape(
        type="rect", xref="x", yref="paper", x0=block_start, y0=0.95,
        x1=len(cost_data_subset),  # Extend to the end
        y1=1,
        fillcolor="gray" if cost_data_subset[-1] == min(cost_data_subset) else "blue",
        opacity=0.3,
        line_width=0,
        layer="below",
    )
    fig.add_annotation(
        x=(block_start + len(cost_data_subset)) / 2, y=1,
        text=f"<b>${cost_data_subset[-1]:.2f}/kWh</b>", showarrow=False, # Make text bold using HTML tags
        font=dict(size=11, color="black", family="Arial"),
        xref="x", yref="paper"
    )

# Plot Results
The following code is a separate function used for plotting the graphs. The labels on the x axis will change from hours to days depending on the number of days considered initially. If only one day is considered, the x axis will be displayed in hours and if more than one day is considered the axis will be in days instead. The battery charge rate is positive if charging and negative if discharging.

In [None]:
# Helper function for plotting with x-axis in days
def plot_results(load, battery_energy, grid_power, curtailed_power, net_battery_power, title, outages, days_considered, cost_data):
    # Create figure with secondary y-axis
    fig = make_subplots(specs=[[{"secondary_y": True}]])

    # Add traces
    fig.add_trace(
        go.Scatter(x=timesteps, y=load, name="Load (kW)", mode='lines',line=dict(shape='hv')),
        secondary_y=False,
    )

    fig.add_trace(
        go.Scatter(x=timesteps, y=grid_power, name="Grid Power (kW)", mode='lines',line=dict(shape='hv', color = 'green')),
        secondary_y=False,
    )

    fig.add_trace(
        go.Scatter(x=timesteps, y=curtailed_power, name="Unmet Load (kW)", mode='lines', line=dict(shape='hv',dash='dash', color='red')),
        secondary_y=False,
    )

    #fig.add_trace(
        # go.Scatter(x=timesteps, y=battery_discharge, name="Battery Discharge (kW)", mode='lines', line=dict(shape='hv',dash='dot', color='purple')),
        # secondary_y=False,
    # )

    # fig.add_trace(
        # go.Scatter(x=timesteps, y=battery_charge, name="Battery Charge (kW)", mode='lines', line=dict(shape='hv',dash='dot',color='green')),
        # secondary_y=False,
    # )

    fig.add_trace(
           go.Scatter(x=timesteps, y=net_battery_power, name="Net Battery Charge (kW)", mode='lines+markers', line=dict(shape='hv', color='purple')),  # Net battery power on primary y-axis
           secondary_y=False,
       )

    # fig.add_trace(
         # go.Scatter(x=timesteps, y=battery_energy[:len(timesteps)], name="Battery State of Charge (kWh)", mode='lines', line=dict(shape='hv',color='black')),
         # secondary_y=True,
    # )

    # Add cost blocks
    plot_cost_blocks(fig, cost_data, days_considered, N_steps)

    # Plot outages as shaded areas
    for outage_start, outage_end in outages:
        fig.add_shape(
            type="rect", xref="x", yref="paper", x0=outage_start, y0=0, x1=outage_end,
            y1=0.95, fillcolor="red", opacity=0.3, line_width=0, layer="below",
        )

    # Update xaxis properties
    if days_considered > 1:
        x_ticks = np.arange(0, N_steps * days_considered, N_steps)
        x_tick_labels = [f"Day {i + 1}" for i in range(days_considered)]
        fig.update_xaxes(title_text="Time (Days)", tickvals=x_ticks, ticktext=x_tick_labels,
                         minor_ticks="outside", minor_dtick=1) # Added minor ticks
    else:
        x_ticks = np.arange(0, N_steps * days_considered, 1)
        x_tick_labels = [f"{i}" for i in range(N_steps * days_considered)]
        fig.update_xaxes(title_text="Time (Hours)", tickvals=x_ticks, ticktext=x_tick_labels)

    # Update yaxis properties
    fig.update_yaxes(title_text="Power (kW)", secondary_y=False)
    fig.update_yaxes(title_text="Energy (kWh)", secondary_y=True)

    # Update layout properties
    fig.update_layout(
           title_text=title,
           title_x=0.5,
           legend=dict(x=0.5, y=-0.2, orientation="h", xanchor="center"),
           yaxis=dict(rangemode='tozero'),  # Align primary y-axis to zero
           yaxis2=dict(rangemode='tozero', anchor='y', overlaying='y', side='right')  # Align secondary y-axis, anchor to y, and position on right
       )

    # Show plot
    fig.show()

# Optimization Loop
The following block defines the optimization problem by minimizing the total cost of power during the outage as subject to power balance, battery, and grid constraints.

In [None]:
# Optimization loop for each horizon
for horizon in horizons:
    battery_energy = [initial_battery_energy]  # Battery energy state for all timesteps
    grid_power = []  # Power from the grid for all timesteps
    unmet_power = []
    battery_discharge = []
    battery_charge = []

    for t in range(N_steps * days_considered):

        horizon_len = horizon

        P_g = cp.Variable(horizon)  # Grid power
        P_u = cp.Variable(horizon)  # Unmet power (curtailed)
        P_b_c = cp.Variable(horizon)  # Battery power charge
        P_b_d = cp.Variable(horizon)  # Battery power discharge
        E_b = cp.Variable(horizon + 1)  # Battery energy

        # Introduce binary variables for charging and discharging
        b_c = cp.Variable(horizon, boolean=True)  # Binary variable for charging
        b_d = cp.Variable(horizon, boolean=True)  # Binary variable for discharging

        # Constraints for the current horizon
        constraints = [E_b[0] == battery_energy[-1]]  # Start with current battery state
        for i in range(horizon_len):
            constraints += [
                load[t] + P_b_c[i] - P_b_d[i] - P_u[i] == P_g[i],  # Power balance
                E_b[i + 1] == E_b[i] + ((P_b_c[i]*efficiency) - (P_b_d[i]/efficiency)),  # Battery energy update
                0 <= E_b[i + 1],  # Battery energy cannot be negative
                E_b[i + 1] <= battery_capacity,  # Battery energy cannot exceed capacity
                P_b_c[i] <= charge_rate, #* b_c[i],  # Charging rate constraint with binary variable
                P_b_d[i] <= charge_rate, #* b_d[i],  # Discharging rate constraint with binary variable
                P_u[i] >= 0,  # Unmet power should not be negative
                #b_c[i] + b_d[i] <= 1,  # Ensure charging and discharging cannot happen at the same time
                P_g[i] >= 0,  # Grid power should not be negative
                P_b_c[i] >= 0,  # Charging power should not be negative
                P_b_d[i] >= 0,  # Discharging power should not be negative
            ]

            # Handle outages: no grid power during an outage
            outage_in_effect = any(outage_start <= t+i < outage_end for outage_start, outage_end in outages)
            if outage_in_effect:
                constraints += [
                     P_g[i] == 0,  # No grid power
                     ]

        # Cost function to minimize
        cost_horizon = cp.sum(cp.multiply(cost_data[t:horizon+t], P_g) + (VOLL * P_u))

        # Solve the optimization problem
        problem = cp.Problem(cp.Minimize(cost_horizon), constraints)
        problem.solve()

        # Check if the problem was solved successfully
        if problem.status == cp.OPTIMAL:
          battery_energy.append(E_b.value[1])  # First value in the next timestep
          grid_power.append(P_g.value[0])
          battery_discharge.append(P_b_d.value[0])  # Track battery discharge
          battery_charge.append(P_b_c.value[0])  # Track battery discharge
          unmet_power.append(P_u.value[0]) # Store unmet power
        else:
          print(f"Optimization failed at timestep {t}, Status: {problem.status}")
          grid_power.append(0)
          battery_energy.append(battery_energy[-1])  # Keep previous energy level
          unmet_power.append(0)  # No unmet power if the optimization fails
          grid_power.append(0)
          battery_discharge.append(0)  # Track battery discharge
          battery_charge.append(0)  # Track battery discharge

    # Calculate the total cost of power drawn from the grid
    total_grid_cost = sum(np.array(grid_power) * cost_data[:len(grid_power)])
    total_unmet_cost = sum(np.array(unmet_power) * VOLL)

    # Calculate the total energy cost of the system
    total_system_cost = total_grid_cost + total_unmet_cost
    print(f"Total Cost of the System with Battery: ${total_system_cost:.2f}")

    # Print the result
    print(f"Total Cost of Power Drawn from the Grid: ${total_grid_cost:.2f}")


    # Calculate unmet load without battery during outage hours
    total_unmet_load_no_battery = 0
    for outage_start, outage_end in outages:
        total_unmet_load_no_battery += load[outage_start:outage_end].sum()

    print(f"Total Unmet Load without Battery during Outages: {total_unmet_load_no_battery:.2f} kW")
    print(f"Total Unmet Load with Battery: {sum(unmet_power)} kW")

    percent_unmet_saved = ((sum(unmet_power)-total_unmet_load_no_battery) / total_unmet_load_no_battery) * 100
    print(f"Percent Unmet Load Change: {percent_unmet_saved:.2f}%")

    # Calculate the percentage difference
    percent_difference = ((total_system_cost-total_cost_no_battery) / total_cost_no_battery) * 100

    # Battery State of Charge
    net_battery_charge = np.array(battery_charge) - np.array(battery_discharge)

    # Print the results
    print(f"Percentage Difference in Cost: {percent_difference:.2f}%")

    # Plot the results for the current horizon
    plot_results(load, battery_energy, grid_power, unmet_power, net_battery_charge, f"MPC with Horizon {horizon} Hours", outages, days_considered, cost_data)


Total Cost of the System with Battery: $372790.58
Total Cost of Power Drawn from the Grid: $5099.45
Total Unmet Load without Battery during Outages: 37844.18 kW
Total Unmet Load with Battery: 33426.46562931882 kW
Percent Unmet Load Change: -11.67%
Percentage Difference in Cost: -11.51%


Total Cost of the System with Battery: $300088.83
Total Cost of Power Drawn from the Grid: $5290.07
Total Unmet Load without Battery during Outages: 37844.18 kW
Total Unmet Load with Battery: 26799.887380534707 kW
Percent Unmet Load Change: -29.18%
Percentage Difference in Cost: -28.76%


Total Cost of the System with Battery: $257502.17
Total Cost of Power Drawn from the Grid: $5401.73
Total Unmet Load without Battery during Outages: 37844.18 kW
Total Unmet Load with Battery: 22918.222312113325 kW
Percent Unmet Load Change: -39.44%
Percentage Difference in Cost: -38.87%


# Battery Dynamics Plot
To create plots of just the battery dynamics, the following code was used. The previous section of code needs to be run first in order to collect the data.

In [None]:
def plot_battery_details(battery_energy, net_battery_charge, cost_data, outages, days_considered):
    # Create figure with secondary y-axis
    fig = make_subplots(specs=[[{"secondary_y": True}]])

    # Add traces
    fig.add_trace(
        go.Scatter(x=timesteps, y=battery_energy[:len(timesteps)], name="Battery State of Charge (kWh)", mode='lines', line=dict(shape = 'hv',color='black')),
        secondary_y=False,  # Battery energy on primary y-axis
    )

    fig.add_trace(
           go.Scatter(x=timesteps, y=net_battery_charge, name="Net Battery Charge (kW)", mode='lines+markers', line=dict(shape='hv', color='purple')),  # Net battery power on primary y-axis
           secondary_y=True,
       )

    # fig.add_trace(
        # go.Scatter(x=timesteps, y=battery_charge, name="Battery Charge (kW)", mode='lines', line=dict(color='green',shape='hv')),  # Battery charge on secondary y-axis
        # secondary_y=True,
    # )

    # fig.add_trace(
        # go.Scatter(x=timesteps, y=battery_discharge, name="Battery Discharge (kW)", mode='lines', line=dict(color='purple',shape='hv')),  # Battery discharge on secondary y-axis
        # secondary_y=True,
    # )

    # Add cost blocks using the existing plot_cost_blocks2 function
    plot_cost_blocks(fig, cost_data, days_considered, N_steps)

    # Plot outages as shaded areas
    for outage_start, outage_end in outages:
        fig.add_shape(
            type="rect", xref="x", yref="paper", x0=outage_start, y0=0, x1=outage_end,
            y1=0.95, fillcolor="red", opacity=0.3, line_width=0, layer="below",
        )

    # Update xaxis properties
    if days_considered > 1:
        x_ticks = np.arange(0, N_steps * days_considered, N_steps)
        x_tick_labels = [f"Day {i + 1}" for i in range(days_considered)]
        fig.update_xaxes(title_text="Time (Days)", tickvals=x_ticks, ticktext=x_tick_labels,
                         minor_ticks="outside", minor_dtick=1) # Added minor ticks
    else:
        x_ticks = np.arange(0, N_steps * days_considered, 1)
        x_tick_labels = [f"{i}" for i in range(N_steps * days_considered)]
        fig.update_xaxes(title_text="Time (Hours)", tickvals=x_ticks, ticktext=x_tick_labels)

    # Update yaxis properties
    fig.update_yaxes(title_text="Energy (kWh)", secondary_y=False)  # Primary y-axis for energy
    fig.update_yaxes(title_text="Power (kW)", secondary_y=True)  # Secondary y-axis for power

    # Update layout properties
    fig.update_layout(
           title_text="Battery Dynamics",
           title_x=0.5,
           legend=dict(x=0.5, y=-0.2, orientation="h", xanchor="center"),
           yaxis=dict(rangemode='tozero'),  # Align primary y-axis to zero
           yaxis2=dict(rangemode='tozero')  # Align secondary y-axis to zero
       )

    # Show plot
    fig.show()

# Call this function after your optimization loop, passing the required data
plot_battery_details(battery_energy, net_battery_charge, cost_data, outages, days_considered)