# DDPG, PPO, SAC under Partial Observability

## Kinematic and Lidar Model Setup

In [1]:
import cv2
import numpy as np


def SparseDepth(x0, x1, y0, y1):
    rec = []
    dx = abs(x1 - x0)
    dy = abs(y1 - y0)
    x, y = x0, y0
    sx = -1 if x0 > x1 else 1
    sy = -1 if y0 > y1 else 1
    if dx > dy:
        err = dx / 2.0
        while x != x1:
            rec.append((x, y))
            err -= dy
            if err < 0:
                y += sy
                err += dx
            x += sx
    else:
        err = dy / 2.0
        while y != y1:
            rec.append((x, y))
            err -= dx
            if err < 0:
                x += sx
                err += dy
            y += sy
    return rec


def distance_to_obstacle(robot_position, robot_sight_param, sensor_data):
    depth_awareness = []
    inter = (robot_sight_param[2] - robot_sight_param[1]) / (robot_sight_param[0] - 1)
    for i in range(robot_sight_param[0]):
        theta = robot_position[2] + robot_sight_param[1] + i * inter
        depth_awareness.append(
            [robot_position[0] + sensor_data[i] * np.cos(np.deg2rad(theta)), robot_position[1] + sensor_data[i] * np.sin(np.deg2rad(theta))])
    return depth_awareness


def gaussian(x, mu, sig):
    return 1./(np.sqrt(2.*np.pi)*sig)*np.exp(-np.power((x - mu)/sig, 2.)/2)


class Vision:
    def __init__(self,
                 img_map,
                 sensor_size=21,
                 start_angle=-90.0,
                 end_angle=90.0,
                 max_dist=500.0,
                 ):
        self.sensor_size = sensor_size
        self.start_angle = start_angle
        self.end_angle = end_angle
        self.farthest = max_dist
        self.img_map = img_map

    def line_of_sight(self, robot_position, theta):
        end = np.array(
            (robot_position[0] + self.farthest * np.cos(np.deg2rad(theta)), robot_position[1] + self.farthest * np.sin(np.deg2rad(theta))))
        x0, y0 = int(robot_position[0]), int(robot_position[1])
        x1, y1 = int(end[0]), int(end[1])
        plist = SparseDepth(x0, x1, y0, y1)
        zone = self.farthest
        for p in plist:
            if p[1] >= self.img_map.shape[0] or p[0] >= self.img_map.shape[1] or p[1] < 0 or p[0] < 0:
                continue
            if self.img_map[p[1], p[0]] < 0.5:
                aux = np.power(float(p[0]) - robot_position[0], 2) + np.power(float(p[1]) - robot_position[1], 2)
                aux = np.sqrt(aux)
                if aux < zone:
                    zone = aux
        return zone

    def measure_depth(self, current_pos):
        sense_data = []
        inter = (self.end_angle - self.start_angle) / (self.sensor_size - 1)
        for i in range(self.sensor_size):
            theta = current_pos[2] + self.start_angle + i * inter
            sense_data.append(self.line_of_sight(np.array((current_pos[0], current_pos[1])), theta))
        plist = distance_to_obstacle(current_pos, [self.sensor_size, self.start_angle, self.end_angle], sense_data)
        return sense_data, plist

