# DDQN + A2C + GA

In [None]:
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt


## Double DQN

In [None]:
import numpy as np
import random
from collections import deque

import torch
import torch.nn as nn
import torch.optim as optim


class DDQNAgentTorch:
    # Shared epsilon for all instances
    common_epsilon = 1.0
    common_epsilon_min = 0.0001
    common_epsilon_decay = 0.995

    def __init__(self,
                 state_size,
                 action_size,
                 gamma=0.99,
                 learning_rate=1e-3,
                 memory_size=2500,
                 device=None):
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma                    # discount factor
        self.learning_rate = learning_rate
        self.memory = deque(maxlen=memory_size)  # control memory size
        self.loss = []

        # Device: GPU if available, else CPU
        if device is None:
            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        else:
            self.device = torch.device(device)

        # Online and target networks
        self.model = self._build_model().to(self.device)
        self.target_model = self._build_model().to(self.device)
        self.update_target_model()  # copy initial weights

        # Optimizer and loss
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()


    def _build_model(self):
        """
        state -> 2 layers -> Q(s, a) for all actions
        """
        model = nn.Sequential(
            nn.Linear(self.state_size, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, self.action_size)
        )
        return model

    def remember(self, state, action, reward, next_state, done):
        """
        Store a transition in memory.
        state, next_state should be 1D np arrays (state_size,)
        action: int, reward: float, done: bool
        """
        self.memory.append((state, action, reward, next_state, done))

    def action(self, state):
        """
        ε-greedy policy using the online model.
        state is expected to be shape (state_size,) or (1, state_size).
        Output: int action.
        """
        # Explore
        if np.random.rand() <= DDQNAgentTorch.common_epsilon:
            return random.randrange(self.action_size)

        # Exploit
        if isinstance(state, np.ndarray):
            state = torch.from_numpy(state).float()
        state = state.to(self.device)

        if state.dim() == 1:
            state = state.unsqueeze(0)  # (1, state_size)

        with torch.no_grad():
            q_values = self.model(state)  # (1, action_size)
        action = torch.argmax(q_values[0]).item()
        return action


    def test_action(self, state):
        """
        Greedy action (no exploration), for evaluation.
        """
        if isinstance(state, np.ndarray):
            state = torch.from_numpy(state).float()
        state = state.to(self.device)

        if state.dim() == 1:
            state = state.unsqueeze(0)

        with torch.no_grad():
            q_values = self.model(state)
        action = torch.argmax(q_values[0]).item()
        return action


    def experience_replay(self, batch_size):
        """
        Perform update using a minibatch from memory.
        """
        minibatch = random.sample(self.memory, batch_size)

        # Stack states and next_states for vectorized prediction
        states = np.vstack([exp[0] for exp in minibatch])       # (B, state_size)
        next_states = np.vstack([exp[3] for exp in minibatch])  # (B, state_size)

        states = torch.from_numpy(states).float().to(self.device)       # (B, state_size)
        next_states = torch.from_numpy(next_states).float().to(self.device)

        # Q(s, ·) from online network
        q_values = self.model(states)                   # (B, action_size)

        # Q(s', ·) from online network (for argmax)
        with torch.no_grad():
            q_next_online = self.model(next_states)     # (B, action_size)
            # Q(s', ·) from target network (for value)
            q_next_target = self.target_model(next_states)  # (B, action_size)

        # Build target Q-values
        target_q = q_values.clone().detach()  # (B, action_size)

        for idx, (_, action, reward, _, done) in enumerate(minibatch):

            if done:
                target = reward
            else:
                # Double DQN: choose action using online net, value from target net
                best_next_action = torch.argmax(q_next_online[idx]).item()
                target = reward + self.gamma * q_next_target[idx, best_next_action].item()

            target_q[idx, action] = target

        # Optimize the online network
        self.optimizer.zero_grad()
        loss = self.criterion(q_values, target_q)
        loss.backward()
        self.optimizer.step()

        self.loss.append(loss.item())

    @classmethod
    def decay_epsilon(cls):
        if cls.common_epsilon > cls.common_epsilon_min:
            cls.common_epsilon *= cls.common_epsilon_decay

    def update_target_model(self):
        """
        Copy weights from online model to target model.
        """
        self.target_model.load_state_dict(self.model.state_dict())


