In [None]:
import gym

env = gym.make("CartPole-v1")
obs = env.reset()
done = False
while not done:
    action = env.action_space.sample()
    obs, reward, done, truncated, info = env.step(action)
env.close()
print("Gym installation seems to be working!")

In [2]:
import numpy as np
import gym
from gym import spaces

In [4]:
class FleetEVChargingEnv(gym.Env):
    """
    An environment for a fleet of EVs with three possible actions per vehicle:
      - 0: Idle
      - 1: In Use
      - 2: Charging
    """
    def __init__(
        self,
        num_vehicles=3,
        num_time_periods=5,
        charging_rate=0.2,
        discharging_rate_idle=0.02,
        discharging_rate_in_use=0.1,
        electricity_price=None,
        soc_min=0.0,
        soc_max=1.0,
        desired_soc=0.8,
        penalty_deviation=1.0,
        time_steps=2,
        chargers=5,
    ):
        super().__init__()

        self.num_vehicles = num_vehicles
        self.num_time_periods = num_time_periods
        self.time_steps = time_steps # Number of time steps considered for a cycle - work on it later
        self.active_vehicles = np.ones(num_time_periods) # at least one vehicle active at each time
        self.charging_rate = charging_rate
        self.discharging_rate_idle = discharging_rate_idle
        self.discharging_rate_in_use = discharging_rate_in_use
        self.chargers = chargers

        self.soc_min = soc_min
        self.soc_max = soc_max
        self.desired_soc = desired_soc
        self.penalty_deviation = penalty_deviation

        # If not provided, default to a constant price array
        if electricity_price is None:
            electricity_price = np.ones(num_time_periods) * 10.0
        self.electricity_price = np.array(electricity_price, dtype=float)

        # Current time step
        self.current_time = 0

        # Define observation & action spaces
        self.observation_space = spaces.Box(
            low=np.float32(self.soc_min),
            high=np.float32(self.soc_max),
            shape=(self.num_vehicles,),
            dtype=np.float32
        )
        # Each vehicle's action ∈ {0,1,2}: Idle, In Use, Charging
        self.action_space = spaces.MultiDiscrete([3] * self.num_vehicles)
        #self.action_space = spaces.Tuple((spaces.MultiDiscrete([3] * self.num_vehicles), spaces.MultiBinary(3)))

        # Internal state (array of shape (num_vehicles,) for SOC)
        self.soc = None

    def reset(self, seed=None, options=None):
        """Resets the environment."""
        super().reset(seed=seed)
        self.current_time = 0

        # Initialize SOC in [0.5, 0.7] or as desired
        self.soc = np.random.uniform(low=0.5, high=0.7, size=(self.num_vehicles,))

        # Return observation + info
        return self._get_obs(), {}

    # def mask_actions(self):
    #   n_active = self.active_vehicles[self.current_time]
    #   return mask

    def step(self, action):
        """Executes one step using the given action."""
        assert self.action_space.contains(action), f"Invalid action: {action}"
        assert np.sum(action == 1) < self.active_vehicles[self.current_time], f"Need at least {self.active_vehicles[self.current_time]} active vehicles"
        assert np.sum(action == 2) < self.chargers, f"Max {self.chargers} chargers only"

        for i in range(self.num_vehicles):
          if action[i] == 1:
            assert self.soc[i] >= self.soc_min, f"Vehicle {i} SOC below minimum, can't run"

        # Cost of charging
        price_t = self.electricity_price[self.current_time]
        num_charging = np.sum(action == 2)
        cost = price_t * num_charging

        # Penalize deviation from desired SOC
        deviation = np.sum(np.abs(self.soc - self.desired_soc))
        penalty = self.penalty_deviation * deviation

        # Reward is negative of (cost + penalty)
        reward = - (cost + penalty)

        # State update (SOC) depending on the action
        for v in range(self.num_vehicles):
            if action[v] == 2:
                # Charging
                self.soc[v] += self.charging_rate
            elif action[v] == 1:
                # In Use => higher discharge
                self.soc[v] -= self.discharging_rate_in_use
            else:
                # Idle => small discharge
                self.soc[v] -= self.discharging_rate_idle

            # Clamp SOC to [soc_min, soc_max]
            self.soc[v] = np.clip(self.soc[v], self.soc_min, self.soc_max)

        # Advance time
        self.current_time += 1
        done = (self.current_time >= self.num_time_periods)
        truncated = False
        info = {
            "cost": cost
        }

        return self._get_obs(), reward, done, truncated, info

    def _get_obs(self):
        """Returns the current SOC as observation."""
        return self.soc.astype(np.float32)

    def render(self, mode='human'):
        """Prints current step and SOC."""
        print(f"Time: {self.current_time}, SOC: {self.soc}")

    def close(self):
        pass

