In [None]:
import cv2 as cv
import numpy as np
import serial
import time
import matplotlib.pyplot as plt
from CSI_Camera import CSI_Camera
from collections import namedtuple, deque

import torch
import torch.nn as nn
import torch.optim as optim


In [None]:
class Environment(object):
    def __init__(self):
        self.camera = CSI_Camera(0)
        self.time_last_action = 0
        self.last_action = None
        self.last_frame = None
        self.ACTION_MAP = {0:'s', 1:'f', 2:'b'}
    def reset(self):
        # Check if cameras are opened.
        if not self.camera.isRunning() or not self.camera.isOpened():
            # Open and start camera
            self.camera.open()
            self.camera.start()

        # Initialize variables
        self.time_last_action = 0
        self.last_action = None

        # Read frame from camera
        _, frame = self.camera.read()
        # self.last_frame = frame
        return self.get_state(frame)
    def close(self):
        # Stop the camera and close them
        if not self.camera.isOpened():
            self.stop()
            self.release()
    def get_state(self, frame):
        frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY, dst=None)
        frame = cv.resize(frame, (240, 240), dst=None, interpolation=cv.INTER_CUBIC)
        state = torch.tensor(frame, device=self.DEVICE).unsqueeze(0)/255
        return state
    def step(self, action):
        try:
            action = self.ACTION_MAP[action]
            # reset if new action.
            if self.last_action != action:
                self.time_last_action = 0

            # Read new frame from camera
            _, frame = self.camera.read()
            state = self.get_state(frame)

            # Calculate reward of the last (state, action)
            reward, colision = self.reward(action, state)

            # Accumlate time since executing same action.
            # Reset if new action is executed
            self.time_last_action += 1
            
            # Update last_action and last_frame
            self.last_action = action
            self.last_frame = frame

            return state, reward, colision
        except RuntimeError:
            print("Error: Camera not opened.")
            
    def reward(self, action, frame):
        collision = False
        if self.last_action == action:
            # Check for collision:
            if action in ['f', 'b']:
                # Comparing the current frame and the last frame. 
                # This is done by comparing the histograms of both frames. 
                # Convert to HSV color space
                last_frame_hsv = cv.cvtColor(self.last_frame, cv.COLOR_BGR2HSV)
                frame_hsv = cv.cvtColor(frame, cv.COLOR_BGR2HSV)
                
                # Histogram parameters
                hist_size = [50, 60]
                ranges = [0, 180, 0, 256]
                channels = [0, 1]

                # Calculate the Histogram of the last frame and normalize it.
                hist_last_frame = cv.calcHist([last_frame_hsv], channels, None, hist_size, ranges, accumulate=False)
                cv.normalize(hist_last_frame, hist_last_frame, alpha=0, beta=1, norm_type=cv.NORM_MINMAX)
                
                # Calculate the Histogram of the current frame and normalize it.
                hist_frame = cv.calcHist([frame_hsv], channels, None, hist_size, ranges, accumulate=False)
                cv.normalize(hist_frame, hist_frame, alpha=0, beta=1, norm_type=cv.NORM_MINMAX)

                # Calculate the correlation coefficient between the histograms
                correlation = cv.compareHist(hist_last_frame, hist_frame, 0)

                # If correlation is higher than 0.95 then we can say that it's a collision with a barrier.
                if correlation >= 0.95:
                    # Collision
                    collision = True
                    reward = -10
        if not collision:
            if action == 's':
                # Reward for stopping
                reward = min(-1, -1 * self.time_last_action)
            elif action in ['l', 'r']:
                reward = 1
            elif action in ['f','b']:
                # Going forward or backwards
                reward = max(2, 2 * self.time_last_action)
        return reward, collision

In [None]:
class ActorLoss(nn.Module):
    def __init__(self):
        super(ActorLoss, self).__init__()
    def forward(self, action_log_probs, advantage):
        return torch.mean(-action_log_probs * advantage)
        
class ActorCriticLoss(nn.Module):
    def __init__(self):
        super(ActorCriticLoss, self).__init__()
        self.actor_criterion = ActorLoss()
        self.critic_criterion = nn.HuberLoss(reduction='mean')
    def forward(self, action_prob, next_value, value, reward, policy_entropy, beta=0.001, discount=0.99):
        q_value = (reward + discount * next_value)
        advantage = q_value - value
        action_log_probs = torch.log(action_prob)

        loss_actor = self.actor_criterion(action_log_probs, advantage).to(torch.float)
        loss_critic = self.critic_criterion(value, q_value).to(torch.float)

        return loss_actor + loss_critic + (policy_entropy * beta)

