In [None]:
import os, sys
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
sys.path.append('.')
import torch
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
torch.backends.cudnn.deterministic = False
from diffuser.guides.policies import Policy
import diffuser.datasets as datasets
import diffuser.utils as utils
from diffuser.models import GaussianDiffusionPB
from datetime import datetime
import os.path as osp
from tqdm import tqdm
from diffuser.utils.jupyter_utils import suppress_stdout
import numpy as np
np.set_printoptions(precision=3)

## You need to download 'maze2d-concave-base' from onedrive to launch
class Parser(utils.Parser):
    config: str = "config/rm2d/rSmazeC43_concave_nw7_exp.py"

#---------------------------------- setup ----------------------------------#

## training args
args_train = Parser().parse_args('diffusion', from_jupyter=True)
args = Parser().parse_args('plan', from_jupyter=True)

args.savepath = None # osp.join(args.savepath, sub_dir)
args.load_unseen_maze = True

## load dataset here, dataset is a string: name of the env
print('args.dataset', type(args.dataset), args.dataset)
print('args.dataset_eval', type(args.dataset_eval), args.dataset_eval)
use_normed_wallLoc = args_train.dataset_config.get('use_normed_wallLoc', False)

## use the trained (seen) env or eval (unseen) env
load_unseen_maze = args.load_unseen_maze # True False

with suppress_stdout():
    # # ---------- load normalizer ------------
    plan_env_list = datasets.load_environment(args.dataset, is_eval=load_unseen_maze)
    train_normalizer = utils.load_datasetNormalizer(plan_env_list.dataset_url, args_train, plan_env_list)

    #---------------------------------- loading ----------------------------------#
    ld_config = dict(env_instance=plan_env_list) 
    diffusion_experiment = utils.load_potential_diffusion_model(args.logbase, args.dataset, \
                args_train.exp_name, epoch='latest', ld_config=ld_config) 

    diffusion = diffusion_experiment.ema

#### create an env

In [None]:
'''pick an env idx'''
from diffuser.utils.jupyter_utils import create_every_wloc_43
# wloc_source = 'from_env'
wloc_source = 'user_input'
if wloc_source == 'from_env':
    env_id = 10
    wallLoc_list = plan_env_list.recs_grp.save_center_and_mode(is_save=False)
    # wloc = plan_env_list.wallLoc_list[env_id] ### wall locations of 2d square blocks
    wloc_np = wallLoc_list[env_id] ## concave obstacle locations as model input
    env = plan_env_list.create_single_env(env_id)
elif wloc_source == 'user_input':
    env_id = 5 # placholder
    wloc_np = np.array([
            [1.03, 2.82, 4.],
            [1.77, 4.50, 1.],
            [0.88, 0.9, 1.],
            [3.09, 3.52, 4.],
            [3.02, 1.42, 2.],
            [4.30, 2.82, 1.],
            [2.35, 2.58, 1.],
        ])
    wloc_np = np.array([
       [1.036, 0.872, 4.],
       [1.46 , 3.797, 4.],
       [2.366, 1.987, 2.],
       [2.853, 4.151, 3.],
       [3.213, 0.676, 3.],
       [3.6  , 2.92 , 3.],
       [4.277, 1.938, 1.],
    ])
    

    assert (1 <= wloc_np[:, 2]).all() and (wloc_np[:, 2] <= 4).all(), 'should be in {1,2,3,4}'
    env_hExts = plan_env_list.model_list[0].wall_hExts #  np.array([0.25, 0.25])
    wloc_np_xy = create_every_wloc_43(wloc_np, env_hExts[0], ) ## (21, 2)
    # print(env_hExts)
    plan_env_list.wallLoc_list[env_id] = wloc_np_xy
    ## create the customized env
    env = plan_env_list.create_env_by_pos(env_id, wloc_np_xy, env_hExts)

print(wloc_np[:2]) ## [ [x, y, mode_idx], ..., ]
env.render()

In [None]:
wloc_np

#### planning

In [None]:
# env.wall_hExts
# plan_env_list.rand_rec_group.rec_loc_grp_list
# type(diffusion)

In [None]:
from diffuser.utils.rm2d_render import RandStaticMazeRenderer
import imageio, time

diffusion: GaussianDiffusionPB
bs = 10
horizon = 48 ## if the distance between start and goal is long, increase the horizon
policy = Policy(diffusion, train_normalizer, use_ddim=True)
diffusion.ddim_num_inference_steps = 8
diffusion.condition_guidance_w = 2.0 ## guidance scale set to 2.0
diffusion.horizon = horizon

start = np.array( [ [ 1.22, 4.74 ] ], dtype=np.float32 ).repeat( bs, 0 )
goal = np.array( [ [3.86, 0.20] ], dtype=np.float32  ).repeat( bs, 0 )


wloc_tensor = utils.to_torch(wloc_np).reshape(1, -1).repeat( (bs, 1) )
print( f'wloc_np: ', wloc_np.shape )

cond = {0: start, 
        horizon-1: goal}

tic = time.time()
## samples: _, final trajectories, _, per_steps_state_trajs
samples = policy(cond, batch_size=-1, wall_locations=wloc_tensor,use_normed_wallLoc=use_normed_wallLoc, return_diffusion=True)
plan_time = time.time() - tic

unnm_traj = samples[1].observations
print(f'unnm_traj: {unnm_traj.shape}; plan time:{plan_time}')

rd = RandStaticMazeRenderer(plan_env_list)
rootdir = './visualization/'
tmp_path = f'{rootdir}/concave_test.png'
img = rd.composite( tmp_path, unnm_traj,  np.array([env_id]))
img = imageio.imread(tmp_path)
utils.plt_img(img)

In [None]:
# len(samples[2]) # empty, ignore
# samples[2]
# env.maze_size # [5, 5]

### create a denoising video

In [None]:
from pb_diff_envs.environment.static.rand_maze2d_renderer import RandMaze2DRenderer

traj_idx = 0 # which traj

x0_perstep = samples[3] # shape: B, n_steps, Horizon, Dim
x0_perstep.shape

n_steps = len(x0_perstep[0]) ## number of denosing steps + 1
trajs_ps = [] ## per-step trajs
imgs_ps = []

mz_renderer = RandMaze2DRenderer(num_walls_c=[30,10], fig_dpi=200)
mz_renderer.up_right = 'empty'
mz_renderer.config['no_draw_traj_line'] = False
mz_renderer.config['no_draw_col_info'] = True # False
# mz_renderer.num_walls_c = [0, 30]

for i_st in range(1, n_steps):
    traj_ps = x0_perstep[traj_idx][i_st] #
    trajs_ps.append(traj_ps)
    ### -----
    env_wlocs = env.wall_locations
    # hExt = plan_env_list.hExt_list[0, 0].tolist()
    # env_hExts = np.concatenate(env.hExt_list_c, axis=0)
    env_hExts = env.wall_hExts
    img = mz_renderer.renders( traj_ps , plan_env_list.maze_size, env_wlocs, env_hExts )
    imgs_ps.append(img)

In [None]:
## show the video
print('rootdir:', rootdir)
# import diffuser.utils.video as duv; reload(duv)
from diffuser.utils.video import save_images_to_mp4, read_mp4_to_numpy

mp4_name = f'{rootdir}/concave_{env_id}.mp4' # {utils.get_time()}
## Save to video
save_images_to_mp4(imgs_ps, mp4_name, fps=15, st_sec=1, end_sec=1)
## load and save
import mediapy as media
from mediapy import show_video, read_video
show_video(read_mp4_to_numpy(mp4_name), fps=15, width=400)
print(f'mp4_name: {mp4_name}')