MPC optimizer to balance the SoE of 2 batteries with an MILP approach to model power electronic power losses

In [59]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
import matplotlib.dates as mdates 
from gurobipy import *

In [60]:
pd.options.plotting.backend = "plotly"
template = "plotly_white"
path="../data/SBAP_FCR_profile_test5.csv"
#path="../data/ecostor_fcr_wholesale_15min_3.csv"
#path= r"C:\Users\tanjavov\Documents\MPC Optimization\data\SBAP_FCR_profile_test4.csv"
#path= r"C:\Users\tanjavov\Documents\MPC Optimization\data\myProfile.csv"
profile = pd.read_csv(path, index_col=0, parse_dates=True)
profile.plot(template=template, labels={"value": "Power [W]"})

FileNotFoundError: [Errno 2] No such file or directory: '../data/SBAP_FCR_profile5.csv'

In [None]:
parameters = {
't_prediction':32,
't_control':16,

"storage_params_1" : {
    "capacity": 3000,  # kWh
    "power": 600,
    
    "soc_bounds": (0.1, 0.9),
    "soc_start": 0.6,

    "effc": 0.9,     # charge efficiency
    "effd": 0.9,     # discharge efficiency
},


"storage_params_2" : {
    "capacity": 3000,  # kWh
    "power": 600,

    "soc_bounds": (0.1, 0.9),
    "soc_start": 0.3,

    "effc": 0.9,     # charge efficiency
    "effd": 0.9,     # discharge efficiency
}

}
parameters['duration']         = profile.index[-1] - profile.index[0]
parameters['dt']               = (profile.index[1] - profile.index[0]).total_seconds() / 60 /60  # h
parameters['total_seconds']    = parameters['duration'].total_seconds()#s
parameters['t_total']    = parameters['total_seconds'] / 60 /60 #h
parameters["kw"] = 0.001
#parameters['I_total'] = int((parameters['t_total'] - parameters['t_prediction'] + parameters['t_control']) // parameters['t_control'])
#parameters['T_MPC'] = parameters['I_total'] * parameters['t_control']
#print (parameters)

In [None]:
#Battery 1
capacity1  = parameters["storage_params_1"]["capacity"]
max_power1 = parameters["storage_params_1"]["power"]

soc1_min, soc1_max = parameters["storage_params_1"]["soc_bounds"]
soc1_range= soc1_max-soc1_min 
soc1_start = parameters["storage_params_1"]["soc_start"]
 
## Battery2
capacity2  = parameters["storage_params_2"]["capacity"]
max_power2 = parameters["storage_params_2"]["power"]


soc2_min, soc2_max = parameters["storage_params_2"]["soc_bounds"]
soc2_range= soc2_max-soc2_min 
soc2_start = parameters["storage_params_2"]["soc_start"]

kw = parameters["kw"]
dt= parameters['dt'] 