class RoboticAssistant:
    def __init__(self, v_range=60, w_range=90, d=5, wu=9, wv=4, car_w=9, car_f=7, car_r=10, dt=0.1):
        self.x = 0
        self.y = 0
        self.theta = 0
        self.v = 0
        self.w = 0
        self.record = []
        self.v_range = v_range  # Control Constrain
        self.w_range = w_range
        self.d = d  # Wheel Distance
        self.wu = wu  # Wheel size
        self.wv = wv
        self.car_w = car_w  # Car size
        self.car_f = car_f
        self.car_r = car_r
        self.corps()
        self.dt = dt  # Simulation delta time

    def update(self):
        if self.v > self.v_range:
            self.v = self.v_range
        elif self.v < -self.v_range:
            self.v = -self.v_range
        if self.w > self.w_range:
            self.w = self.w_range
        elif self.w < -self.w_range:
            self.w = -self.w_range
        self.x += self.v * np.cos(np.deg2rad(self.theta)) * self.dt
        self.y += self.v * np.sin(np.deg2rad(self.theta)) * self.dt
        self.theta += self.w * self.dt
        self.theta = self.theta % 360
        self.record.append((self.x, self.y, self.theta))
        self.corps()

    def redo(self):
        self.x -= self.v * np.cos(np.deg2rad(self.theta)) * self.dt
        self.y -= self.v * np.sin(np.deg2rad(self.theta)) * self.dt
        self.theta -= self.w * self.dt
        self.theta = self.theta % 360
        self.record.pop()

    def control(self, v, w):
        self.v = v
        self.w = w
        if self.v > self.v_range:
            self.v = self.v_range
        elif self.v < -self.v_range:
            self.v = -self.v_range
        if self.w > self.w_range:
            self.w = self.w_range
        elif self.w < -self.w_range:
            self.w = -self.w_range

    def corps(self):
        pts1 = change_direction_vis(self.car_f, self.car_w / 2, -self.theta) + np.array((self.x, self.y))
        pts2 = change_direction_vis(self.car_f, -self.car_w / 2, -self.theta) + np.array((self.x, self.y))
        pts3 = change_direction_vis(-self.car_r, self.car_w / 2, -self.theta) + np.array((self.x, self.y))
        pts4 = change_direction_vis(-self.car_r, -self.car_w / 2, -self.theta) + np.array((self.x, self.y))
        self.dimensions = (pts1.astype(int), pts2.astype(int), pts3.astype(int), pts4.astype(int))

    def render(self, img=np.ones((600, 600, 3))):
        rec_max = 1000
        start = 0 if len(self.record) < rec_max else len(self.record) - rec_max

        color = (0 / 255, 97 / 255, 255 / 255)
        for i in range(start, len(self.record) - 1):
            cv2.line(img, (int(self.record[i][0]), int(self.record[i][1])),
                     (int(self.record[i + 1][0]), int(self.record[i + 1][1])), color, 1)

        pts1, pts2, pts3, pts4 = self.dimensions
        color = (0, 0, 0)
        size = 1
        cv2.line(img, tuple(pts1.astype(np.int).tolist()), tuple(pts2.astype(np.int).tolist()), color, size)
        cv2.line(img, tuple(pts1.astype(np.int).tolist()), tuple(pts3.astype(np.int).tolist()), color, size)
        cv2.line(img, tuple(pts3.astype(np.int).tolist()), tuple(pts4.astype(np.int).tolist()), color, size)
        cv2.line(img, tuple(pts2.astype(np.int).tolist()), tuple(pts4.astype(np.int).tolist()), color, size)

        t1 = change_direction_vis(6, 0, -self.theta) + np.array((self.x, self.y))
        t2 = change_direction_vis(0, 4, -self.theta) + np.array((self.x, self.y))
        t3 = change_direction_vis(0, -4, -self.theta) + np.array((self.x, self.y))
        cv2.line(img, (int(self.x), int(self.y)), (int(t1[0]), int(t1[1])), (0, 0, 1), 2)
        cv2.line(img, (int(t2[0]), int(t2[1])), (int(t3[0]), int(t3[1])), (1, 0, 0), 2)

        w1 = change_direction_vis(0, self.d, -self.theta) + np.array((self.x, self.y))
        w2 = change_direction_vis(0, -self.d, -self.theta) + np.array((self.x, self.y))
        img = view_unit(img, int(w1[0]), int(w1[1]), self.wu, self.wv, -self.theta)
        img = view_unit(img, int(w2[0]), int(w2[1]), self.wu, self.wv, -self.theta)
        img = cv2.line(img, tuple(w1.astype(np.int).tolist()), tuple(w2.astype(np.int).tolist()), (0, 0, 0), 1)
        return img


def change_direction_vis(x, y, phi_):
    phi = np.deg2rad(phi_)
    return np.array((x * np.cos(phi) + y * np.sin(phi), -x * np.sin(phi) + y * np.cos(phi)))


def view_unit(img, x, y, u, v, phi, color=(0, 0, 0), size=1):
    edge1 = change_direction_vis(-u / 2, -v / 2, phi) + np.array((x, y))
    edge2 = change_direction_vis(u / 2, -v / 2, phi) + np.array((x, y))
    edge3 = change_direction_vis(-u / 2, v / 2, phi) + np.array((x, y))
    edge4 = change_direction_vis(u / 2, v / 2, phi) + np.array((x, y))
    cv2.line(img, tuple(edge1.astype(np.int).tolist()), tuple(edge2.astype(np.int).tolist()), color, size)
    cv2.line(img, tuple(edge1.astype(np.int).tolist()), tuple(edge3.astype(np.int).tolist()), color, size)
    cv2.line(img, tuple(edge3.astype(np.int).tolist()), tuple(edge4.astype(np.int).tolist()), color, size)
    cv2.line(img, tuple(edge2.astype(np.int).tolist()), tuple(edge4.astype(np.int).tolist()), color, size)
    return img

## Indoor Reinforcement Learning Environment Setup

In [3]:
import os

