In [1]:
import os
import glob
import random
import itertools
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchinfo import summary
from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix


seed = 0
random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)

DATA_DIR = "ModelNet10"
SAVE_DIR = "ModelNet10Voxel"

num_classes = 10
grid_size = 32
object_size = 24
pitch_rescale = 1.0
no_of_rotations = 1

In [2]:
class ModelNetDatasetVoxel(Dataset):
    def __init__(self, data_dir, train=True):
        self.data_dir = data_dir
        self.train = train
        self.class_map = {}
        self.files, self.labels = self.load_files_and_labels()

    def load_files_and_labels(self):
        folders = glob.glob(os.path.join(self.data_dir, "*"))
        files = []
        labels = []
        for i, folder in enumerate(folders):
            self.class_map[i] = os.path.basename(folder)
            dataset_type = "train" if self.train else "test"
            class_files = glob.glob(os.path.join(folder, f"{dataset_type}/*.pt"))
            files.extend(class_files)
            labels.extend([i] * len(class_files))
        return files, labels

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file_path = self.files[idx]
        voxel_grid = torch.load(file_path)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return voxel_grid, label


batch_size = 16
train_dataset = ModelNetDatasetVoxel(SAVE_DIR, train=True)
test_dataset = ModelNetDatasetVoxel(SAVE_DIR, train=False)
train_dataloader = DataLoader(
    train_dataset, batch_size, shuffle=True, num_workers=1, pin_memory=True
)
test_dataloader = DataLoader(test_dataset, batch_size, shuffle=False, num_workers=1)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Training device: {device}")

Training device: cuda


In [3]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv1 = nn.Conv3d(
            in_channels=in_channels, out_channels=out_channels, kernel_size=3, padding=1
        )
        self.bn1 = nn.BatchNorm3d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv3d(
            in_channels=out_channels,
            out_channels=out_channels,
            kernel_size=3,
            padding=1,
        )
        self.bn2 = nn.BatchNorm3d(out_channels)

        self.downsample = None
        if in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv3d(in_channels, out_channels, kernel_size=1, stride=1),
                nn.BatchNorm3d(out_channels),
            )

    def forward(self, x):
        identity = self.downsample(x) if self.downsample else x

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)

        x += identity
        x = self.relu(x)

        return x

In [4]:
class VoxResNet(nn.Module):
    def __init__(self, classes_num, in_channels=1, voxelgrid_size=(32, 32, 32)):
        super().__init__()

        # Initial convolution layer
        self.conv = nn.Conv3d(in_channels, out_channels=16, kernel_size=3, padding=1)
        self.bn = nn.BatchNorm3d(16)
        self.relu = nn.ReLU()

        # Residual blocks
        self.layer1 = nn.Sequential(
            ResidualBlock(16, 16),
            ResidualBlock(16, 16),
            ResidualBlock(16, 16),
            nn.AvgPool3d(kernel_size=2, stride=2),
        )
        self.layer2 = nn.Sequential(
            ResidualBlock(16, 32),
            ResidualBlock(32, 32),
            ResidualBlock(32, 32),
            nn.AvgPool3d(kernel_size=2, stride=2),
        )
        self.layer3 = nn.Sequential(
            ResidualBlock(32, 64),
            ResidualBlock(64, 64),
            ResidualBlock(64, 64),
            nn.AvgPool3d(kernel_size=2, stride=2),
        )
        self.layer4 = nn.Sequential(
            ResidualBlock(64, 128),
            ResidualBlock(128, 128),
            ResidualBlock(128, 128),
            nn.AvgPool3d(kernel_size=2, stride=2),
        )

        # Flattening and final linear layer
        self.flatten = nn.Flatten(1)
        self.dropout = nn.Dropout(p=0.2)
        self.linear = nn.Linear(
            in_features=int(voxelgrid_size[0] / 2**4 * voxelgrid_size[1] / 2**4 * 128),
            out_features=classes_num,
        )

    def forward(self, x):
        x = self.relu(self.bn(self.conv(x)))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.flatten(x)
        x = self.dropout(x)
        x = self.linear(x)

        return x


# model = VoxResNet(ResidualBlock, [2, 2, 2, 2, 2], num_classes=num_classes)

model = VoxResNet(classes_num=num_classes, voxelgrid_size=(32, 32, 32), in_channels=1)
model.to(device)
print(summary(model, input_size=(batch_size, 1, grid_size, grid_size, grid_size)))