In [None]:
class mpc_controller:
    def __init__(self,profile, parameters):
        self.profile = profile
        self.parameters=parameters
        self.T_prediction= parameters['t_prediction']
        self.T_control= parameters['t_control']
        self.I_total = int((parameters['t_total'] - self.T_prediction + self.T_control) // self.T_control)
        self.T_MPC = self.I_total * self.T_control
        self.dt=parameters['dt']


    def mpc_horizon(self,cur_df,i,parameters,df_control):
        
       
        opt_model = Model(f"MPC_optimitzer_{i}")
        n=len(cur_df)
        time=range(0,n)
        zero = [0.0] * len(time)
        frac = [0.01] * len(time)

        #bigM method
        b= opt_model.addVars(time,vtype=GRB.BINARY, name="b")

        eps= 0.01
        M = 1e15 + eps
        
        ## =====  Variables  ===== ##
        # BESS1: power charge/discharge, energy content
        power_tu1        = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='power_tu1')#, lb=0, ub=max_power1) 
        power_charge1    = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='power_charge1')#, lb=0, ub=max_power1)
        power_discharge1 = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='power_discharge1')#, lb=0, ub=max_power1)
        #energy_bess1     = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='energy_bess1')#,lb=soc1_min * capacity1, ub= soc1_max * capacity1)
        power_loss1      = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='power_loss1')#, lb=0, ub=max_power1)
        soc1             = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='soc1')#, lb=0, ub=1)
        
         # BESS2: power charge/discharge, energy content
        power_tu2        = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='power_tu2')#, lb=0, ub=max_power2) 
        power_charge2    = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='power_charge2')#, lb=0, ub=max_power2)
        power_discharge2 = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='power_discharge2')#, lb=0, ub=max_power2)
        #energy_bess2     = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='energy_bess2')#, lb=soc2_min * capacity2, ub= soc2_max * capacity2)
        power_loss2      = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='power_loss2')#, lb=0, ub=max_power2)
        soc2             = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='soc2')#, lb=0, ub=1)

        delta_SOC        = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name ='delta_SOC')#, lb=-1, ub= 1)
        abs_delta        = opt_model.addVars(time, vtype=GRB.CONTINUOUS, name='abs_delta')#, lb=0.0, ub=1)#, initialize=soc2_start-soc1_start)
        
        ## =====  Objective  ===== ##
        obj_soc = sum(abs_delta[t] for t in time)
        opt_model.setObjective(obj_soc, GRB.MINIMIZE) 
        
        ## ===== Constraints ===== ## 
        ## SOC1 balance
        #if i==0:
        #    opt_model.addConstr(soc1_start * capacity1 == energy_bess1[0], 'SOC1Balance1')
        #    opt_model.addConstrs((energy_bess1[t-1]+ parameters['dt'] * (power_charge1[t-1] - power_discharge1[t-1])==energy_bess1[t] for t in range(1,n)), 'SOC1Balance2')
        #else:
        #    opt_model.addConstr(df_control['energy_bess1'].iloc[-1]  == energy_bess1[0], 'SOC1Balance1')
        #    opt_model.addConstrs((energy_bess1[t-1]+ parameters['dt'] * (power_charge1[t-1] - power_discharge1[t-1])==energy_bess1[t] for t in range(1,n)), 'SOC1Balance2')

        ## SOC2 balance
        #if i==0:
        #    opt_model.addConstr(soc2_start * capacity2 == energy_bess2[0], 'SOC2Balance1')
        #    opt_model.addConstrs((energy_bess2[t-1] + parameters['dt'] * (power_charge2[t-1] - power_discharge2[t-1])==energy_bess2[t] for t in range(1,n)), 'SOC2Balance2')
        #else:
        #    opt_model.addConstr(df_control['energy_bess2'].iloc[-1]== energy_bess2[0], 'SOC2Balance1')
        #    opt_model.addConstrs((energy_bess2[t-1] + parameters['dt'] * (power_charge2[t-1] - power_discharge2[t-1])==energy_bess2[t] for t in range(1,n)), 'SOC2Balance2')

        
        # SOC1 balance
        if i==0:
            opt_model.addConstr(soc1_start == soc1[0], 'SOC1Balance1')
            #opt_model.addConstrs((soc1[t-1] + (dt*(power_charge1[t-1]-power_discharge1[t-1]))/capacity1 ==soc1[t] for t in range(1,n)), 'SOC1Balance2')
            opt_model.addConstrs((soc1[t-1] + (dt * power_tu1[t-1]/capacity1) - (dt * power_loss1[t-1]/capacity1) ==soc1[t] for t in range(1,n) if cur_df.power[t] >= zero[t]), 'SOC1Balance2')
            opt_model.addConstrs((soc1[t-1] - (dt * power_tu1[t-1]/capacity1) + (dt * power_loss1[t-1]/capacity1) ==soc1[t] for t in range(1,n) if cur_df.power[t] < zero[t]), 'SOC1Balance2')
        else:
            opt_model.addConstr(float(df_control['soc1'].iloc[-1])  == soc1[0], 'SOC1Balance3')
            #opt_model.addConstrs((soc1[t-1] + (dt * (power_charge1[t-1]-power_discharge1[t-1]))/capacity1 ==soc1[t] for t in range(1,n)), 'SOC1Balance4')
            opt_model.addConstrs((soc1[t-1] + (dt * power_tu1[t-1]/capacity1) - (dt * power_loss1[t-1]/capacity1) ==soc1[t] for t in range(1,n) if cur_df.power[t] >= zero[t]), 'SOC1Balance4')
            opt_model.addConstrs((soc1[t-1] - (dt * power_tu1[t-1]/capacity1) + (dt * power_loss1[t-1]/capacity1) ==soc1[t] for t in range(1,n) if cur_df.power[t] < zero[t]), 'SOC1Balance4')

        # SOC2 balance
        if i==0:
            opt_model.addConstr(soc2_start == soc2[0], 'SOC2Balance1')
            #opt_model.addConstrs((soc2[t-1]+ (dt * (power_charge2[t-1]-power_discharge2[t-1]))/capacity2 ==soc2[t] for t in range(1,n)), 'SOC2Balance2')
            opt_model.addConstrs((soc2[t-1] + (dt * power_tu2[t-1]/capacity2) - (dt * power_loss2[t-1]/capacity2) ==soc2[t] for t in range(1,n) if cur_df.power[t] >= zero[t]), 'SOC2Balance2')
            opt_model.addConstrs((soc2[t-1] - (dt * power_tu2[t-1]/capacity2) + (dt * power_loss2[t-1]/capacity2) ==soc2[t] for t in range(1,n) if cur_df.power[t] < zero[t]), 'SOC2Balance2')
        else:
            opt_model.addConstr(float(df_control['soc2'].iloc[-1])  == soc2[0], 'SOC2Balance3')
            #opt_model.addConstrs((soc2[t-1]+ (dt * (power_charge2[t-1]-power_discharge2[t-1]))/capacity2 ==soc2[t] for t in range(1,n)), 'SOC2Balance4')
            opt_model.addConstrs((soc2[t-1] + (dt * power_tu2[t-1]/capacity2) - (dt * power_loss2[t-1]/capacity2) ==soc2[t] for t in range(1,n) if cur_df.power[t] >= zero[t]), 'SOC2Balance4')
            opt_model.addConstrs((soc2[t-1] - (dt * power_tu2[t-1]/capacity2) + (dt * power_loss2[t-1]/capacity2) ==soc2[t] for t in range(1,n) if cur_df.power[t] < zero[t]), 'SOC2Balance4')

               
      
        
        #Absolute Delta SOE 
        opt_model.addConstrs((soc1[t] - soc2[t] == delta_SOC[t] for t in time), 'deltaSOC')
        opt_model.addConstrs((abs_delta[t] == abs_(delta_SOC[t]) for t in time), 'absDelta')

        #PE Power Loss with bigM Constraints

        #opt_model.addConstrs((power_tu1[t] >= (frac[t]*max_power1) + eps - M * (1 - b[t]) for t in time), name="bigM_constr1")
        #opt_model.addConstrs((power_tu2[t] >= (frac[t]*max_power2) + eps - M * (1 - b[t]) for t in time), name="bigM_constr2")
    #
        #opt_model.addConstrs((power_tu1[t] <= (frac[t]*max_power1) + M * b[t] for t in time), name="bigM_constr3")
        #opt_model.addConstrs((power_tu2[t] <= (frac[t]*max_power2) + M * b[t] for t in time), name="bigM_constr4")
    #
        #opt_model.addConstrs(((b[t] == 1) >> (power_loss1[t] == (0.021*(power_tu1[t]))+0.005*max_power1) for t in time), name="indicator_constr1")
        #opt_model.addConstrs(((b[t] == 0) >> (power_loss1[t] == 0) for t in time), name="indicator_constr2")
    #
        #opt_model.addConstrs(((b[t] == 1) >> (power_loss2[t] == (0.021*(power_tu2[t]))+0.005*max_power2) for t in time), name="indicator_constr3")
        #opt_model.addConstrs(((b[t] == 0) >> (power_loss2[t] == 0) for t in time), name="indicator_constr4")

        opt_model.addConstrs(((power_loss1[t] == (0.021*(power_tu1[t])+0.005*max_power1)) for t in time if cur_df.power[t] >= zero[t]), name="indicator_constr3")
        opt_model.addConstrs(((power_loss2[t] == (0.021*(power_tu2[t])+0.005*max_power2)) for t in time if cur_df.power[t] >= zero[t]), name="indicator_constr4")

        opt_model.addConstrs(((power_loss1[t] == (0.021*(power_tu1[t]+power_loss1[t]))+0.005*max_power1) for t in time if cur_df.power[t] < zero[t]), name="indicator_constr3")
        opt_model.addConstrs(((power_loss2[t] == (0.021*(power_tu2[t]+power_loss1[t]))+0.005*max_power2) for t in time if cur_df.power[t] < zero[t]), name="indicator_constr4")
        
        
        # Power balance
        opt_model.addConstrs((power_tu1[t] + power_tu2[t]  ==(abs(cur_df.power[t])*kw) for t in time ), 'power_balance0')
        
        #opt_model.addConstrs((power_discharge1[t]  == 0 for t in time if cur_df.power[t] >= zero[t]), 'power_balance1')
        #opt_model.addConstrs((power_discharge2[t] == 0 for t in time if cur_df.power[t] >= zero[t]), 'power_balance2')
        #opt_model.addConstrs((power_charge1[t]  == 0 for t in time if cur_df.power[t] < zero[t]), 'power_balance3')
        #opt_model.addConstrs((power_charge2[t] == 0 for t in time if cur_df.power[t] < zero[t]), 'power_balance4')
        #
        #opt_model.addConstrs((power_charge1[t]  == 0.89 * power_tu1[t] for t in time if cur_df.power[t] >= zero[t]), 'power_charge1')
        #opt_model.addConstrs((power_charge2[t]  == 0.89 * power_tu2[t] for t in time if cur_df.power[t] >= zero[t]), 'power_charge2')
