In [1]:
from scipy.optimize import linprog
import numpy as np
import pandas as pd
import plotly.express as px
import datetime


import plotly.graph_objects as go
from plotly.subplots import make_subplots

from datetime import timedelta
import datetime
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

# Battery specs :



- SOC: state of charge in [0,1]
- NEC: nominal energy capacity (Wh)
- EC: energy capacity (Wh)




In [2]:
NEC = 10**8 

To have a linear constraint, we charge/ discharge the battery at a constant rate 0f 0.5 :


In [3]:
E_1h = NEC/2 

# Schedule optimization functions :

In [4]:
def optimize(df) :
    """
    Returns a copy of the initial price data frame, with three additional columns :
    - schedule: Change in EC during the hour
    - capacity: Capacity at the end of the hour
    - SOC: SOC at the end of the hour 

    The schedule is optimized on the entire time period spanned by df_day. 
    
    The variable is the increment in EC for every hour t of the data frame. denoted by x with shape (n_hours) where n_hours 
    is the number of hours in the dataframe.
    
    optimization function :
    sum{t from 1 to n_hours} P[t] * x[t] = P @ x

    constraints : for t from 1 to n_hours :
    - (I) EC[t] = sum{i from 1 to t} x[i] <= NEC. battery capacity never exceeds nominal capacity
    - (II) EC[t] = sum{i from 1 to t} x[i] >= 0. battery can't be discharged further when already discharged

    bounds : for t from 1 to n_hours :
    - (III) E_1h <= x[t] <= E_1h. At any hour t, we can't (dis)charge for more than (-)E_1h

    """
    
    n_hours = len(df)
    
    if  n_hours % 24 != 0 :
        raise Exception("The dataframe should contain only full days (24 hours)")


    P = df.price_euros_wh.to_numpy()
    
    ## (I) lhs <= rhs
    lhs_ineq1 = np.tril(np.ones((n_hours,n_hours)))
    rhs_ineq1 = np.ones(n_hours) * NEC 

    ## (II) lhs <= rhs
    lhs_ineq2 = -np.tril(np.ones((n_hours,n_hours)))
    rhs_ineq2 = np.zeros(n_hours)

    
    ## (III)
    bnd = np.array([(-E_1h, E_1h) for i in range(n_hours)])

    # format stuff as the linprog function wants
    lhs_ineq = np.vstack((lhs_ineq1,lhs_ineq2))
    rhs_ineq = np.hstack((rhs_ineq1,rhs_ineq2))
    

    opt = linprog(c=P, A_ub=lhs_ineq, b_ub=rhs_ineq,bounds=bnd)


    df_optim = df.copy()
    df_optim["schedule"] = opt.x
    df_optim["capacity"] = np.hstack((np.array([0]),np.cumsum(opt.x)[:-1]))
    df_optim["SOC"] = 100*df_optim["capacity"]/NEC

    return df_optim


