In [None]:
import numpy as np
import gym
import pickle as pickle
from IPython.display import display, Math, Latex

## Sigma activation function

\begin{align}
\sigma(x) = \frac{1}{1+e^{-x}}
\end{align}

In [None]:
def sgm(x):
    y = 1.0/(1.0+np.exp(-x))
    return(y)

## Sigma activation function derivative
\begin{align}
\frac{d}{dx}\sigma(x)=&\frac{d}{dx}\bigg(\frac{1}{1+e^{-x}}\bigg)\\
=&\sigma(x)(1-\sigma(x))
\end{align}

In [None]:
def dsgm(x):
    return(x*(1-x))

## Feed forward neural network

Equations for the first layer (hidden layer) of neural networks,
\begin{align}
z_{1}=& w_1\cdot x+b_1\\
\sigma_{1}(z_1)=&\sigma(w_1\cdot x+b_1)\\
=& \frac{1}{1+e^{-z_1}}
\end{align}

Equations for the second layer (output layer) of neural networks,
\begin{align}
z_{2}=& w_2\cdot \sigma_1(z_1)+b_2\\
\sigma_{2}(z_2)=&\sigma(w_2\cdot \sigma_1(z_1)+b_2)\\
=& \frac{1}{1+e^{-z_2}}
\end{align}

In [None]:
def feed_forward_nets(model, image_matrix):
    """ Computing the hidden layer values and the output layer values"""
    hidden_layer = np.dot(model['layer_1']['weight'], image_matrix)+model['layer_1']['bias']
    hidden_layer = sgm(hidden_layer)
    output_layer = np.dot(hidden_layer, model['layer_2']['weight'])+model['layer_2']['bias']
    output_layer = sgm(output_layer)
    return(hidden_layer, output_layer)

## discount function

\begin{align}
R_t=\sum_{n=0}^H\gamma^nr_{t+n}
\end{align}

where $\gamma$ is the discounting facter (usually less than 1),  $n$ is the number of timesteps, and $r$ is the magnitude of the reward for a given timestamp $t+n$ where $t$ is the episode time step, and $H$ is the final $n^{th}$ step before the reward is awarded (H is not fixed). The discounts function helps by weighing more rewards actions taken towards the end of an episode and weighing less rewards action taken at the beginning of an episode.

In [None]:
def discount_function(gradient_log_p, ep_rewards, gamma):
    """ Steps taken earlier before the end result are less important to the overall result than the steps taken
    recently. This implements that logic by discounting the reward on previous actions based on how long ago they 
    were taken"""
    discounted_rewards = np.zeros_like(ep_rewards)
    running_add = 0
    for t in reversed(range(0, ep_rewards.size)):
        if ep_rewards[t] != 0:
            running_add = 0 # reset the sum, since this was a game boundary 
        running_add = running_add * gamma + ep_rewards[t]
        discounted_rewards[t] = running_add
        
    """ Discount the gradient with the normalized rewards """
    discounted_episode_rewards = discounted_rewards
    # Standardize the rewards to be unit normal (helps control the gradient estimator variance)
    discounted_episode_rewards -= np.mean(discounted_episode_rewards)
    discounted_episode_rewards /= np.std(discounted_episode_rewards)
    return(gradient_log_p * discounted_episode_rewards)


## Back propagation

For computation of back propagation we will use the loss function (cost function)

\begin{align}
L(y,\hat{y}) =-\frac{1}{N}\sum_{n\in N}y_n\log\hat{y}
\end{align}
where $y=\sigma(z_2)$ is the ground truth and $\hat{y}$ is the estimated outcome. The deriavative of the loss function with respect the second layer (output layer) weight $w_2$ yields:
\begin{align}
\frac{\partial L(y,\hat{y})}{\partial w_2}=&\frac{\partial L}{\partial \hat{y}}\frac{\partial\hat{y}}{\partial z_2}\frac{\partial z_2}{w_2}\\
=&(y-\hat{y})*\frac{\partial\hat{y}}{\partial w_2}*\sigma(z_1)\\
=&(y-\sigma(z_2))*\sigma^\prime(z_2)*\sigma(z_1).
\end{align}

The derivative of the loss function with respect the first layer $w_1$ gives,
\begin{align}
\frac{\partial L(y,\hat{y})}{\partial w_1}=&\frac{\partial L}{\partial \hat{y}}\frac{\partial\hat{y}}{\partial z_1}\frac{\partial z_1}{w_1}\\
=&(y-\hat{y})\frac{\partial\hat{y}}{\partial z_1}*x\\
=&(y-\hat{y})*\frac{\partial \hat{y}}{\partial z_2}\frac{\partial z_2}{\partial\sigma(z_1)}\frac{\partial\sigma(z_1)}{\partial z_1}*x\\
=&(y-\hat{y})*\sigma^{\prime}(z_2)*w_2*\sigma^{\prime}(z_1)*x
\end{align}
where $x$ will be the image matrix (array).