In [None]:
class ActorCritic(nn.Module):
    def __init__(self, num_actions):
        super(ActorCritic, self).__init__()
        self.CNN = nn.Sequential(
            nn.Conv2d(in_channels=1,out_channels=8,kernel_size=5,stride=3),
            nn.LeakyReLU(),
            nn.Conv2d(in_channels=8,out_channels=8,kernel_size=5,stride=3),
            nn.LeakyReLU(),
            nn.Conv2d(in_channels=8,out_channels=8,kernel_size=3,stride=2),
            nn.LeakyReLU(),
            nn.AvgPool2d(kernel_size=2,stride=2),
            nn.Flatten(),
            nn.Linear(in_features=288, out_features=48),
            nn.LeakyReLU(),
        )
        self.Actor = nn.Sequential(
            nn.Linear(in_features=48, out_features=num_actions),
            nn.Softmax(dim=1)
        )
        self.Critic = nn.Sequential(
            nn.Linear(in_features=48, out_features=1)
        )
    def forward(self,state):
        features = self.CNN(state)
        
        value = self.Critic(features)
        policy = self.Actor(features)

        return value, policy   

In [None]:
Transition = namedtuple('Transition', ('action_prob', 'value', 'reward'))
class Memory(object):
  def __init__(self, capacity):
    self.memory = deque([], maxlen=capacity)

  def __iter__(self):
    return self.memory.__iter__
  
  def __len__(self):
    return len(self.memory)

  def push(self, *args):
    self.memory.append(Transition(*args))

In [None]:
class Agent(object):
    def __init__(self, mem_cap, LEARNING_RATE=25e-4):
        self.ACTION_MAP = {0:'s', 1:'f', 2:'b', 3:'l', 4:'r'}
        self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        self.driver = serial.Serial("/dev/ttyTHS1", 9600, timeout=1)
        self.driver.reset_input_buffer()
        self.last_action = None

        self.model = ActorCritic(3).to(self.DEVICE)
        self.optimizer = optim.Adam(params=self.model.parameters(), lr=LEARNING_RATE)
        self.criterion = ActorCriticLoss()
        self.memory = Memory(mem_cap)

    def select_action(self, policy):
        policy_dist = torch.distributions.Categorical(policy.view(-1))
        action = policy_dist.sample().item()
        return action

    def execute_action(self, action):
        action = self.ACTION_MAP[action]
        if action in ['f','b'] and self.last_action in ['f','b'] and self.last_action != action:
            # Have to stop first to go opposite direction
            self.driver.write('s'.encode('ascii'))
            time.sleep(1)
        self.driver.write(action.encode('ascii'))
        time.sleep(1)
        self.last_action = action

    def wait_for_next_epsiode(self):
        self.driver.write('s'.encode('ascii'))
        time.sleep(1)
        self.driver.write('c'.encode('ascii'))
    
    # def optimize_policy(self, action_prob, next_value, value, reward, policy_entropy):    
    #     # Calculate Loss
    #     loss = self.criterion(action_prob, next_value, value, reward, policy_entropy)

    #     # Backprop & weight update
    #     self.optimizer.zero_grad()
    #     self.loss.backward()
    #     self.optimizer.step()
    #     self.optimizer.zero_grad()
    def optimize_policy(self, next_value):
        for i,(_,_,reward) in enumerate(self.memory):
            print(reward)

    def add_memory(self, *args):
        self.memory.push(*args)

In [None]:
env = Environment()

In [None]:
# Hyperparameters
EPISODES = 2
STEPS = 2

agent = Agent(STEPS)

In [None]:
# Have robot spin to keep arduino on.
agent.wait_for_next_epsiode()
print('what is going on?')
# # policy_entropy = torch.tensor(0, dtype=torch.float, requires_grad=True, device=DEVICE)
# returns = []
# for episode in range(EPISODES):
#     action_sequence = []
#     episode_reward = 0

#     state = env.reset()
#     for step in range(STEPS):
#         print(f'\rStep: {step+1}', end='')
#         value, policy = agent.model(state.unsqueeze(0))

#         with torch.autograd.no_grad():
#         # Select action from policy
#             action = agent.select_action(policy)

#         action_prob = policy.view(-1)[action]

#         # Calculate policy entropy
#         # policy_entropy = -torch.sum(policy.view(-1) * torch.log(policy.view(-1))) 

#         if step == 0:
#             # Stop robot from spinning.
#             agent.execute_action(0)
#         agent.execute_action(action)

#         # Transition to next state
#         new_state, reward = env.step(action)

#         action_sequence.append(action)
#         agent.add_memory(action_prob, value, reward)
#         episode_reward += reward

#         state = new_state
#         if 0 < step < STEPS:
#             with torch.inference_mode():
#                 next_value, _ = agent.model(state.unsqueeze(0))
#             # agent.optimize_policy(action_prob, next_value.view(-1), value.view(-1), reward, policy_entropy)
#             agent.optimize_policy(next_value)

#     returns.append(episode_reward)
#     if episode % 1 == 0:
#         print(f'\nEpisode {episode+1}/{EPISODES}: Reward {episode_reward}')

#     # Have robot spin to keep arduino on.
#     agent.wait_for_next_epsiode()
    
# agent.execute_action(0)

In [None]:
agent.model.state_dict()