In [5]:
import torch
import copy
import numpy as np

In [6]:
# Define the TransportEnv environment
class TransportEnv:
    def __init__(self, grid_size=4):
        self.grid_size = grid_size
        self.state_space = 7  # State space size
        self.actions = ['north', 'south', 'west', 'east']
        self.reset()

    def reset(self):
        self.flag_collected = 0
        
        self.pickup_x, self.pickup_y = np.random.randint(0, self.grid_size, size=2)
        self.delivery_x, self.delivery_y = np.random.randint(0, self.grid_size, size=2)
        while (self.pickup_x, self.pickup_y) == (self.delivery_x, self.delivery_y):
            self.pickup_x, self.pickup_y = np.random.randint(0, self.grid_size, size=2)
        
        self.agent_x, self.agent_y = np.random.randint(0, self.grid_size, size=2)
        while ((self.agent_x, self.agent_y) == (self.delivery_x, self.delivery_y)) or ((self.agent_x, self.agent_y) == (self.pickup_x, self.pickup_y)):
            self.agent_x, self.agent_y = np.random.randint(0, self.grid_size, size=2)

        return np.array([self.agent_x, self.agent_y, self.flag_collected, self.pickup_x, self.pickup_y, self.delivery_x, self.delivery_y])

    def step(self, action):
        done = False
        if action == 'north' and self.agent_x > 0:
            self.agent_x -= 1
        elif action == 'south' and self.agent_x < self.grid_size - 1:
            self.agent_x += 1
        elif action == 'west' and self.agent_y > 0:
            self.agent_y -= 1
        elif action == 'east' and self.agent_y < self.grid_size - 1:
            self.agent_y += 1

        if self.flag_collected == 0:
            if (self.agent_x, self.agent_y) == (self.pickup_x, self.pickup_y):
                self.flag_collected = 1
                reward = 10
            else:
                reward = -0.1
        else:
            if (self.agent_x, self.agent_y) == (self.delivery_x, self.delivery_y):
                reward = 10
                done = True
            else:
                reward = -0.1

        return np.array([self.agent_x, self.agent_y, self.flag_collected, self.pickup_x, self.pickup_y, self.delivery_x, self.delivery_y]), reward, done

In [7]:
# Define the TransportAgent agent
class TransportAgent:
    def __init__(self, statespace_size=7, gamma=0.99, learning_rate=0.997, start_epsilon=1.0,
                 epsilon_decay_factor=0.997, min_epsilon=0.1, replay_buffer_size=1000,
                 batch_size=200, network_copy_frequency=500):
        self.statespace_size = statespace_size
        self.gamma = gamma
        self.learning_rate = learning_rate
        self.model2 = self.prepare_torch()
        self.memory = []  # Using a list instead of a deque
        self.epsilon = start_epsilon
        self.epsilon_min = min_epsilon
        self.epsilon_decay_factor = epsilon_decay_factor
        self.replay_buffer_size = replay_buffer_size
        self.batch_size = batch_size
        self.network_copy_frequency = network_copy_frequency
        self.steps_since_copy = 0  # Counter for network copy

    def prepare_torch(self):
        l1 = self.statespace_size
        l2 = 150
        l3 = 100
        l4 = 4
        self.model = torch.nn.Sequential(
            torch.nn.Linear(l1, l2),
            torch.nn.ReLU(),
            torch.nn.Linear(l2, l3),
            torch.nn.ReLU(),
            torch.nn.Linear(l3, l4)
        )
        model2 = copy.deepcopy(self.model)
        model2.load_state_dict(self.model.state_dict())
        loss_fn = torch.nn.MSELoss()
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
        return model2

    def update_target(self):
        if self.steps_since_copy >= self.network_copy_frequency:
            self.model2.load_state_dict(self.model.state_dict())
            self.steps_since_copy = 0

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        if len(self.memory) > self.replay_buffer_size:
            self.memory.pop(0)  # Remove the oldest experience

    def get_qvals(self, state):
        state1 = torch.from_numpy(state).float()
        qvals_torch = self.model(state1)
        qvals = qvals_torch.data.numpy()
        return qvals

    def get_maxQ(self, s):
        return torch.max(self.model2(torch.from_numpy(s).float())).float()

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(range(4))  # Random action
        q_values = self.get_qvals(state)
        return np.argmax(q_values[0])  # Greedy action

    def process_minibatch(self, minibatch):
        states = []
        actions = []
        targets = []
        for state, action, reward, next_state, done in minibatch:
            q_values = self.get_qvals(state)
            if done:
                q_values[0][action] = reward
            else:
                q_values[0][action] = reward + self.gamma * self.get_maxQ(next_state)
            states.append(state)
            actions.append(action)
            targets.append(q_values[0])
        return np.array(states), np.array(actions), np.array(targets)

    def train_one_step(self, states, actions, targets):
        state1_batch = torch.cat([torch.from_numpy(s).float() for s in states])
        action_batch = torch.Tensor(actions)
        Q1 = self.model(state1_batch)
        X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze()
        Y = torch.tensor(targets)
        loss = torch.nn.MSELoss()(X, Y)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()

In [8]:
if __name__ == "__main__":
    env = TransportEnv()  # Create the environment
    agent = TransportAgent(learning_rate=0.997, start_epsilon=1.0, epsilon_decay_factor=0.997,
                           min_epsilon=0.1, replay_buffer_size=1000, batch_size=200, network_copy_frequency=500)

    episodes = 1000
    for episode in range(episodes):
        state = env.reset()  # Reset the environment and get the initial state
        done = False
        total_reward = 0
        while not done:
            # print("state", state)
            action = agent.act(state)
            next_state, reward, done = env.step(env.actions[action])
            total_reward += reward
            agent.remember(state, action, reward, next_state, done)
            # print("agent.memory[-1]",agent.memory[-1])
            state = next_state

            # Training
            if len(agent.memory) > agent.batch_size:
                minibatch_indices = np.random.choice(len(agent.memory), agent.batch_size, replace=False)
                minibatch = [agent.memory[i] for i in minibatch_indices]
                states_batch, actions_batch, targets_batch = agent.process_minibatch(minibatch)
                agent.train_one_step(states_batch, actions_batch, targets_batch)

            agent.epsilon = max(agent.epsilon * agent.epsilon_decay_factor, agent.epsilon_min)
            agent.steps_since_copy += 1
            agent.update_target()
        # print("state", state)
        print(f"Episode: {episode + 1}, Total Reward: {total_reward}, Epsilon: {agent.epsilon}")

Episode: 1, Total Reward: 17.10000000000001, Epsilon: 0.9110661429076764
Episode: 2, Total Reward: 16.300000000000004, Epsilon: 0.8103284404851119
Episode: 3, Total Reward: 12.500000000000016, Epsilon: 0.6429663904714829


TypeError: 'numpy.float32' object does not support item assignment