class IndoorDeepRL:
    def __init__(self, map_path="map.png"):
        # Read Map
        self.terra = cv2.flip(cv2.imread(map_path), 0)
        self.terra[self.terra > 128] = 255
        self.terra[self.terra <= 128] = 0
        self.m = np.asarray(self.terra)
        self.m = cv2.cvtColor(self.m, cv2.COLOR_RGB2GRAY)
        self.m = self.m.astype(float) / 255.
        self.terra = self.terra.astype(float) / 255.
        self.lmodel = Vision(self.m)

    def createInstance(self):
        self.robot = RoboticAssistant(d=5, wu=9, wv=4, car_w=9, car_f=7, car_r=10, dt=0.1)
        self.robot.x, self.robot.y = self.random_start_travesable()
        self.robot.theta = 360 * np.random.random()
        self.pos = (self.robot.x, self.robot.y, self.robot.theta)

        self.target = self.random_start_travesable()
        self.target_euclidian = np.sqrt((self.robot.x - self.target[0]) ** 2 + (self.robot.y - self.target[1]) ** 2)
        target_angle = np.arctan2(self.target[1] - self.robot.y, self.target[0] - self.robot.x) - np.deg2rad(self.robot.theta)
        target_distance = [self.target_euclidian * np.cos(target_angle), self.target_euclidian * np.sin(target_angle)]

        self.sdata, self.plist = self.lmodel.measure_depth(self.pos)
        state = self.existance(self.sdata, target_distance)
        return state

    def step(self, action):
        self.robot.control((action[0] + 1) / 2 * self.robot.v_range, action[1] * self.robot.w_range)
        self.robot.update()
        # Check for Collision
        e1,e2,e3,e4 = self.robot.dimensions
        ee1 = SparseDepth(e1[0], e2[0], e1[1], e2[1])
        ee2 = SparseDepth(e1[0], e3[0], e1[1], e3[1])
        ee3 = SparseDepth(e3[0], e4[0], e3[1], e4[1])
        ee4 = SparseDepth(e4[0], e2[0], e4[1], e2[1])
        check = ee1+ee2+ee3+ee4
        collision = False
        for points in check:
            if self.m[int(points[1]),int(points[0])]<0.5:
                collision = True
                self.robot.redo()
                self.robot.v = -0.5 * self.robot.v
                break
        self.pos = (self.robot.x, self.robot.y, self.robot.theta)
        self.sdata, self.plist = self.lmodel.measure_depth(self.pos)

        # Distance Reward
        curr_target_dist = np.sqrt((self.robot.x - self.target[0]) ** 2 + (self.robot.y - self.target[1]) ** 2)
        reward_dist = self.target_euclidian - curr_target_dist
        # Orientation Reward
        orien = np.rad2deg(np.arctan2(self.target[1] - self.robot.y, self.target[0] - self.robot.x))
        orientation_error = (orien - self.robot.theta) % 360
        if orientation_error > 180:
            orientation_error = 360 - orientation_error
        reward_orien = np.deg2rad(orientation_error)
        # Action Panelty
        reward_act = 0.05 if action[0] < -0.5 else 0
        # Total
        w1 = 1
        w2 = 1
        w3 = 0.6
        reward = w1*reward_dist - w2*reward_orien - w3*reward_act
        
        # Terminal State 
        terminated = False
        if collision:
            reward = -15
            terminated = True
        if curr_target_dist < 20:
            reward = 20
            terminated = True

        # Relative Coordinate of Target
        self.target_euclidian = curr_target_dist
        target_angle = np.arctan2(self.target[1] - self.robot.y, self.target[0] - self.robot.x) - np.deg2rad(self.robot.theta)
        target_distance = [self.target_euclidian * np.cos(target_angle), self.target_euclidian * np.sin(target_angle)]
        state_next = self.existance(self.sdata, target_distance)
        return state_next, reward, terminated
    
    def render(self, gui=True):
        experiment_space = self.terra.copy()
        for pts in self.plist:
            cv2.line(
                experiment_space,
                (int(1*self.pos[0]), int(1*self.pos[1])), 
                (int(1*pts[0]), int(1*pts[1])),
                (0.0,1.0,0.0), 1)

        cv2.circle(experiment_space, (int(1*self.target[0]), int(1*self.target[1])), 10, (1.0,0.5,0.7), 3)
        experiment_space = self.robot.render(experiment_space)
        experiment_space = cv2.flip(experiment_space,0)
        if gui:
            cv2.imshow("Mapless Navigation",experiment_space)
            k = cv2.waitKey(1)
        return experiment_space.copy()
    
    def random_start_travesable(self):
        im_h, im_w = self.m.shape[0], self.m.shape[1]
        tx = np.random.randint(0,im_w)
        ty = np.random.randint(0,im_h)

        kernel = np.ones((10,10),np.uint8)  
        m_dilate = 1-cv2.dilate(1-self.m, kernel, iterations=3)
        while(m_dilate[ty, tx] < 0.5):
            tx = np.random.randint(0,im_w)
            ty = np.random.randint(0,im_h)
        return tx, ty
    
    def existance(self, sensor, target):
        state_s = [s/200 for s in sensor]
        state_t = [t/500 for t in target]
        return state_s + state_t

#%% md

## Trainning and Path configurations

#%%

training = True
render = False
load_model = False
terrain = "map.png"
gif_path = "out/"
model_path = "save/"
if not os.path.exists(model_path):
    os.makedirs(model_path)

#%% md

## Episode Visualization 

#%%

from PIL import Image

def visualize(agent, total_eps=2, message=False, render=False, map_path="map.png", gif_path="out/", gif_name="test.gif"):
    if not os.path.exists(gif_path):
        os.makedirs(gif_path)
    images = []

    env = IndoorDeepRL(map_path=terrain)
    for eps in range(total_eps):
        step = 0
        max_success_rate = 0
        success_count = 0

        state = env.createInstance()
        r_eps = []
        acc_reward = 0.
        while(True):
            action = agent.choose_action(state, eval=True)
            state_next, reward, terminated = env.step(action)
            im = env.render(gui=render)
            im_pil = Image.fromarray(cv2.cvtColor(np.uint8(im*255),cv2.COLOR_BGR2RGB))
            images.append(im_pil)
            r_eps.append(reward)
            acc_reward += reward
            
            if message:
                print('\rEps: {:2d}| Step: {:4d} | action:{:+.2f}| R:{:+.2f}| Reps:{:.2f}  '\
                        .format(eps, step, action[0], reward, acc_reward), end='')
            
            state = state_next.copy()
            step += 1
            if terminated or step>200:
                if message:
                    print()
                break

    print("Create GIF ...")
    if gif_path is not None:
        images[0].save(gif_path+gif_name, save_all=True, append_images=images[1:], optimize=True, duration=40, loop=0)


## Deep RL Algorithms

