Import Torch Packages

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim

#### Import Gym Packages

In [None]:
import gym
from gym.wrappers import FrameStack

#### All Other Packages

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import trange
import random
from abc import ABC, abstractmethod

In [None]:
class Gym_Env():
    def __init__(self, env_name, max_steps=1000, max_episodes=10000):
        self.env = gym.make(env_name)
        self.max_steps = max_steps
        self.max_episodes = max_episodes
        

In [None]:
class Transition():
    def __init__(self, state, action, next_state, reward, done):
        self.state = state
        self.action = action
        self.reward = reward
        self.next_state = next_state
        self.game_complete = done
        self.transition = (self.state, self.action, self.next_state, self.reward)
    
    def change_state(self, state):
        return self.next_state = state

In [None]:
class Replay_Buffer():
    def __init__(self, capacity, mini_batch_size):
        self.rb = []
        self.capacity = capacity
        self.mini_batch_size = mini_batch_size
        self.current_batch = sample_rb
    
    def sample_rb(self):
        self.current_batch = random.sample(self.rb, batch_size=self.mini_batch_size)
    
    def add_to_rb(self, new_transition):
        if len(self.rb) >= self.capacity:
            del self.rb[0] 
        self.rb.append(new_transition)

In [None]:
class Deep_Q_Network(nn.Module):
    def __init__(self):
        super(Deep_Q_Network_Agent, self).__init__()
        
        self.network = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=5, stride=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=5, stride=2),
            n.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=5, stride=2),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )
    
    def forward(self, x):
        return self.network(x)
    

In [None]:
class Agent():
    def __init__(self, pred_model, target_model):
        self.agent = pred_model
        self.target = target_model
        
    def get_action_value(self, state):
        with torch.no_grad():
            q_val = self.agent(state)
        #print("q_val: ", q_val)
        action = torch.argmax(q_val)
        #print("action: ", action)
        return action

In [None]:
class Data_Preprocess(ABC):
    
    @abstractmethod
    def preprocess_state(self):
        pass
    
    @abstractmethod
    def preprocess_state(self):
        pass

##### Global Variables

In [6]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # check if GPU is available
MAX_EPISODES = None
MAX_STEPS = None
REPLAY_BUFFER_SIZE = None
MINI_BATCH_SIZE = None
EPSILON = None

In [None]:
def build_optimizer(model, optimizer_name='adam', learning_rate=0.01, weight_decay=0.01, momentum=0.9):
    try:
        optimizer = None
        if optimizer_name == "sgd":
            optimizer = optim.SGD(model.parameters(), 
                                  lr=learning_rate, 
                                  momentum=momentum)
            
        elif optimizer_name == "adam":
            optimizer = optim.Adam(model.parameters(), 
                                   lr=learning_rate, 
                                   weight_decay=weight_decay)
               
        return optimizer
    except:
        print("Error: Invalid optimizer specified.")
        sys.exit(1)

In [None]:
def build_scheduler(optimizer, sched_name='reduce_lr', patience=5, verbose=True):
    try: 
        sched = None
        if sched_name == "reduce_lr":
            sched = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 
                                                         mode='min', 
                                                         patience=patience, 
                                                         verbose=verbose)
        elif sched_name == 'TODO':
            pass
            #TODO: add other scheduler
            
        return sched
    except:
        print("Error: Invalid scheduler specified.")
        sys.exit(1)

In [None]:
def run_episode(environment, the_agent):
    replay_buffer = Replay_Buffer(capacity=REPLAY_BUFFER_SIZE, mini_batch_size=MINI_BATCH_SIZE)
    
    episode_active = True
    while episode_active and (step_count < environment.max_steps):
        random_action_prob = random.uniform(0.0, 1.0)
        if random_action_prob < EPSILON:
            action = environment.env.action_space.sample()
        else:
            action = the_agent.get_action_value(state)
        

In [None]:
def train():
    p_init = 0.7
    p_end = 0.1
    pred_model = Deep_Q_Network_Agent()
    target_model = Deep_Q_Network_Agent()
    environment = Gym_Env(env_name='CartPole-v1', pred_model=pred_model, target_model=target_model, max_steps=MAX_STEPS, max_episodes=MAX_EPISODES)
    the_agent = Agent(pred_model=pred_model, target_model=target_model)
    
    for i in trange(environment.max_episodes):
        run_episode(environment, the_agent)
        episode = e + 1
        epsilon_decay_rate = max((self.episodes - episode) / self.episodes, 0)
        self.epsilon = (self.p_init - self.p_end) * epsilon_decay_rate + self.p_end