In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import sys
import argparse
import numpy as np
import matplotlib.pyplot as plt

import torch
from torch.autograd import Variable

#from PCHID.pchid import *
from PCHID.dataset.dataset import *
from PCHID.utility.utils import *
from PCHID.model_vin import *
from PCHID.domains.gridworld import *
from PCHID.generators.obstacle_gen import *

IMAGE_SIZE = 16
N_ACTIONS = 8
N_STATES = 2 + IMAGE_SIZE * IMAGE_SIZE
FILE_NAME = '_size16_50obs_5000dom'
DOMAIN_NUM = 5000
K = 20


class conf():
    def __init__(self,
                 datafile = 'PCHID/dataset/gridworld_RL_{0}x{1}'.format(IMAGE_SIZE, IMAGE_SIZE) + FILE_NAME ,
                 image_size = IMAGE_SIZE,
                 lr = 0.005,
                 epochs = 30,
                 k = K,
                 l_i = 2,
                 l_h = 150,
                 l_q = 10,
                 batch_size = 128,
                 DOMAIN_NUM = DOMAIN_NUM,
                 algorithm = 'K_step',
                 experiment = '1step'):
        self.domain_num = DOMAIN_NUM
        self.l_i = l_i
        self.l_h = l_h
        self.image_size = image_size
        self.lr = lr
        self.epochs = epochs
        self.k = k
        self.l_q = l_q
        self.batch_size = batch_size
        self.datafile = datafile
        self.algorithm = algorithm
        self.experiment = experiment
        self.file_name = FILE_NAME

