In [8]:
from acnportal.algorithms import BaseAlgorithm, UpperBoundEstimatorBase, infrastructure_constraints_feasible,format_array_schedule
from acnportal.algorithms import (
     enforce_pilot_limit,
     apply_upper_bound_estimate,
     apply_minimum_charging_rate,
     remove_finished_sessions,
 )
import numpy as np
from typing import Callable, List, Optional, Dict
# from upper_bound_estimator import UpperBoundEstimatorBase
# from base_algorithm import BaseAlgorithm
# from utils import infrastructure_constraints_feasible
# from postprocessing import format_array_schedule
# from preprocessing import (
#     enforce_pilot_limit,
#     apply_upper_bound_estimate,
#     apply_minimum_charging_rate,
#     remove_finished_sessions,
# )
from warnings import warn
import copy
from acnportal.acnsim.interface import SessionInfo, InfrastructureInfo, Interface

In [16]:
class SortedSchedulingAlgo(BaseAlgorithm):
    """ Class for sorting based algorithms like First Come First Served (FCFS) and
    Earliest Deadline First (EDF).

    Implements abstract class BaseAlgorithm.

    For this family of algorithms, active EVs are first sorted by some metric, then
    current is allocated to each EV in order. To allocate current we use a binary
    search approach which allocates each EV the maximum current possible subject to the
    constraints and already allocated allotments.

    The argument sort_fn controlled how the EVs are sorted and thus which sorting based
    algorithm is implemented.

    Args:
        sort_fn (Callable[[List[SessionInfo], Interface], List[SessionInfo]]): Function
            which takes in a list of SessionInfo objects and returns a list of the
            same SessionInfo objects but sorted according to some metric.
    """

    _sort_fn: Callable[[List[SessionInfo], Interface], List[SessionInfo]]
    estimate_max_rate: bool
    max_rate_estimator: Optional[UpperBoundEstimatorBase]
    uninterrupted_charging: bool
    allow_overcharging: bool

    def __init__(
        self,
        sort_fn: Callable[[List[SessionInfo], Interface], List[SessionInfo]],
        estimate_max_rate: bool = False,
        max_rate_estimator: Optional[UpperBoundEstimatorBase] = None,
        uninterrupted_charging: bool = False,
        allow_overcharging: bool = False,
    ) -> None:
        super().__init__()
        self._sort_fn = sort_fn
        # Call algorithm each period since it only returns a rate for the
        # next period.
        self.max_recompute = 1
        self.estimate_max_rate = estimate_max_rate
        self.max_rate_estimator = max_rate_estimator
        self.uninterrupted_charging = uninterrupted_charging
        self.allow_overcharging = allow_overcharging
    @staticmethod
    def max_feasible_rate(
        station_index: int,
        ub: float,
        schedule: np.ndarray,
        infrastructure: InfrastructureInfo,
        eps: float = 0.0001,
        lb: float = 0.0,
    ) -> float:
        """ Return the maximum feasible rate less than ub subject to the environment's
        constraints.

        If schedule contains non-zero elements at the given time, these are
        treated as fixed allocations and this function will include them
        when determining the maximum feasible rate for the given EVSE.

        Args:
            station_index (int): Index for the station in the schedule
                vector.
            ub (float): Upper bound on the charging rate. [A]
            schedule (np.array[Union[float, int]]): Array of charging rates, where each
                row is the charging rates for a specific session.
            infrastructure (InfrastructureInfo): Description of the electrical
                infrastructure.
            eps (float): Accuracy to which the max rate should be calculated.
                (When the binary search is terminated.)
            lb (float): Lower bound on the charging rate [A]

        Returns:
            float: maximum feasible rate less than ub subject to the
                environment's constraints. [A]
        """

        def bisection(
            _index: int, _lb: float, _ub: float, _schedule: np.ndarray
        ) -> float:
            """ Use the bisection method to find the maximum feasible charging
                rate for the EV. """
            mid: float = (_ub + _lb) / 2
            _new_schedule = copy(schedule)
            _new_schedule[_index] = mid
            if (_ub - _lb) <= eps:
                return _lb
            elif infrastructure_constraints_feasible(_new_schedule, infrastructure):
                return bisection(_index, mid, _ub, _new_schedule)
            else:
                return bisection(_index, _lb, mid, _new_schedule)

        if not infrastructure_constraints_feasible(schedule, infrastructure):
            raise ValueError("The initial schedule is not feasible.")

        # Test maximum rate to short-circuit bisection
        new_schedule = copy(schedule)
        new_schedule[station_index] = ub
        if infrastructure_constraints_feasible(new_schedule, infrastructure):
            return ub
        return bisection(station_index, lb, ub, schedule)
    def sorting_algorithm(
        self, active_sessions: List[SessionInfo], infrastructure: InfrastructureInfo
    ) -> np.ndarray:
        """ Schedule EVs by first sorting them by sort_fn, then allocating
            them their maximum feasible rate.

        See class documentation for description of the algorithm.

        Args:
            active_sessions (List[SessionInfo]): see BaseAlgorithm
            infrastructure (InfrastructureInfo): Description of the electrical
                infrastructure.

        Returns:
            np.array[Union[float, int]]: Array of charging rates, where each
                row is the charging rates for a specific session.
        """
        queue: List[SessionInfo] = self._sort_fn(active_sessions, self.interface)
        schedule: np.ndarray = np.zeros(infrastructure.num_stations)

        # Start each EV at its lower bound
        for session in queue:
            station_index: int = infrastructure.get_station_index(session.station_id)
            lb: float = max(0, session.min_rates[0])
            schedule[station_index] = lb

        if not infrastructure_constraints_feasible(schedule, infrastructure):
            raise ValueError(
                "Charging all sessions at their lower bound is not feasible."
            )

        for session in queue:
            station_index = infrastructure.get_station_index(session.station_id)
            ub: float = min(
                session.max_rates[0], self.interface.remaining_amp_periods(session)
            )
            lb: float = max(0, session.min_rates[0])
            if infrastructure.is_continuous[station_index]:
                charging_rate: float = self.max_feasible_rate(
                    station_index, ub, schedule, infrastructure, eps=0.01, lb=lb
                )
            else:
                allowable = [
                    a
                    for a in infrastructure.allowable_pilots[station_index]
                    if lb <= a <= ub
                ]

                if len(allowable) == 0:
                    charging_rate = 0
                else:
                    charging_rate = self.discrete_max_feasible_rate(
                        station_index, allowable, schedule, infrastructure
                    )
            schedule[station_index] = charging_rate
        return schedule
    

In [15]:
def least_laxity_first(evs: List[SessionInfo], iface: Interface) -> List[SessionInfo]:
    """ Sort EVs by laxity in increasing order.

    Laxity is a measure of the charging flexibility of an EV. Here we define laxity as:
        LAX_i(t) = (estimated_departure_i - t) - (remaining_demand_i(t) / max_rate_i)

    Args:
        evs (List[SessionInfo]): List of EVs to be sorted.
        iface (Interface): Interface object.

    Returns:
        List[SessionInfo]: List of EVs sorted by laxity in increasing order.
    """

    def laxity(ev: SessionInfo) -> float:
        """ Calculate laxity of the EV.

        Args:
            ev (EV): An EV object.

        Returns:
            float: The laxity of the EV.
        """
        lax = (ev.estimated_departure - iface.current_time) - (
            iface.remaining_amp_periods(ev) / iface.max_pilot_signal(ev.station_id)
        )
        return lax

    return sorted(evs, key=laxity)