In [2]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler, random_split
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau
from sklearn.metrics import accuracy_score
from torchvision import datasets, transforms
from torch.cuda.amp import autocast, GradScaler
from torch.utils.data import RandomSampler
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import pandas as pd

In [3]:
# Data augmentation functions
def rotate_point_cloud(points):
    """Rotate the point cloud around the z-axis."""
    angle = np.random.uniform(0, 2 * np.pi)
    cos_val = np.cos(angle)
    sin_val = np.sin(angle)
    rotation_matrix = np.array([
        [cos_val, -sin_val, 0],
        [sin_val, cos_val, 0],
        [0, 0, 1]
    ])
    rotated_points = np.dot(points, rotation_matrix.T)
    return rotated_points

def translate_point_cloud(points):
    """Translate the point cloud by a small random vector."""
    translation_vector = np.random.uniform(-0.2, 0.2, size=(1, 3))
    translated_points = points + translation_vector
    return translated_points

def scale_point_cloud(points):
    """Scale the point cloud by a random factor."""
    scale_factor = np.random.uniform(0.8, 1.2)
    scaled_points = points * scale_factor
    return scaled_points

def jitter_point_cloud(points):
    """Add Gaussian noise to the point cloud."""
    noise = np.random.normal(0, 0.01, points.shape)
    jittered_points = points + noise
    return jittered_points

def augment_point_cloud(points):
    points = rotate_point_cloud(points)
    points = translate_point_cloud(points)
    points = scale_point_cloud(points)
    points = jitter_point_cloud(points)
    return points

def augment_point_cloud(features):
    """Augment the point cloud with random rotations and/or noise."""
    # Extract the spatial coordinates (x, y, z)
    points = features[:, :3]  # Only the x, y, z columns
    
    # Check and remove the header row if present
    if isinstance(points[0, 0], str):  # Check if the first element is a string (indicating header)
        points = points[1:, :]  # Remove the first row (header)
        features = features[1:, :]  # Remove the header row from the whole features array as well
    
    # Convert points to float type explicitly
    points = points.astype(np.float32)
    
    # Print the shape of points to debug
    print("Shape of points:", points.shape)
    
    # Print the first few points to check the data
    print("First few points:", points[:5])
    
    # Generate a random rotation matrix (for rotating around the z-axis)
    angle = np.random.uniform(-np.pi, np.pi)
    rotation_matrix = np.array([
        [np.cos(angle), -np.sin(angle), 0],
        [np.sin(angle), np.cos(angle), 0],
        [0, 0, 1]
    ])
    
    # Print the rotation matrix
    print("Rotation matrix:\n", rotation_matrix)
    
    # Apply the rotation to the points (only x, y, z)
    try:
        rotated_points = np.dot(points, rotation_matrix.T)  # Rotate points
    except Exception as e:
        print("Error during matrix multiplication:", e)
        return features
    
    # Replace the original points with the rotated points
    features[:, :3] = rotated_points
    
    return features

In [4]:


def normalize_point_cloud(points):
    """Normalize the point cloud."""
    centroid = np.mean(points, axis=0)
    points -= centroid
    max_dist = np.max(np.sqrt(np.sum(points**2, axis=1)))
    points /= max_dist
    return points

class PointCloudDataset(Dataset):
    def __init__(self, data_dir, augment=False):
        self.data_dir = data_dir
        self.files = [f for f in os.listdir(data_dir) if f.endswith('.npy')]
        self.augment = augment

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file_path = os.path.join(self.data_dir, self.files[idx])
        
        data = pd.read_csv(file_path, header=None).values  # Load as NumPy array
        points = data[:, :3]  # x, y, z
        normals = data[:, 3:6]  # nx, ny, nz
        curvature_density = data[:, 6:8]  # curvature, density
        features = np.hstack((points, normals, curvature_density))  # (x, y, z, nx, ny, nz, curvature, density)
        labels = curvature_density  # If you're doing regression, these can be your labels
        
        if self.augment:
            features = augment_point_cloud(features)
        points = normalize_point_cloud(points)
        features = torch.FloatTensor(features)
        labels = torch.FloatTensor(labels)  # If you're working with regression, keep labels as float
        
        return features, labels



