# Example Codes

This notebook contains example code of each module. Before committing this notebook, make sure all the output is cleared.

### A Star Search Algorithm

In [None]:
from AStar import AStar

In [None]:
# A 2D matrix representing the map. 1 is obstacle. 0 is free block.
grid = [
    [0,1,1,0,0,0],
    [0,1,0,0,1,0],
    [0,1,1,0,1,0],
    [0,0,0,0,1,0],
    [1,1,1,1,1,0],
    [1,1,1,1,1,0]
]

# goal is the lower right corner
goal = (5, 5)

In [None]:
# Forbidden diagonal move.
planner = AStar(grid, goal, False)

# Find a path starting from the upper left corner.
path = planner.plan(0, 0)

print(path)
len(path)

In [None]:
# Allow diagonal move.
planner = AStar(grid, goal, True)

# Find a path starting from the upper left corner.
path = planner.plan(0, 0)

print(path)

### Train Network with Q-Learning

In [None]:
import random
import torch
from torch import optim
from torch import nn
import numpy as np
from PathPlanningEnv import PathPlanningEnv
from FCNN import FCNN
from Q_Network import Q_Network
import matplotlib.pyplot as plt

#### Environment and Network Setup

In [None]:
settings = {
    'height' : 5,
    'width' : 10,
    'obs_count' : 5,
    'random_seed' : 42
}

In [None]:
# Create environments.
env = PathPlanningEnv(**settings)
env.display()
print(env.distances)

In [None]:
# Create value network.
network = FCNN(
    input_dim = 3 * settings['height'] * settings['width'] + 4
)

# Or 
# network = Q_Network(
#     BatchSize = 1, MapHeight = settings['height'], MapWidth = settings['width'],
#     Covn1OutChan = 32, Conv1Kernel = 3, Covn2OutChan = 32, Conv2Kernel = 3, HiddenSize = 64
# )

#### Network Training

In [None]:
epsilon = 0.9
epsilon_low = 0.1
learning_rate = 0.01
gamma = 0.99
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
step_limit = 100
num_of_play = 2000
loss_func = nn.MSELoss()

In [None]:
actions = [
    torch.Tensor().new_tensor([1,0,0,0], dtype=torch.float32, requires_grad=False),
    torch.Tensor().new_tensor([0,1,0,0], dtype=torch.float32, requires_grad=False),
    torch.Tensor().new_tensor([0,0,1,0], dtype=torch.float32, requires_grad=False),
    torch.Tensor().new_tensor([0,0,0,1], dtype=torch.float32, requires_grad=False)
]

rewards = []
network.train()

for i in range(1, num_of_play+1):
    optimizer.zero_grad()
    done = False
    counter = 0
    env.reset()
    if i % 100 == 0:
        print("play round: {}, ave reward (last {}): {}".format(i, 100, sum(rewards[-100:])/100))
        if epsilon > epsilon_low: epsilon -= 0.05

    while counter <= step_limit and not done:
        preds = []
        
        # copy one to avoid being modified at env.step()
        state = env.grid.clone().detach().view(1, *env.grid.shape)
        
        # find Q(s_{t},a) for all actions
        for action in actions:
            pred = network(state, action)
            preds.append(pred)

        p = np.random.uniform(0,1)
        if p < epsilon:
            choice = np.random.randint(0,4)
        else:
            list_pred = [x.item() for x in preds]
            max_pred = np.amax(list_pred)
            max_positions = np.argwhere(list_pred == max_pred).flatten().tolist()
            choice = random.choice(max_positions)
            
        # take the action, s_{t},a -> s_{t+1} and get the immediate reward
        obs, imm_reward, done, _ = env.step(choice, early_stop=False, q_learning=True)
        
        # find Q(s_{t+1},a) for all actions
        future_reward = 0
        if not done:
            next_preds = []
            state = env.grid.clone().detach().view(1, *env.grid.shape)
            for action in actions:
                next_pred = network(state, action)
                next_preds.append(next_pred.item())
            future_reward = max(next_preds)
        elif imm_reward == 1:
            future_reward = 1
        
        tot_reward = imm_reward + gamma * future_reward
        rewards.append(tot_reward)
        counter += 1

        # Q(s,a|t) = r + gamma*max[Q(s,a|t+1)]
        loss = loss_func(preds[choice], torch.Tensor([tot_reward]))
        loss.backward()
        optimizer.step()

#### Reward Plot

In [None]:
def MovingAveragePlot(InputList, window_size):
    window = np.ones(int(window_size))/float(window_size)
    ave_values = np.convolve(InputList, window, 'same')
    plt.plot(range(len(ave_values)), ave_values)

In [None]:
MovingAveragePlot(rewards, 100)

#### Test Path Finding Performance

In [None]:
def PlayOnce(env, network):
    env.reset()
    network.eval()

    counter = 0
    print("Time {}".format(counter))
    counter += 1
    env.display()

    done = False
    while counter <= 100 and not done:
        preds = []
        state = env.grid.clone().detach().view(1, *env.grid.shape)
        print("actions: ", end = " ")
        for action in actions:
            pred = network(state, action)
            preds.append(pred)
            print("{:.8f}".format(pred.item()), end = " ")
        print(" ")

        choice = np.argmax(np.array(preds))
        obs, reward, done, _ = env.step(choice)

        print("Time {}".format(counter))
        counter += 1
        env.display()

In [None]:
# test training case
PlayOnce(env, network)

In [None]:
# test other cases
env = PathPlanningEnv(
    grid = env.grid[2,:,:],
    init_row = 4,
    init_col = 9,
    goal_row = env.goal_row,
    goal_col = env.goal_col
)
PlayOnce(env, network)