In [4]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class DDPG():
    def __init__(
            self,
            model,
            learning_rate=[1e-4, 2e-4],
            reward_decay=0.98,
            memory_size=5000,
            batch_size=64,
            tau=0.005,
            epsilon_params=[1.0, 0.5, 0.00001],
            criterion=nn.MSELoss()
    ):
        self.memory_size = memory_size
        self.batch_size = batch_size
        self.tau = tau
        self.memory = {"s": [], "a": [], "r": [], "sn": [], "end": []}
        self.memory_counter = 0
        self.lr = learning_rate
        self.gamma = reward_decay
        self.criterion = criterion
        self.epsilon_params = epsilon_params
        self.epsilon = self.epsilon_params[0]
        self._build_net(model[0], model[1])

    def _build_net(self, anet, cnet):
        self.actor = anet()
        self.actor_optim = optim.Adam(self.actor.parameters(), lr=self.lr[0])
        self.critic = cnet()
        self.critic_optim = optim.Adam(self.critic.parameters(), lr=self.lr[1])
        self.critic_target = cnet()
        self.critic_target.eval()

    def save_load_model(self, op, path):
        anet_path = path + "ddpg_anet.pt"
        cnet_path = path + "ddpg_cnet.pt"
        if op == "save":
            torch.save(self.critic.state_dict(), cnet_path)
            torch.save(self.actor.state_dict(), anet_path)
        elif op == "load":
            self.critic.load_state_dict(torch.load(cnet_path))
            self.critic_target.load_state_dict(torch.load(cnet_path))
            self.actor.load_state_dict(torch.load(anet_path))

    def choose_action(self, s, eval=False):
        s_ts = torch.FloatTensor(np.expand_dims(s, 0))
        action = self.actor(s_ts)
        action = action.cpu().detach().numpy()[0]
        if not eval:
            action += np.random.normal(0, self.epsilon, action.shape)
        else:
            action += np.random.normal(0, self.epsilon_params[1], action.shape)

        action = np.clip(action, -1, 1)
        return action

    def store_transition(self, s, a, r, sn, end):
        if self.memory_counter <= self.memory_size:
            self.memory["s"].append(s)
            self.memory["a"].append(a)
            self.memory["r"].append(r)
            self.memory["sn"].append(sn)
            self.memory["end"].append(end)
        else:
            index = self.memory_counter % self.memory_size
            self.memory["s"][index] = s
            self.memory["a"][index] = a
            self.memory["r"][index] = r
            self.memory["sn"][index] = sn
            self.memory["end"][index] = end

        self.memory_counter += 1

    def soft_update(self):
        # Store sample to replay buffer
        with torch.no_grad():
            for targetParam, evalParam in zip(self.critic_target.parameters(), self.critic.parameters()):
                targetParam.copy_((1 - self.tau) * targetParam.data + self.tau * evalParam.data)

    def learn(self):
        # Sample
        if self.memory_counter > self.memory_size:
            sample_index = np.random.choice(self.memory_size, size=self.batch_size)
        else:
            sample_index = np.random.choice(self.memory_counter, size=self.batch_size)

        s_batch = [self.memory["s"][index] for index in sample_index]
        a_batch = [self.memory["a"][index] for index in sample_index]
        r_batch = [self.memory["r"][index] for index in sample_index]
        sn_batch = [self.memory["sn"][index] for index in sample_index]
        end_batch = [self.memory["end"][index] for index in sample_index]

        # Construct torch tensor
        s_ts = torch.FloatTensor(np.array(s_batch))
        a_ts = torch.FloatTensor(np.array(a_batch))
        r_ts = torch.FloatTensor(np.array(r_batch))
        sn_ts = torch.FloatTensor(np.array(sn_batch))
        end_ts = torch.FloatTensor(np.array(end_batch))

        # TD-target
        with torch.no_grad():
            a_next = self.actor(sn_ts)
            q_next_target = self.critic_target(sn_ts, a_next)
            q_target = r_ts + end_ts * self.gamma * q_next_target

        # Critic loss
        q_eval = self.critic(s_ts, a_ts)
        self.critic_loss = self.criterion(q_eval, q_target)

        self.critic_optim.zero_grad()
        self.critic_loss.backward()
        self.critic_optim.step()

        # Actor loss
        a_curr = self.actor(s_ts)
        q_current = self.critic(s_ts, a_curr)
        self.actor_loss = -q_current.mean()

        self.actor_optim.zero_grad()
        self.actor_loss.backward()
        self.actor_optim.step()

        # Update target network and noise variance
        self.soft_update()
        if self.epsilon > self.epsilon_params[1]:
            self.epsilon -= self.epsilon_params[2]
        else:
            self.epsilon = self.epsilon_params[1]

        return float(self.actor_loss.detach().cpu().numpy()), float(self.critic_loss.detach().cpu().numpy())

