In [None]:
!pip3 install highway_env

In [None]:
import gymnasium
import highway_env
import numpy as np
import pprint

import torch
import torch.nn as nn
import torch.nn.functional as F
import copy



In [None]:


env = gymnasium.make("highway-v0", render_mode='rgb_array')
# pprint.pprint(env.unwrapped.config)
env.unwrapped.config["lanes_count"] = 3
env.unwrapped.config["duration"] = 10
env.unwrapped.config["vehicles_density"] = 2 # 3
env.unwrapped.config["vehicles_count"] = 10

# env.unwrapped.config["action"]["type"] = "DiscreteAction"
# ACTION_SIZE = 9
ACTION_SIZE = 5
observation = {
        "type": "Kinematics",
        "vehicles_count": 5,
        "features": ["x", "y", "vx", "vy", "cos_h"],
        "features_range": {
            "x": [-100, 100],
            "y": [-100, 100],
            "vx": [-20, 20],
            "vy": [-20, 20]
        },
        "absolute": True,
        # "absolute": True,
        "order": "sorted"
    }
env.unwrapped.config["observation"] = observation



In [None]:

class Model(nn.Module):
    def __init__(self, input_size, output_size, hidden_size=50):
        super().__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.layer_1 = nn.Linear(input_size, hidden_size)
        self.norm_1 = nn.BatchNorm1d(hidden_size)
        self.layer_2 = nn.Linear(hidden_size, hidden_size)
        self.norm_2 = nn.BatchNorm1d(hidden_size)
        self.layer_3 = nn.Linear(hidden_size, output_size)
        self.norm_3 = nn.BatchNorm1d(output_size)

    def forward(self, obs, batch_size=1):
        batch_norm_on = batch_size != 1
        if obs is None:
            retval = torch.tensor([[1/self.output_size] * self.output_size] * batch_size)
            return retval
        
       
        x = torch.tensor(obs)
        x = x.view(-1, self.input_size)


        x = self.layer_1(x)
        if batch_norm_on:
            x = self.norm_1(x)
        x = F.sigmoid(x)
        x = self.layer_2(x)
        if batch_norm_on:
            x = self.norm_2(x)
        x = F.sigmoid(x)
        x = self.layer_3(x)
        if batch_norm_on:
            x = self.norm_3(x)
        x = F.softmax(x, dim=1)
        
        return x
    


In [None]:
import random


class ReplayMemory(object):
    def __init__(self, capacity=1000):
        self.capacity = capacity
        self.memory = []

    def push(self, *args):
        """Save a transition"""
        self.memory.append(args)
        if len(self.memory) > self.capacity:
            self.memory.pop(0)

    def sample(self, batch_size):
        batch = random.sample(self.memory, batch_size)
        states = []
        actions = []
        rewards = []
        for i, memory in enumerate(batch):
            states.append(torch.tensor(memory[0]))
            actions.append(torch.tensor(memory[1]))
            rewards.append(torch.tensor(memory[2]))
        states = torch.stack(states)
        actions = torch.stack(actions)
        rewards = torch.stack(rewards)
            
        return states, actions, rewards

    def __len__(self):
        return len(self.memory)

In [None]:
def select_action(obs, policy_net):
    if obs is None:
        return 0
    policy = policy_net(obs)
    # policy = policy.detach().numpy().flatten()
    # return np.random.choice(len(policy), p=policy)
    return torch.argmax(policy, dim=1).item()

def optimize_model(policy_net, value_net, memory, BATCH_SIZE, GAMMA, optimizer):

    if len(memory) < BATCH_SIZE:
        return
    # print('learn')
    state_batch, action_batch, reward_batch = memory.sample(BATCH_SIZE)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net

    state_action_values = policy_net(state_batch, batch_size=BATCH_SIZE).gather(1, action_batch.unsqueeze(1))
    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1).values
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.

    next_state_values = torch.max(value_net(state_batch, batch_size=BATCH_SIZE), dim=1).values
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
    print(f'Loss: {loss:.2f}')
    # [batch_size, 1, 1]

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()
    


In [None]:
import time
OBS_SIZE = env.unwrapped.config["observation"]["vehicles_count"] * len(env.unwrapped.config["observation"]["features"])
env.unwrapped.config["duration"] = 20