RuntimeError: Failed to run torchinfo. See above stack traces for more details. Executed layers up to: [Conv3d: 1, BatchNorm3d: 1, ReLU: 1, Sequential: 1, ResidualBlock: 2, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, ResidualBlock: 2, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, ResidualBlock: 2, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, AvgPool3d: 2, Sequential: 1, ResidualBlock: 2, Sequential: 3, Conv3d: 4, BatchNorm3d: 4, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, ResidualBlock: 2, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, ResidualBlock: 2, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, AvgPool3d: 2, Sequential: 1, ResidualBlock: 2, Sequential: 3, Conv3d: 4, BatchNorm3d: 4, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, ResidualBlock: 2, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, ResidualBlock: 2, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, AvgPool3d: 2, Sequential: 1, ResidualBlock: 2, Sequential: 3, Conv3d: 4, BatchNorm3d: 4, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, ResidualBlock: 2, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, ResidualBlock: 2, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, Conv3d: 3, BatchNorm3d: 3, ReLU: 3, AvgPool3d: 2, Flatten: 1, Dropout: 1]

In [5]:
train_class_distribution = np.zeros(num_classes)
test_class_distribution = np.zeros(num_classes)
for label in train_dataset.labels:
    train_class_distribution[label] += 1

for label in test_dataset.labels:
    test_class_distribution[label] += 1


weights = np.sum(train_class_distribution) / train_class_distribution
weights /= np.sum(weights)
weights = torch.tensor(weights, dtype=torch.float32).to(device)

In [6]:
# Define your model, optimizer, and loss function
opt = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
loss_fn = nn.CrossEntropyLoss(weight=weights)

# Define the number of epochs and initialize logs
num_epochs = 20
train_loss_log = []
val_loss_log = []
best_val = np.inf
start_epoch = 0

# Check if there's a saved checkpoint
checkpoint_path = "checkpoint.pt"
if os.path.exists(checkpoint_path):
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint["model_state_dict"])
    opt.load_state_dict(checkpoint["optimizer_state_dict"])
    start_epoch = checkpoint["epoch"] + 1
    train_loss_log = checkpoint["train_loss_log"]
    val_loss_log = checkpoint["val_loss_log"]
    best_val = checkpoint["best_val"]
    print(f"Resuming from epoch {start_epoch}")


for epoch_num in range(start_epoch, num_epochs):
    print("#################")
    print(f"# EPOCH {epoch_num}")

    ### TRAIN
    model.train()  # Training mode (e.g. enable dropout, batchnorm updates,...)
    train_losses = []
    iterator = tqdm(train_dataloader)
    for batch_x, batch_y in iterator:
        # Move data to device
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device)

        # Forward pass
        out = model(batch_x)
        # Compute loss
        loss = loss_fn(out, batch_y)

        # Backpropagation
        opt.zero_grad()
        loss.backward()

        # Update the weights
        opt.step()

        train_losses.append(loss.item())
        iterator.set_description(f"Train loss: {round(loss.item(), 2)}")

    avg_train_loss = np.mean(train_losses)
    train_loss_log.append(avg_train_loss)

    ### VALIDATION
    model.eval()  # Evaluation mode (e.g. disable dropout, batchnorm,...)
    val_losses = []
    correct = 0
    total = 0
    with torch.no_grad():  # Disable gradient tracking
        for batch_x, batch_y in tqdm(test_dataloader):
            # Move data to device
            batch_x = batch_x.to(device)
            batch_y = batch_y.to(device)

            # Forward pass
            out = model(batch_x)

            val_losses.append(loss_fn(out, batch_y).item())

            _, predicted = torch.max(out, 1)
            total += batch_y.size(0)
            correct += (predicted == batch_y).sum().item()

    avg_val_loss = np.mean(val_losses)
    val_loss_log.append(avg_val_loss)
    val_acc = correct / total

    print(
        f"Average validation loss: {round(avg_val_loss,2)}\tValidation accuracy: {round(val_acc,2)}"
    )

    # Save the model and optimizer state at each epoch
    torch.save(
        {
            "epoch": epoch_num,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": opt.state_dict(),
            "train_loss_log": train_loss_log,
            "val_loss_log": val_loss_log,
            "best_val": best_val,
        },
        checkpoint_path,
    )

    if avg_val_loss < best_val:
        print("Update model!!!")
        torch.save(model.state_dict(), f"best_model_{avg_val_loss}.pt")
        best_val = avg_val_loss