def collate_fn(batch):
 
    max_points = min(1024, max(item[0].size(0) for item in batch))  # Limit to 1024 points

    padded_points = []
    padded_labels = []

    for points, labels in batch:
        
        if points.size(0) > max_points:
            points = points[:max_points]
            labels = labels[:max_points]
        
        
        pad_size = max_points - points.size(0)
        
   
        padded = torch.nn.functional.pad(points, (0, 0, 0, pad_size)) 
        padded_points.append(padded)
        

        padded_label = torch.cat((labels, torch.zeros(pad_size)))
        padded_labels.append(padded_label)


    return torch.stack(padded_points), torch.stack(padded_labels)

In [5]:
import torch
import torch.nn as nn

class PointNetPlusPlus(nn.Module):
    def __init__(self):
        super(PointNetPlusPlus, self).__init__()

        # First MLP block with dropout
        self.mlp1 = nn.Sequential(
            nn.Linear(3, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Dropout(0.5)
        )

        self.bn1 = nn.BatchNorm1d(512)  

       
        self.residual1 = nn.Linear(3, 512) 

       
        self.abstraction = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU()
        )

        self.mlp2 = nn.Sequential(
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1)  
        )

       
        self.residual2 = nn.Linear(128, 1) 

        self.mlp3 = nn.Sequential(
            nn.Linear(1, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1) 
        )
        
       
        self.residual3 = nn.Linear(1, 1)  

    def forward(self, x):
        # Apply the first MLP
        identity1 = self.residual1(x)
        x = self.mlp1(x)
        x = x + identity1

        # Apply the second MLP with residual connection
        x = self.abstraction(x)
        identity2 = self.residual2(x)
        x = self.mlp2(x)
        x = x + identity2

        # Apply the third MLP with residual connection
        identity3 = self.residual3(x)
        x = self.mlp3(x)
        x = x + identity3

        # Final output
        return x.squeeze(-1) 


In [6]:
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import StepLR
import os
import numpy as np
import pandas as pd
import time
from tqdm import tqdm
import matplotlib.pyplot as plt

data_dir = '/home/jovyan/RnD/dataset_npy.zip' #provide input directory of the filtered data

# Hyperparameters
batch_size = 16
num_epochs = 100
learning_rate = 0.001
step_size = 20  
gamma = 0.7  
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Dataset and DataLoader
train_dataset = PointCloudDataset(data_dir=data_dir, augment=True)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

val_dataset = PointCloudDataset(data_dir=data_dir, augment=False)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

# Model
model = PointNetPlusPlus().to(device)


criterion = nn.BCEWithLogitsLoss()


optimizer = optim.Adam(model.parameters(), lr=learning_rate)


scheduler = StepLR(optimizer, step_size=step_size, gamma=gamma)


epoch_durations = []
train_losses = []


start_time = time.time()