class SAC():
    def __init__(
        self,
        model,
        n_actions,
        learning_rate = [1e-4, 2e-4],
        reward_decay = 0.98,
        replace_target_iter = 300,
        memory_size = 5000,
        batch_size = 64,
        tau = 0.01,
        alpha = 0.5,
        auto_entropy_tuning = True,
        criterion = nn.MSELoss()
    ):
        # initialize parameters
        self.n_actions = n_actions
        self.lr = learning_rate
        self.gamma = reward_decay
        self.memory_size = memory_size
        self.batch_size = batch_size
        self.tau = tau
        self.alpha = alpha
        self.auto_entropy_tuning = auto_entropy_tuning
        self.criterion = criterion
        self._build_net(model[0], model[1])
        self.init_memory()

    def _build_net(self, anet, cnet):
        # Policy Network
        self.actor = anet()
        self.actor_optim = optim.Adam(self.actor.parameters(), lr=self.lr[0])
        # Evaluation Critic Network (new)
        self.critic = cnet()
        self.critic_optim = optim.Adam(self.critic.parameters(), lr=self.lr[1])
        # Target Critic Network (old)
        self.critic_target = cnet()
        self.critic_target.eval()

        if self.auto_entropy_tuning == True:
            self.target_entropy = -torch.Tensor(self.n_actions)
            self.log_alpha = torch.zeros(1, requires_grad=True)
            self.alpha_optim = optim.Adam([self.log_alpha], lr=0.0001)
    
    def save_load_model(self, op, path):
        anet_path = path + "sac_anet.pt"
        cnet_path = path + "sac_cnet.pt"
        if op == "save":
            torch.save(self.critic.state_dict(), cnet_path)
            torch.save(self.actor.state_dict(), anet_path)
        elif op == "load":
            self.critic.load_state_dict(torch.load(cnet_path))
            self.critic_target.load_state_dict(torch.load(cnet_path))
            self.actor.load_state_dict(torch.load(anet_path))

    def choose_action(self, s, eval=False):
        s_ts = torch.FloatTensor(np.expand_dims(s,0))
        if eval == False:
            action, _, _ = self.actor.sample(s_ts)
        else:
            _, _, action = self.actor.sample(s_ts)
        
        action = action.cpu().detach().numpy()[0]
        return action

    def init_memory(self):
        self.memory_counter = 0
        self.memory = {"s":[], "a":[], "r":[], "sn":[], "end":[]}

    def store_transition(self, s, a, r, sn, end):
        if self.memory_counter <= self.memory_size:
            self.memory["s"].append(s)
            self.memory["a"].append(a)
            self.memory["r"].append(r)
            self.memory["sn"].append(sn)
            self.memory["end"].append(end)
        else:
            index = self.memory_counter % self.memory_size
            self.memory["s"][index] = s
            self.memory["a"][index] = a
            self.memory["r"][index] = r
            self.memory["sn"][index] = sn
            self.memory["end"][index] = end

        self.memory_counter += 1

    def soft_update(self, TAU=0.01):
        with torch.no_grad():
            for targetParam, evalParam in zip(self.critic_target.parameters(), self.critic.parameters()):
                targetParam.copy_((1 - self.tau)*targetParam.data + self.tau*evalParam.data)

    def learn(self):
        # sample batch memory from all memory
        if self.memory_counter > self.memory_size:
            sample_index = np.random.choice(self.memory_size, size=self.batch_size)
        else:
            sample_index = np.random.choice(self.memory_counter, size=self.batch_size)
        
        s_batch = [self.memory["s"][index] for index in sample_index]
        a_batch = [self.memory["a"][index] for index in sample_index]
        r_batch = [self.memory["r"][index] for index in sample_index]
        sn_batch = [self.memory["sn"][index] for index in sample_index]
        end_batch = [self.memory["end"][index] for index in sample_index]

        # Construct torch tensor
        s_ts = torch.FloatTensor(np.array(s_batch))
        a_ts = torch.FloatTensor(np.array(a_batch))
        r_ts = torch.FloatTensor(np.array(r_batch))
        sn_ts = torch.FloatTensor(np.array(sn_batch))
        end_ts = torch.FloatTensor(np.array(end_batch))
        
        # TD-target
        with torch.no_grad():
            a_next, logpi_next, _ = self.actor.sample(sn_ts)
            q_next_target = self.critic_target(sn_ts, a_next) - self.alpha * logpi_next
            q_target = r_ts + end_ts * self.gamma * q_next_target
        
        # Critic loss
        q_eval = self.critic(s_ts, a_ts)
        self.critic_loss = self.criterion(q_eval, q_target)

        self.critic_optim.zero_grad()
        self.critic_loss.backward()
        self.critic_optim.step()

        # Actor loss
        a_curr, logpi_curr, _ = self.actor.sample(s_ts)
        q_current = self.critic(s_ts, a_curr)
        self.actor_loss = ((self.alpha*logpi_curr) - q_current).mean()

        self.actor_optim.zero_grad()
        self.actor_loss.backward()
        self.actor_optim.step()

        self.soft_update()
        
        # Adaptive entropy adjustment
        if self.auto_entropy_tuning:
            alpha_loss = -(self.log_alpha * (logpi_curr + self.target_entropy).detach()).mean()
            self.alpha_optim.zero_grad()
            alpha_loss.backward()
            self.alpha_optim.step()
            self.alpha = float(self.log_alpha.exp().detach().cpu().numpy())
        
        return float(self.actor_loss.detach().cpu().numpy()), float(self.critic_loss.detach().cpu().numpy())