def optimize_day_by_day(df) :
    """
    Returns a copy of the initial price data frame, with three additional columns :
    - schedule: Change in EC during the hour
    - capacity: Capacity at the end of the hour
    - SOC: SOC at the end of the hour 

    The schedule is optimized **for each day of the dataframe, considered separately.** 
    
    
    The variable is the increment in EC for every hour t of the data frame. denoted by x with shape (n_hours) where n_hours 
    is the number of hours in the dataframe.
    
    FOR EACH DAY : 

            initial_capacity is the capacity at the beginning of the day. Can be > 0 if battery didn't finish the last day empty
            
            optimization function :

            sum{t from 1 to 24} P[t] * x[t] = P @ x

            constraints : for t from 1 to 24 :
            - (I) EC[t] = sum{i from 1 to t} x[i] + initial_capacity <= NEC. battery capacity never exceeds nominal capacity
            - (II) EC[t] = sum{i from 1 to t} x[i] + initial_capacity >= 0. battery can't be discharged further when already discharged

            bounds : for t from 1 to 24 :
            - (III) E_1h <= x[t] <= E_1h. At any hour t, we can't (dis)charge for more than (-)E_1h

    """
    
    P = df.price_euros_wh.to_numpy() 
    
    n_hours = len(df)
    if  n_hours % 24 != 0 :
        raise Exception("The dataframe should contain only full days (24 hours)")
    
    schedule = np.zeros(n_hours)
    
    ## first day starts with empty battery :
    initial_capacity = 0
    
    ## optimization done for each day :
    for day in range(0,n_hours//24) :
        
        
        P_day = P[day*24:(day+1)*24] 

        ## (I) lhs <= rhs
        lhs_ineq1 = np.tril(np.ones((24,24)))
        rhs_ineq1 = np.ones(24) * (NEC - initial_capacity)

        ## (II) lhs <= rhs
        lhs_ineq2 = -np.tril(np.ones((24,24)))
        rhs_ineq2 = np.zeros(24) + initial_capacity
        
        ## (III)
        bnd = np.array([(-E_1h, E_1h) for i in range(24)])

        rhs_ineq = np.hstack((rhs_ineq1,rhs_ineq2))
        lhs_ineq = np.vstack((lhs_ineq1,lhs_ineq2))

        
        # format stuff as the linprog function wants
        opt = linprog(c=P_day, A_ub=lhs_ineq, b_ub=rhs_ineq,bounds=bnd)

        # initial capacity for the next day is the remaining capacity in the battery
        initial_capacity = initial_capacity + sum(opt.x)
        

        schedule[day*24:(day+1)*24]  = opt.x

    capacity = np.hstack((np.array([0]),np.cumsum(schedule)[:-1]))

    df_optim = df.copy()
    df_optim["schedule"] = schedule
    df_optim["capacity"] = np.hstack((np.array([0]),np.cumsum(schedule)[:-1]))
    df_optim["SOC"] = 100*df_optim["capacity"]/NEC


    return df_optim



# Plot functions :

In [5]:
def display_profit(df_optim) :
    """
    Displays daily profits 
    """

    days =pd.to_datetime(df_optim["timestamp"].apply(lambda x: datetime.datetime(x.year, x.month, x.day)),utc=True).unique()
    
    daily_profit = []


    for i in range(len(days)) :
        daily_profit.append(-df_optim.price_euros_wh.iloc[i*24:(i+1)*24] @ df_optim.schedule.iloc[i*24:(i+1)*24])

    fig = make_subplots(specs=[[{"secondary_y": True}]])


    fig.add_trace(go.Scatter(x=df_optim.timestamp, y=df_optim.price_euros_wh*10**6,name='Price (EUR/MWh)',line={"shape":"hv"},showlegend=True),
    secondary_y=False)

    fig.add_trace(go.Bar(x=days,y=daily_profit,name="Daily profit (EUR)",offset=2,showlegend=True,opacity=0.5),
    secondary_y=True)

    fig.update_layout(
    title_text="Daily profit<br>Total: {} EUR<br>Mean: {} EUR".format(int(sum(daily_profit)), int(np.mean(daily_profit)))
    )

    fig.update_xaxes(title_text="Hour")

    fig.update_yaxes(title_text="Price (EUR/MWh)", secondary_y=False)
    fig.update_yaxes(title_text="Daily profit (EUR)", secondary_y=True)

    fig.update_layout(bargap=0.)
    fig.write_html("profit.html")
    fig.show()
    


    
    



def display_schedule(df_optim,start,end) :
    """
    Displays charge schedule between start datetime and end datetime 
    """
		
    mask = (df_optim.timestamp<end) & (df_optim.timestamp>=start)
    df_to_show = df_optim[mask]


    fig = make_subplots(specs=[[{"secondary_y": True}]])


    fig.add_trace(go.Scatter(x=df_to_show.timestamp, y=df_to_show.price_euros_wh*10**6,name='Price (EUR/MWh)',line={"shape":"hv"},showlegend=True),
        secondary_y=False)

    fig.add_trace(
        go.Scatter(x=df_to_show.timestamp,y=df_to_show.SOC,name="SOC (%)",showlegend=True),secondary_y=True)



    color = None 
    count = 0

    shapes = []


    for h in df_to_show.index[np.sign(df_to_show.schedule).diff() != 0] : 
        
       
        if (df_to_show.schedule[h] == 0 and not color) or (df_to_show.schedule[h] > 0  and color == "green") or (df_to_show.schedule[h] < 0  and color == "red") :
            continue

        elif df_to_show.schedule[h] == 0 and color :
            shapes.append(dict(type="rect",x0=df_to_show.timestamp[start], y0=1, x1=df_to_show.timestamp[h], y1=100,  yref="y2",fillcolor=color, opacity=0.25, line_width=0))
            color = None

        elif df_to_show.schedule[h] > 0 and color == "red" : 
            shapes.append(dict(type="rect",x0=df_to_show.timestamp[start], y0=1, x1=df_to_show.timestamp[h], y1=100, yref="y2",fillcolor=color, opacity=0.25, line_width=0))
            start = h 
            color = "green"

        elif df_to_show.schedule[h] > 0 and not color :
            start = h
            color = "green"
            
        elif df_to_show.schedule[h] < 0 and color == "green" : 
            shapes.append(dict(type="rect",x0=df_to_show.timestamp[start], y0=1, x1=df_to_show.timestamp[h], y1=100, yref="y2", fillcolor=color, opacity=0.25, line_width=0))
            start = h 
            color = "red"
            
        elif df_to_show.schedule[h] < 0 and not color :
            start = h
            color = "red"






    # Add range slider
    fig.update_layout(
        xaxis=dict(
            rangeselector=dict(
                buttons=list([
                    dict(step="all"),
                    dict(count=1,
                        label="1m",
                        step="month",
                        stepmode="backward"),
                    dict(count=2*7,
                        label="2w",
                        step="day",
                        stepmode="backward"),
                    dict(count=1*7,
                        label="1w",
                        step="day",
                        stepmode="backward"),
                    dict(count=2,
                        label="2d",
                        step="day",
                        stepmode="backward"),
                    dict(count=1,
                        label="1d",
                        step="day",
                        stepmode="backward"),
                    
                ])
            ),
            rangeslider=dict(
                visible=True
            ),
            type="date"
        )
    )

    fig.update_layout(
        shapes=shapes)


    fig.update_layout(
    title_text="Charge schedule.    Please use the buttons below to set the data range.<br>"
    )

    fig.update_xaxes(title_text="Date")

    fig.update_yaxes(title_text="Price (EUR/MWh)", secondary_y=False)
    fig.update_yaxes(title_text="SOC (%)", secondary_y=True)
        
    fig.show()  
    fig.write_html("schedule.html")





    

# Price dataset :

In [6]:
df = pd.read_csv("data/european_wholesale_electricity_price_data_hourly.csv")
df.rename(columns={"Datetime (UTC)":"timestamp","Price (EUR/MWhe)":"price_euros_wh"},inplace=True, errors='raise')
df["timestamp"] = pd.to_datetime(df["timestamp"], format="%Y-%m-%dT%H:%M:%S.%f%Z")

df.price_euros_wh /= 10 ** 6

start = "2022-01-01 00:00:00"
end = "2022-02-01 00:00:00"
country = "Austria"

df = df[df["Country"] == country]
mask = (df.timestamp<end) & (df.timestamp>=start)
df = df[mask]



# Optimize and show schedule :

In [7]:
df_optim = optimize(df)
# df_optim = optimize_day_by_day(df)

# Show profits :

In [8]:
start_show = start
end_show = end
display_schedule(df_optim,start_show,end_show)

In [9]:
display_profit(df_optim)