In [None]:
!pip install gym

In [None]:
!pip install pytorch

In [None]:
!pip install matplotlib

In [None]:
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T    


In [None]:
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython: from IPython import display


In [None]:
class DQN(nn.Module):
    def __init__(self, img_height, img_width):
        super().__init__()

        self.fc1 = nn.Linear(in_features=img_height*img_width*3, out_features=24)   
        self.fc2 = nn.Linear(in_features=24, out_features=32)
        self.out = nn.Linear(in_features=32, out_features=2)
    def forward(self, t):
        t = t.flatten(start_dim=1)              # flatten the given tensor first before passing it to fully connected layer
        t = F.relu(self.fc1(t))                 # applying an activation functin to fully connected layer 1 and 2
        t = F.relu(self.fc2(t))
        t = self.out(t)
        return t
Experience = namedtuple(
    'Experience',
    ('state', 'action', 'next_state', 'reward')       ## namedtuple a python function for creating tuple with names
)
## Replay Memory (It is place where expirence is stored)

class ReplayMemory():                        ## Replaymemory has three functions push,sample,can provide sample
    def __init__(self, capacity):            ## capacity is only parameter required in specifying Replaymemory class
        self.capacity = capacity
        self.memory = []
        self.push_count = 0
 # we need to find a way to store experince in the memory                                              
    def push(self, experience):
        if len(self.memory) < self.capacity:
            self.memory.append(experience)
        else:
            self.memory[self.push_count % self.capacity] = experience
        self.push_count += 1
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    def can_provide_sample(self, batch_size):
        return len(self.memory) >= batch_size
class EpsilonGreedyStrategy():                     ## it has one function exploration decay rate
    def __init__(self, start, end, decay):
        self.start = start
        self.end = end
        self.decay = decay
    def get_exploration_rate(self, current_step):
        return self.end + (self.start - self.end) * \
            math.exp(-1. * current_step * self.decay)
class Agent():
        def __init__(self, strategy, num_actions, device):
            self.current_step = 0
            self.strategy = strategy
            self.num_actions = num_actions
            self.device = device
        def select_action(self,state,policy_net):                # policy network is just our Q-network
            rate=strategy.get_exploration_rate(self.current_step)
            self.current_step+=1

            if rate > random.random():
                action = random.randrange(self.num_actions)
                return torch.tensor([action]).to(self.device)                    # exploration     
            else:
                with torch.no_grad():
                    return policy_net(state).argmax(dim=1).to(self.device)          # exploitation
class CartPoleEnvManager():
    def __init__(self, device):
        self.device = device                       # it indicates the pytorch to use which device for tensor calculations
        self.env = gym.make('CartPole-v0').unwrapped
        self.env.reset()             # it indicates that it is at initial stage of the game or starting stage of the episode
        self.current_screen = None    #  indication of starting  an episode
        self.done = False            # it indicats whether an action taken ended in an episode
        
 # There are few wrapper functions that simply wrap the function with the same name used by gym

    def reset(self):
        self.env.reset()             # Always the environment has to be  reset to stating state
        self.current_screen = None   # when we reset the env we are at the end of the episode ans setting screen to none indicates that we are at start of the episode and havent render the screen
        

    def close(self):
        self.env.close()            # when the episode is finished  we  close the environment

    def render(self, mode='human'):
        return self.env.render(mode)
    def num_actions_available(self):          # in cart and pole env there are two action left and right
        return self.env.action_space.n
    def take_action(self, action):        # using the attribute action it calls the step on env to execute the action taken by the agent in the env
        _, reward, self.done, _ = self.env.step(action.item())
        return torch.tensor([reward], device=self.device)
    def just_starting(self):                               # starting an Episode
        return self.current_screen is None
    def get_state(self):
        if self.just_starting() or self.done:  # if it just starting or the episode had  ended(in both the cases we set our screen as black screen)

            self.current_screen = self.get_processed_screen()
            black_screen = torch.zeros_like(self.current_screen)
            return black_screen
        else:                             #when its not starting state or episode has not ended then we are in somewhere middle of the episode

            s1 = self.current_screen
            s2 = self.get_processed_screen()
            self.current_screen = s2  # s2(get_proceesd_screen) it is the current screen,s1 is the last screen(black_screen)
            return s2 - s1
    def get_screen_height(self):
        screen = self.get_processed_screen()
        return screen.shape[2]    # the input tensor has an order as (batch,colour_channel,height,width)

    def get_screen_width(self):
        screen = self.get_processed_screen()
        return screen.shape[3]
    def get_processed_screen(self):
        screen = self.render('rgb_array').transpose((2, 0, 1)) # PyTorch expects CHW
        screen = self.crop_screen(screen)
        return self.transform_screen_data(screen)
    def crop_screen(self, screen):           # it takes screen as attribute for which it needs to be cropped
        screen_height = screen.shape[1]      # calling the screen from get_processed_screen to take the height which we need to crop


    # Strip off top and bottom
        top = int(screen_height * 0.4)    #  setting top as  40% of the screen_height
        bottom = int(screen_height * 0.8) #  setting bottom as  80% of the screen_height
        screen = screen[:, top:bottom, :] #  taking  a slice strating from top value to bottom value so that we essentially removed 40% from top and 20% from bottom

        return screen
    def transform_screen_data(self, screen):       
    # Convert to float, rescale, convert to tensor
        screen = np.ascontiguousarray(screen, dtype=np.float32) / 255 # convert into sacle and rescaling all values by 255 is commom practice for image conversion in ML
        screen = torch.from_numpy(screen) # converting into tensor