# Training Loop
best_val_loss = float('inf')
def train_one_epoch(model, train_loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    for points, labels in tqdm(train_loader, desc="Training"):
        points, labels = points.to(device), labels.to(device)
        
        optimizer.zero_grad()  # Zero the gradients
        
        # Forward pass
        outputs = model(points)
        
        # Calculate loss
        loss = criterion(outputs.squeeze(), labels.float())  # Squeeze for binary labels
        loss.backward()  # Backpropagate
        optimizer.step()  # Update weights
        
        running_loss += loss.item()
    
    return running_loss / len(train_loader)
def validate(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for points, labels in tqdm(val_loader, desc="Validating"):
            points, labels = points.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(points)
            loss = criterion(outputs.squeeze(), labels.float())  # Squeeze for binary labels
            
            val_loss += loss.item()
            predictions = (outputs.squeeze() > 0.5).long()  # Assuming binary classification (0 or 1)
            correct += (predictions == labels).sum().item()
            total += labels.size(0)
    
    val_loss = val_loss / len(val_loader)
    val_accuracy = correct / total
    return val_loss, val_accuracy

for epoch in range(num_epochs):
    print(f'Epoch {epoch + 1}/{num_epochs}')
    
    
    epoch_start_time = time.time()
    
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_accuracy = validate(model, val_loader, criterion, device)
    
    
    epoch_duration = time.time() - epoch_start_time
    epoch_durations.append(epoch_duration)
    train_losses.append(train_loss)
    
   
    scheduler.step()
    
    print(f'Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')
    
    # Save the best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'edgesegment_pointnet_model.pth')
        print('Saved the best model!')

print('Training finished!')


elapsed_times = [sum(epoch_durations[:i+1]) for i in range(num_epochs)] 
df = pd.DataFrame({
    'Elapsed Time (s)': elapsed_times,
    'Train Loss': train_losses
})
df.to_csv('training_time_vs_loss.csv', index=False)

# # Plot Training Loss vs Elapsed Time
# plt.figure(figsize=(10, 6))
# plt.plot(elapsed_times, train_losses, label='Training Loss')
# plt.xlabel('Elapsed Time (s)')
# plt.ylabel('Training Loss')
# plt.title('Training Loss vs Elapsed Time')
# plt.legend()
# plt.grid(True)
# plt.savefig('training_loss_vs_time.png')
# plt.show()


NotADirectoryError: [Errno 20] Not a directory: '/home/jovyan/RnD/dataset_npy.zip'

In [7]:
import torch
import numpy as np
from torch.utils.data import DataLoader


model.load_state_dict(torch.load('edgesegment_pointnet_model.pth', map_location=device))
model.to(device)
model.eval()  

# Load the test data
test_file = '/home/jovyan/dlrv_project_ws/test_pico_2024-07-04_215824.npy' #provide appropriate test data location
test_data = np.load(test_file)
test_points = torch.tensor(test_data[:, :3], dtype=torch.float32) 
test_labels = torch.tensor(test_data[:, 3], dtype=torch.float32) 

# Prepare test data loader
test_dataset = torch.utils.data.TensorDataset(test_points, test_labels)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Testing function
def test_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for points, labels in test_loader:
            points, labels = points.to(device), labels.to(device)
            
            outputs = model(points)
            outputs = outputs.squeeze(-1)  # Shape: [batch_size, num_points]
            
            predicted = torch.round(torch.sigmoid(outputs))  # Apply sigmoid and round to get [0, 1]
            correct += (predicted == labels).sum().item()
            total += labels.numel()
    
    accuracy = correct / total
    return accuracy

# Test the model
test_accuracy = test_model(model, test_loader, device)
print(f'Test Accuracy: {test_accuracy * 100:.2f}%')


# dlrv_project_ws/test_pico_2024-07-04_215824.npy

NameError: name 'model' is not defined

In [11]:
# Function to plot the point cloud with predicted edge and non-edge points
def plot_point_cloud_with_predictions(points, predictions):
    

    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')


    if isinstance(predictions, torch.Tensor):
        predictions = predictions.cpu().numpy()


    if predictions.ndim == 1:
        predictions = predictions.reshape(-1)


    edges = points[predictions == 1]
    non_edges = points[predictions == 0]


    ax.scatter(edges[:, 0], edges[:, 1], edges[:, 2], c='r', label='Edge', s=1)


    ax.scatter(non_edges[:, 0], non_edges[:, 1], non_edges[:, 2], c='b', label='Non-Edge', s=1)


    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')
    plt.legend()
    plt.title("Detected Edges (Red) and Non-Edges (Blue)")
    

    plt.show()

# provide Test point cloud file path
test_file = '/home/jovyan/dlrv_project_ws/test_Schmersal_AM-T100_2024-07-04_215814.npy'

# Load the test point cloud
data = np.load(test_file)
points = data[:, :3] 


points = normalize_point_cloud(points)


points_tensor = torch.FloatTensor(points).unsqueeze(0).to(device) 

model = PointNetPlusPlus().to(device)
model.load_state_dict(torch.load('edgesegment_pointnet_model.pth')) #load the model using correct path
with torch.no_grad():
    outputs = model(points_tensor)
    print(f"Model output shape: {outputs.shape}")
    

    if outputs.ndimension() == 3:  
        outputs = outputs.squeeze(-1)  

    predictions = torch.round(torch.sigmoid(outputs)).squeeze()  # [num_points]


if predictions.ndimension() == 1:
    predictions = predictions.cpu().numpy()
else:
    predictions = predictions.squeeze().cpu().numpy()


# Plot the point cloud with predicted edges and non-edges
plot_point_cloud_with_predictions(points, predictions)


FileNotFoundError: [Errno 2] No such file or directory: '/home/jovyan/dlrv_project_ws/test_Schmersal_AM-T100_2024-07-04_215814.npy'

In [12]:

file_path= r'/home/jovyan/dlrv_project_ws/training_time_vs_loss.csv' #provide the correct filepath for the same
data = pd.read_csv(file_path)

# Plot training loss vs time
plt.figure(figsize=(8, 6))
plt.plot(data['Elapsed Time (s)'], data['Train Loss'], label='Training Loss', color='blue', marker='o')
plt.xlabel('Time (seconds)')
plt.ylabel('Training Loss')
plt.title('Training Loss vs Time')
plt.legend()
plt.grid(True)
plt.show()

FileNotFoundError: [Errno 2] No such file or directory: '/home/jovyan/dlrv_project_ws/training_time_vs_loss.csv'