In [1]:
import numpy as np
import gudhi
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import open3d as o3d
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import os
from dataset import PointCloudDataLoader

class PointCloudProcessor:
    def __init__(self, normalize=True, num_points=None):
        self.normalize = normalize
        self.num_points = num_points
        self.scaler = StandardScaler()
    
    def process(self, point_cloud):
        """Process a single point cloud."""
        # Convert to numpy if needed
        if isinstance(point_cloud, o3d.geometry.PointCloud):
            points = np.asarray(point_cloud.points)
        else:
            points = point_cloud

        # Normalize if requested
        if self.normalize:
            # Center and scale
            centroid = np.mean(points, axis=0)
            points = points - centroid
            scale = np.max(np.abs(points))
            points = points / scale

        # Downsample if needed
        if self.num_points and len(points) > self.num_points:
            indices = np.random.choice(len(points), self.num_points, replace=False)
            points = points[indices]

        return points

class PersistentHomologyFeatures:
    def __init__(self, max_dimension=2, max_edge_length=1.0):
        self.max_dimension = max_dimension
        self.max_edge_length = max_edge_length
    
    def compute_persistence(self, points):
        """Compute persistence diagrams for a point cloud."""
        # Create Vietoris-Rips complex
        rips = gudhi.RipsComplex(points=points, max_edge_length=self.max_edge_length)
        
        # Compute persistence
        simplex_tree = rips.create_simplex_tree(max_dimension=self.max_dimension)
        persistence = simplex_tree.persistence()
        
        return persistence
    
    def extract_features(self, persistence):
        """Extract features from persistence diagram."""
        features = []
        
        # Process each dimension
        for dim in range(self.max_dimension + 1):
            # Get persistence pairs for this dimension
            pairs = [(birth, death) for (d, (birth, death)) in persistence if d == dim]
            
            if len(pairs) > 0:
                pairs = np.array(pairs)
                
                # Calculate basic statistics
                lifetimes = pairs[:, 1] - pairs[:, 0]
                features.extend([
                    np.mean(lifetimes),
                    np.std(lifetimes),
                    np.max(lifetimes),
                    len(pairs)  # number of features in this dimension
                ])
            else:
                # Add zeros if no features in this dimension
                features.extend([0, 0, 0, 0])
        
        return np.array(features)

class TopologyClassifier:
    def __init__(self, processor=None, feature_extractor=None):
        self.processor = processor or PointCloudProcessor()
        self.feature_extractor = feature_extractor or PersistentHomologyFeatures()
        self.classifier = RandomForestClassifier(n_estimators=100)
        
    def extract_features(self, point_clouds):
        """Extract features from a list of point clouds."""
        features = []
        
        for pc in point_clouds:
            # Process point cloud
            processed_pc = self.processor.process(pc)
            
            # Compute persistence
            persistence = self.feature_extractor.compute_persistence(processed_pc)
            
            # Extract features from persistence
            pc_features = self.feature_extractor.extract_features(persistence)
            features.append(pc_features)
            
        return np.array(features)
    
    def fit(self, point_clouds, labels):
        """Train the classifier."""
        # Extract features
        X = self.extract_features(point_clouds)
        
        # Train classifier
        self.classifier.fit(X, labels)
    
    def predict(self, point_clouds):
        """Predict labels for point clouds."""
        # Extract features
        X = self.extract_features(point_clouds)
        
        # Predict
        return self.classifier.predict(X)

# Deep learning extension
class TopologyNet(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(TopologyNet, self).__init__()
        
        self.network = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, num_classes)
        )
    
    def forward(self, x):
        return self.network(x)

class PointCloudDataset(Dataset):
    def __init__(self, point_clouds, labels, processor=None, feature_extractor=None):
        self.processor = processor or PointCloudProcessor()
        self.feature_extractor = feature_extractor or PersistentHomologyFeatures()
        
        # Pre-compute features
        self.features = []
        for pc in point_clouds:
            processed_pc = self.processor.process(pc)
            persistence = self.feature_extractor.compute_persistence(processed_pc)
            features = self.feature_extractor.extract_features(persistence)
            self.features.append(features)
            
        self.features = torch.FloatTensor(self.features)
        self.labels = torch.LongTensor(labels)
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]        


# Example usage
def main():
    # Generate some dummy data
    # num_samples = 100
    # num_points = 1000
    # num_classes = 4
    
    # Generate random point clouds (replace with real data)
    # point_clouds = [
    #     np.random.randn(np.random.randint(500, 1000), 3)
    #     for _ in range(num_samples)
    # ]
    # labels = np.random.randint(0, num_classes, num_samples)
    
    # # Split data
    # X_train, X_test, y_train, y_test = train_test_split(
    #     point_clouds, labels, test_size=0.2, random_state=42
    # )
    
    # # Train classical classifier
    # # classifier = TopologyClassifier()
    # classifier = TopologyClassifier(processor=PointCloudProcessor(num_points=200))

    # print("Training classifier...")
    # classifier.fit(X_train, y_train)
    
    # # Predict
    # print("Predicting...")
    # predictions = classifier.predict(X_test)
    # y_hat = np.array(predictions)
    # accuracy = accuracy_score(y_test, y_hat)
    # print(f"Accuracy: {accuracy}")

    
    # # Train deep learning model
    # # Create datasets
    # train_dataset = PointCloudDataset(X_train, y_train)
    # test_dataset = PointCloudDataset(X_test, y_test)
    
    # # Create data loaders
    # train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    # test_loader = DataLoader(test_dataset, batch_size=32)
    
    # # Initialize model
    # input_dim = train_dataset.features.shape[1]
    # model = TopologyNet(input_dim=input_dim, num_classes=num_classes)
    
    # # Training loop (simplified)
    # criterion = nn.CrossEntropyLoss()
    # optimizer = torch.optim.Adam(model.parameters())
    
    # num_epochs = 10
    # for epoch in range(num_epochs):
    #     model.train()
    #     epoch_loss = 0
    #     for features, labels in train_loader:
    #         optimizer.zero_grad()
    #         outputs = model(features)
    #         loss = criterion(outputs, labels)
    #         loss.backward()
    #         optimizer.step()
    #         epoch_loss += loss.item()
    #     print(f"Epoch {epoch+1}, loss: {epoch_loss / len(train_loader)}")
    dataset_path = 'merged_sampled_objects'
    data = PointCloudDataLoader(dataset_path, use_cache=False).data
    print(type(data))
    print(data.shape)

if __name__ == "__main__":
    main()

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


ModuleNotFoundError: No module named 'dataset.pcd_dataloader'

# Sample enough objects from cropped classes
# 1500 for each class
# Test the classifier