#  torchvision package to compose image transform 
# torchvision compose class to chain together several image transformation and call thsi resize

        resize = T.Compose([
            T.ToPILImage()
            ,T.Resize((40,70))            # the tensor shape after resize is [2,40,70]
            ,T.ToTensor()
        ])

        return resize(screen).unsqueeze(0).to(self.device) # add a batch dimension (BCHW)


In [None]:
# initial image without cropping 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
em = CartPoleEnvManager(device)
em.reset()
screen = em.render('rgb_array')

plt.figure()
plt.imshow(screen)
plt.title('Non-processed screen example')
plt.show()

In [None]:
# image with cropped screen
screen = em.get_processed_screen()

plt.figure()
plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')   # permute change the order of the tensor [40,70,2]
plt.title('Processed screen example')
plt.show()

In [None]:
# strating state of the screen (it i srepresented a sthe black screen)
screen = em.get_state()

plt.figure()
plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Starting state example')
plt.show()

In [None]:
# the state of the screen in between the start and end of the episode
for i in range(5):
    em.take_action(torch.tensor([1]))
screen = em.get_state()

plt.figure()
plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Non starting state example')
plt.show()

In [None]:
# the state of the screeen at the end of the episode (it is a black screen)
em.done = True
screen = em.get_state()

plt.figure()
plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Ending state example')
plt.show()
em.close()

In [None]:
def plot(values, moving_avg_period):
    plt.figure(2)
    plt.clf()        
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(values)
    plt.plot(get_moving_average(moving_avg_period, values))
    plt.pause(0.001)
    if is_ipython: display.clear_output(wait=True)


In [None]:
def get_moving_average(period, values):
    values = torch.tensor(values, dtype=torch.float)
    if len(values) >= period:
        moving_avg = values.unfold(dimension=0, size=period, step=1) \
            .mean(dim=1).flatten(start_dim=0)
        moving_avg = torch.cat((torch.zeros(period-1), moving_avg))
        return moving_avg.numpy()
    else:
        moving_avg = torch.zeros(len(values))
        return moving_avg.numpy()


In [None]:
# Hyperparameters
batch_size = 64
gamma = 1                  # discounted_rate in Bellmann equation
eps_start = 1
eps_end = 0.01
eps_decay = 0.01
target_update = 10                # the frequency at which we update the target_nett wights abd bias
memory_size = 100000              # replay memory size where it store all the experience gained by the agent
lr = 0.01
num_episodes = 1000             

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# calling all the classes 
em = CartPoleEnvManager(device)
strategy = EpsilonGreedyStrategy(eps_start, eps_end, eps_decay)
agent = Agent(strategy, em.num_actions_available(), device)
memory = ReplayMemory(memory_size)
# inserting the input to policy_nett and target_nett
policy_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)
target_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)
target_net.load_state_dict(policy_net.state_dict())    # we set the weights and bias of target_nett as same as policy _nett using python state_dict,load_state_dict functions
target_net.eval()              # it indicates that this targetnetwork is only for inferenece not for training
optimizer = optim.Adam(params=policy_net.parameters(), lr=lr)


