## Get Dataset

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2


### Data Directory Analysis

In [None]:
# Get File Names and Categories and Data Amount
# File exploration
import os
import re
import numpy as np
from collections import defaultdict
import torch
def analyze_directory(path):
    file_pattern = re.compile(r"^(.*?)(\d{5})\..+$")  # captures category and 5-digit number
    category_files = defaultdict(list)

    for filename in os.listdir(path):
        match = file_pattern.match(filename)
        if match:
            category, number_str = match.groups()
            category_files[category].append((filename, int(number_str)))

    # Print summary of categories
    for category, files in category_files.items():
        numbers = [num for _, num in files]
        print(f"Category: {category}")
        print(f"  Number of files: {len(files)}")
        print(f"  Number range: {min(numbers)} to {max(numbers)}")

    print("\nInspecting one file per category:")
    for category, files in category_files.items():
        sample_file = next(f for f in files if f[0].endswith('.npz'))[0]
        filepath = os.path.join(path, sample_file)
        print(f"\nSample file for category '{category}': {sample_file}")
        try:
            data = np.load(filepath)
            for key in data:
                print(f"  Key: {key}, Shape: {data[key].shape}")
        except Exception as e:
            print(f"  Could not load file '{sample_file}': {e}")

# Example usage:
analyze_directory("../dexnet_2.1/dexnet_2.1_eps_10/tensors")


In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import random

# Define all categories and their shape descriptions
categories = {
    'camera_poses_': (1000, 7),
    'hand_poses_': (1000, 6),
    'depth_ims_tf_table_': (466, 32, 32, 1),
    'labels_': (1000,),
    'traj_ids_': (1000,),
    'grasp_metrics_': (1000,),
    'camera_intrs_': (1000, 4),
    'grasped_obj_keys_': (1000,),
    'grasp_collision_metrics_': (1000,),
    'pile_ids_': (1000,)
}

def load_file(path, category, file_num):
    fname = f"{category}{file_num:05d}.npz"
    fpath = os.path.join(path, fname)
    return np.load(fpath)['arr_0']

def find_common_file_numbers(path):
    files = os.listdir(path)
    category_to_nums = {cat: set() for cat in categories}
    for fname in files:
        for cat in categories:
            if fname.startswith(cat) and fname.endswith('.npz'):
                try:
                    num = int(fname[len(cat):-4])
                    category_to_nums[cat].add(num)
                except:
                    continue
    # Find intersection of all sets
    common_nums = set.intersection(*category_to_nums.values())
    return sorted(list(common_nums))

def visualize_random_example(path):
    common_files = find_common_file_numbers(path)
    if not common_files:
        print("No common file numbers found across all categories.")
        return

    chosen_file_num = random.choice(common_files)

    # Load a sample file to determine valid index range
    depth_map = load_file(path, 'depth_ims_tf_table_', chosen_file_num)
    max_index = depth_map.shape[0]  # Likely 466
    chosen_index = random.randint(0, max_index - 1)

    print(f"Selected file number: {chosen_file_num:05d}, sample index: {chosen_index}\n")

    # Store and print/plot each category
    for category in categories:
        data = load_file(path, category, chosen_file_num)

        if category == 'depth_ims_tf_table_':
            image = data[chosen_index].squeeze()
            plt.figure()
            plt.title("Depth Map")
            plt.imshow(image, cmap='gray')
            plt.colorbar()
            plt.show()

        elif category == 'grasp_metrics_':
            plt.figure()
            plt.title("Grasp Metric (value)")
            plt.bar([0], [data[chosen_index]])
            plt.xticks([0], ['Grasp Metric'])
            plt.ylabel('Score')
            plt.show()

        else:
            print(f"{category}{chosen_file_num:05d} -> Example[{chosen_index}]: {data[chosen_index]}\n")

# Example usage
# Replace this path with the actual path to your data folder
visualize_random_example("../dexnet_2.1/dexnet_2.1_eps_10/tensors")


In [None]:
# cd scripts/
# ./download_dexnet_2.sh


In [None]:
# dataset['image']['depth_ims']     # Shape: (N, 32, 32)   — depth image
# dataset['pose']                   # Shape: (N, 4)        — grasp pose (x, y, z, angle)
# dataset['success']                # Shape: (N,)          — binary label: success/failure


In [None]:
"""
For tensorfloaw dataset
"""
# import torch
# from torch.utils.data import Dataset
# import h5py
# import numpy as np

# class DexNetDataset(Dataset):
#     def __init__(self, h5_path):
#         self.data = h5py.File(h5_path, 'r')
#         self.depth_images = self.data['image']['depth_ims'][:]
#         self.labels = self.data['grasp_qualities'][:]  # or 'success', depending on file

#     def __len__(self):
#         return len(self.depth_images)

#     def __getitem__(self, idx):
#         img = self.depth_images[idx]
#         img = np.expand_dims(img, axis=0)  # Convert to (1, H, W) for PyTorch CNN
#         label = self.labels[idx]
#         return torch.tensor(img, dtype=torch.float32), torch.tensor(label, dtype=torch.float32)


