## Training the model 

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import math
import gym
import random
import numpy as np
import matplotlib.pyplot as plt

env = gym.make('CartPole-v1')
observation_space = env.observation_space.shape[0]
action_space = env.action_space.n

EPISODES = 50
LEARNING_RATE = 0.0001
MEM_SIZE = 10000
BATCH_SIZE = 64
GAMMA = 0.95
EXPLORATION_MAX = 1.0
EXPLORATION_DECAY = 0.999
EXPLORATION_MIN = 0.001
EPSILON_PERT = 0.015

FC1_DIMS = 1024
FC2_DIMS = 512
DEVICE = torch.device("cpu")



In [2]:
class Network(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.input_shape = env.observation_space.shape
        self.action_space = action_space

        self.fc1 = nn.Linear(*self.input_shape, FC1_DIMS)
        self.fc2 = nn.Linear(FC1_DIMS, FC2_DIMS)
        self.fc3 = nn.Linear(FC2_DIMS, self.action_space)

        self.optimizer = optim.Adam(self.parameters(), lr=LEARNING_RATE)
        self.loss = nn.MSELoss()
        self.to(DEVICE)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayBuffer:
    def __init__(self):
        self.mem_count = 0

        self.states = np.zeros((MEM_SIZE, *env.observation_space.shape),dtype=np.float32)
        self.actions = np.zeros(MEM_SIZE, dtype=np.int64)
        self.rewards = np.zeros(MEM_SIZE, dtype=np.float32)
        self.states_ = np.zeros((MEM_SIZE, *env.observation_space.shape),dtype=np.float32)
        self.dones = np.zeros(MEM_SIZE, dtype=np.bool)

    def add(self, state, action, reward, state_, done):
        mem_index = self.mem_count % MEM_SIZE

        self.states[mem_index]  = state.clone().detach().numpy()
        self.actions[mem_index] = action
        self.rewards[mem_index] = reward
        self.states_[mem_index] = state_.clone().detach().numpy()
        self.dones[mem_index] =  1 - done

        self.mem_count += 1

    def sample(self):
        MEM_MAX = min(self.mem_count, MEM_SIZE)
        batch_indices = np.random.choice(MEM_MAX, BATCH_SIZE, replace=True)

        states  = self.states[batch_indices]
        actions = self.actions[batch_indices]
        rewards = self.rewards[batch_indices]
        states_ = self.states_[batch_indices]
        dones   = self.dones[batch_indices]

        return states, actions, rewards, states_, dones

class DQN_Solver:
    def __init__(self):
        self.memory = ReplayBuffer()
        self.exploration_rate = EXPLORATION_MAX
        self.network = Network()

    def choose_action(self, observation):
        if random.random() < self.exploration_rate:
            return env.action_space.sample()

        state = observation
        state = state.to(DEVICE)
        state = state.unsqueeze(0)

        q_values = self.network(state.float())
        return torch.argmax(q_values).item(), q_values

    def learn(self):
        if self.memory.mem_count < BATCH_SIZE:
            return

        states, actions, rewards, states_, dones = self.memory.sample()
        states = torch.tensor(states , dtype=torch.float32).to(DEVICE)
        actions = torch.tensor(actions, dtype=torch.long).to(DEVICE)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(DEVICE)
        states_ = torch.tensor(states_, dtype=torch.float32).to(DEVICE)
        dones = torch.tensor(dones, dtype=torch.bool).to(DEVICE)
        batch_indices = np.arange(BATCH_SIZE, dtype=np.int64)
        states.requires_grad = True
        
        q_values = self.network(states)
        next_q_values = self.network(states_)

        predicted_value_of_now = q_values[batch_indices, actions]
        predicted_value_of_future = torch.max(next_q_values, dim=1)[0]

        q_target = rewards + GAMMA * predicted_value_of_future * dones

        loss = self.network.loss(q_target, predicted_value_of_now)        
        self.network.optimizer.zero_grad()
        loss.backward()
        self.network.optimizer.step()

        self.exploration_rate *= EXPLORATION_DECAY
        self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)

    def returning_epsilon(self):
        return self.exploration_rate

