# Beebop technical test

During this technical test, the objective is to implement an algorithm that finds a power schedule to charge an Electric Vehicle (EV). The objective is to minimize the customer's energy cost, while respecting their comfort constraints (battery sufficiently charged to cover their driving needs). Note that the vehicle is allowed to discharge into the grid (sell energy).

## Problem Statement

In [1]:
import numpy as np
import numpy.typing as npt
import plotly.graph_objects as go
from plotly.subplots import make_subplots

The car is empty at midnight.  
The owner leaves for work at 7am, and comes back at 5pm.  
While commuting, the car discharges a constant 3 kW.

In [2]:
INIT_SOE = 0  # kWh
BATTERY_CAPACITY = 100  # kWh
MAX_POWER = 10  # kW
COMMUTE_POWER = -3  # kW
TIMESTEP_SIZE = 1  # Hour
HORIZON = 24
MIN_LEAVE_TIME = 5
MAX_LEAVE_TIME = 9
MIN_RETURN_TIME = 15
MAX_RETURN_TIME = 19

In [3]:
class Data:
    def __init__(self, seed: int) -> None:
        np.random.seed(seed)
        self.leave_time = np.random.randint(MIN_LEAVE_TIME, MAX_LEAVE_TIME)
        self.return_time = np.random.randint(MIN_RETURN_TIME, MAX_RETURN_TIME)
        self.energy_prices = np.maximum(
            np.array(
                [  # Euros/kWh
                    0.08609,
                    0.08156,
                    0.07896,
                    0.07905,
                    0.08360,
                    0.08772,
                    0.11136,
                    0.11263,
                    0.10299,
                    0.08109,
                    0.07000,
                    0.06008,
                    0.05687,
                    0.05521,
                    0.05859,
                    0.06772,
                    0.07639,
                    0.08904,
                    0.10748,
                    0.13838,
                    0.17048,
                    0.12993,
                    0.11220,
                    0.09532,
                ]
            )
            + np.random.normal(0, 0.1, HORIZON),
            0,
        )
        self.min_soe_before_commute = (
            -(self.return_time - self.leave_time) * COMMUTE_POWER
        )

## Starter Code

### Transition Function and constraints

In [23]:
def get_soe(power_schedule: npt.NDArray[np.float64], data: Data):
    """
    We consider a simple linear transition function:
    soe += power * time # Where each timestep is 1 hour
    """
    soes = np.zeros(HORIZON + 1)
    for t in range(data.leave_time):
        soes[t + 1] = soes[t] + power_schedule[t] * TIMESTEP_SIZE
    for t in range(data.leave_time, data.return_time):
        soes[t + 1] = soes[t] + COMMUTE_POWER * TIMESTEP_SIZE
    for t in range(data.return_time, HORIZON):
        soes[t + 1] = soes[t] + power_schedule[t] * TIMESTEP_SIZE
    return soes

In [55]:
def check_legal(
    soes: npt.NDArray[np.float64], power_schedule: npt.NDArray[np.float64]
) -> bool:
    is_legal = True
    if (power_schedule > MAX_POWER).any():
        print(f"Max charge constraint not respected {power_schedule=}")
        is_legal = False
    if (power_schedule < -MAX_POWER).any():
        print(f"Max discharge constraint not respected {power_schedule=}")
        is_legal = False
    if (soes < -0.001).any():
        print(f"Battery capacity constraint not respected {soes=}")
        is_legal = False
    if (soes >= BATTERY_CAPACITY).any():
        print(f"SoE must always be positive {soes=}")
        is_legal = False
    return is_legal

### Visualization helper

