In [None]:
from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros
from gym_super_mario_bros.actions import RIGHT_ONLY
import numpy as np
import pandas as pd
import time
import torch 
from torch import nn
from torchvision import transforms as T
from matplotlib import pyplot as plt
import random
import torch.optim as optim
import torch.nn.functional as F

In [None]:
env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0') 
env = JoypadSpace(env, RIGHT_ONLY)

In [None]:
N_states = env.observation_space
N_actions = env.action_space
print('The state space has the following shape :', N_states)
print('The action space has the following shape :', N_actions)

In [None]:
retention = 5
buffer_capacity = 10000
batch_size = 16
to_skip = 3

In [None]:
def image_processor(raw_image):                                   
    raw_image_trans = np.transpose(raw_image,(2,0,1))               
    raw_image_trans_copy = np.copy(raw_image_trans)                 
    raw_image_tensor = torch.from_numpy(raw_image_trans_copy)       
    grayscaler_func = T.Grayscale(num_output_channels=1)
    grayscaled_image_tensor = grayscaler_func(raw_image_tensor)     
    resizer_func = T.Resize((84,84))                                  
    resized_image_tensor = resizer_func(grayscaled_image_tensor)    
    processed_image_tensor = np.transpose(resized_image_tensor,(1,2,0))
    return(processed_image_tensor)

In [None]:
def update_retainer(current_retainer,new_image):
    processed_new_image = image_processor(new_image)
    dim_0 = np.shape(current_retainer)[0]
    dim_1 = np.shape(current_retainer)[1]
    for i in np.arange(retention-1,0,-1):
        current_retainer[0:dim_0,0:dim_1,i] = current_retainer[0:dim_0,0:dim_1,i-1]
    current_retainer[0:dim_0,0:dim_1,0] = processed_new_image[0:dim_0,0:dim_1,0]
    return(current_retainer)

In [None]:
def update_buffer(current_buffer, new_experience):
    current_buffer.append(new_experience)
    return(current_buffer)

In [None]:
def make_experience_list(processed_current_state, current_action, current_reward, processed_next_state, stage_end_status):
    experience_list = []
    experience_list.append(processed_current_state)
    experience_list.append(current_action)
    experience_list.append(current_reward)
    experience_list.append(processed_next_state)
    experience_list.append(stage_end_status)
    return(experience_list)

In [None]:
def buffer_maintainer(updated_buffer_enter):
    if len(updated_buffer_enter) > buffer_capacity:
        for mock_ite in np.arange(0,len(updated_buffer_enter) - buffer_capacity):
            del updated_buffer_enter[0]
    return(updated_buffer_enter)          

In [None]:
def reset_retainer(retainer_new_episode):
    retainer_new_episode = torch.zeros(84,84,retention)       
    return(retainer_new_episode)

In [None]:
def input_to_nn(processed_retainer):
    processed_retainer = np.transpose(processed_retainer,(2,0,1))
    processed_retainer_fin = torch.zeros(1,retention,84,84)
    processed_retainer_fin[0] = processed_retainer
    return(processed_retainer_fin)

In [None]:
possible_actions = 5                  
learning_rate = 5e-3
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(device))
CNN_q = nn.Sequential(
                      nn.Conv2d(in_channels=retention, out_channels=32, kernel_size=8, stride=4),
                      nn.ReLU(),
                      nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
                      nn.ReLU(),
                      nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
                      nn.ReLU(),
                      nn.Flatten(),
                      nn.Linear(3136, 512),
                      nn.ReLU(),
                      nn.Linear(512, possible_actions),
                      )
optimizer_q = optim.Adam(CNN_q.parameters(), lr=learning_rate)          

In [None]:
total_episodes = 25
max_done_per_episode = 100
epsilon = 0.9
epsilon_decay = 0.99
gamma = 0.97
retainer = torch.zeros(84,84,retention)
final_reward_array = []
final_score_array = []
final_x_array = []
replay_buffer = []

