In [1]:
import torch
import numpy as np
import random
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, fowlkes_mallows_score, silhouette_score
from torch import nn
import torch.nn.functional as F
import open3d as o3d
from torch.utils.data import DataLoader
from learning3d.models import PPFNet
from learning3d.data_utils import ClassificationData, ModelNet40Data
from tqdm import tqdm
from SpinNet128 import Descriptor_Net


# Cihaz ayarı
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Test veri kümesini yükleme
test_dataset = ClassificationData(data_class=ModelNet40Data(train=False, num_points=4096))
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=2)

# Modeli oluştur ve ağırlıkları yükle
class SpinNetClassifier(nn.Module):
    def __init__(self, num_classes=40, embedding_dim=128):
        super(SpinNetClassifier, self).__init__()
        self.encoder = self.encoder = Descriptor_Net(
            rad_n = 9,
            azi_n = 80,
            ele_n = 40,
            des_r = 0.30,
            voxel_r = 0.04,
            voxel_sample = 30,
            dataset = "3DMatch"
        )
        self.fc = nn.Sequential(
            nn.Linear(embedding_dim, 512),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(negative_slope=0.1),
            nn.Dropout(0.4),

            nn.Linear(512, 1024),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(negative_slope=0.1),
            nn.Dropout(0.4),

            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(negative_slope=0.1),
            nn.Dropout(0.3),

            nn.Linear(512, num_classes)
        )
    
    def forward(self, points):
        features = self.encoder(points)
        global_embedding = features.squeeze()
        logits = self.fc(global_embedding)
        return logits

model = SpinNetClassifier(num_classes=40, embedding_dim=128).to(device)
model.encoder.load_state_dict(torch.load("ppfnet_encoder_epoch21.pth", map_location=device))
model.fc.load_state_dict(torch.load("ppfnet_classifier_epoch21.pth", map_location=device))
model.eval()
  

def random_rotation_single_axis(points, max_angle):
    """Belirli bir eksende rastgele dönüşüm uygular."""
    axis = random.choice([0, 1, 2])  # Rastgele bir eksen seç
    angle = random.uniform(-max_angle, max_angle)  # Rastgele dönüş açısı belirle
    
    rotation_matrix = torch.eye(3)  # 3x3 birim matris
    c, s = torch.cos(torch.tensor(angle)), torch.sin(torch.tensor(angle))
    
    if axis == 0:  # X ekseni dönüşü
        rotation_matrix[1, 1] = c
        rotation_matrix[1, 2] = -s
        rotation_matrix[2, 1] = s
        rotation_matrix[2, 2] = c
    elif axis == 1:  # Y ekseni dönüşü
        rotation_matrix[0, 0] = c
        rotation_matrix[0, 2] = s
        rotation_matrix[2, 0] = -s
        rotation_matrix[2, 2] = c
    else:  # Z ekseni dönüşü
        rotation_matrix[0, 0] = c
        rotation_matrix[0, 1] = -s
        rotation_matrix[1, 0] = s
        rotation_matrix[1, 1] = c

    return torch.matmul(points, rotation_matrix.to(points.device))  # Dönüşümü uygula


def random_rotation(points, max_angle):
    """Rastgele dönüşüm uygular."""
    angles = torch.empty(3).uniform_(-max_angle, max_angle)  # Her eksen için rastgele açı seç
    Rx = torch.tensor([
        [1, 0, 0],
        [0, torch.cos(angles[0]), -torch.sin(angles[0])],
        [0, torch.sin(angles[0]), torch.cos(angles[0])]
    ])
    Ry = torch.tensor([
        [torch.cos(angles[1]), 0, torch.sin(angles[1])],
        [0, 1, 0],
        [-torch.sin(angles[1]), 0, torch.cos(angles[1])]
    ])
    Rz = torch.tensor([
        [torch.cos(angles[2]), -torch.sin(angles[2]), 0],
        [torch.sin(angles[2]), torch.cos(angles[2]), 0],
        [0, 0, 1]
    ])
    rotation_matrix = torch.matmul(Rz, torch.matmul(Ry, Rx))  # Döndürme matrisini oluştur
    return torch.matmul(points, rotation_matrix.to(points.device))  # Dönüşümü uygula 


