In [None]:
from mlagents_envs.environment import UnityEnvironment
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math

import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (7,12)

# Step history
steps = []

In [None]:
class TestNNInUnity():
    
    def __init__(self, model_path):
        self.agent = torch.load(model_path)
        self.agent.eval()
        print("1. Model Loaded.")
        
        print("2. Connecting Unity Environment...")
        self.env = UnityEnvironment(file_name= None, base_port=5004) 
        self.env.reset()
        print("3. Connected.")
        
        self.behaviorNames = list(self.env.behavior_specs.keys())
        self.behaviorName = self.behaviorNames[0]
        self.behavior_spec = self.env.behavior_specs[self.behaviorName]
        self.num_inputs  = self.behavior_spec.observation_shapes[0][0]
        self.num_outputs = self.behavior_spec.action_spec[0]
        print("4. Connection with Unity is ready!")
        
        if(torch.cuda.is_available()):
            self.device = torch.device("cuda")
        else:
            self.device= torch.device("cpu")
        print("5. Going to use device = ", self.device)
       
    
        
    '''  
    ########################################  Test Unity Environment  ########################################
    '''    
    def test(self, episodes_to_collect=10, max_steps_per_episode=1000):
        self.episodes_to_collect = episodes_to_collect
        self.max_steps_per_episode = max_steps_per_episode 
        print("6. Testing your trained model...")
        with torch.no_grad():
            total_Steps = []
            for game in range(self.episodes_to_collect):
                for step in range(self.max_steps_per_episode): 

                    # Get Observation
                    step_result = self.env.get_steps(self.behaviorName) 
                    DecisionSteps = step_result[0] 
                    TerminalSteps = step_result[1]

                    if len (TerminalSteps.obs[0]) > 0:
                        self.env.reset()
                        total_Steps.append(step)
                        print(f'Episode #{game}. Avg # of steps so far ==> {np.mean(total_Steps)}')
                        break
                    
                    state = DecisionSteps.obs[0][0]
                    state = torch.FloatTensor(state).to(self.device)

                    # Get Steps
                    hidden, _ = self.agent.get_dists([state], [], masks=torch.ones((1,6)))
                    actions = []
                    for dist in hidden:
                        actions.append(dist.sample().cpu().numpy()[0][0])

                    self.env.set_actions(self.behaviorName, np.array([actions]))
                    self.env.step()

            print(f"7. Done. Average steps = {np.mean(total_Steps)}\n\n")
            return total_Steps
        
        
        
        
    '''  
    ########################################  Test Unity Environment with modified probability  ########################################
    ############################# use this only if you want to take different actions than trained policy ##############################
    '''
    def test_Modified(self, episodes_to_collect=10, max_steps_per_episode=1000, prob=0.9):
        self.episodes_to_collect = episodes_to_collect
        self.max_steps_per_episode = max_steps_per_episode 
        self.probability = prob
        
        self.trajectories = {}
        
        print("6. Testing your trained model...")
        with torch.no_grad():
            total_Steps = []
            for game in range(self.episodes_to_collect):
                self.trajectories[game]=[]
                for step in range(self.max_steps_per_episode): 

                    # Get Observation
                    step_result = self.env.get_steps(self.behaviorName) 
                    DecisionSteps = step_result[0] 
                    TerminalSteps = step_result[1]

                    if len (TerminalSteps.obs[0]) > 0:
                        self.env.reset()
                        total_Steps.append(step)
                        print(f'Episode #{game}. Avg # of steps so far ==> {np.mean(total_Steps)}')
                        self.trajectories[game] = np.array(self.trajectories[game])
                        break
                    
                    state = DecisionSteps.obs[0][0]
                    self.trajectories[game].append([(state[1]+0.25)*-1, (state[2]+3.64)*-1])
                    #print(f"X = {state[1]+0.25}, Z = {state[2]+3.64}")
                    state = torch.FloatTensor(state).to(self.device)
                    
                    

                    # Get Steps
                    actions = []
                    dists = self.agent.get_dists([state], [], masks=torch.ones((1,6)))
                    percentage = 0.9
                    for dist in dists[0]:
                        probs = dist.probs.cpu().detach().numpy()
                        diff = (np.max(probs)-(np.max(probs)*self.probability))/2
                        probs_ = [[x+diff if x != np.max(probs) else np.max(probs)*self.probability for x in probs[0]]]
                        actions.append(torch.multinomial(torch.tensor(probs_).to(self.device), 1).cpu().numpy()[0][0])

                        
                    self.env.set_actions(self.behaviorName, np.array([actions]))
                    self.env.step()

            print(f"7. Done. Average steps = {np.mean(total_Steps)}\n\n")
            return total_Steps, self.trajectories

In [None]:
tester = TestNNInUnity("Turtlebot3_0331.pth")

'''
Use the line below if you want to use the trained policy, WITHOUT ANY PROBABILITY MODIFICATIONS
'''
steps.append(tester.test(episodes_to_collect=100, max_steps_per_episode=10000)) 


'''  
Use the line below if you want to MODIFY THE PROBABILITY.
prob ===>  if you set prob = 0.9,  it means we will reduce the maximum probability by 10% 
'''
#steps, trajectories = tester.test_Modified(games_to_play=100, max_steps_per_episode=10000, prob=0.6)
print("steps = ", steps)

# Read trajectories from CSV and graph them as a transparent PNG
### ***** IMPORTANT: Only use when the "test" function is used! ****
### ***** DO NOT USE IF YOU USED THE test_Modified function! *****
###  If you use the test_modified function, skip to the last code cell

In [None]:
import pandas as pd
import numpy as np
pth = "M:\ML Agents\envs\CarDiscreteLargeMaze\LSTM_Stacked_large.csv"
df = pd.read_csv(pth)
array = np.array(df.iloc[: , :2])
counter = 0
trajectories = {}
for row in array:
    if row[0] != 0:
        if counter not in trajectories.keys():
            trajectories[counter]=[]
        trajectories[counter].append([float(x) for x in row])
    
    else:
        trajectories[counter] = np.array(trajectories[counter])
        counter+=1

        
''' 
this will save the trajectory as a transparent PNG, ready to paste on top of a picture of your environment!
'''
plt.rcParams['figure.figsize'] = (7,12)
for _,traj in trajectories.items():
    plt.axis('off')
    plt.plot(traj[:,0],traj[:,1], color='black')
    # for x,y in traj:
    #     plt.plot(x,y, color='blue')
plt.savefig("fig.png", transparent=True)    

# Graph the trajectories
# IMP: Only use when "test_Modified" is used!

In [None]:
plt.rcParams['figure.figsize'] = (7,10)

In [None]:
''' 
this will save the trajectory as a transparent PNG, ready to paste on top of a picture of your environment!
'''
plt.rcParams['figure.figsize'] = (7,12)
for _,traj in trajectories.items():
    plt.axis('off')
    plt.plot(traj[:,0],traj[:,1], color='black')
    # for x,y in traj:
    #     plt.plot(x,y, color='blue')
plt.savefig("fig.png", transparent=True)