# from torch.utils.data import DataLoader

# dataset = DexNetDataset('path/to/dexnet_dataset.h5')
# dataloader = DataLoader(dataset, batch_size=32, shuffle=True)


In [None]:
from customize_dataset import DexNetNPZDataset
from torch.utils.data import DataLoader

dataset = DexNetNPZDataset('../dexnet_2.1/dexnet_2.1_eps_10/tensors/')
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=4)

In [None]:
# Load dataset with regression labels and full 6D poses
from customize_dataset import DexNetNPZDatasetAll
from torch.utils.data import DataLoader
dataset = DexNetNPZDatasetAll(tensor_dir='../dexnet_2.1/dexnet_2.1_eps_10/tensors/', use_regression=True, pose_dims=[0, 1, 2, 3, 4, 5])
img, pose, label = dataset[0]
print(img.shape)    # torch.Size([3, 32, 32])
print(pose.shape)   # torch.Size([6])
print(label)        # A float (grasp quality)

dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=4)


In [None]:
for i in dataloader:
    print(i[0].shape, i[1])
    break


In [None]:
i[0].shape

## Image Model

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, 3)
        self.pool = nn.MaxPool2d(2)
        self.conv2 = nn.Conv2d(16, 32, 3)
        self.fc1 = nn.Linear(32 * 6 * 6, 64)
        self.fc2 = nn.Linear(64, 1)  # Binary classification (grasp success)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # (B, 16, 15, 15)
        x = self.pool(F.relu(self.conv2(x)))  # (B, 32, 6, 6)
        x = x.view(-1, 32 * 6 * 6)
        x = F.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x


In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet50, ResNet50_Weights