The derivatives for the cost function with respect the bias terms from the first and second layers are respectively:
\begin{align}
\frac{\partial L(y,\hat{y})}{\partial b_2}=&\frac{\partial L}{\partial \hat{y}}\frac{\partial\hat{y}}{\partial z_2}\frac{\partial z_2}{b_2}\\
=&(y-\hat{y})*\frac{\partial\hat{y}}{\partial b_2}\\
=&(y-\sigma(z_2))*\sigma^\prime(z_2).\\
\frac{\partial L(y,\hat{y})}{\partial b_1}=&\frac{\partial L}{\partial \hat{y}}\frac{\partial\hat{y}}{\partial z_1}\frac{\partial z_1}{w_1}\\
=&(y-\hat{y})*\frac{\partial \hat{y}}{\partial z_2}\frac{\partial z_2}{\partial\sigma(z_1)}\frac{\partial\sigma(z_1)}{\partial z_1}*x\\
=&(y-\hat{y})*\sigma^{\prime}(z_2)*w_2*\sigma^{\prime}(z_1)
\end{align}

The weights and bias parameters will be updated using RMSProp. The update RMSProp for the weights parameters will be,
\begin{align}
E^w[g^2]_t=&\beta *E^w[g^2]_{t-1}+(1-\beta)(g_t)^2,\quad\quad\beta =0.9\\
=&0.9*E^w[g^2]_{t-1}+0.1g_t^2\\
w_{t+1}=&w_t-\frac{\eta}{E^w[g^2]_t+\epsilon}g_t
\end{align}
And the update RMSProp for the bias parameters will be,
\begin{align}
E^b[g^2]_t=& \beta *E^b[g^2]_{t-1}+(1-\beta)g_t^2,\quad\quad\beta =0.9\\
=& 0.9 *E^b[g^2]_{t-1}+0.1*g_t^2\\
b_{t+1}=&b_t-\frac{\eta}{E^b[g^2]_t+\epsilon}g_t
\end{align}

In [None]:
def back_propagation(weight_gradients, hidden_layers,output, image_matrices, decay_rate, learning_rate, model,g_dict,e_dict):
    ep_number=0   
    batch_size = 10
    epsilon = 1e-5
    delta_L = weight_gradients
    dW_2 = np.dot(hidden_layers.T, (delta_L* dsgm(output))).ravel()
    delta_l2 = np.outer(delta_L,model['layer_2']['weight'])
    delta_l2 = delta_l2*dsgm(hidden_layers)
    dW_1 =  np.dot((np.outer(delta_L * dsgm(output),model['layer_2']['weight'].T)* dsgm(hidden_layers)).T, image_matrices)
    db_1 = np.sum(np.outer(delta_L * dsgm(output),model['layer_2']['weight'].T)* dsgm(hidden_layers),axis=0)
    db_2 = np.sum(delta_L* dsgm(output),axis=0)
    
    """Updating the weights"""
    g_dict['layer_1']['weight']+= dW_1
    g_dict['layer_2']['weight']+= dW_2
    g_dict['layer_1']['bias']+= db_1
    g_dict['layer_2']['bias']+= db_2
    
    """Applying RMSprop"""
    if ep_number % batch_size==0:
        for weight in model:
            g_t = g_dict[weight]['weight']
            g_b = g_dict[weight]['bias']
            e_dict[weight]['weight'] = decay_rate*e_dict[weight]['weight']+(1-decay_rate)*g_t**2
            e_dict[weight]['bias'] = decay_rate*e_dict[weight]['bias']+(1-decay_rate)*g_b**2
            model[weight]['weight']+=(learning_rate*g_t)/(np.sqrt(e_dict[weight]['weight'] + epsilon))
            model[weight]['bias']+=(learning_rate*g_b)/(np.sqrt(e_dict[weight]['bias'] + epsilon))
            g_dict[weight]['weight']=np.zeros_like(model[weight]['weight'])
            g_dict[weight]['bias']=np.zeros_like(model[weight]['bias'])
            
    return(model)    

## Image preprocessing function:
The image pre-processing function helps with croping the image and leaving only the components that are important for learning the game. After that the image will be down sampled, then the color will be reduced to only black and white and the background will be removed. The image will then be flattened, by converting it from an 80X80 array (matrix) into a 6400X1 vector, this will make feed forward pass of the neural network convenient. The function will then store the difference between succesive image frames, since the difference between images will capture the motion and direction of the image.

