In [None]:
# ==============================================================================
# === Part 1: The Environment (EVChargingEnv)
# ==============================================================================
import gymnasium as gym
from gymnasium import spaces
import numpy as np

# --- Configuration Parameters ---
# Based on the project report 
EV_BATTERY_CAPACITY_KWH = 40.0  # kWh, based on a common EV model like the Nissan Leaf 
CHARGER_POWER_KW = 7.4          # kW 
# Price Schedule based on CEB Time-of-Day tariffs 
PEAK_PRICE_PER_KWH = 0.30
OFF_PEAK_PRICE_PER_KWH = 0.10
PEAK_HOURS = {18, 19, 20, 21, 22} # 6 PM to 10 PM

# Simulation Time
START_HOUR = 18 # 6 PM
END_HOUR = 7    # 7 AM
TOTAL_HOURS = (24 - START_HOUR) + END_HOUR # Total duration of the simulation

# Reward Parameters
SUCCESS_REWARD = 100
FAILURE_PENALTY = -500

class EVChargingEnv(gym.Env):
    """A custom Gymnasium environment for simulating EV charging."""

    def __init__(self):
        super(EVChargingEnv, self).__init__()

        # Define the Action Space: 0 for Don't Charge, 1 for Charge 
        self.action_space = spaces.Discrete(2)

        # Define the State Space: [current_battery_level, current_hour]
        # We normalize these values to be between 0 and 1 for the neural network 
        self.observation_space = spaces.Box(
            low=np.array([0.0, 0.0]),
            high=np.array([1.0, 1.0]),
            dtype=np.float32
        )

        # Initialize environment state
        self.current_hour = START_HOUR
        self.battery_kwh = 0.0 # Assume the car starts with 0 charge

    def _get_state(self):
        """Returns the current state, normalized for the agent."""
        normalized_battery = self.battery_kwh / EV_BATTERY_CAPACITY_KWH
        normalized_hour = (self.current_hour - START_HOUR) % 24 / TOTAL_HOURS
        return np.array([normalized_battery, normalized_hour], dtype=np.float32)

    def reset(self, seed=None, options=None):
        """Resets the environment to its initial state for a new episode."""
        super().reset(seed=seed)
        self.current_hour = START_HOUR
        self.battery_kwh = 0.0 # Start with an empty battery
        return self._get_state(), {}

    def step(self, action):
        """Executes one time step within the environment based on the agent's action."""
        assert self.action_space.contains(action), f"{action} is an invalid action."

        cost = 0.0
        if action == 1: # If the agent chooses to charge
            self.battery_kwh += CHARGER_POWER_KW
            self.battery_kwh = min(self.battery_kwh, EV_BATTERY_CAPACITY_KWH)
            current_price = PEAK_PRICE_PER_KWH if self.current_hour in PEAK_HOURS else OFF_PEAK_PRICE_PER_KWH
            cost = CHARGER_POWER_KW * current_price

        # Advance the time by one hour
        self.current_hour = (self.current_hour + 1) % 24

        # Reward Shaping: Give a small bonus for charging to encourage exploration
        charge_increase_bonus = 1.0 if (action == 1 and self.battery_kwh < EV_BATTERY_CAPACITY_KWH) else 0.0
        reward = -cost + charge_increase_bonus

        # Check if the episode has ended
        terminated = self.current_hour == END_HOUR

        # At the final time step, apply the large bonus or penalty 
        if terminated:
            if self.battery_kwh >= EV_BATTERY_CAPACITY_KWH * 0.99:
                reward += SUCCESS_REWARD # Large bonus for success 
            else:
                reward += FAILURE_PENALTY # Large penalty for failure 

        return self._get_state(), reward, terminated, False, {}

# ==============================================================================
# === Part 2: The Agent (ActorCritic)
# ==============================================================================
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class ActorCritic(tf.keras.Model):
    """An Actor-Critic network with increased depth for better feature extraction."""

    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.state_dim = state_dim
        self.action_dim = action_dim

        # A deeper shared network for more complex feature learning
        self.shared_layer_1 = layers.Dense(128, activation="relu")
        self.shared_layer_2 = layers.Dense(128, activation="relu")

        # The Actor-specific head outputs action probabilities
        self.actor_head = layers.Dense(action_dim, activation="softmax")

        # The Critic-specific head outputs the estimated state value
        self.critic_head = layers.Dense(1)

    def call(self, inputs: tf.Tensor) -> tuple[tf.Tensor, tf.Tensor]:
        """Forward pass of the model."""
        x = self.shared_layer_1(inputs)
        shared_features = self.shared_layer_2(x)
        action_probs = self.actor_head(shared_features)
        state_value = self.critic_head(shared_features)
        return action_probs, state_value

# ==============================================================================
# === Part 3: The Training Loop
# ==============================================================================
import collections
import tqdm # A library to show a progress bar
import matplotlib.pyplot as plt

# --- Final Hyperparameters ---
learning_rate = 0.001
gamma = 0.98        # Discount factor
gae_lambda = 0.95   # Lambda for Generalized Advantage Estimation
max_episodes = 1000 # Reduced for faster training

# --- Initialization ---
env = EVChargingEnv()
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = ActorCritic(state_dim, action_dim)
# Optimizer with gradient clipping for stability
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate, clipnorm=1.0)
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

# --- Final Training Loop ---
episode_rewards = []

