In [1]:


path = 'data'
file_prefix_list = ['depth_frames', 'can_pos_quat']
file_suffix_range = range(1, 4)


# load data
import numpy as np
import os

data = {}
data['depth_frames'] = []
data['can_pos_quat'] = []
# interate over all files
for idx in file_suffix_range:
    can_pos_quat_path = f"can_pos_quat_{idx}.npy"
    depth_frames_path = f"depth_frames_{idx}.npy"
    
    can_pos_quat = np.load(os.path.join(path, can_pos_quat_path))
    depth_frames = np.load(os.path.join(path, depth_frames_path))
    print(f"Loaded {can_pos_quat.shape[0]} samples from {can_pos_quat_path}")
    # concat
    data['can_pos_quat'].append(can_pos_quat)
    data['depth_frames'].append(depth_frames)
    

# concatenate all data
data['can_pos_quat'] = np.concatenate(data['can_pos_quat'], axis=0)
data['depth_frames'] = np.concatenate(data['depth_frames'], axis=0)

print(f"Concatenated can_pos_quat with shape {data['can_pos_quat'].shape}")
print(f"Concatenated depth_frames with shape {data['depth_frames'].shape}")

Loaded 1000 samples from can_pos_quat_1.npy
Loaded 1000 samples from can_pos_quat_2.npy
Loaded 1000 samples from can_pos_quat_3.npy
Concatenated can_pos_quat with shape (3000, 7)
Concatenated depth_frames with shape (3000, 256, 256, 1)


In [2]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

class DepthCanDataset(Dataset):
    def __init__(self, depth_frames, can_pos_quat):
        # Depth frames and corresponding position/quaternion data
        # Permute depth frames from (batch, 256, 256, 1) to (batch, 1, 256, 256)
        self.depth_frames = torch.tensor(depth_frames, dtype=torch.float32).permute(0, 3, 1, 2)
        self.can_pos_quat = torch.tensor(can_pos_quat, dtype=torch.float32)

    def __len__(self):
        return len(self.can_pos_quat)

    def __getitem__(self, idx):
        depth_frame = self.depth_frames[idx]
        can_pos_quat = self.can_pos_quat[idx]
        
        return depth_frame, can_pos_quat


# Create the dataset
dataset = DepthCanDataset(data['depth_frames'], data['can_pos_quat'][:, :3])

# Define batch size
batch_size = 128

# Create DataLoader
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
'''
# Example usage
for batch_idx, (depth_batch, target_batch) in enumerate(train_loader):
    print(f"Batch {batch_idx+1} - Depth batch shape: {depth_batch.shape}, Target batch shape: {target_batch.shape}")
    # Depth batch shape: [batch_size, 1, 256, 256]
    # Target batch shape: [batch_size, 3] (assuming 'can_pos_quat' has 3 values to predict)
'''


'\n# Example usage\nfor batch_idx, (depth_batch, target_batch) in enumerate(train_loader):\n    print(f"Batch {batch_idx+1} - Depth batch shape: {depth_batch.shape}, Target batch shape: {target_batch.shape}")\n    # Depth batch shape: [batch_size, 1, 256, 256]\n    # Target batch shape: [batch_size, 3] (assuming \'can_pos_quat\' has 3 values to predict)\n'

In [34]:


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class DepthImageRegressor(nn.Module):
    def __init__(self):
        super(DepthImageRegressor, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        
        # Max Pooling layer
        self.pool = nn.MaxPool2d(2, 2)
        
        # Fully connected layers
        self.fc1 = nn.Linear(256 * 16 * 16, 512)  # Adjust based on the output size after conv layers
        self.fc2 = nn.Linear(512, 128)
        self.fc3 = nn.Linear(128, 3)  # Output layer for 3 real numbers
        
        # Activation and normalization
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)
    
    def forward(self, x):
        # Convolutional layers with ReLU activation and MaxPooling
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = self.pool(self.relu(self.conv4(x)))
        
        # Flatten the tensor
        x = x.reshape(-1, 256 * 16 * 16)  # Use reshape instead of view
        
        # Fully connected layers with dropout
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        
        # Output layer (no activation, because we are predicting real numbers)
        x = self.fc3(x)
        
        # Output constrained to -1 to 1
        return torch.tanh(x)

# Hyperparameters
#batch_size = 32
learning_rate = 1e-4
num_epochs = 10

# Model, loss function, optimizer
model = DepthImageRegressor()
criterion = nn.MSELoss()  # Mean squared error for regression
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Example of training loop
def train_model(train_loader):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, targets in train_loader:
            # Move data to appropriate device
            images, targets = images.to(device), targets.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, targets)
            
            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            
            # Print statistics
            running_loss += loss.item()
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}')



In [None]:

# Train the model
train_model(train_loader)

Epoch [1/10], Loss: 0.057587733414644994


