In [1]:
# EV Charging Station Simulation and Optimization

import numpy as np
import pandas as pd
import rsome as rso
from rsome import ro
from rsome import cpt_solver as solver
#from rsome import msk_solver as solver

# -------------------- Parameters --------------------

def initialize_parameters():
    """
    Initialize the simulation parameters.

    Returns:
        dict: Dictionary containing parameters such as planning horizon, energy requirements, etc.
    """
    T = 20
    parameters = {
        "T": T,  # Planning horizon
        "energy_requirements": [5, 10, 15, 20, 25, 30, 35, 40],
        "max_stay_duration": int(T*0.3),  # Longest stay duration
        "max_charging_speed": 20,  # Maximum charging speed per period
        "arrival_rate_range": (0.05, 0.2),  # Range for random arrival rates
        "test_size": 10000,  # Number of samples to generate
        "random_seed": 2024,  # Seed for reproducibility
        "et": {t: 1 for t in range(1, T+1)},  # Time-based cost coefficients
        "d": T * 0.5,  # Cost scaling factor
        "S": 1000  # Sample size for SAA
    }
    return parameters

# -------------------- Data Generation --------------------

def generate_arrival_rates(params):
    """
    Generate arrival rates and vehicle types based on parameters.

    Args:
        params (dict): Simulation parameters.

    Returns:
        tuple: Vehicle types and their respective arrival rates.
    """
    np.random.seed(params["random_seed"])
    v_types = []
    v_rates = []

    for arrival_period in range(1, params["T"] + 1):
        for stay_duration in range(1, params["max_stay_duration"] + 1):
            departure_period = arrival_period + stay_duration - 1
            if departure_period <= params["T"]:
                for energy in params["energy_requirements"]:
                    if energy <= (departure_period - arrival_period + 1) * params["max_charging_speed"]:
                        v_types.append((arrival_period, departure_period, energy))
                        v_rates.append(np.random.uniform(*params["arrival_rate_range"]))

    return v_types, v_rates

def simulate_arrivals(v_rates, test_size, seed):
    """
    Simulate arrivals for the given rates.

    Args:
        v_rates (list): List of arrival rates.
        test_size (int): Number of samples to generate.
        seed (int): Random seed for reproducibility.

    Returns:
        np.ndarray: Simulated data array.
    """
    np.random.seed(seed)
    V = len(v_rates)
    test_data = np.zeros((test_size, V))

    for v in range(V):
        test_data[:, v] = np.random.poisson(v_rates[v], test_size)

    return test_data

# -------------------- Time-Based Vehicle Mapping --------------------

def map_vehicle_types_to_time(v_types, T):
    """
    Map vehicle types to their corresponding time periods.

    Args:
        v_types (list): List of vehicle types.
        T (int): Planning horizon.

    Returns:
        dict: Mapping of time periods to vehicle types.
    """
    vt_types = {}
    for t in range(1, T + 1):
        vt_types[t] = [v_idx for v_idx, v_type in enumerate(v_types) if v_type[0] <= t and v_type[1] >= t]
    return vt_types

# -------------------- Out-of-Sample Testing --------------------

def out_of_sample_test(x_sol, test_data, vt_types, params):
    """
    Perform out-of-sample testing to evaluate the optimization model.

    Args:
        x_ecp (dict): Optimal charging schedules for each vehicle.
        test_data (np.ndarray): Simulated test data.
        vt_types (dict): Mapping of time periods to vehicle types.
        params (dict): Simulation parameters.

    Returns:
        float: Average cost from the out-of-sample test.
    """
    T = params["T"]
    et = params["et"]
    d = params["d"]

    flow = np.zeros((params["test_size"], T))
    for t in range(1, T + 1):
        flow[:, t - 1] = sum(test_data[:, v - 1] * x_sol[v, t] for v in vt_types[t])

    cost = flow @ np.array(list(et.values())) + np.max(flow, axis=1) * d
    return np.mean(cost)