class PPO():
    def __init__(
        self,
        model,
        learning_rate = [1e-4, 2e-4],
        reward_decay = 0.99,
        batch_size = 2000,
        eps_clip = 0.2
    ):
        self.lr = learning_rate
        self.gamma = reward_decay
        self.batch_size = batch_size
        self.eps_clip = eps_clip
        self._build_net(model[0], model[1])
        self.init_memory()
    
    def _build_net(self, anet, cnet):
        # Policy Network
        self.actor = anet()
        self.actor_optim = optim.Adam(self.actor.parameters(), lr=self.lr[0])
        # Critic Network 
        self.critic = cnet()
        self.critic_optim = optim.Adam(self.critic.parameters(), lr=self.lr[1])
    
    def save_load_model(self, op, path):
        anet_path = path + "ppo_anet.pt"
        cnet_path = path + "ppo_cnet.pt"
        if op == "save":
            torch.save(self.actor.state_dict(), anet_path)
            torch.save(self.critic.state_dict(), cnet_path)
        elif op == "load":
            self.actor.load_state_dict(torch.load(anet_path))
            self.critic.load_state_dict(torch.load(cnet_path))
            
    def choose_action(self, s, eval=False):
        s_ts = torch.FloatTensor(np.expand_dims(s,0))
        logp = None
        if eval == False:
            a_ts, logp_ts = self.actor.sample(s_ts)
            a_ts = torch.clamp(a_ts, min=-1, max=1)
            action = a_ts.cpu().detach().numpy()[0]
            logp = logp_ts.cpu().detach().numpy()[0]
            return action, logp
        else:
            a_ts, logp_ts = self.actor.sample(s_ts)
            a_ts = torch.clamp(a_ts, min=-1, max=1)
            action = a_ts.cpu().detach().numpy()[0]
            return action
    
    def init_memory(self):
        self.memory_counter = 0
        self.memory = {"s":[], "a":[], "r":[], "sn":[], "end":[], "logp":[], "return":[]}
        
    def store_transition(self, s, a, r, sn, end, logp):
        self.memory["s"].append(s)
        self.memory["a"].append(a)
        self.memory["r"].append(r)
        self.memory["sn"].append(sn)
        self.memory["end"].append(end)
        self.memory["logp"].append(logp)
        self.memory_counter += 1

    def run_return(self):
        self.memory["return"] = []
        discounted_reward = 0
        for reward, end in zip(reversed(self.memory["r"]), reversed(self.memory["end"])):
            if end == 0:
                discounted_reward = reward
            discounted_reward = reward + (self.gamma * discounted_reward)
            self.memory["return"].insert(0, discounted_reward)

    def learn(self, iter):
        self.run_return()
        # Construct torch tensor
        s_ts = torch.FloatTensor(np.array(self.memory["s"]))
        a_ts = torch.FloatTensor(np.array(self.memory["a"]))
        r_ts = torch.FloatTensor(np.expand_dims(np.array(self.memory["r"]), 1))
        sn_ts = torch.FloatTensor(np.array(self.memory["sn"]))
        end_ts = torch.FloatTensor(np.expand_dims(np.array(self.memory["end"]), 1))

        logp_ts = torch.FloatTensor(np.expand_dims(np.array(self.memory["logp"]), 1))
        return_ts = torch.FloatTensor(np.expand_dims(np.array(self.memory["return"]), 1))
        return_ts = (return_ts - return_ts.mean()) / (return_ts.std() + 1e-5)

        for it in range(1):
            # Evaluate policy and state-value
            dist = self.actor.distribution(s_ts)
            logp_curr = dist.log_prob(a_ts)
            ent   = dist.entropy()
            value = self.critic(s_ts)

            # Compute loss
            ratio = (logp_curr - logp_ts.detach()).exp()
            advantage = return_ts - value.detach()
            surr1 = advantage * ratio
            surr2 = advantage * torch.clamp(ratio, 1-self.eps_clip, 1+self.eps_clip)
            pg_loss = (-advantage*logp_curr).mean() #-torch.min(surr1, surr2).mean()
            v_loss = torch.nn.MSELoss()(value, return_ts).mean()
            ent_loss = ent.mean()
            loss = pg_loss + 0.5*v_loss - 0.01*ent_loss
            
            # Optimize parameters
            self.critic_optim.zero_grad()
            self.actor_optim.zero_grad()
            loss.backward()
            self.critic_optim.step()
            self.actor_optim.step()
            if it%10 == 0:
                print(  "Iter", it, \
                        ", pg_loss:", pg_loss.detach().cpu().numpy(), \
                        ", ent_loss:", ent_loss.detach().cpu().numpy(), \
                        ", v_loss:", v_loss.detach().cpu().numpy())

        self.init_memory()

##DDPG Agent training

In [5]:
batch_size = 64
eval_eps = 50
LOG_SIG_MAX = 2
LOG_SIG_MIN = -20
epsilon = 1e-6

class PolicyNet(nn.Module):
    def __init__(self):
        super(PolicyNet, self).__init__()
        self.layer1 = nn.Linear(23, 512)
        self.layer2 = nn.Linear(512, 512)
        self.layer3 = nn.Linear(512, 512)
        self.layer4 = nn.Linear(512, 2)

    def forward(self, s):
        h_fc1 = F.relu(self.layer1(s))
        h_fc2 = F.relu(self.layer2(h_fc1))
        h_fc3 = F.relu(self.layer3(h_fc2))
        mu = torch.tanh(self.layer4(h_fc3))
        return mu