## Training the network without defense

In [3]:
agent = DQN_Solver()
agent.network.train()

best_reward = 0
average_reward = 0
episode_number = []
average_reward_number = []

for i in range(1, EPISODES):
    state = env.reset()
    score = 0

    while True:
        env.render()
        action = agent.choose_action(state)
        try:
            q_values = action[1]
            action = action[0]
        except:
            action = action
        state_, reward, done, info = env.step(action)
        agent.memory.add(state, action, reward, state_, done)
        gradient_sign = agent.learn()
        state = state_
        score += reward

        if done:
            if score > best_reward:
                best_reward = score
            average_reward += score
            print("Episode {} Average Reward {} Best Reward {} Last Reward {} Epsilon {}".format(i, average_reward/i, best_reward, score, agent.returning_epsilon()))
            break

        episode_number.append(i)
        average_reward_number.append(average_reward/i)


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.dones = np.zeros(MEM_SIZE, dtype=np.bool)


Episode 1 Average Reward 30.0 Best Reward 30.0 Last Reward 30.0 Epsilon 1.0
Episode 2 Average Reward 27.0 Best Reward 30.0 Last Reward 24.0 Epsilon 1.0
Episode 3 Average Reward 29.666666666666668 Best Reward 35.0 Last Reward 35.0 Epsilon 0.9743224148844496
Episode 4 Average Reward 33.25 Best Reward 44.0 Last Reward 44.0 Epsilon 0.9323611649219127
Episode 5 Average Reward 29.6 Best Reward 44.0 Last Reward 15.0 Epsilon 0.9184732224159486
Episode 6 Average Reward 30.0 Best Reward 44.0 Last Reward 32.0 Epsilon 0.8895331192339416
Episode 7 Average Reward 31.0 Best Reward 44.0 Last Reward 37.0 Epsilon 0.857205969570888
Episode 8 Average Reward 35.875 Best Reward 70.0 Last Reward 70.0 Epsilon 0.7992255563671304
Episode 9 Average Reward 34.0 Best Reward 70.0 Last Reward 19.0 Epsilon 0.784176167005256
Episode 10 Average Reward 32.5 Best Reward 70.0 Last Reward 19.0 Epsilon 0.7694101571203781
Episode 11 Average Reward 32.0 Best Reward 70.0 Last Reward 27.0 Epsilon 0.7489039087598284
Episode 12 A

## Testing the network with FGSM

In [5]:
# Taking the gradient of the reward with respect to the temporal difference error
agent.network.eval()

gradient_sign = None

best_reward = 0
average_reward = 0
episode_number = []
average_reward_number = []

for i in range(1,100):
    state = env.reset()
    score = 0
    did_pertubate = 0
    while True:
        found_q_values = False
        env.render()
        action = agent.choose_action(state)
        try:
            q_values = action[1]
            action = action[0]
            found_q_values = True
        except Exception as e:
            action = action
        state_, reward, done, info = env.step(action)
        
        if(found_q_values == True):
            next_q_values = agent.network(state_.float())
            loss = agent.network.loss(q_values, next_q_values)
            
            gradient_sign = torch.autograd.grad(loss,state,retain_graph=True)[0].sign()
            if gradient_sign != None and score % 3 == 0:
                state_ = state_ + gradient_sign * EPSILON_PERT
                did_pertubate += 1
                
        state = state_
        score += reward

        if done:
            print(did_pertubate)
            
            if score > best_reward:
                best_reward = score
            average_reward += score
            print("Episode {} Average Reward {} Best Reward {} Last Reward {} Epsilon {}".format(i, average_reward/i, best_reward, score, agent.returning_epsilon()))
            break

        episode_number.append(i)
        average_reward_number.append(average_reward/i)