In [6]:
def plot(
    power_schedule: npt.NDArray[np.float64],
    soes: npt.NDArray[np.float64],
    cost: npt.NDArray[np.float64],
    is_legal: bool,
    data: Data,
) -> None:
    cost_fig = make_subplots(
        rows=1,
        cols=1,
        subplot_titles=("Power Schedule", "Battery Energy Schedule"),
        specs=[[{"secondary_y": True}]],
    )
    title = f"Total cost is {cost:,.2f}€"
    if not is_legal:
        title += " (constraints not respected)"
    cost_fig.update_layout(
        title=title,
        xaxis1={"title": "Time [h]"},
        yaxis1={"title": "Power [kW]"},
        yaxis2={"title": "Price [Euro/kWh]"},
        xaxis2={"title": "Time [h]"},
    )
    cost_fig.add_trace(
        go.Bar(y=power_schedule, offset=0, width=1, name="Power"), row=1, col=1
    )
    cost_fig.add_trace(
        go.Scatter(
            y=data.energy_prices,
            name="Day-Ahead Price [€/kWh]",
        ),
        secondary_y=True,
        row=1,
        col=1,
    )
    cost_fig.add_trace(
        go.Scatter(
            x=[data.leave_time, data.leave_time],
            y=[-MAX_POWER, MAX_POWER],
            mode="lines",
            showlegend=False,
            line={"color": "rgba(0,0,0,0)"},  # invisible line for fill
        ),
        row=1,
        col=1,
    )
    cost_fig.add_trace(
        go.Scatter(
            x=[data.return_time, data.return_time],
            y=[-MAX_POWER, MAX_POWER],
            mode="lines",
            fill="tonexty",
            showlegend=False,
            legendgroup="Unavailability Region",
            line={"color": "rgba(0,0,0,0)"},  # invisible line for fill
            fillcolor="rgba(255, 105, 105, 0.4)",
        ),
        row=1,
        col=1,
    )
    cost_fig.show()

    soe_fig = go.Figure(
        layout=go.Layout(
            xaxis=go.layout.XAxis(title="Time [h]"),
            yaxis=go.layout.YAxis(title="State of Energy [kWh]"),
        )
    )
    soe_fig.add_trace(
        go.Scatter(
            x=[data.leave_time, data.leave_time],
            y=[0, BATTERY_CAPACITY],
            mode="lines",
            showlegend=False,
            line={"color": "rgba(0,0,0,0)"},  # invisible line for fill
        ),
    )
    soe_fig.add_trace(go.Bar(y=soes, offset=0, width=1, name="SoE"))
    soe_fig.add_trace(
        go.Scatter(
            x=[data.return_time, data.return_time],
            y=[0, BATTERY_CAPACITY],
            mode="lines",
            fill="tonexty",
            name="Unavailability Region",
            legendgroup="Unavailability Region",
            line={"color": "rgba(0,0,0,0)"},  # invisible line for fill
            fillcolor="rgba(255, 105, 105, 0.4)",
        ),
    )
    soe_fig.add_trace(
        go.Scatter(
            x=[data.leave_time, data.leave_time + 1],
            y=[data.min_soe_before_commute, data.min_soe_before_commute],
            name="Min SoE before commute",
            mode="lines",
        )
    )
    soe_fig.show()

### Evaluation function

In [7]:
def solution(
    power_schedule: npt.NDArray[np.float64], data: Data, visualize: bool
) -> None:
    if power_schedule.shape != (HORIZON,):
        raise ValueError(f"Invalid power schedule shape {power_schedule.shape}")
    if (power_schedule[data.leave_time : data.return_time] != 0).any():
        print("Warning: Power values were overwritten when the EV is unavailable")
        power_schedule[data.leave_time : data.return_time] = 0

    soes = get_soe(power_schedule, data)
    is_legal = check_legal(soes, power_schedule)
    cost = np.sum(power_schedule * TIMESTEP_SIZE * data.energy_prices)
    if visualize:
        plot(power_schedule, soes, cost, is_legal, data)
    elif not is_legal:
        print("Constraints not respected")
    else:
        print(f"Total cost is {cost:,.2f}€")

## Example Solutions

In [8]:
data = Data(seed=0)
print(f"{data.leave_time=}")
print(f"{data.return_time=}")
print(f"{data.min_soe_before_commute=}")
print(f"{data.energy_prices=}")

data.leave_time=5
data.return_time=18
data.min_soe_before_commute=39
data.energy_prices=array([0.16024917, 0.23685137, 0.        , 0.21240454, 0.        ,
       0.28471244, 0.23797185, 0.06204235, 0.35751008, 0.18917119,
       0.11843122, 0.11799405, 0.03871174, 0.19623046, 0.02114283,
       0.09523983, 0.        , 0.1267327 , 0.11082389, 0.20643672,
       0.01413033, 0.07326024, 0.08798505, 0.24675913])


In [9]:
def illegal_optimisation(data: Data, visualize: bool) -> None:
    power_schedule = np.full(24, 7)
    power_schedule[data.leave_time : data.return_time] = 0
    solution(power_schedule, data, visualize)