class QNet(nn.Module):
    def __init__(self):
        super(QNet, self).__init__()
        self.layer1 = nn.Linear(23, 512)
        self.layer2 = nn.Linear(512+2, 512)
        self.layer3 = nn.Linear(512, 512)
        self.layer4 = nn.Linear(512, 1)
    
    def forward(self, s, a):
        h_fc1 = F.relu(self.layer1(s))
        h_fc1_a = torch.cat((h_fc1, a), 1)
        h_fc2 = F.relu(self.layer2(h_fc1_a))
        h_fc3 = F.relu(self.layer3(h_fc2))
        q_out = self.layer4(h_fc3)
        return q_out

agent_mind_ddpg = DDPG(
    model = [PolicyNet, QNet],
    learning_rate = [0.0001, 0.0001],
    reward_decay = 0.99,
    memory_size = 10000,
    batch_size = batch_size)

if load_model:
    print("Load model ...", model_path)
    agent_mind_ddpg.save_load_model("load", model_path)

env = IndoorDeepRL(map_path=terrain)
total_steps = 0
best_rate = 0
success_count = 0

print("eps, step, total_steps, action[0], reward, loss_a, loss_c, agent_mind_ddpg.epsilon, acc_reward/step")
for eps in range(4500): # how many episode do you want?
    state = env.createInstance()
    step = 0
    loss_a = loss_c = 0.
    acc_reward = 0.
    while(True):
        # Choose action and run
        if training:
            action = agent_mind_ddpg.choose_action(state, eval=False)
        else:
            action = agent_mind_ddpg.choose_action(state, eval=True)
        state_next, reward, terminated = env.step(action)
        end = 0 if terminated else 1
        agent_mind_ddpg.store_transition(state, action, reward, state_next, end)

        # Render environment
        im = env.render(gui=render)

        # Learn the model
        loss_a = loss_c = 0.
        if total_steps > batch_size and training:
            loss_a, loss_c = agent_mind_ddpg.learn()
        step += 1
        total_steps += 1
        # Print information
        acc_reward += reward
        print('\r{:3d} ; {:4d}; {:6d}; {:+.2f}; {:+.2f}; {:+.2f}; {:+.2f}; {:.3f}; {:.2f}'
              .format(eps, step, total_steps, action[0], reward, loss_a, loss_c, agent_mind_ddpg.epsilon, acc_reward/step), end='')
        state = state_next.copy()
        if terminated or step>200:
            # Count the successful times
            if reward > 2:
                success_count += 1
            print()
            break

    if eps>0 and eps%eval_eps==0:
        # Sucess rate
        success_rate = success_count / eval_eps
        success_count = 0
        # Save the best model
        if success_rate >= best_rate:
            best_rate = success_rate
            if training:
                print("Save model to " + model_path)
                agent_mind_ddpg.save_load_model("save", model_path)
        print("Success Rate (current/max):", success_rate, "/", best_rate)


TypeError: ignored

In [None]:
visualize(agent_mind_ddpg, total_eps=4, map_path=terrain, gif_path=gif_path, gif_name="ddpg_"+str(eps).zfill(4)+".gif")

## SAC Training


In [None]:
batch_size = 64
eval_eps = 50
LOG_SIG_MAX = 2
LOG_SIG_MIN = -20
epsilon = 1e-6

from torch.distributions import Normal

class PolicyNetGaussian(nn.Module):
    def __init__(self):
        super(PolicyNetGaussian, self).__init__()
        self.layer1 = nn.Linear(23, 512)
        self.layer2 = nn.Linear(512, 512)
        self.layer3 = nn.Linear(512, 512)
        self.fc4_mean = nn.Linear(512, 2)
        self.fc4_logstd = nn.Linear(512, 2)

    def forward(self, s):
        h_fc1 = F.relu(self.layer1(s))
        h_fc2 = F.relu(self.layer2(h_fc1))
        h_fc3 = F.relu(self.layer3(h_fc2))
        a_mean = self.fc4_mean(h_fc3)
        a_logstd = self.fc4_logstd(h_fc3)
        a_logstd = torch.clamp(a_logstd, min=LOG_SIG_MIN, max=LOG_SIG_MAX)
        return a_mean, a_logstd
    
    def sample(self, s):
        a_mean, a_logstd = self.forward(s)
        a_std = a_logstd.exp()
        normal = Normal(a_mean, a_std)
        x_t = normal.rsample()
        action = torch.tanh(x_t)
        log_prob = normal.log_prob(x_t)

        # Enforcing action Bound
        log_prob -= torch.log(1 - action.pow(2) + epsilon)
        log_prob = log_prob.sum(1, keepdim=True)
        return action, log_prob, torch.tanh(a_mean)

agent_mind_sac = SAC(
    model = [PolicyNetGaussian, QNet],
    n_actions = 2,
    learning_rate = [0.0001, 0.0001],
    reward_decay = 0.99,
    memory_size = 10000,
    batch_size = batch_size,
    alpha = 0.1,
    auto_entropy_tuning=True)

if load_model:
    print("Load model ...", model_path)
    agent_mind_sac.save_load_model("load", model_path)

env = IndoorDeepRL(map_path=terrain)
total_step = 0
max_success_rate = 0
success_count = 0