30
Episode 1 Average Reward 94.0 Best Reward 94.0 Last Reward 94.0 Epsilon 0.05875274256781066
41
Episode 2 Average Reward 111.0 Best Reward 128.0 Last Reward 128.0 Epsilon 0.05875274256781066
40
Episode 3 Average Reward 116.0 Best Reward 128.0 Last Reward 126.0 Epsilon 0.05875274256781066
56
Episode 4 Average Reward 131.75 Best Reward 179.0 Last Reward 179.0 Epsilon 0.05875274256781066
51
Episode 5 Average Reward 140.0 Best Reward 179.0 Last Reward 173.0 Epsilon 0.05875274256781066
57
Episode 6 Average Reward 145.66666666666666 Best Reward 179.0 Last Reward 174.0 Epsilon 0.05875274256781066
39
Episode 7 Average Reward 142.71428571428572 Best Reward 179.0 Last Reward 125.0 Epsilon 0.05875274256781066
38
Episode 8 Average Reward 140.375 Best Reward 179.0 Last Reward 124.0 Epsilon 0.05875274256781066
50
Episode 9 Average Reward 143.55555555555554 Best Reward 179.0 Last Reward 169.0 Epsilon 0.05875274256781066
72
Episode 10 Average Reward 152.6 Best Reward 234.0 Last Reward 234.0 Epsilon 

KeyboardInterrupt: 

## Training the network with adversarial training

In [None]:
agent_robust = DQN_Solver()
agent_robust.network.train()

gradient_sign = None

best_reward = 0
average_reward = 0
episode_number = []
average_reward_number = []

for i in range(1, EPISODES):
    state = env.reset()
    score = 0
    while True:
        found_q_values = False
        env.render()
        action = agent_robust.choose_action(state)
        try:
            q_values = action[1]
            action = action[0]
            found_q_values = True
        except Exception as e:
            action = action
        state_, reward, done, info = env.step(action)
        
        if(found_q_values == True):
            next_q_values = agent_robust.network(state_.float())
            loss = agent_robust.network.loss(q_values, next_q_values)
            
            gradient_sign = torch.autograd.grad(loss,state,retain_graph=True)[0].sign()
            if(gradient_sign != None):
                state_ = state_ + gradient_sign * EPSILON_PERT
            else:
                print("Not perturbated")
                
        agent_robust.memory.add(state, action, reward, state_, done)
        agent_robust.learn()
        state = state_
        score += reward

        if done:
            if score > best_reward:
                best_reward = score
            average_reward += score
            print("Episode {} Average Reward {} Best Reward {} Last Reward {} Epsilon {}".format(i, average_reward/i, best_reward, score, agent.returning_epsilon()))
            break

        episode_number.append(i)
        average_reward_number.append(average_reward/i)

## Testing the network with FGSM

In [None]:
# Taking the gradient of the reward with respect to the temporal difference error
agent_robust.network.eval()

gradient_sign = None

best_reward = 0
average_reward = 0
episode_number = []
average_reward_number = []

for i in range(1,100):
    state = env.reset()
    score = 0
    while True:
        found_q_values = False
        env.render()
        action = agent_robust.choose_action(state)
        try:
            q_values = action[1]
            action = action[0]
#             found_q_values = True
        except Exception as e:
            action = action
        state_, reward, done, info = env.step(action)
        
        if(found_q_values == True):
            next_q_values = agent_robust.network(state_.float())
            loss = agent_robust.network.loss(q_values, next_q_values)
            
            gradient_sign = torch.autograd.grad(loss,state,retain_graph=True)[0].sign()
            if(gradient_sign != None):
                state_ = state_ + gradient_sign * EPSILON_PERT
            else:
                print("Not perturbated")
                
            
        state = state_
        score += reward
        
        if done:
            if score > best_reward:
                best_reward = score
            average_reward += score
            print("Episode {} Average Reward {} Best Reward {} Last Reward {} Epsilon {}".format(i, average_reward/i, best_reward, score, agent.returning_epsilon()))
            break

        episode_number.append(i)
        average_reward_number.append(average_reward/i)