In [None]:
def extract_tensors(experiences):
    # Convert batch of Experiences to Experience of batches
    batch = Experience(*zip(*experiences)) #states,action,reward,next_state attribute are all set to tuple of containing all the corresponding states,action,reward,next_stae from expirence list
    t1 = torch.cat(batch.state)
    t2 = torch.cat(batch.action)             # concatenating extract all the actions from the batch to their own tensor
    t3 = torch.cat(batch.reward)
    t4 = torch.cat(batch.next_state)

    return (t1,t2,t3,t4)


In [None]:
# the whole idea behind this class (Qvalues) is to find the states location and its value if the state is at final location 
# then we know tha episode is ended and all the qvalues are zero hence we dont need to send this qvalues to target_nett only the 
# values of which are in non_final_position we need to send to target q_values by taking the max value of q at that state

class QValues():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# @staticmethod
    def get_current(policy_net, states, actions):
        return policy_net(states).gather(dim=1, index=actions.unsqueeze(-1)) # it returns the final q values for the specific states
    @staticmethod        
    def get_next(target_net, next_states):      
# finds the final state location as we know the final state is when epiosde is ended and it is black screen with all zeros
# therefore it searches for the highest q vaue in that state and if the highest value is zero then it is true (bool) 
        final_state_locations = next_states.flatten(start_dim=1) \
            .max(dim=1)[0].eq(0).type(torch.bool)
        non_final_state_locations = (final_state_locations == False) # it is the opposite of the final_state_location
        non_final_states = next_states[non_final_state_locations] # we can get the values of the non_final_state by indexing it to next_state
        batch_size = next_states.shape[0] # we check the batch size by looking at how many states are there in our next_state
        values = torch.zeros(batch_size).to(QValues.device) # creating a tensor of zeros of batch_size which is equal to size of next_state
        values[non_final_state_locations] = target_net(non_final_states).max(dim=1)[0].detach()
        return values



In [None]:
#              Training the loop    

episode_durations = []
for episode in range(num_episodes):
    em.reset()                    # initiallly resetting our environment
    state = em.get_state()       # restting the state to initial state or the initial position
    for timestep in count():
        action = agent.select_action(state, policy_net)
        reward = em.take_action(action)
        next_state = em.get_state()
        memory.push(Experience(state, action, next_state, reward)) #storing the experince that the agent gained in the memory using push function
        state = next_state
        if memory.can_provide_sample(batch_size): # checking if our memory size is atleast equal or greater than batch_size
            experiences = memory.sample(batch_size)
            states, actions, rewards, next_states = extract_tensors(experiences) # extracting all states,action,rewards,next_sate in to tesnor from experince list using extract_tensor function
            current_q_values = QValues.get_current(policy_net, states, actions)
            next_q_values = QValues.get_next(target_net, next_states)
            target_q_values = (next_q_values * gamma) + rewards

            loss = F.mse_loss(current_q_values, target_q_values.unsqueeze(1)) # loss function to calculate the loss b/w our current and target qvalue
            optimizer.zero_grad() # this function set all the weights and bias to zero in policy nett before we do backprop as in pytorch weights and bias acumulate during backprop
            loss.backward()       # computes gradient of the loss 
            optimizer.step()      # it updates the weights and bias with gradient 
        if em.done:              # checking if the last action taken by the agent ended the episode
            episode_durations.append(timestep) # if it is ended then apppend all the timestep in to episode to see how many timestep it took to complete one episode
            plot(episode_durations, 100)        #  plotting with 100 period moving average
            break
    if episode % target_update == 0: # check whether the  episode is a multiple of 10 as it has set to upadte the weight and bias of target_nett for evevry 10 episode
        target_net.load_state_dict(policy_net.state_dict())
em.close()



In [None]:
#    plotting

def plot(values, moving_avg_period):
    plt.figure(2)
    plt.clf()        
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(values)

    moving_avg = get_moving_average(moving_avg_period, values)
    plt.plot(moving_avg)    
    plt.pause(0.001)
    print("Episode", len(values), "\n", \
        moving_avg_period, "episode moving avg:", moving_avg[-1])
    if is_ipython: display.clear_output(wait=True)


In [None]:
plot(np.random.rand(400), 100)