class ResNetGraspNet(nn.Module):
    """
    A binary classification model for grasp success using ResNet-50 as a backbone.
    """
    def __init__(self, pretrained: bool = True, num_classes: int = 1, dropout_rate: float = 0.2):
        super(ResNetGraspNet, self).__init__()
        # Load ResNet-50 backbone with optional ImageNet pretrained weights
        weights = ResNet50_Weights.IMAGENET1K_V2 if pretrained else None
        self.backbone = resnet50(weights=weights)
        # Retrieve number of input features to the classifier head
        in_features = self.backbone.fc.in_features
        # Replace the original classification head
        self.backbone.fc = nn.Sequential(
            nn.Dropout(dropout_rate, inplace=True),
            nn.Linear(in_features, num_classes),
            nn.Sigmoid()
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Forward pass through ResNet-50 and sigmoid head
        return self.backbone(x)

In [None]:
import torch
import torch.nn as nn
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights

class EfficientNetV2GraspNet(nn.Module):
    """
    A binary classification model for grasp success using EfficientNet-V2-S as a backbone.
    """
    def __init__(self, pretrained: bool = True, num_classes: int = 1, dropout_rate: float = 0.2):
        super(EfficientNetV2GraspNet, self).__init__()
        # Load EfficientNet-V2-S backbone with optional ImageNet pretrained weights
        weights = EfficientNet_V2_S_Weights.IMAGENET1K_V1 if pretrained else None
        self.backbone = efficientnet_v2_s(weights=weights)
        # Retrieve number of input features to the classifier head
        in_features = self.backbone.classifier[1].in_features
        # Replace the classifier head for binary grasp success prediction
        self.backbone.classifier = nn.Sequential(
            nn.Dropout(dropout_rate, inplace=True),
            nn.Linear(in_features, num_classes),
            nn.Sigmoid()
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Forward pass through EfficientNet-V2 and sigmoid head
        return self.backbone(x)

### Forward Pass and Evaluation Measurements Define

In [None]:
dataloader

### Model Training

In [None]:
for i in model.parameters():
    print(i.shape)

In [None]:
# model = SimpleCNN()
model = EfficientNetV2GraspNet()
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

from tqdm import tqdm
for epoch in range(3):
    for imgs, labels in tqdm(dataloader):
        outputs = model(imgs)
        loss = criterion(outputs.squeeze(), labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        break


In [None]:
import torch
from tqdm import tqdm

num_epochs = 3
loss_history = []

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")
    
    for imgs, labels in progress_bar:
        outputs = model(imgs).squeeze()
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate metrics
        running_loss += loss.item() * imgs.size(0)

        # Optional: compute accuracy
        predicted = (outputs >= 0.5).float()
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
        break

    epoch_loss = running_loss / total
    accuracy = correct / total

    loss_history.append(epoch_loss)

    print(f"Epoch [{epoch+1}/{num_epochs}] — Loss: {epoch_loss:.4f} — Accuracy: {accuracy:.4f}")


In [None]:
import matplotlib.pyplot as plt

plt.plot(loss_history, marker='o')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss per Epoch')
plt.grid(True)
plt.show()


## Image + Poses Model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SimpleGQCNN(nn.Module):
    def __init__(self, pose_dim=4, output_type='binary', merge_methods="element_dot"):
        """
        pose_dim: number of dimensions in the pose vector (e.g., x, y, z, theta)
        output_type: 'binary' or 'regression'
        """
        super(SimpleGQCNN, self).__init__()
        self.output_type = output_type

        # Image stream
        self.conv1 = nn.Conv2d(1, 16, 3)           # → (B, 16, 30, 30)
        self.pool = nn.MaxPool2d(2, 2)             # → (B, 16, 15, 15)
        self.conv2 = nn.Conv2d(16, 32, 3)          # → (B, 32, 13, 13) → pool → (B, 32, 6, 6)
        self.im_fc = nn.Linear(32 * 6 * 6, 64)     # → (B, 64)

        # Pose stream
        self.pose_fc1 = nn.Linear(pose_dim, 64)
        self.pose_fc2 = nn.Linear(64, 64)

        self.merge_methods = merge_methods
        if self.merge_methods == "element_dot":
            # Merge stream after elementwise multiplication
            self.merge_fc1 = nn.Linear(64, 32)
            self.merge_fc2 = nn.Linear(32, 1)
        else:
            # Merge stream by concatanation
            self.merge_fc1 = nn.Linear(64 + 64, 64)
            self.merge_fc2 = nn.Linear(64, 1)  # Single output for binary or regression

    def forward(self, image, pose):
        
        # Image stream
        x = self.pool(F.relu(self.conv1(image)))   # (B, 16, 15, 15)
        x = self.pool(F.relu(self.conv2(x)))       # (B, 32, 6, 6)
        x = x.view(x.size(0), -1)                  # Flatten
        x = F.relu(self.im_fc(x))                  # (B, 64)

        # Pose stream
        p = F.relu(self.pose_fc1(pose))            # (B, 64)
        p = F.relu(self.pose_fc2(p))               # (B, 64)

        if self.merge_methods == "element_dot":
            # Element-wise multiplication
            combined = x * p                           # (B, 64)
        else:
            # Merge
            combined = torch.cat((x, p), dim=1)       # -> (B, 96)

        # Final layers
        out = F.relu(self.merge_fc1(combined))     # (B, 32)
        out = self.merge_fc2(out)                  # (B, 1)

        if self.output_type == 'binary':
            out = torch.sigmoid(out)               # Binary prediction
        return out


In [None]:
model = SimpleGQCNN(pose_dim=4, output_type='regression')  # or 'binary'

image = torch.randn(8, 1, 32, 32)  # Batch of 8 grayscale images
pose = torch.randn(8, 4)          # Corresponding batch of 4D poses

output = model(image, pose)
print(output.shape)  # torch.Size([8, 1])


In [None]:
print(torch.cuda.device_count())
print(torch.cuda.get_device_name())

In [None]:
import torch.optim as optim
from tqdm import tqdm
import os
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.amp import GradScaler, autocast

# Set config
tensor_dir = '../dexnet_2.1/dexnet_2.1_eps_10/tensors/'  # replace with actual path
batch_size = 32
use_regression = False  # or True
pose_dims = [2]          # or full [0, 1, 2, 3, 4, 5]
torch.backends.cudnn.benchmark = True
# Create dataset and dataloader


dataset = DexNetNPZDatasetAll(tensor_dir=tensor_dir, use_regression=use_regression, pose_dims=pose_dims)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=8, pin_memory=True)

model = SimpleGQCNN(pose_dim=len(pose_dims), output_type='regression' if use_regression else 'binary')
os.environ["CUDA_VISIBLE_DEVICES"]="1"
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)

if use_regression:
    criterion = nn.MSELoss()
else:
    criterion = nn.BCELoss()

optimizer = optim.Adam(model.parameters(), lr=1e-4)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=2, verbose=True)


In [None]:
def train(model, dataloader, criterion, optimizer, scheduler, device, num_epochs=10, save_path="model.pth"):
    model = model.to(device, memory_format=torch.channels_last)
    model.train()
    scaler = GradScaler()

    loss_history = []
    best_loss = float("inf")

    for epoch in range(num_epochs):
        running_loss = 0.0

        for images, poses, labels in tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            images = images.to(device, non_blocking=True).contiguous(memory_format=torch.channels_last)
            poses = poses.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True).unsqueeze(1)

            optimizer.zero_grad()
            with autocast():
                outputs = model(images, poses)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            running_loss += loss.item()

        avg_loss = running_loss / len(dataloader)
        loss_history.append(avg_loss)
        scheduler.step(avg_loss)

        current_lr = optimizer.param_groups[0]['lr']
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, LR: {current_lr:.2e}")

        # Save model if it's the best so far
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), save_path)
            print(f"✅ Saved best model with loss {best_loss:.4f} to '{save_path}'")

    # Visualize training loss
    plt.figure(figsize=(8, 4))
    plt.plot(range(1, num_epochs + 1), loss_history, marker='o')
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training Loss Over Epochs")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig("loss_plot.png")
    plt.show()


In [None]:
train(model, dataloader, criterion, optimizer, num_epochs=10)

In [None]:
torch.save(model.state_dict(), "model.pth")