# Evaluate

In [33]:
import torch
import gymnasium as gym
import numpy as np
import mediapy as media
# 필요한 클래스 및 함수 임포트
from cleanrl.cleanrl.ppo_continuous_action import Agent, Args, ppo_make_env
import cv2
from robosuite.utils.camera_utils import CameraMover

visualize = True
frames = []

# Argument 설정
task_id = 'pickplace'
seed = 0
gamma = 0.99
num_episodes = 1
render_camera = ['birdview']#,'agentview'] #('frontview', 'birdview', 'agentview', 'robot0_robotview', 'robot0_eye_in_hand')
camera_names = render_camera


# 환경 생성
env = gym.vector.SyncVectorEnv(
    [ppo_make_env(
        task_id=task_id, 
        reward_shaping=True,
        idx=0, 
        control_mode="OSC_POSITION",
        capture_video=False, 
        run_name="eval", 
        gamma= gamma, 
        active_rewards="r",
        active_image=True, 
        fix_object=False,
        wandb_enabled=False,
        verbose=False,
        control_freq=20,
        render_camera=render_camera,
        camera_names=camera_names,

        )
    ]
)


def colorize_depth(frame):
    # Assuming the depth image is in float32 and contains values representing distances.
    # Normalize the depth image to 0-255 for visualization
    min_depth = np.min(frame)
    max_depth = np.max(frame)
    normalized_depth = 255 * (frame - min_depth) / (max_depth - min_depth)
    normalized_depth = normalized_depth.astype(np.uint8)

    # Apply a colormap for better visualization (COLORMAP_JET is commonly used)
    colorized_depth = cv2.applyColorMap(normalized_depth, cv2.COLORMAP_JET)
    return colorized_depth


# 디바이스 설정 (cuda가 가능하면 cuda 사용)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device == torch.device("cuda"):
    print("Using CUDA")
else :
    assert device == torch.device("cpu")

# 평가 수행
total_rewards = []
viewer_image_key = 'birdview'+'_depth'

# generate samples
num_samples = 10

model.eval()
for i in range(num_samples):
    obs, _ = env.reset()
    obs = torch.Tensor(obs).to(device)
    done = False
    episode_reward = 0

    image_frame = env.envs[0].image_states[viewer_image_key]
    if not viewer_image_key.endswith('depth'):
        image_frame = np.array(image_frame[::-1, :, :], dtype=np.uint8)  # numpy 배열로 변환
    else:
        image_frame = np.array(image_frame[::-1, :, :], dtype=np.float32)

    model_input = torch.tensor(image_frame, dtype=torch.float32).unsqueeze(0).permute(0, 3, 1, 2).to(device)
    
    predict_target = model(model_input).cpu().detach().numpy()
    print(f"Predicted target: {predict_target}")
    print(f"GT target: {env.envs[0].sim.data.get_body_xpos('Can_main')}")

    image_frame = colorize_depth(image_frame)
    
    can_pos = env.envs[0].sim.data.get_body_xpos('Can_main')  # Assuming the object is called 'Can'
    can_quat = env.envs[0].sim.data.get_body_xquat('Can_main')
    pos_text = f"Pos: {can_pos}"
    quat_text = f"Quat: {can_quat}"
    cv2.putText(image_frame, pos_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.2, (255, 255, 255), 1)
    cv2.putText(image_frame, quat_text, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.2, (255, 255, 255), 1)
   
    
    frames.append(image_frame)

media.show_video(frames, fps=20)


### controller_config: OSC_POSITION ###
#### J PickPlace ####
fix_object:False
start with grasp lock: True
control_freq: 20
ignore_done: False
Predicted target: [[ 0.05333922 -0.24155368  0.711333  ]]
GT target: [-0.01927948 -0.12532923  0.86      ]
Predicted target: [[ 0.05333966 -0.24155438  0.71133363]]
GT target: [ 0.15282304 -0.23320474  0.86      ]
Predicted target: [[ 0.05333911 -0.24155405  0.7113336 ]]
GT target: [ 0.1806715  -0.38088449  0.86      ]
Predicted target: [[ 0.05333944 -0.24155438  0.7113343 ]]
GT target: [ 0.0656172  -0.11452292  0.86      ]
Predicted target: [[ 0.05333943 -0.24155432  0.71133333]]
GT target: [ 0.01505211 -0.12958697  0.86      ]
Predicted target: [[ 0.05333968 -0.24155435  0.7113335 ]]
GT target: [-2.86206572e-05 -3.99099092e-01  8.60000000e-01]
Predicted target: [[ 0.0533396  -0.24155405  0.7113336 ]]
GT target: [ 0.17425978 -0.30358682  0.86      ]
Predicted target: [[ 0.05333933 -0.24155438  0.71133345]]
GT target: [-0.00369851 -0.29816124  0

0
This browser does not support the video tag.