# -------------------- Optimization Models --------------------
def define_ecp_model(v_types, v_rates, vt_types, params):
    """
    Define and solve the robust optimization model for the EV charging station.

    Args:
        v_types (list): List of vehicle types.
        v_rates (list): Arrival rates for vehicle types.
        vt_types (dict): Mapping of time periods to vehicle types.
        params (dict): Simulation parameters.

    Returns:
        dict: Optimal charging schedules for each vehicle.
    """
    ecp = ro.Model("ECP")
    T = params["T"]
    max_charging_speed = params["max_charging_speed"]
    et = params["et"]
    d = params["d"]

    x = {}
    xi = {}
    zeta = {}
    flow = {}
    kappa = ecp.dvar()
    gama = ecp.dvar()
    mu = ecp.dvar()

    ecp.st(mu >= 0)

    # Define decision variables and constraints for each vehicle type
    for v in range(len(v_rates)):
        temp = 0
        for t in range(v_types[v][0], v_types[v][1] + 1):
            x[v, t] = ecp.dvar()
            xi[v, t] = ecp.dvar()
            ecp.st(
                x[v, t] >= 0,
                x[v, t] <= max_charging_speed,
                xi[v, t] >= rso.pexp(x[v, t], mu)
            )
            temp += x[v, t]
        ecp.st(temp == v_types[v][2])

    # Define constraints for each time period
    temp = 0
    for t in range(1, T + 1):
        zeta[t] = ecp.dvar()
        flow[t] = ecp.dvar()
        ecp.st(sum(x[v, t] * v_rates[v] for v in vt_types[t]) == flow[t])
        ecp.st(flow[t] <= gama)
        ecp.st(
            zeta[t] >= rso.pexp(-kappa + sum((xi[v, t] - x[v, t] - mu) * v_rates[v] for v in vt_types[t]), mu)
        )
        temp += zeta[t]

    ecp.st(temp <= mu)
    ecp.min(d * (kappa + gama) + sum(et[t] * flow[t] for t in range(1, T + 1)))

    ecp.solve(solver, log=True)

    # Extract optimal schedule
    x_ecp = {}
    for v in range(len(v_rates)):
        for t in range(v_types[v][0], v_types[v][1] + 1):
            x_ecp[v, t] = x[v, t].get()

    return x_ecp

def define_deterministic_model(v_types, v_rates, vt_types, params):
    """
    Define and solve the deterministic (LP) optimization model.

    Args:
        v_types (list): List of vehicle types.
        v_rates (list): Arrival rates for vehicle types.
        vt_types (dict): Mapping of time periods to vehicle types.
        params (dict): Simulation parameters.

    Returns:
        dict: Optimal charging schedules for each vehicle.
    """
    lp = ro.Model("LP")
    T = params["T"]
    max_charging_speed = params["max_charging_speed"]
    et = params["et"]
    d = params["d"]

    x = {}
    flow = {}
    gama = lp.dvar()

    for v in range(len(v_rates)):
        temp = 0
        for t in range(v_types[v][0], v_types[v][1] + 1):
            x[v, t] = lp.dvar()
            lp.st(x[v, t] >= 0, x[v, t] <= max_charging_speed)
            temp += x[v, t]
        lp.st(temp == v_types[v][2])

    for t in range(1, T + 1):
        flow[t] = lp.dvar()
        lp.st(sum(x[v, t] * v_rates[v] for v in vt_types[t]) == flow[t])
        lp.st(flow[t] <= gama)

    lp.min(d * gama + sum(et[t] * flow[t] for t in range(1, T + 1)))
    lp.solve(solver, log=True)

    # Extract optimal schedule
    x_lp = {}
    for v in range(len(v_rates)):
        for t in range(v_types[v][0], v_types[v][1] + 1):
            x_lp[v, t] = x[v, t].get()

    return x_lp