#### DDQN Train loop over episodes

In [None]:
env = EvChargingEnv(device="cuda" if torch.cuda.is_available() else "cpu")

state_size = env.observation_space["state"].shape[0]
action_size = env.action_space.n

DDQN_agent = DDQNAgentTorch(state_size, action_size)
DDQN_return = []

# As an example
num_episodes = 100
batch_size = 64

for episode in range(num_episodes):
    obs, info = env.reset()
    state = obs["state"].detach().cpu().numpy()  # (state_size,)

    done = False
    truncated = False
    episode_return = 0.0

    while not (done or truncated):
        # choose action (int)
        action = DDQN_agent.action(state)

        # env expecrequires tensor action on env._device
        action_tensor = torch.tensor(action, device=env._device)
        next_obs, reward, done, truncated, info = env.step(action_tensor)

        next_state = next_obs["state"].detach().cpu().numpy()

        reward = float(reward.item() if isinstance(reward, torch.Tensor) else reward)
        episode_return += reward

        DDQN_agent.remember(state, action, reward, next_state, done or truncated)

        if len(DDQN_agent.memory) > batch_size:
            DDQN_agent.experience_replay(batch_size=batch_size)

        state = next_state

    DDQNAgentTorch.decay_epsilon()
    DDQN_agent.update_target_model()   # At the end of each episode

    DDQN_return.append(episode_return)
    print(f"Episode {episode}, return = {episode_return:.3f}")


## Actor-Critic (A2C)

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical


class ActorNet(nn.Module):
    def __init__(self, state_size, action_size):
        super().__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        logits = self.fc3(x)
        probs = torch.softmax(logits, dim=-1)
        return probs


class CriticNet(nn.Module):
    def __init__(self, state_size):
        super().__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        value = self.fc3(x)   # (B, 1)
        return value


class ActorCriticAgent:
    def __init__(self,
                 state_size,
                 action_size,
                 gamma=0.99,
                 actor_lr=1e-3,
                 critic_lr=1e-3):
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Build actor and critic neural networks
        self.actor = ActorNet(state_size, action_size).to(self.device)
        self.critic = CriticNet(state_size).to(self.device)

        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)

        self.actor_losses = []
        self.critic_losses = []

    # Action selection by actor NN
    def select_action(self, state):
        """
        state: np.array or tensor, shape (state_size,) or (1, state_size)
        returns: action (int), log_prob (torch scalar)
        """
        if isinstance(state, np.ndarray):
            state = torch.from_numpy(state).float()
        state = state.to(self.device)

        if state.dim() == 1:
            state = state.unsqueeze(0)  # (1, state_size)

        probs = self.actor(state).squeeze(0)   # (action_size,)

        dist = Categorical(probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)  # scalar

        return action.item(), log_prob

    def greedy_action(self, state):
        """Deterministic (argmax) action for evaluation."""
        if isinstance(state, np.ndarray):
            state = torch.from_numpy(state).float()
        state = state.to(self.device)

        if state.dim() == 1:
            state = state.unsqueeze(0)

        with torch.no_grad():
            probs = self.actor(state).squeeze(0)
        action = torch.argmax(probs).item()
        return action

    # A2C update (update actor and critic NNs each time step)
    def train_step(self, state, action_log_prob, reward, next_state, done):
        """
        A2C update (one-step TD):
          td_target = r + gamma * V(next_state) * (1 - done)
          td_error  = td_target - V(state)

        actor_loss  = -log π(a|s) * td_error.detach()
        critic_loss = td_error**2
        """
        # Convert to tensors
        if isinstance(state, np.ndarray):
            state = torch.from_numpy(state).float()
        if isinstance(next_state, np.ndarray):
            next_state = torch.from_numpy(next_state).float()

        state = state.to(self.device)
        next_state = next_state.to(self.device)

        if state.dim() == 1:
            state = state.unsqueeze(0)
        if next_state.dim() == 1:
            next_state = next_state.unsqueeze(0)

        reward = torch.tensor([reward], dtype=torch.float32, device=self.device)
        done = torch.tensor([float(done)], dtype=torch.float32, device=self.device)

        # Critic values
        value = self.critic(state).squeeze(1)        # (1,)
        next_value = self.critic(next_state).squeeze(1)  # (1,)

        td_target = reward + self.gamma * next_value * (1.0 - done)
        td_error = td_target - value                 # (1,)

        # Critic loss (MSE)
        critic_loss = (td_error.pow(2)).mean()

        #update critic network's parameters
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Actor loss (advantage = td_error.detach())
        advantage = td_error.detach()
        actor_loss = -(action_log_prob * advantage).mean()

        #update actor network's parameters
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        self.actor_losses.append(actor_loss.item())
        self.critic_losses.append(critic_loss.item())