#################
# EPOCH 0


  0%|          | 0/250 [00:00<?, ?it/s]


RuntimeError: Given groups=1, weight of size [16, 1, 3, 3, 3], expected input[1, 16, 32, 32, 32] to have 1 channels, but got 16 channels instead

In [None]:
# Load the model
model.load_state_dict(torch.load("best_model.pt"))
model.to(device)
# Set model to evaluation mode
model.eval()

# Lists to store predictions and ground truth labels
all_predictions = []
all_targets = []

iterator = tqdm(test_dataloader)
for inputs, targets in iterator:
    # Move data to device
    inputs = inputs.to(device)
    targets = targets.to(device)

    # Forward pass
    with torch.no_grad():
        outputs = model(inputs)

    # Convert outputs to predicted labels
    _, predicted = torch.max(outputs, 1)

    # Append to lists
    all_predictions.extend(predicted.cpu().numpy())
    all_targets.extend(targets.cpu().numpy())

# Calculate overall accuracy
overall_accuracy = accuracy_score(all_targets, all_predictions)
print(f"Overall Accuracy: {overall_accuracy}")

# Calculate accuracy per class
class_names = [train_dataset.class_map.get(key) for key in range(num_classes)]
class_report = classification_report(
    all_targets, all_predictions, target_names=class_names
)
print("Classification Report:")
print(class_report)

# Compute confusion matrix
cm = confusion_matrix(all_targets, all_predictions)

# Plot confusion matrix
plt.figure(figsize=(8, 8))
plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.colorbar()
tick_marks = np.arange(len(class_names))
plt.xticks(tick_marks, class_names, rotation=45)
plt.yticks(tick_marks, class_names)

plt.ylabel("True label")
plt.xlabel("Predicted label")
plt.tight_layout()

# Print numerical values in each cell of the matrix
fmt = "d"
thresh = cm.max() / 2.0
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
    plt.text(
        j,
        i,
        format(cm[i, j], fmt),
        horizontalalignment="center",
        color="white" if cm[i, j] > thresh else "black",
    )

plt.show()

In [None]:
# Choose random samples
num_samples = 5  # Number of random samples to predict
random_indices = random.sample(range(len(test_dataset)), num_samples)

fig = plt.figure(figsize=(15, 5))

for i, idx in enumerate(random_indices):
    # Get a random sample from the test set
    sample, target = test_dataset[idx]
    sample = sample.unsqueeze(0).to(device)  # Add batch dimension and move to device

    # Predict with the model
    with torch.no_grad():
        output = model(sample)
        _, predicted = torch.max(output, 1)

    # Convert predicted and target labels to numpy if needed
    predicted_label = predicted.item()
    true_label = target.item()  # Assuming target is a scalar label

    # Plot the voxel grid
    sample_np = (
        sample.cpu().squeeze().numpy()
    )  # Move sample to CPU and convert to numpy
    ax = fig.add_subplot(1, num_samples, i + 1, projection="3d")

    # Voxel plot
    sample_np = (sample_np + 1) / 2
    ax.voxels(sample_np, edgecolor="k")

    ax.set_title(
        f"Pred: {train_dataset.class_map.get(predicted_label)}, True: {train_dataset.class_map.get(true_label)}"
    )
    ax.set_xlabel("X")
    ax.set_ylabel("Y")
    ax.set_zlabel("Z")
    ax.set_xlim(0, sample_np.shape[0])
    ax.set_ylim(0, sample_np.shape[1])
    ax.set_zlim(0, sample_np.shape[2])

plt.tight_layout()
plt.show()

In [None]:
# Get the class names from the dataset
class_names = [train_dataset.class_map[i] for i in range(10)]

fig, axs = plt.subplots(2, 1, tight_layout=True, figsize=(10, 8))

# Plot training class distribution
axs[0].bar(range(10), train_class_distribution, color="skyblue")
axs[0].set_xticks(range(num_classes))
axs[0].set_xticklabels(class_names, rotation=45, ha="right")
axs[0].set_title("Training Class Distribution")
axs[0].set_ylabel("Number of Samples")

# Plot test class distribution
axs[1].bar(range(num_classes), test_class_distribution, color="salmon")
axs[1].set_xticks(range(num_classes))
axs[1].set_xticklabels(class_names, rotation=45, ha="right")
axs[1].set_title("Test Class Distribution")
axs[1].set_ylabel("Number of Samples")

plt.show()