illegal_optimisation(data, visualize=True)

[ 0.  7. 14. 21. 28. 35. 32. 29. 26. 23. 20. 17. 14. 11.  8.  5.  2. -1.
 -4.  3. 10. 17. 24. 31. 38.]
Battery capacity constraint not respected soes=array([ 0.,  7., 14., 21., 28., 35., 32., 29., 26., 23., 20., 17., 14.,
       11.,  8.,  5.,  2., -1., -4.,  3., 10., 17., 24., 31., 38.])


In [10]:
def dummy_optimisation(data: Data, visualize: bool) -> None:
    power_schedule = np.array(
        [9, 9, 9, 9, 9, 9, 9, 9, 9, 0, 0, 0, 0, 0, 0, 3, 3, 3, 3, 3, -2, -2, -2, -2]
    )
    power_schedule[data.leave_time : data.return_time] = 0
    solution(power_schedule, data, visualize)


dummy_optimisation(data, visualize=True)

[ 0.  9. 18. 27. 36. 45. 42. 39. 36. 33. 30. 27. 24. 21. 18. 15. 12.  9.
  6.  9. 12. 10.  8.  6.  4.]


## Your solution

In [57]:
import cvxpy as cp

def optimisation(data: Data, visualize: bool) -> None:
    leaving = data.leave_time
    returning = data.return_time
    # optimization
    charge = cp.Variable((24,), name="charging_power")
    soe = cp.Variable((24,), name="battery_capacity", nonneg=True)

    price = cp.Parameter((24,), name="price", value=data.energy_prices)

    constraints = []
    # 0 capacity at 12
    constraints.append(soe[0] == INIT_SOE)

    # battery constraints
    constraints.append(charge[:] <= MAX_POWER)
    constraints.append(charge[:] >= -MAX_POWER)
    constraints.append(soe[:] <= BATTERY_CAPACITY)
    constraints.append(soe[:] >= 0)

    # charge before leaving
    constraints.append(soe[1:leaving+1] == soe[:leaving] + charge[:leaving] * TIMESTEP_SIZE)
    # charge after returning
    constraints.append(soe[returning+1:] == soe[returning:-1] + charge[returning:-1] * TIMESTEP_SIZE)
    # no charging while EV is gone: 
    constraints.append(charge[leaving:returning] == 0)
    # capacity minus 3 per hour while gone
    constraints.append(soe[leaving+1:returning+1] == soe[leaving:returning] + COMMUTE_POWER  * TIMESTEP_SIZE)
    constraints.append(charge[-1]==0)

    objective = cp.Minimize(cp.sum(cp.multiply(price, charge)))
    problem = cp.Problem(objective=objective, constraints=constraints)
    problem.solve(verbose=False)

    # print(charge.value)
    power_schedule = np.array([int(0) if x<=0.001 and x>=-0.001 else x for x in charge.value])

    solution(power_schedule, data, visualize)


data = Data(seed=42)
optimisation(data, visualize=True)

In [58]:
for seed in range(100):

    optimisation(Data(seed), visualize=False)


Total cost is 1.80€
Total cost is -4.60€
Total cost is -0.93€
Total cost is -8.15€
Total cost is -2.71€
Total cost is -4.32€
Total cost is -5.81€
Total cost is -10.31€
Total cost is -7.87€
Total cost is -6.90€
Total cost is -7.53€
Total cost is -2.22€
Total cost is -5.31€
Total cost is -6.43€
Total cost is -7.29€
Total cost is -7.51€
Total cost is -2.63€
Total cost is -4.66€
Total cost is 0.39€
Total cost is -5.43€
Total cost is -8.35€
Total cost is -3.02€
Total cost is -8.65€
Total cost is -6.93€
Total cost is -8.12€
Total cost is -3.11€
Total cost is -2.71€
Total cost is -5.47€
Total cost is -4.77€
Total cost is -0.74€
Total cost is -2.93€
Total cost is -3.16€
Total cost is -4.43€
Total cost is -0.42€
Total cost is -5.29€
Total cost is -3.25€
Total cost is -2.71€
Total cost is -3.84€
Total cost is -1.81€
Total cost is -5.58€
Total cost is -3.55€
Total cost is -0.35€
Total cost is -3.38€
Total cost is -2.93€
Total cost is -2.49€
Total cost is -6.23€
Total cost is -0.85€
Total cost is 