#### A2C Train loop over episodes

In [None]:
env = EvChargingEnv(device="cuda" if torch.cuda.is_available() else "cpu")

state_size = env.observation_space["state"].shape[0]
action_size = env.action_space.n

A2C_agent = ActorCriticAgent(state_size, action_size)
A2C_return = []

num_episodes = 100

for ep in range(num_episodes):
    obs, info = env.reset()
    state = obs["state"].detach().cpu().numpy()

    done = False
    truncated = False
    episode_return = 0.0

    while not (done or truncated):
        action_idx, log_prob = A2C_agent.select_action(state)

        # env expects a torch.Tensor action on env._device
        action_tensor = torch.tensor(action_idx, device=env._device)
        next_obs, reward, done, truncated, info = env.step(action_tensor)

        next_state = next_obs["state"].detach().cpu().numpy()

        episode_return += float(reward.item() if isinstance(reward, torch.Tensor) else reward)
        A2C_return.append(episode_return)

        done_flag = done or truncated

        A2C_agent.train_step(state, log_prob, reward, next_state, done_flag)

        state = next_state

    print(f"Episode {ep}, return = {episode_return:.3f}")


## Genetic Algorithm Baseline (Heuristic model)


In [None]:
def heuristic_action_from_genome(genome, env, obs):
    """
    genome: np.array of shape (6,) -> weights [w0,w1,w2,w3,w4,w5]
    env: EvChargingEnv instance
    obs: observation dict from env ({"state": torch.Tensor})

    Output: int action = selected station index
    """
    state_tensor = obs["state"]

    # Convert to numpy on CPU
    if isinstance(state_tensor, torch.Tensor):
        state_np = state_tensor.detach().cpu().numpy()
    else:
        state_np = np.array(state_tensor, dtype=np.float32)


    # observation:[ steps_until_next_arrival, car_to_route_state(5), station_0_state, station_1_state, ... ]
    steps_until_next = state_np[0]
    car_vec = state_np[1:1 + NB_STATE_PER_CAR]  # length 5

    travel_time_norm = car_vec[0]
    soc = car_vec[1]
    desired_soc = car_vec[2]
    capacity_norm = car_vec[3]
    urgency = car_vec[4]

    # If there is no car to route (filler = -1), action doesn't matter → choose 0
    if travel_time_norm == FILLER_VALUE:
        return 0

    scores = []

    # genome = [w0, w1, w2, w3, w4, w5]
    w0, w1, w2, w3, w4, w5 = genome

    # Normalize features in get_state())
    for station in env.stations:
        station_state_t = station.get_state()                # torch.Tensor
        station_state = station_state_t.detach().cpu().numpy()

        # First 8 entries = station_only_state:[charge_speed_norm, charge_speed_sharpness_norm, nb_free_chargers_norm, nb_cars_traveling_norm,
        #  nb_cars_charging_norm, nb_cars_waiting_norm, is_max_nb_cars_traveling_reached, is_max_nb_cars_waiting_reached]
        charge_speed_norm      = station_state[0]
        nb_free_chargers_norm  = station_state[2]
        nb_cars_waiting_norm   = station_state[5]

        # Define features for scoring
        x1 = charge_speed_norm
        x2 = nb_free_chargers_norm
        x3 = nb_cars_waiting_norm
        x4 = urgency
        x5 = 1.0 - soc

        # Linear scoring heuristic
        score = w0 + w1 * x1 + w2 * x2 - w3 * x3 + w4 * x4 + w5 * x5
        scores.append(score)

    best_station = int(np.argmax(scores))
    return best_station