for episode in tqdm.trange(max_episodes):
    with tf.GradientTape() as tape:
        # --- 1. Collect Experience for one full episode ---
        history_actions = []
        history_action_probs = []
        history_critic_values = []
        history_rewards = []
        
        state, _ = env.reset()
        while True:
            state_tensor = tf.expand_dims(tf.convert_to_tensor(state), 0)
            action_probs, critic_value = agent(state_tensor)
            action = np.random.choice(action_dim, p=np.squeeze(action_probs))
            
            history_actions.append(action)
            history_action_probs.append(action_probs)
            history_critic_values.append(critic_value[0, 0])
            
            state, reward, terminated, _, _ = env.step(action)
            history_rewards.append(reward)
            
            if terminated:
                break
        
        episode_rewards.append(sum(history_rewards))

        # --- 2. Calculate GAE and Returns for the episode ---
        advantages = []
        returns = []
        next_value = 0
        
        for value, reward in zip(reversed(history_critic_values), reversed(history_rewards)):
            td_error = reward + gamma * next_value - value
            advantage = td_error + gamma * gae_lambda * (advantages[-1] if advantages else 0)
            advantages.append(advantage)
            next_value = value
        
        advantages.reverse()
        returns = np.array(advantages) + np.array(history_critic_values)
        advantages = (advantages - np.mean(advantages)) / (np.std(advantages) + 1e-8)

        # --- 3. Calculate Actor and Critic Losses ---
        actor_losses = []
        critic_losses = []

        for i in range(len(history_rewards)):
            action = history_actions[i]
            prob_dist = history_action_probs[i]
            adv = advantages[i]
            ret = returns[i]
            value = history_critic_values[i]

            # Entropy bonus for exploration
            entropy = -tf.math.reduce_sum(prob_dist * tf.math.log(prob_dist + 1e-10))
            
            # Actor loss
            prob = prob_dist[0, action]
            actor_losses.append(-tf.math.log(prob + 1e-10) * adv - 0.01 * entropy)
            
            # Critic loss
            critic_losses.append(huber_loss(tf.expand_dims(value, 0), tf.expand_dims(ret, 0)))

        total_loss = tf.math.reduce_sum(actor_losses) + tf.math.reduce_sum(critic_losses)

    # --- 4. Apply Gradients to update the agent's networks ---
    grads = tape.gradient(total_loss, agent.trainable_variables)
    optimizer.apply_gradients(zip(grads, agent.trainable_variables))

    # Log progress periodically
    if episode % 100 == 0 or episode == max_episodes - 1:
        avg_reward = np.mean(episode_rewards[-100:])
        print(f"\nEpisode {episode}, Average Reward: {avg_reward:.2f}")

print("\n--- Training Complete ---")

# ==============================================================================
# === Part 4: The Evaluation Script
# ==============================================================================
import pandas as pd

# Use the 'agent' object that has now been trained
trained_agent = agent

# Run one full episode with the final, trained policy
state, _ = env.reset()
terminated = False
total_reward = 0
history = []

print("\n--- Running Trained Agent for Evaluation ---")
while not terminated:
    state_tensor = tf.expand_dims(tf.convert_to_tensor(state), 0)
    action_probs, _ = trained_agent(state_tensor)
    
    # Choose the BEST action (highest probability), not a random one
    action = np.argmax(np.squeeze(action_probs))
    
    history.append({'hour': env.current_hour, 'battery_kwh': env.battery_kwh, 'action': action})
    
    state, reward, terminated, _, _ = env.step(action)
    total_reward += reward

history.append({'hour': env.current_hour, 'battery_kwh': env.battery_kwh, 'action': -1})

# Calculate the final cost
final_cost = -(total_reward - SUCCESS_REWARD) 

print(f"Final Battery Level: {env.battery_kwh / EV_BATTERY_CAPACITY_KWH * 100:.2f}%")
print(f"Total Cost to Full Charge: ${final_cost:.2f}")

# ==============================================================================
# === Part 5: The Visualization Script
# ==============================================================================
df = pd.DataFrame(history)

# Create the plot
fig, ax = plt.subplots(figsize=(15, 8))

# Plot battery level over time
ax.plot(df.index, df['battery_kwh'] / EV_BATTERY_CAPACITY_KWH * 100, marker='o', linestyle='-', label='Battery %')

# Set labels, title, and grid
ax.set_xlabel("Time (Hour of Day)")
ax.set_ylabel("Battery Level (%)")
ax.set_title("Optimal Charging Schedule Learned by A2C Agent")
ax.grid(True, which='both', linestyle='--', linewidth=0.5)
ax.set_xticks(df.index)
ax.set_xticklabels(df['hour'])
ax.set_ylim(0, 105)

# Color the background to show peak vs. off-peak periods
for i in range(len(df) - 1):
    hour = df['hour'].iloc[i]
    if hour in PEAK_HOURS:
        ax.axvspan(i, i + 1, facecolor='red', alpha=0.2, label='Peak Price Period' if i == 0 else "")
    else:
        ax.axvspan(i, i + 1, facecolor='green', alpha=0.2, label='Off-Peak Price Period' if i == 6 else "")

# Add markers to show when the agent chose to charge
charge_points = df[df['action'] == 1]
ax.scatter(charge_points.index, charge_points['battery_kwh'] / EV_BATTERY_CAPACITY_KWH * 100, 
           color='blue', s=150, zorder=5, label='Agent Chose to Charge')

plt.legend()
plt.show()