print("eps, step, total_step, action[0], reward, loss_a, loss_c, agent_mind_sac.alpha, acc_reward/step")
for eps in range(4500):
    state = env.createInstance()
    step = 0
    loss_a = loss_c = 0.
    acc_reward = 0.

    while(True):
        # Choose action and run
        if training:
            action = agent_mind_sac.choose_action(state, eval=False)
        else:
            action = agent_mind_sac.choose_action(state, eval=True)
        state_next, reward, terminated = env.step(action)
        end = 0 if terminated else 1
        agent_mind_sac.store_transition(state, action, reward, state_next, end)

        # Render environment
        im = env.render(gui=render)

        # Learn the model
        loss_a = loss_c = 0.
        if total_step > batch_size and training:
            loss_a, loss_c = agent_mind_sac.learn()
        step += 1
        total_step += 1

        # Print information
        acc_reward += reward
        print('\r {:3d}; {:4d}; {:6d}; {:+.2f}; {:+.2f}; {:+.2f}; {:+.2f};  {:.3f}; {:.2f}  '\
                .format(eps, step, total_step, action[0], reward, loss_a, loss_c, agent_mind_sac.alpha, acc_reward/step), end='')

        state = state_next.copy()
        if terminated or step>200:
            # Count the successful times
            if reward > 5:
                success_count += 1
            print()
            break

    if eps>0 and eps%eval_eps==0:
        # Sucess rate
        success_rate = success_count / eval_eps
        success_count = 0
        # Save the best model
        if success_rate >= max_success_rate:
            max_success_rate = success_rate
            if training:
                print("Save model to " + model_path)
                agent_mind_sac.save_load_model("save", model_path)
        print("Success Rate (current/max):", success_rate, "/", max_success_rate)

In [None]:
visualize(agent_mind_sac, total_eps=4, map_path=terrain, gif_path=gif_path, gif_name="sac_"+str(eps).zfill(4)+".gif")

## PPO Training 

In [None]:
from torch.distributions import Normal

class PPOPolicy(nn.Module):
    def __init__(self):
        super(PPOPolicy, self).__init__()
        self.layer1 = nn.Linear(23, 512)
        self.layer2 = nn.Linear(512, 512)
        self.layer3 = nn.Linear(512, 512)
        self.fc4_mean = nn.Linear(512, 2)
        self.fc4_logstd = nn.Linear(512, 2)

    def forward(self, s):
        h_fc1 = F.relu(self.layer1(s))
        h_fc2 = F.relu(self.layer2(h_fc1))
        h_fc3 = F.relu(self.layer3(h_fc2))
        a_mean = torch.tanh(self.fc4_mean(h_fc3))
        a_logstd = self.fc4_logstd(h_fc3)
        return a_mean, a_logstd
    
    def distribution(self, s):
        a_mean, a_logstd = self.forward(s)
        a_std = a_logstd.exp()
        dist = Normal(a_mean, a_std)
        return dist

    def sample(self, s):
        dist = self.distribution(s)
        a_samp = dist.sample()
        logp = dist.log_prob(a_samp)
        return a_samp, logp

class ValueNet(nn.Module):
    def __init__(self):
        super(ValueNet, self).__init__()
        self.layer1 = nn.Linear(23, 512)
        self.layer2 = nn.Linear(512, 512)
        self.layer3 = nn.Linear(512, 512)
        self.layer4 = nn.Linear(512, 1)
    
    def forward(self, s):
        h_fc1 = F.relu(self.layer1(s))
        h_fc2 = F.relu(self.layer2(h_fc1))
        h_fc3 = F.relu(self.layer3(h_fc2))
        v_out = self.layer4(h_fc3)
        return v_out

agent_mind_ppo = PPO(
    model = [PPOPolicy, ValueNet],
    learning_rate = [0.0001, 0.0001],
    reward_decay = 0.99,
    batch_size = 10)

render = True
if load_model:
    print("Load model ...", model_path)
    agent_mind_ppo.save_load_model("load", model_path)


env = IndoorDeepRL(map_path=terrain)
total_step = 0
max_success_rate = 0
success_count = 0

print("eps, step, total_step, action[0], reward, acc_reward/step")
for eps in range(4500):
    state = env.createInstance()
    step = 0
    loss_a = loss_c = 0.
    acc_reward = 0.

    while(True):
        # Choose action and run
        if training:
            action, logp = agent_mind_ppo.choose_action(state, eval=False)
        else:
            action, logp = agent_mind_ppo.choose_action(state, eval=True)
        state_next, reward, terminated = env.step(action)
        end = 0 if terminated else 1
        agent_mind_ppo.store_transition(state, action, reward, state_next, end, logp)

        # Render environment
        im = env.render(gui=render)

        # Learn the model
        step += 1
        total_step += 1

        # Print information
        acc_reward += reward
        print('\r {:3d}; {:4d}; {:6d}; {:+.2f}; {:+.2f}; {:.2f} '\
                .format(eps, step, total_step, action[0], reward, acc_reward/step), end='')

        state = state_next.copy()
        if terminated or step>200:
            # Count the successful times
            if reward > 5:
                success_count += 1
            print()
            break

    if agent_mind_ppo.memory_counter >= agent_mind_ppo.batch_size:
        agent_mind_ppo.learn(100)

    if eps>0 and eps%eval_eps==0:
        # Sucess rate
        success_rate = success_count / eval_eps
        success_count = 0
        # Save the best model
        if success_rate >= max_success_rate:
            max_success_rate = success_rate
            if training:
                print("Save model to " + model_path)
                agent_mind_ppo.save_load_model("save", model_path)
        print("Success Rate (current/max):", success_rate, "/", max_success_rate)

In [None]:
visualize(agent_mind_ppo, total_eps=4, map_path=terrain, gif_path=gif_path, gif_name="ppo_"+str(eps).zfill(4)+".gif")