#
        #opt_model.addConstrs((power_discharge1[t] == power_tu1[t]/0.89 for t in time if cur_df.power[t] < zero[t]), 'power_discharge1')
        #opt_model.addConstrs((power_discharge2[t] == power_tu2[t]/0.89 for t in time if cur_df.power[t] < zero[t]), 'power_discharge2')

        #opt_model.addConstrs((power_charge1[t] + power_loss1[t] == power_tu1[t] for t in time if cur_df.power[t] >= zero[t]), 'power_charge1')
        #opt_model.addConstrs((power_charge2[t] + power_loss2[t] == power_tu2[t] for t in time if cur_df.power[t] >= zero[t]), 'power_charge2')
#
        #opt_model.addConstrs((power_discharge1[t] - power_loss1[t] == power_tu1[t] for t in time if cur_df.power[t] < zero[t]), 'power_discharge1')
        #opt_model.addConstrs((power_discharge2[t] - power_loss2[t] == power_tu2[t] for t in time if cur_df.power[t] < zero[t]), 'power_discharge2')
        
        
        opt_model.update()

        opt_model.optimize()
        opt_model.write('mpc_test.lp')
        
        if opt_model.status == GRB.Status.OPTIMAL:
         opt_solution = [v.x for v in opt_model.getVars()]
        else:
         print("Optimization problem did not converge to an optimal solution.")
         opt_solution = None
        
        #define empty df for time series vars and dic non time series data 
        dic_opt = {}
        df_opt = pd.DataFrame()
        
        # get time-series and Non TV from optimized model
        # print('#######################################', model.getVars)
        for v in opt_model.getVars():
            #print ('#######################################',v.varName)
            if '[' in v.varName:
                item = v.varName.split('[')
                cur_var = item[0]
                cur_index = item[1][:-1]
                df_opt.loc[cur_index,cur_var] = v.x
            else:
                dic_opt[v.varName] = v.x
                
        # calculate other variables from optimized data
        #df_opt['bat_power'] = df_opt['power_charge'] - df_opt['power_discharge']
        #df_opt = df_opt.set_index(self.profile.index[1:])
          
    
        return self.parameters, df_opt, dic_opt
 
    def mpc_total(self):      
        df_pred_all = pd.DataFrame()
        df_opt_horizon=pd.DataFrame()
        df_opt_NTV = pd.DataFrame()
        df_opt_final = pd.DataFrame()
        df_control_final = pd.DataFrame()
        df_control = pd.DataFrame()
        
        for i in range(0, self.I_total):
            
            cur_df = self.fake_pred(i)

            print("Prediction Horizon_optimitzer_",i)
            print("df_Prediction: ",cur_df)
            
            for column_name in cur_df:
               df_pred_all[column_name+str(i)] = cur_df[column_name]

            self.parameters, df_opt, dic_opt = self.mpc_horizon(cur_df,i,self.parameters,df_control)
            
            df_opt = df_opt.set_index(self.profile.index[int(i * self.T_control/self.dt) : int(i*self.T_control/self.dt + self.T_prediction/ self.dt)])
            df_control = df_opt[0:int((self.T_control / self.dt))]
            df_opt_horizon = pd.concat([df_opt_horizon, df_opt], axis=1)
        
            if i==0:
                column_names = list(df_opt.columns)
                df_opt_final = pd.DataFrame(columns=column_names)    # Create an empty DataFrame with column names
            
            df_control_final = pd.concat([df_control_final, df_control.tail(1)], ignore_index=True)
            df_opt_final = pd.concat([df_opt_final, df_control], ignore_index=True)
            
            
            
            
            for item in dic_opt:
                df_opt_NTV[item] = dic_opt[item] #define as a df

            print("############################################################################################") 
            print("############################################################################################")
            print("############################################################################################") 
            print("cur_df:",cur_df) 
            print("############################################################################################") 
            print("df_opt:",df_opt)
            print("############################################################################################")
            #print("df_opt_cum_horizon:",df_opt_horizon)
            print("df_control: ", df_control)
            #print("parameters: ", self.parameters)
            print("############################################################################################")   
            print("df_opt_final:",df_opt_final)
            print("############################################################################################") 
            print("############################################################################################") 
            print("############################################################################################") 

        return df_opt_final,df_opt_horizon, df_opt_NTV, df_pred_all,df_control_final

    
    def fake_pred(self,i):

        cur_df = self.profile.iloc[int(i * self.T_control/ self.dt) : int(i*self.T_control/ self.dt + self.T_prediction/ self.dt)]
        return cur_df

