In [1]:
import numpy as np
import pandas as pd
from tqdm import tqdm
from SAC.Agent import Agent
from environment import Env
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow, show   
from matplotlib import rcParams
from IPython import display
import time
import copy
import torch
import cv2
from IPython.display import clear_output

In [6]:
class Agent_Training():
    def __init__(self, image_dims, seed = 1):
        # Initialisations
        self.seed = seed
        self.image_dims = image_dims
        self.n_epochs = 250
        self.n_episodes = 15
        self.episode_len = 100
        self.seq_len = 4
        self.env = Env(self.image_dims, self.seed)
        self.n_actions = self.env.action_space.shape[0]
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

    def update_sequences(self, seq_observation, seq_action, observation, actions):
        seq_observation = np.roll(seq_observation, -1, axis=0)
        seq_observation[-1] = observation
        seq_action = np.roll(seq_action, -1, axis=0)
        seq_action[-1] = actions

        return seq_observation, seq_action
    
    @torch.no_grad()
    def initial_window(self):
        observation = self.env.reset()
        seq_observation = []
        seq_observation_ = []
        seq_actions = []     
           
        for i in range(self.seq_len): 
            action = [0 for i in range(self.n_actions)]
            observation_, obs = self.env.step(action)
            seq_observation.append(observation)
            seq_observation_.append(observation_)
            seq_actions.append(action)
            observation = observation_
        
        return np.array(seq_observation), np.array(seq_observation_), np.array(seq_actions, dtype=np.float64)
    

    def initial_window_all(self):
        final_seq_observation, final_seq_observation_, final_seq_actions = self.initial_window()
        return final_seq_observation, final_seq_actions


    def test_actor_video(self):
        frames = [] # store the frames for the video
        observations = []
        
        with tqdm(total=self.n_episodes*self.episode_len) as pbar:
            for i in range(self.n_episodes):
                seq_observation, seq_action = self.initial_window_all()
                seq_observation_ = copy.deepcopy(seq_observation)
                
                for t in range(self.episode_len):  
                    action = [0., 0., 0.] # static arms
                    
                    # get camera views
                    view1, view2 = self.env.my_render(dims = self.image_dims) # two different views - front and left. Two images instead of 1, test 1 vs 2 views
                    concatenated_image = np.concatenate([view1[:,:,::-1], view2[:,:,::-1]], axis=2).transpose(2, 1, 0) / 255.0 
                    frames.append(concatenated_image)
                    
                    # perform a step
                    observation_, obs = self.env.step(action)
                    observations.append(obs)
                    seq_observation_, seq_action = self.update_sequences(seq_observation_, seq_action, observation_, action)
                    seq_observation = seq_observation_
                print(i)
        
        return frames, observations

In [8]:
self = Agent_Training(image_dims = 32)
frames, observations = self.test_actor_video()

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14


In [None]:
side_images = []
for i in range(len(frames)):
    x = frames[i]
    # swap the first and last dimensions
    x = np.swapaxes(x, 0, 2)
    side_images.append(x[:, :, :3])

In [None]:
for image in side_images:
    plt.figure(figsize=(9, 9))
    plt.imshow(image)
    plt.axis('off')  # Turn off the axis
    plt.pause(0.005)  # Pause for 1 second between images
    # clear the current image
    clear_output(wait=True)

KeyboardInterrupt: 