EPOCHS = 10000
batch_size = 16
GAMMA = 0.99
TAU = 0.005
LR = 1e-2

policy_net = Model(OBS_SIZE, ACTION_SIZE)
value_net = Model(OBS_SIZE, ACTION_SIZE)
optimizer = torch.optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)


all_parameters = list(policy_net.parameters()) + list(value_net.parameters())
optimizer_random = torch.optim.Adam(all_parameters, lr=0.1)

env.reset()

memory = ReplayMemory()
for epoch in range(EPOCHS):
    print(f'epoch: {epoch}')
    env.reset()
    obs = None
    done = truncated = False
    
    total_reward = 0
    while not (done or truncated):
        action = select_action(obs, policy_net)
        next_obs, reward, done, truncated, info = env.step(action)
        print(info)

        if info["rewards"]["on_road_reward"] == 0:
            pass# reward -= 10
        # "features": ["x", "y", "vx", "vy", "cos_h"],
        if done or truncated:
            break
        if obs is not None:
            # print(f'sub: {abs(next_obs[0][4] - obs[0][4])}')
            # reward -= abs(next_obs[0][4] - obs[0][4]) * 0.2
            memory.push(obs, action, reward)
        obs = next_obs
        
        total_reward += reward
        # print(total_reward)
        
        if epoch < 5: # 5
            target = torch.Tensor([[1 / ACTION_SIZE] * ACTION_SIZE])
            policy = policy_net(obs)
            values = value_net(obs)
            criterion = torch.nn.CrossEntropyLoss()
            loss_policy = criterion(policy, target)
            
            loss_values = criterion(policy, values)
            loss = loss_policy + loss_values
            optimizer_random.zero_grad()
            loss.backward()
            optimizer.step()

            

        else:
            print(f'policy: {policy_net(obs)}')
            print(f'value: {value_net(obs)}')
            optimize_model(policy_net, value_net, memory, batch_size, GAMMA, optimizer)

            value_net_state_dict = value_net.state_dict()
            policy_net_state_dict = policy_net.state_dict()
            for key in policy_net_state_dict:
                value_net_state_dict[key] = value_net_state_dict[key]*TAU + value_net_state_dict[key]*(1-TAU)
            value_net.load_state_dict(value_net_state_dict)
            env.render()




In [None]:

GENERATIONS = 1000
SEEDS_PER_GEN = 10
MODELS = 10
OBS_SIZE = env.unwrapped.config["observation"]["vehicles_count"] * len(env.unwrapped.config["observation"]["features"])

models = [Model(OBS_SIZE, ACTION_SIZE) for i in range(MODELS)]

for generation in range(GENERATIONS):
    env.unwrapped.config["duration"] = 25 + 0.5 * generation
    print(f'Generation: {generation}')

    # create models
    scores = [0 for i in range(MODELS)]

    for i in range(SEEDS_PER_GEN):
        seed = np.random.randint(1_000_000)
        for model_num in range(MODELS):
            print(f'{model_num} ', end='')
            render = False
            if model_num % 1 == 0:
                render = True
            model = models[model_num]

            env.reset(seed=seed)
            done = truncated = False
            obs = None
            score = 0
            while not (done or truncated):
                probabilities = model(obs).view(-1).detach().numpy()
                action = np.argmax(probabilities)
                # action = np.random.choice(ACTION_SIZE, p=probabilities)
                obs, reward, done, truncated, info = env.step(action)
                if info["rewards"]["on_road_reward"] == 0:
                    # reward -= 10
                    done = True
                if render:
                    env.render()
                score += reward
            print(score)

            # if done:
            #     score -= 10

            scores[model_num] += score
        print()
    best_score = max(scores)
    best_model_num = scores.index(best_score)
    best_model = models[best_model_num]
    print(f'Best model_num: {best_model_num} | score: {best_score}\n')
    models = [copy.deepcopy(best_model) for i in range(MODELS)]
    change = 0.1 * (0.999 ** generation)
    for model in models:
        for param in model.parameters():
            param.data += change * torch.randn_like(param)
    torch.save(best_model.state_dict(), f'model_gen{generation}')