In [None]:
mpc_optimizer = mpc_controller(profile, parameters)
df_opt_final,df_opt_horizon, df_opt_NTV, df_pred_all,df_control_final = mpc_optimizer.mpc_total()

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=profile.index, y=df_opt_final["soc1"], name="Soc1"))
fig.add_trace(go.Scatter(x=profile.index, y=df_opt_final["soc2"], name="Soc2"))
fig.add_trace(go.Scatter(x=profile.index, y=df_opt_final["delta_SOC"], name="delta_SOC"))
fig.update_yaxes(title="SOC")
fig.update_xaxes(title="No. of days")
#fig.update_traces(line_shape="hv")
fig.update_layout(
  width=900,  # Specify the width in pixels
  height=500  # Specify the height in pixels
)
fig.show()

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(y=df_opt_final["power_loss1"], name="Power1"))
fig.add_trace(go.Scatter(y=df_opt_final["power_loss2"], name="Power2"))
fig.update_yaxes(title="Power_loss [kW]")
fig.update_xaxes(title="No. of days")
#fig.update_traces(line_shape="hv")
fig.update_layout(
  width=900,  # Specify the width in pixels
  height=500  # Specify the height in pixels
)
fig.show()

In [None]:
# Data
SoE1 = df_opt_final["soc1"]
SoE2 = df_opt_final["soc2"]
del_SOE= abs(df_opt_final["delta_SOC"])