def evaluate_clustering():
    """Orijinal ve dönüşümlü objeler için kümeleme skorlarını hesaplar."""
    scores = {
        "Original": [],
        "Single Small": [],
        "All Small": [],
        "Single Large": [],
        "All Large": [],
    }

    all_labels, original_encs, enc_1s, enc_2s, enc_3s, enc_4s = [], [], [], [], [], []

    with torch.no_grad():
        for points, labels in tqdm(test_loader):
            points = points.to(device)

            # 4 Farklı Dönüşüm
            rotated_1 = random_rotation_single_axis(points, torch.pi / 18)  # Single Small (~10°)
            rotated_2 = random_rotation(points, torch.pi / 18)  # All Small (~10°)
            rotated_3 = random_rotation_single_axis(points, torch.pi / 3)   # Single Large (~60°)
            rotated_4 = random_rotation(points, torch.pi / 3)   # All Large (~60°)


            # **Encoder Çıktıları**
            original_encoding = model.encoder(points)
            original_encoding = torch.squeeze(original_encoding)
            original_encoding = original_encoding.cpu().numpy().astype(np.float16)
            enc_1 = model.encoder(rotated_1)
            enc_1 = torch.squeeze(enc_1)
            enc_1 = enc_1.cpu().numpy().astype(np.float16)
            enc_2 = model.encoder(rotated_2)
            enc_2 = torch.squeeze(enc_2)
            enc_2 = enc_2.cpu().numpy().astype(np.float16)
            enc_3 = model.encoder(rotated_3)
            enc_3 = torch.squeeze(enc_3)
            enc_3 = enc_3.cpu().numpy().astype(np.float16)
            enc_4 = model.encoder(rotated_4)
            enc_4 = torch.squeeze(enc_4)
            enc_4 = enc_4.cpu().numpy().astype(np.float16)

            original_encs.extend(original_encoding)
            enc_1s.extend(enc_1)
            enc_2s.extend(enc_2)
            enc_3s.extend(enc_3)
            enc_4s.extend(enc_4)
            all_labels.extend(labels.cpu().numpy().astype(np.float16))
    all_encs = [original_encs, enc_1s, enc_2s, enc_3s, enc_4s]

    return all_encs, all_labels, scores

all_encs, all_labels, scores = evaluate_clustering()

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
Using device: cuda


100%|█████████████████████████████████████████| 309/309 [02:07<00:00,  2.42it/s]


In [2]:
def evaluate(all_encs, all_labels, scores):
    # **Clustering Performansını Ölç**
    for key, encs in zip(scores.keys(), all_encs):
        
        ari, nmi, fmi, silhouette = clustering_performance(encs, all_labels)
        scores[key] = [ari, nmi, fmi, silhouette]

    # **Sonuçları Yazdır**
    print("\n==== Kümeleme Performans Sonuçları ====")
    for key, values in scores.items():
        print(f"{key}: ARI={values[0]:.4f}, NMI={values[1]:.4f}, FMI={values[2]:.4f}, Silhouette={values[3]:.4f}")

In [5]:
def clustering_performance(encodings, true_labels):
    """Kümeleme algoritması ile 40 cluster'a ayır ve performansı değerlendir."""
    kmeans = KMeans(n_clusters=40, random_state=42, n_init="auto")  
    predicted_clusters = kmeans.fit_predict(encodings)
    true_labels = np.squeeze(true_labels, axis=-1)

    ari = adjusted_rand_score(true_labels, predicted_clusters)  
    nmi = normalized_mutual_info_score(true_labels, predicted_clusters)  
    fmi = fowlkes_mallows_score(true_labels, predicted_clusters)  
    silhouette = silhouette_score(encodings, predicted_clusters)

    return ari, nmi, fmi, silhouette

In [6]:
evaluate(all_encs, all_labels, scores)


==== Kümeleme Performans Sonuçları ====
Original: ARI=0.1112, NMI=0.3396, FMI=0.1391, Silhouette=0.0675
Single Small: ARI=0.1056, NMI=0.3402, FMI=0.1338, Silhouette=0.0651
All Small: ARI=0.1103, NMI=0.3399, FMI=0.1381, Silhouette=0.0670
Single Large: ARI=0.1153, NMI=0.3437, FMI=0.1432, Silhouette=0.0706
All Large: ARI=0.1109, NMI=0.3390, FMI=0.1386, Silhouette=0.0683