In [None]:
# ---------------------------------------------------------------------------
# Example usage:
if __name__ == "__main__":
    env = FleetEVChargingEnv(
        num_vehicles=36,
        num_time_periods=5,
        charging_rate=0.2,
        discharging_rate_idle=0.02,
        discharging_rate_in_use=0.1,
        electricity_price=[10, 12, 8, 9, 11],
        soc_min=0.0,
        soc_max=1.0,
        desired_soc=0.8,
        penalty_deviation=2.0
    )

    obs, info = env.reset()
    done = False
    total_reward = 0.0
    schedule_log = []  # list to store step-by-step decisions

    while not done:
        # Randomly sample an action from {0,1,2} for each vehicle
        action = env.action_space.sample()

        # Execute the step
        next_obs, reward, done, truncated, info = env.step(action)
        # Combine done & truncated if needed
        done = done or truncated

        total_reward += reward

        # Log step info
        schedule_log.append({
            "time": env.current_time,       # time after we advanced
            "action": action.copy(),
            "SOC": next_obs.copy(),
            "reward": reward,
            "cost": info["cost"]
        })

        obs = next_obs

        env.render()

    print("Episode finished! Total reward:", total_reward)

Time: 1, SOC: [0.8729846  0.73171351 0.58511049 0.57301318 0.5689711  0.4757485
 0.87904269 0.52982415 0.87812258 0.70058684 0.80082287 0.72482996
 0.49819689 0.52431909 0.43799547 0.87787619 0.53989864 0.52611242
 0.48496558 0.89670737 0.66887155 0.66558647 0.56156539 0.86149289
 0.75045019 0.58024365 0.50593244 0.89451677 0.55081173 0.88737837
 0.5272066  0.67545337 0.53856999 0.8258702  0.77615156 0.77535422]
Time: 2, SOC: [0.7729846  0.93171351 0.78511049 0.55301318 0.5489711  0.3757485
 0.77904269 0.42982415 1.         0.90058684 0.70082287 0.62482996
 0.47819689 0.42431909 0.41799547 0.85787619 0.51989864 0.50611242
 0.68496558 0.87670737 0.86887155 0.56558647 0.76156539 0.84149289
 0.95045019 0.78024365 0.70593244 0.87451677 0.75081173 0.86737837
 0.5072066  0.57545337 0.43856999 1.         0.75615156 0.75535422]
Time: 3, SOC: [0.6729846  0.91171351 0.98511049 0.53301318 0.4489711  0.5757485
 0.97904269 0.62982415 0.9        0.80058684 0.68082287 0.82482996
 0.37819689 0.3243190

In [None]:
schedule_log

[{'time': 1,
  'action': array([2, 2, 1, 1, 1, 1, 2, 0, 2, 2, 2, 2, 0, 1, 1, 2, 0, 0, 1, 2, 0, 0,
         1, 2, 2, 0, 1, 2, 0, 2, 0, 0, 1, 2, 2, 2]),
  'SOC': array([0.8729846 , 0.73171353, 0.5851105 , 0.5730132 , 0.5689711 ,
         0.4757485 , 0.8790427 , 0.52982414, 0.87812257, 0.70058686,
         0.80082285, 0.72483   , 0.4981969 , 0.5243191 , 0.43799546,
         0.87787616, 0.53989863, 0.52611244, 0.4849656 , 0.89670736,
         0.6688715 , 0.6655865 , 0.5615654 , 0.8614929 , 0.7504502 ,
         0.58024365, 0.50593245, 0.89451677, 0.5508117 , 0.8873784 ,
         0.5272066 , 0.67545336, 0.53857   , 0.8258702 , 0.77615154,
         0.7753542 ], dtype=float32),
  'reward': -173.29540495635436,
  'cost': 160.0},
 {'time': 2,
  'action': array([1, 2, 2, 0, 0, 1, 1, 1, 2, 2, 1, 1, 0, 1, 0, 0, 0, 0, 2, 0, 2, 1,
         2, 0, 2, 2, 2, 0, 2, 0, 0, 1, 1, 2, 0, 0]),
  'SOC': array([0.7729846 , 0.9317135 , 0.7851105 , 0.5530132 , 0.5489711 ,
         0.37574852, 0.7790427 , 0.42982414