# Create a new figure
fig, ax = plt.subplots(figsize=(15, 8))

# Plot the data
ax.plot(profile.index[:len(SoE1)], SoE1, label="String_1_SOC", color="orange", linewidth=2)
ax.plot(profile.index[:len(SoE2)], SoE2, label="String_2_SOC", color="green", linewidth=2,linestyle='--')
#ax.plot(profile.index[:len(SoE2)], del_SOE, label="delta_SOC", color="brown", linewidth=2, linestyle='--')

#ax2 = ax.twinx()
#ax2.plot(profile.index[:len(del_SOE)], del_SOE, label="del_SOC", color="brown", linewidth=2, linestyle='--')

# Set labels and title
ax.set_xlabel("Time (days)", fontsize=34)
ax.set_ylabel("State of Charge", fontsize=34)
#ax.set_title("State of Charge Over Time")

# Set labels and title for the second y-axis
#ax2.set_ylabel("delta_SOC", fontsize=24)
ax.tick_params(axis='y', labelsize=26)
ax.tick_params(axis='x', labelsize=26)
#ax2.tick_params(axis='y', labelsize=16)
# Add a legend
#ax.legend(fontsize=18)
#plt.xticks(fontsize=18)
#plt.yticks(fontsize=18)

# Add legends
ax.legend(loc="lower right", fontsize=26)
#ax2.legend(loc="upper right", fontsize=18)