In [None]:
def pre_process_images(input_observation, prev_processed_observation, input_dimensions):
    """ Here we want to convert the image of 210x160x3 array into a 6400 float vector """
    processed_observation = input_observation[35:195] # crop
    processed_observation = down_sample(processed_observation) # Downsample function below below
    processed_observation = remove_color(processed_observation) # Remove color function implemented below
    processed_observation = remove_background(processed_observation) # Remove background function implemented below
    processed_observation[processed_observation != 0] = 1 # everything else (paddles, ball) just set to 1
    """Now we convert the 80X80 array (matrix) image into 1600X1 matrix"""
    processed_observation = processed_observation.astype(np.float).ravel()

    """Here we process the difference in succesive frames. Subtract the previous image frame from the 
    current frame """
    if prev_processed_observation is not None:
        input_observation = processed_observation - prev_processed_observation
    else:
        input_observation = np.zeros(input_dimensions)
    """ Save the previous image frame """
    prev_processed_observations = processed_observation
    return(input_observation, prev_processed_observation)

def down_sample(image):
    return image[::2, ::2, :]

def remove_color(image):
    return image[:, :, 0]

def remove_background(image):
    image[image == 144] = 0
    image[image == 109] = 0
    return image

## Probability function

The feed forward output layer (second layer) will output a probability number from the sigmoid activation function, the number will be between 0 and 1, this probability function will be input into a choose_action, the choose_action function will will either return a value 2 or 3, depending on the probability value. The reuturned value from the choose action will be used to move the AI agent up either up or down.

In [None]:
def choose_action(probability):
    random_value = np.random.uniform()
    if random_value < probability:
        return 2      # the AI agent will move up in the openai gym
    else:
        return 3      # The AI agent will move down in the openai gym

In [None]:
#resume = False # resume from previous checkpoint?
D = 80 * 80
H = 200 
# model= {'layer_1':{},'layer_2':{}}
# model['layer_1']['weight']=np.random.randn(H,D) / np.sqrt(D)
# model['layer_1']['bias']=np.random.randn(H) / np.sqrt(H)
# model['layer_2']['weight']=np.random.randn(H) / np.sqrt(H)
# model['layer_2']['bias']=np.random.randn(1) / np.sqrt(H)
model = pickle.load(open('model.pkl','rb'))

## Main function
First, a collection of parameters will be set before the model (Main function) can be run. These parameters are required for the forward propagation and updating of the weights during backward propagation.

In [None]:
def Main():
    env = gym.make("Pong-v0")
    image = env.reset() # This gets us the image
    epoch=50

    """Parameters"""
    episode_number = 0
    batch_size = 10
    gamma = 0.99 # This is the discount factor for rewards
    decay_rate = 0.99
    hidden_layer_neurons = 200
    input_dimensions = 80 * 80
    learning_rate = 1e-4

    ep_number = 0
    reward_sum = 0
    running_reward = None
    prev_process_images = None

    """Dictionary to store RMSprop weights"""
    e_dict = {'layer_1':{},'layer_2':{}}
    g_dict = {'layer_1':{},'layer_2':{}}
    for layer_name in model:
        e_dict[layer_name]['weight'] = np.zeros_like(model[layer_name]['weight'])
        e_dict[layer_name]['bias'] = np.zeros_like(model[layer_name]['bias'])
        g_dict[layer_name]['weight'] = np.zeros_like(model[layer_name]['weight'])
        g_dict[layer_name]['bias'] = np.zeros_like(model[layer_name]['bias'])
        

    ep_hidden_layers, ep_images, ep_weight_gradients, ep_rewards = [], [], [], []


    while epoch:
        env.render()
        process_images, prev_process_images = pre_process_images(image, prev_process_images, input_dimensions)
        hidden_layers, probability = feed_forward_nets(model,process_images)
    
        ep_images.append(process_images)
        ep_hidden_layers.append(hidden_layers)

        action = choose_action(probability)

        """Action for up or down"""
        image, reward, done, info = env.step(action)

        reward_sum += reward
        ep_rewards.append(reward)

        made_label = 1 if action == 2 else 0
        loss_function = made_label - probability
        ep_weight_gradients.append(loss_function)


        if done: # when an eposode is done
            ep_number += 1

            """Combine the episode images, hidden layers values, weights/gradients and also the rewards for 
            the actions in each episodes"""
            ep_hidden_layers = np.vstack(ep_hidden_layers)
            ep_images = np.vstack(ep_images)
            ep_weight_gradients = np.vstack(ep_weight_gradients)
            ep_rewards = np.vstack(ep_rewards)

            """Applying discount function"""
            ep_weight_gradients_discount = discount_function(ep_weight_gradients, ep_rewards, gamma)

            gradient = back_propagation(ep_weight_gradients_discount,ep_hidden_layers,probability,ep_images,decay_rate, learning_rate,model
                                ,g_dict,e_dict)
            
            """Reset the episode stored information by cleaning the lists above"""
            ep_hidden_layers, ep_images, ep_weight_gradients, ep_rewards = [], [], [], [] 
            image = env.reset() # Also reset the env gym
            running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
            print('Resetting the env. The episode reward total is {}. The running mean is:{}'.format(reward_sum, running_reward))
            reward_sum = 0
            prev_process_images = None
            print(epoch)
            epoch-=1

In [None]:
model

In [None]:
Main()#200,80*80)

In [None]:
pickle.dump(model, open('model.pkl','wb'))