for i in np.arange(0,total_episodes):
    
    current_episode = i+1
    initial_frame = env.reset()
    env.render()
    retainer = reset_retainer(retainer)
    retainer = update_retainer(retainer,initial_frame)
    reward_array = []
    score_array = []
    x_array = []
    flag_status = False
    done_status = False
    times_done = 0
    total_reward = 0
    total_score = 0
    total_x = 0 
    if i >= 5:
        epsilon = 0.7
    else:
        epsilon = 0.9
    
    while (flag_status == False) and (times_done < max_done_per_episode):
        
        if epsilon > 0.3:                                     
            epsilon = epsilon*epsilon_decay                     
        else:
            epsilon = 0.3                                     
            
        if random.uniform(0,1) < epsilon:
            action = env.action_space.sample()
        else:
            q_estimate = CNN_q(input_to_nn(retainer))
            action = torch.argmax(q_estimate)
            
        aggregate_reward = 0
        for j in np.arange(0,to_skip+1):
            state, reward, done, info = env.step(int(action))
            env.render()
            aggregate_reward = aggregate_reward + reward
            if (done == True) or (info['flag_get'] == True):
                break
        
        
        next_frame = state
        done_status = done
        flag_status = info['flag_get']
        total_reward = total_reward + aggregate_reward
        total_score = info['score']
        total_x = info['x_pos']
        
        
        next_retainer = update_retainer(retainer,next_frame)
        current_experience = make_experience_list(retainer,action,aggregate_reward,next_retainer,flag_status)
        replay_buffer = update_buffer(replay_buffer,current_experience)
        replay_buffer = buffer_maintainer(replay_buffer)
        
        
        if len(replay_buffer) <= batch_size:
            mini_batch = replay_buffer
        else:
            sampling_index = np.random.choice(np.arange(len(replay_buffer)),batch_size,replace=False)
            mini_batch = []
            for k in np.arange(batch_size):
                mini_batch.append(replay_buffer[sampling_index[k]])


        current_val = []
        target = []


        for indexer in np.arange(len(mini_batch)):
            optimize_experience = mini_batch[indexer]
            current_state_j = optimize_experience[0]
            current_action_j = optimize_experience[1]
            current_reward_j = optimize_experience[2]
            next_state_j = optimize_experience[3]
            flag_status_j = optimize_experience[4]
            with torch.no_grad():
                loc_current_val = CNN_q(input_to_nn(current_state_j))[0][current_action_j]
                current_val.append(loc_current_val)
                if flag_status_j == True:       
                    loc_target = current_reward_j 
                else: 
                    loc_target = current_reward_j + gamma*(torch.max(CNN_q(input_to_nn(next_state_j))))
                target.append(loc_target)


        current_val = np.array(current_val)     
        target = np.array(target)               

        current_val_copy = np.copy(current_val)
        target_copy = np.copy(target)

        current_val_tensor = torch.tensor(current_val_copy,requires_grad=True)
        target_tensor = torch.tensor(target_copy,requires_grad=True)
        
        current_val_tensor = current_val_tensor.float()
        target_tensor = target_tensor.float()

        loss = F.mse_loss(current_val_tensor,target_tensor)
        loss.backward()
        optimizer_q.step()

        retainer = next_retainer
        
        if (done_status == True) and (flag_status == False):
            reward_array.append(total_reward)
            score_array.append(total_score)
            x_array.append(total_x)
            initial_frame = env.reset()
            env.render()
            retainer = update_retainer(retainer,initial_frame)
            total_reward = 0
            total_score = 0
            total_x = 0
            done_status = False
            flag_status = False
            times_done = times_done + 1
            
            
        if flag_status == True:
            reward_array.append(total_reward)
            score_array.append(total_score)
            x_array.append(total_x)
            
    
    reward_mean = np.mean(np.array(reward_array))
    score_mean = np.mean(np.array(score_array))
    x_mean = np.mean(np.array(x_array))
    
    final_reward_array.append(reward_mean)
    final_score_array.append(score_mean)
    final_x_array.append(x_mean)
    
    print('The episode completed is :',current_episode)
    print('The average reward in this episode is :',reward_mean)
    print('The average score in this episode is :',score_mean)
    print('The average x position Mario reaches, without dying, is :',x_mean)
    
    
env.close()

In [None]:
plt.plot(np.arange(1,26),np.array(final_reward_array))
plt.xlabel('No. of Episodes')
plt.ylabel('The average reward')

In [None]:
plt.plot(np.arange(1,26),np.array(final_score_array))
plt.xlabel('No. of Episodes')
plt.ylabel('The average score')
plt.show()

In [None]:
plt.plot(np.arange(1,26),np.array(final_x_array))
plt.xlabel('No. of Episodes')
plt.ylabel('The average x position of Mario')
plt.show()