# Add grid lines
ax.grid(True, linestyle='--')
ax.xaxis.set_major_formatter(mdates.DateFormatter('%d'))
plt.yticks(np.arange(0,1.0, 0.1))


# Adjust layout
plt.tight_layout()

# Save the plot as a high-quality image (e.g., PNG or PDF)
plt.savefig("SoE_Plot_MPC.png", dpi=300, bbox_inches="tight")
# Or, if you prefer to display the plot interactively, use plt.show()
# plt.show()

In [None]:
# Data
power_tu1 = df_opt_final["power_charge1"] - df_opt_final["power_discharge1"]
power_tu2 = df_opt_final["power_charge2"] - df_opt_final["power_discharge2"]

# Create a new figure
fig, ax = plt.subplots(figsize=(15, 5))

# Plot the data
ax.plot(profile.index[:len(power_tu1)], power_tu1, label="String_1_Power", color="orange", linewidth=2)
ax.plot(profile.index[:len(power_tu2)], power_tu2, label="String_2_Power", color="green", linewidth=1, linestyle='--')

# Set labels and title
ax.set_xlabel("Time (days)",fontsize=24)
ax.set_ylabel("String Power (kW)",fontsize=24)
#ax.set_title("State of Charge Over Time")

# Add grid lines
ax.grid(True, linestyle='--')
ax.xaxis.set_major_formatter(mdates.DateFormatter('%d-%m'))

# Add a legend
ax.legend(fontsize=18)
plt.xticks(fontsize=16)
plt.yticks(fontsize=16)

# Adjust layout
plt.tight_layout()

ax.xaxis.set_major_formatter(mdates.DateFormatter('%d-%m'))

# Save the plot as a high-quality image (e.g., PNG or PDF)
plt.savefig("Power_split_MPC.png", dpi=300, bbox_inches="tight")
# Or, if you prefer to display the plot interactively, use plt.show()
# plt.show()


In [None]:
total_power_loss1 = df_opt_final["power_loss1"].sum()
total_power_loss2 = df_opt_final["power_loss2"].sum()
total_power1= abs(df_opt_final["power_tu1"]).sum()
total_power2= abs(df_opt_final["power_tu2"]).sum()

print(total_power_loss1+total_power_loss2)
print(total_power1+total_power2)
print(1-((total_power_loss1+total_power_loss2)/(total_power1+total_power2)))


In [None]:
total_soc1 = df_opt_final["soc1"].sum()
total_soc2 = df_opt_final["soc2"].sum()
soc1_mean= total_soc1/len(df_opt_final["soc1"])
soc2_mean= total_soc2/len(df_opt_final["soc2"])
print("soc1_mean: ", soc1_mean)
print("soc2_mean: ", soc2_mean)
print("System Avergae SOC: ",(soc1_mean+soc2_mean)/2)

In [None]:
df_opt_final.to_csv('output_MPC.csv', index=False)