def define_saa_model(v_types, v_rates, vt_types, params, samples):
    """
    Define and solve the sample average approximation (SAA) model.

    Args:
        v_types (list): List of vehicle types.
        v_rates (list): Arrival rates for vehicle types.
        vt_types (dict): Mapping of time periods to vehicle types.
        params (dict): Simulation parameters.
        samples (np.ndarray): Simulated arrival samples for SAA.

    Returns:
        dict: Optimal charging schedules for each vehicle.
    """
    saa = ro.Model("SAA")
    T = params["T"]
    max_charging_speed = params["max_charging_speed"]
    et = params["et"]
    d = params["d"]
    S = params["S"]

    x = {}
    flow = {}
    gama = saa.dvar(S)

    for v in range(len(v_rates)):
        temp = 0
        for t in range(v_types[v][0], v_types[v][1] + 1):
            x[v, t] = saa.dvar()
            saa.st(x[v, t] >= 0, x[v, t] <= max_charging_speed)
            temp += x[v, t]
        saa.st(temp == v_types[v][2])

    for t in range(1, T + 1):
        flow[t] = saa.dvar(S)
        saa.st(sum(x[v, t] * samples[:, v] for v in vt_types[t]) == flow[t])
        saa.st(flow[t] <= gama)

    saa.min(sum(d * gama + sum(et[t] * flow[t] for t in range(1, T + 1))) * (1/S) )
    saa.solve(solver, log=True)# params={'LpMethod':2})
    #saa.solve(solver, log=True, params={"optimizer": "intpnt"})

    # Extract optimal schedule
    x_saa = {}
    for v in range(len(v_rates)):
        for t in range(v_types[v][0], v_types[v][1] + 1):
            x_saa[v, t] = x[v, t].get()

    return x_saa

# -------------------- Main Execution --------------------

def main():
    """
    Main execution function for the simulation and optimization setup.
    """
    # Initialize parameters
    params = initialize_parameters()

    # Generate arrival rates and vehicle types
    v_types, v_rates = generate_arrival_rates(params)

    # Simulate arrivals
    test_data = simulate_arrivals(v_rates, params["test_size"], params["random_seed"])
    samples = simulate_arrivals(v_rates, params["S"], params["random_seed"]+1) # samples used in SAA, different seed from the test

    # Map vehicle types to time periods
    vt_types = map_vehicle_types_to_time(v_types, params["T"])

    # Solve exponential cone program model
    x_ecp = define_ecp_model(v_types, v_rates, vt_types, params)

    # Solve deterministic model
    x_lp = define_deterministic_model(v_types, v_rates, vt_types, params)

    # Solve SAA model
    x_saa = define_saa_model(v_types, v_rates, vt_types, params, samples)

    # Perform out-of-sample testing
    average_cost_ecp = out_of_sample_test(x_ecp, test_data, vt_types, params)
    average_cost_lp = out_of_sample_test(x_lp, test_data, vt_types, params)
    average_cost_saa = out_of_sample_test(x_saa, test_data, vt_types, params)

    print(f"Average out-of-sample cost (ECP): {average_cost_ecp:.2f}")
    print(f"Average out-of-sample cost (Deterministic): {average_cost_lp:.2f}")
    print(f"Average out-of-sample cost (SAA) with {params['S']} samples: {average_cost_saa:.2f}")

if __name__ == "__main__":
    main()


Being solved by COPT...
Model fingerprint: 133908a6

Using Cardinal Optimizer v7.2.3 on macOS (aarch64)
Hardware has 8 cores and 8 threads. Using instruction set ARMV8 (30)
Minimizing a CONIC problem

The original problem has:
    9022 rows, 13704 columns and 27444 non-zero elements
    2740 exponential cones
The presolved problem has:
    8766 rows, 13547 columns and 26776 non-zero elements
    2740 exponential cones

Starting barrier solver using 8 threads

Problem info:
Range of matrix coefficients:    [4e-03,1e+00]
Range of rhs coefficients:       [2e+00,4e+01]
Range of bound coefficients:     [5e+00,4e+01]
Range of cost coefficients:      [1e-01,1e-01]

Factor info:
Number of dense columns:         0
Number of matrix entries:        6.006e+04
Number of factor entries:        7.713e+04
Number of factor flops:          4.981e+05

Iter       Primal.Obj         Dual.Obj      Compl  Primal.Inf  Dual.Inf    Time
   0  +0.00000000e+00  -5.00000000e+00   1.36e+04    4.00e+01  1.41e+00   0