def test_model_vin_module(image_size_test, file_name, K=20):
    lb = (image_size_test / 2)**2
    ub = (image_size_test - 1)**2
    obs_list = [10]
    Acc = []
    L_list = []
    for obs_num in obs_list:
        L_temp = []

        def X2R(X):
            X = X.cpu().numpy()
            xargmax = np.argmax(X[1])
            goal = [xargmax // config.image_size, xargmax % config.image_size]
            G = gridworld(1 - X[0], goal[0], goal[1])
            R = X[1] + 1 - (1 - X[0]) * 0.02 - 2 * X[0]
            return R

        def main(config, n_domains=1000, max_obs=obs_num, max_obs_size=None, n_traj=1, n_actions=8):
            correct, total = 0.0, 0.0
            use_GPU = True
            vin = VIN(config)
            vin.load_state_dict(torch.load(config.weights))
            if use_GPU:
                vin = vin.cuda()
            for dom in range(n_domains):
                try:
                    (G,value_prior,states_xy,states_xy,goal) = np.load('test_data_16/test_map_test{}.npy'.format(dom))
                except:
                    continue
                for i in range(n_traj):
                    if len(states_xy[i]) > 1:
                        # Get number of steps to goal
                        L = len(states_xy[i]) * 2
                        L_temp.append(L)
                        # Allocate space for predicted steps
                        pred_traj = np.zeros((L, 2))
                        # Set starting position
                        pred_traj[0, :] = states_xy[i][0, :]

                        for j in range(1, L):
                            state_data = pred_traj[j - 1, :]
                            state_data = state_data.astype(np.int)

                            # Transform domain to Networks expected input shape
                            im_data = G.image.astype(np.int)
                            im_data = 1 - im_data
                            im_data = im_data.reshape(1, 1, config.image_size, config.image_size)
                            # Transfrom value prior to Networks expected input shape
                            value_data = value_prior.astype(np.int)
                            value_data = value_data.reshape(1, 1, config.image_size, config.image_size)
                            # Get inputs as expected by network
                            X_in = torch.from_numpy(np.append(im_data, value_data, axis=1)).float()
                            S1_in = torch.from_numpy(state_data[0].reshape([1, 1])).float()
                            S2_in = torch.from_numpy(state_data[1].reshape([1, 1])).float()
                            # Send Tensors to GPU if available
                            if use_GPU:
                                X_in = X_in.cuda()
                                S1_in = S1_in.cuda()
                                S2_in = S2_in.cuda()
                            # Wrap to autograd.Variable
                            X_in, S1_in, S2_in = Variable(X_in), Variable(S1_in), Variable(S2_in)
                            R = torch.FloatTensor(X2R(X_in[0])).cuda()
                            S = torch.cat([R.flatten(), S1_in.flatten(), S2_in.flatten()], 0)
                            # Forward pass in our neural net
                            predictions = vin(S, config)
                            _, indices = torch.max(predictions.cpu(), 1, keepdim=True)
                            a = indices.data.numpy()[0][0]
                            # Transform prediction to indices
                            s = G.map_ind_to_state(pred_traj[j - 1, 0], pred_traj[j - 1, 1])
                            ns = G.sample_next_state(s, a)
                            nr, nc = G.get_coords(ns)
                            pred_traj[j, 0] = nr
                            pred_traj[j, 1] = nc
                            if nr == goal[0] and nc == goal[1]:
                                # We hit goal so fill remaining steps
                                pred_traj[j + 1:, 0] = nr
                                pred_traj[j + 1:, 1] = nc
                                break
                        # Plot optimal and predicted path (also start, end)
                        if pred_traj[-1, 0] == goal[0] and pred_traj[-1, 1] == goal[1]:
                            correct += 1
                        total += 1
                        if config.plot == True:
                            visualize(G.image.T, states_xy[i], pred_traj)
                sys.stdout.write("\r" + str(int((float(dom) / n_domains) * 100.0)) + "%")
                sys.stdout.flush()
            sys.stdout.write("\n")
            print('Rollout Accuracy: {:.2f}%'.format(100 * (correct / total)))
            Acc.append((correct / total))
            L_list.append(L_temp)

        def visualize(dom, states_xy, pred_traj):
            fig, ax = plt.subplots()
            implot = plt.imshow(dom, cmap="Greys_r")
            ax.plot(states_xy[:, 0], states_xy[:, 1], c='b', label='Optimal Path')
            ax.plot(pred_traj[:, 0], pred_traj[:, 1], '-X', c='r', label='Predicted Path')
            ax.plot(states_xy[0, 0], states_xy[0, 1], '-o', label='Start')
            ax.plot(states_xy[-1, 0], states_xy[-1, 1], '-s', label='Goal')
            legend = ax.legend(loc='upper right', shadow=False)
            for label in legend.get_texts():
                label.set_fontsize('x-small')  # the legend text size
            for label in legend.get_lines():
                label.set_linewidth(0.5)  # the legend line width
            plt.draw()
            plt.waitforbuttonpress(0)
            plt.close(fig)

        class conf():
            def __init__(self,
                         weights=file_name,
                         image_size=image_size_test,
                         plot='store_true',
                         epochs=30,
                         k=K,
                         l_i=2,
                         l_h=150,
                         l_q=10,
                         batch_size=128):
                self.l_i = l_i
                self.l_h = l_h
                self.image_size = image_size
                self.plot = plot
                self.epochs = epochs
                self.k = k
                self.l_q = l_q
                self.batch_size = batch_size
                self.weights = weights
                #self.weights_sup = weights_sup

        config = conf()
        # Compute Paths generated by network and plot
        main(config)
    return Acc




In [None]:
for repeat in range(10):
    for exp_i in range(1,9):
        config = conf(experiment='{}step'.format(exp_i))
        
        test_list = [60,100,200,480]
        print("test in following pth files:\n", test_list)
        test_result = []
        for i in test_list:
            print(i)
            name_total = '16_16_ablation/' + 'K_step/' + str(i) + config.file_name + '_repeat{}'.format(repeat) + '{}.pth'.format(config.experiment)
            acc = test_model_vin_module(config.image_size, name_total, K=20)
            test_result.append(acc)

        test_result = np.asarray(test_result)
        np.savetxt('16_16_ablation/K_step_results/16x16_PCHID_VIN_result_{0}_repeat{1}.txt'.format(config.experiment,repeat), test_result)