def evaluate_genome(genome, env, n_eval_episodes=3, max_steps_per_episode=None):
    total_return = 0.0

    for ep in range(n_eval_episodes):
        obs, info = env.reset()

        episode_return = 0.0
        done = False
        truncated = False

        while not (done or truncated):

            # Select action from heuristic
            action_idx = heuristic_action_from_genome(genome, env, obs)
            action_tensor = torch.tensor(action_idx, device=env._device) # Env needs a torch.Tensor action

            obs, reward, done, truncated, info = env.step(action_tensor)

            reward = float(reward.item()) if isinstance(reward, torch.Tensor) else reward  # reward: torch scalar -> float
            episode_return += reward

        total_return += episode_return

    mean_return = total_return / n_eval_episodes

    return mean_return



In [None]:
class GeneticAlgorithmEV:
    def __init__(
        self,
        env,
        genome_dim=6,
        population_size=20,
        n_generations=10,
        elite_frac=0.2,    # The top 20% of genomes (by fitness) survive to the next generation
        mutation_std=0.1,  # Standard deviation of Gaussian noise added to genome values during mutation.
        mutation_prob=0.3, # Probability that a newly created child (via crossover) gets mutated -> 30% get Gaussian noise, others unchanged
        n_eval_episodes=3,
        seed=42,
    ):
        self.genome_dim = genome_dim
        self.population_size = population_size
        self.n_generations = n_generations
        self.elite_frac = elite_frac
        self.mutation_std = mutation_std
        self.mutation_prob = mutation_prob
        self.n_eval_episodes = n_eval_episodes

        # Fixed random seed for reproducibility
        np.random.seed(seed)

        self.best_genome = None
        self.best_fitness = -np.inf
        self.best_fitness_history = []

    def _init_population(self):
        # Uniform init in [-1, 1]
        return np.random.uniform(-1.0, 1.0, size=(self.population_size, self.genome_dim))

    def _evaluate_population(self, population):
        fitnesses = []
        for genome in population:
            fit = evaluate_genome(genome, env, n_eval_episodes=self.n_eval_episodes)
            fitnesses.append(fit)
        return np.array(fitnesses)

    def run(self):
        population = self._init_population()
        n_elite = max(1, int(self.elite_frac * self.population_size))

        for gen in range(self.n_generations):
            fitnesses = self._evaluate_population(population)

            # Track best
            best_idx = np.argmax(fitnesses)
            best_fit = fitnesses[best_idx]
            best_gen = population[best_idx]

            if best_fit > self.best_fitness:
                self.best_fitness = best_fit
                self.best_genome = best_gen.copy()

            self.best_fitness_history.append(self.best_fitness)

            print(f"[GA] Gen {gen} | gen_best={best_fit:.3f} | overall_best={self.best_fitness:.3f}")

            # Select elites
            elite_indices = np.argsort(fitnesses)[-n_elite:]
            elites = population[elite_indices]

            # Build new population
            new_pop = [elites[0]]   # keep the best individual always

            while len(new_pop) < self.population_size:
                # Select two parents randomly from elite set
                p1, p2 = elites[np.random.randint(0, n_elite)], elites[np.random.randint(0, n_elite)]

                # Uniform crossover
                mask = np.random.rand(self.genome_dim) < 0.5
                child = np.where(mask, p1, p2)

                # Mutation
                if np.random.rand() < self.mutation_prob:
                    child = child + np.random.normal(0, self.mutation_std, size=self.genome_dim)

                new_pop.append(child)

            population = np.array(new_pop)

        return self.best_genome, self.best_fitness, self.best_fitness_history


    def _init_population(self):
        # Initialize weights in [-1, 1]
        pop = self.rng.uniform(-1.0, 1.0, size=(self.population_size, self.genome_dim))
        return pop

    def _evaluate_population(self, population):
        fitnesses = []
        for i in range(len(population)):
            genome = population[i]
            fit = evaluate_genome(
                genome,
                n_eval_episodes=self.n_eval_episodes,
                env_kwargs=self.env_kwargs,
            )
            fitnesses.append(fit)
        return np.array(fitnesses)

    def run(self):
        population = self._init_population()
        n_elite = max(1, int(self.elite_frac * self.population_size))

        for gen in range(self.n_generations):
            # 1) Evaluate
            fitnesses = self._evaluate_population(population)

            # 2) Track best
            gen_best_idx = int(np.argmax(fitnesses))
            gen_best_fit = fitnesses[gen_best_idx]
            gen_best_genome = population[gen_best_idx].copy()

            if gen_best_fit > self.best_fitness:
                self.best_fitness = gen_best_fit
                self.best_genome = gen_best_genome.copy()

            self.best_fitness_history.append(self.best_fitness)
            print(
                f"[GA] Generation {gen}/{self.n_generations-1} "
                f"best_gen_fit = {gen_best_fit:.3f}, "
                f"overall_best = {self.best_fitness:.3f}"
            )

            # 3) Selection: take elites
            elite_indices = np.argsort(fitnesses)[-n_elite:]
            elites = population[elite_indices]

            # 4) Create new population
            new_pop = [elites[0]]  # keep the single best (elitism)

            while len(new_pop) < self.population_size:
                # parents: random elites
                p1, p2 = elites[self.rng.randint(0, n_elite, size=2)]

                # uniform crossover
                mask = self.rng.rand(self.genome_dim) < 0.5
                child = np.where(mask, p1, p2)

                # mutation
                if self.rng.rand() < self.mutation_prob:
                    noise = self.rng.normal(0.0, self.mutation_std, size=self.genome_dim)
                    child = child + noise

                new_pop.append(child)

            population = np.array(new_pop)

        return self.best_genome, self.best_fitness, self.best_fitness_history



### GA Evaluation

In [None]:
def evaluate_best_genome(best_genome, env, n_eval_episodes=20):
    returns = []

    for ep in range(n_eval_episodes):
        obs, info = env.reset()

        episode_return = 0.0
        done = False
        truncated = False

        while not (done or truncated):
            action_idx = heuristic_action_from_genome(best_genome, env, obs)
            action_tensor = torch.tensor(action_idx, device=env._device)

            obs, reward, done, truncated, info = env.step(action_tensor)

            if isinstance(reward, torch.Tensor):
                r = float(reward.item())
            else:
                r = float(reward)
            episode_return += r

            # Optional safety stop if you want
            if env.step_count >= env.max_steps:
                break

        returns.append(episode_return)

    returns = np.array(returns)
    print(f"Final GA heuristic evaluation over {n_eval_episodes} episodes:")
    print(f"  Mean return = {returns.mean():.3f}")
    print(f"  Std return  = {returns.std():.3f}")
    return returns


#### GA Train loop over episodes + Evaluation phase

In [None]:
if __name__ == "__main__":
    # Create ONE environment instance
    env = EvChargingEnv(
        device="cpu",
        seed=123,
        nb_stations=5,
        max_steps=1000,
        # add other parameters if you changed defaults
    )

    # Run GA to optimize heuristic weights
    ga = GeneticAlgorithmEV(
        env=env,
        genome_dim=6,
        population_size=20,
        n_generations=10,
        elite_frac=0.3,
        mutation_std=0.1,
        mutation_prob=0.3,
        n_eval_episodes=2,   # small for speed; you can increase later
        seed=42,
    )

    best_genome, best_fit, fitness_history = ga.run()

    print("Best genome (weights):", best_genome)
    print("Best fitness (mean return during GA eval):", best_fit)

    # Plot GA best fitness over generations
    plt.figure()
    plt.plot(fitness_history, marker='o')
    plt.xlabel("Generation")
    plt.ylabel("Best mean return")
    plt.title("GA best fitness per generation")
    plt.grid(True)
    plt.show()

    # Final evaluation of best heuristic policy
    final_returns = evaluate_best_genome(
        best_genome,
        env,
        n_eval_episodes=20,
    )
