In [1]:
%%capture
!yes | pip install trimesh==3.21.6
!yes | pip install open3d==0.17.0
!yes | pip install natsort==8.3.1

In [2]:
import sys 
print(sys.version)

3.10.10 | packaged by conda-forge | (main, Mar 24 2023, 20:08:06) [GCC 11.3.0]


In [3]:
from typing import Tuple
from natsort import natsorted
from tqdm import tqdm

import os
import json
import errno
import random

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
%matplotlib inline
import IPython.display as IPd

import plotly.express as px
import plotly.graph_objs as go

import torch
from torch.utils.data import Dataset, DataLoader

import trimesh
import open3d as o3d

from sklearn.decomposition import PCA

from sklearn.svm import SVC

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [4]:
seed = 42

random.seed(seed)
np.random.seed(seed)
os.environ["PYTHONHASHSEED"] = str(seed)

o3d.utility.random.seed(seed)

torch.manual_seed(seed)

<torch._C.Generator at 0x7cb990dfa030>

In [5]:
BASE_PATH = '/kaggle/input/2023-ml-project3/modelnet10'

DATA_PATH   = os.path.join(BASE_PATH, 'dataset')
LABEL_PATH  = os.path.join(BASE_PATH, 'class2Label.json')
SUBMIT_PATH = os.path.join(BASE_PATH, 'sample_submit.csv')


In [6]:
class_name_list = sorted(os.listdir(os.path.join(DATA_PATH, 'train')))

scene_list = list()

for class_name in class_name_list:
    off = random.choice(os.listdir(os.path.join(DATA_PATH, 'train', class_name)))
    
    mesh = trimesh.load(os.path.join(DATA_PATH, 'train', class_name, off))
    
    scene = trimesh.Scene()
    
    scene.add_geometry(mesh)
    
    scene_list.append(scene)

In [7]:
for class_name, scene in zip(class_name_list, scene_list):
    IPd.display(IPd.HTML(f"<h4 style='text-align:center;'>{class_name}</h4>"))
    IPd.display(scene.show())

In [8]:
class ModelNet10(Dataset):
    def __init__(self, root: str, split: str, class2label:dict):
        self.root = root
        self.split = split.lower()
        assert split in ['train', 'test']
        
        if class2label is not None:
            self.class2label = class2label
        
        self.path, self.label = list(), list()
        
        if self.split == 'train':
            self.classes = sorted(os.listdir(os.path.join(self.root, self.split)))
            self.le = {self.classes[i]: self.class2label[self.classes[i]] for i in range(len(self.classes))}
        
            for class_name in self.classes:
                class_path = os.path.join(self.root, self.split, class_name)

                for off in os.listdir(class_path):
                    if off.endswith('off'):
                        self.path.append(os.path.join(class_path, off))
                        self.label.append(self.le[class_name])
                    else:
                        raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), off)
                        
        elif self.split == 'test':
            fname_list = natsorted(os.listdir(os.path.join(self.root, self.split)))
            for fname in fname_list:
                self.path.append(os.path.join(self.root, self.split, fname))
            
    def __getitem__(self, index):
        if self.split == 'train':
            return self.path[index], self.label[index]
        elif self.split == 'test':
            return self.path[index]
    
    def __len__(self):
        return len(self.path)

In [9]:
with open(LABEL_PATH, 'r') as j:
    class2label = json.load(j)
    
train_dataset = ModelNet10(root=DATA_PATH, split='train', class2label=class2label)
train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)

test_dataset = ModelNet10(root=DATA_PATH, split='test', class2label=None)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [10]:
# 반복문을 통해 학습 데이터로더로부터 데이터 디렉토리(off)와 정수형 라벨(label)을 받음
for off, label in train_dataloader:
    print(f'TRAIN DATALOADER\npath: {off}\nlabel: {label}\n')
    print(f'TRAIN DATALOADER\npath: {off[0]}\nlabel: {label.item()}'); break
    
# 반복문을 통해 평가 데이터로더로부터 데이터 디렉토리(off)를 받음
for off in test_dataloader:
    print(f'\nTEST  DATALODER\npath: {off[0]}'); break

TRAIN DATALOADER
path: ('/kaggle/input/2023-ml-project3/modelnet10/dataset/train/bathtub/bathtub_0098.off',)
label: tensor([0])

TRAIN DATALOADER
path: /kaggle/input/2023-ml-project3/modelnet10/dataset/train/bathtub/bathtub_0098.off
label: 0

TEST  DATALODER
path: /kaggle/input/2023-ml-project3/modelnet10/dataset/test/0.off


In [11]:
def read_off(off: str) -> Tuple[np.array, np.array]:
    with open(off, 'r') as f:
        first_line = f.readline().strip()
        if first_line != 'OFF':
            raise ValueError('Not a valid OFF header')
        
        n_vertex, n_face, n_edge = map(int, f.readline().strip().split())
        #print(n_vertex, n_face, n_edge)
        vertices = list()
        for _ in range(n_vertex):
            vertex = list(map(float, f.readline().strip().split()))
            vertices.append(vertex)
        
        faces = list()
        for _ in range(n_face):
            face = list(map(int, f.readline().strip().split()[1:]))
            faces.append(face)
        
        # Vertex들의 정보를 담은 리스트(vertices)와 Face들의 정보를 담은 리스트(faces)를 각각 numpy array로 변환하여 튜플 형식으로 반환
        return np.array(vertices), np.array(faces)

In [12]:
# 3D 데이터 표현 방법 선택: 'point_cloud', 'voxel', 'mesh' 중 하나를 선택
method = 'point_cloud'

In [13]:
def estimate_normal_vector_from_point_cloud(point_cloud, radius: int = None, max_nn: int = None) -> np.array:
    pcd = o3d.geometry.PointCloud()

    pcd.points = o3d.utility.Vector3dVector(point_cloud)
    
    search_param = o3d.geometry.KDTreeSearchParamHybrid(radius=radius, max_nn=max_nn)
    pcd.estimate_normals(search_param=search_param)
    
    normal = np.asarray(pcd.normals)
    return normal

In [14]:
def compute_histogram_of_normal_vector(normal_vector, n_bins: int = 10, n_range: tuple = (-1,1)):

    hist_x, bin_edges_x = np.histogram(normal_vector[:, 0], bins=n_bins, range=n_range)
    hist_y, bin_edges_y = np.histogram(normal_vector[:, 1], bins=n_bins, range=n_range)
    hist_z, bin_edges_z = np.histogram(normal_vector[:, 2], bins=n_bins, range=n_range)

    feature = np.concatenate((hist_x, hist_y, hist_z))

    feature_normalized = feature / np.sum(feature)

    return feature_normalized

In [15]:
if method == 'point_cloud':

    x_train_point_cloud, y_train_point_cloud = list(), list()
    
    pbar = tqdm(enumerate(train_dataloader, start=1))
    for i, (off_, label_) in pbar:
        off, label = off_[0], label_.item()
        
        point_cloud, a = read_off(off)
       
        normal_vector = estimate_normal_vector_from_point_cloud(point_cloud, radius=5, max_nn=30)
        
        training_feature = compute_histogram_of_normal_vector(normal_vector, n_bins=32, n_range=(-1, 1))
        
        x_train_point_cloud.append(training_feature)
        y_train_point_cloud.append(label)

        pbar.set_description(f'Processing: {os.path.basename(off)}\tPercentage: {i / len(train_dataset) * 100:.1f}%')

    # 학습 데이터와 학습 라벨을 numpy array 형식으로 변환
    x_train_point_cloud = np.asarray(x_train_point_cloud)

Processing: desk_0057.off	Percentage: 100.0%: : 1000it [02:00,  8.31it/s]     


In [16]:
if method == 'point_cloud':
    x_test_point_cloud = list()
    
    pbar = tqdm(enumerate(test_dataloader, start=1))
    for i, (off_) in pbar:
        off = off_[0]
        
        point_cloud, faces = read_off(off)
        
        normal_vector = estimate_normal_vector_from_point_cloud(point_cloud, radius=5, max_nn=30)
        
        feature = compute_histogram_of_normal_vector(normal_vector,n_bins=32, n_range=(-1, 1))
        
        x_test_point_cloud.append(feature)
        
        pbar.set_description(f'Processing: {os.path.basename(off)}\tPercentage: {i / len(test_dataset) * 100:.1f}%')

    # 평가 데이터를 numpy array 형식으로 변환
    x_test_point_cloud = np.asarray(x_test_point_cloud)

Processing: 299.off	Percentage: 100.0%: : 300it [00:29, 10.19it/s]


- SVM classification

In [17]:
print(f'Select Method is {method}')

if method == 'point_cloud':
    x_train, y_train = x_train_point_cloud, y_train_point_cloud
    x_test = x_test_point_cloud

elif method == 'voxel':
    x_train, y_train = x_train_voxel, y_train_voxel
    x_test = x_test_voxel

elif method == 'mesh':
    x_train, y_train = x_train_mesh, y_train_mesh
    x_test = x_test_mesh
    
from sklearn.svm import SVC
svm = SVC(C=10, random_state=seed)
svm.fit(x_train, y_train)
print(sum(svm.predict(x_train) == y_train) / len(y_train)) #train set predict 정확도 체크

pred = svm.predict(x_test)

submit = pd.read_csv(SUBMIT_PATH, index_col=0)
submit['Label'] = pred

submit.to_csv(f"{method}_baseline.csv")

Select Method is point_cloud
0.766


In [18]:
# 3D 데이터 표현 방법 선택: 'point_cloud', 'voxel', 'mesh' 중 하나를 선택
method = 'voxel'

In [19]:
def create_voxel_grid(point_cloud, voxel_size):
    min_bounds = np.min(point_cloud, axis=0)
    max_bounds = np.max(point_cloud, axis=0)

    voxel_counts = np.ceil((max_bounds - min_bounds) / voxel_size).astype(int)
    x_count, y_count, z_count = voxel_counts

    voxel_grid = np.zeros((x_count, y_count, z_count), dtype=bool)

    voxel_indices = ((point_cloud - min_bounds) / voxel_size).astype(int)

    voxel_grid[voxel_indices[:, 0], voxel_indices[:, 1], voxel_indices[:, 2]] = True

    return voxel_grid

In [20]:
def create_voxel_grid(point_cloud, voxel_size: int) -> np.array:
    pcd = o3d.geometry.PointCloud()
     
    pcd.points = o3d.utility.Vector3dVector(point_cloud)

    voxel_grid = o3d.geometry.VoxelGrid.create_from_point_cloud(pcd, voxel_size=voxel_size)
    
    voxels = voxel_grid.get_voxels()
    #print(voxels)
    
    grid_index = np.stack([voxel.grid_index for voxel in voxels])
    
    return grid_index

In [21]:
def compute_occupancy(voxel_grid) -> Tuple[np.array, np.array]:
    n_voxels = voxel_grid.shape[0]
    n_occupied_voxels = np.count_nonzero(voxel_grid[:, -1])
    occupancy = n_occupied_voxels / n_voxels
    
    x_occupancy = np.count_nonzero(np.sum(np.column_stack((voxel_grid[:, 1], voxel_grid[:, 2])), axis=1))
    y_occupancy = np.count_nonzero(np.sum(np.column_stack((voxel_grid[:, 0], voxel_grid[:, 2])), axis=1))
    z_occupancy = np.count_nonzero(np.sum(np.column_stack((voxel_grid[:, 0], voxel_grid[:, 1])), axis=1))
    #print(x_occupancy, y_occupancy, z_occupancy)
    
    axis_occupancy = np.column_stack((x_occupancy, y_occupancy, z_occupancy)) / n_voxels
    
    return np.array([occupancy]), axis_occupancy

In [22]:
if method == 'voxel':
    x_train_voxel  = list()
    y_train_voxel = list()
    
    pbar = tqdm(enumerate(train_dataloader, start=1))
    for i, (off_, label_) in pbar:
        off, label = off_[0], label_.item()
        
        point_cloud, faces = read_off(off)

        voxel_grid = create_voxel_grid(point_cloud, voxel_size=8)
        
        occupancy, axis_occupancy = compute_occupancy(voxel_grid)
        
        feature = np.concatenate((occupancy, axis_occupancy.reshape(-1)), axis=0)
        
        x_train_voxel.append(feature)
        y_train_voxel.append(label)

        pbar.set_description(f'Processing: {os.path.basename(off)}\tPercentage: {i / len(train_dataset) * 100:.1f}%')
    
    x_train_voxel = np.array(x_train_voxel)
    y_train_voxel = np.array(y_train_voxel)

Processing: sofa_0024.off	Percentage: 100.0%: : 1000it [01:34, 10.59it/s]     


In [23]:
if method == 'voxel':
    x_test_voxel = list()
    
    pbar = tqdm(enumerate(test_dataloader, start=1))
    for i, (off_) in pbar:
        off = off_[0]
        point_cloud, faces = read_off(off)
        
        voxel_grid = create_voxel_grid(point_cloud, voxel_size=8)
        
        occupancy, axis_occupancy = compute_occupancy(voxel_grid)
        
        feature = np.concatenate((occupancy, axis_occupancy.reshape(-1)), axis=0)
        
        x_test_voxel.append(feature)
        
        pbar.set_description(f'Processing: {os.path.basename(off)}\tPercentage: {i / len(test_dataset) * 100:.1f}%')
    
    # 평가 데이터를 numpy array 형식으로 변환
    x_test_voxel = np.array(x_test_voxel)

Processing: 299.off	Percentage: 100.0%: : 300it [00:22, 13.45it/s]


In [24]:
print(f'Select Method is {method}')

if method == 'point_cloud':
    x_train, y_train = x_train_point_cloud, y_train_point_cloud
    x_test = x_test_point_cloud

elif method == 'voxel':
    x_train, y_train = x_train_voxel, y_train_voxel
    x_test = x_test_voxel

elif method == 'mesh':
    x_train, y_train = x_train_mesh, y_train_mesh
    x_test = x_test_mesh
    
from sklearn.svm import SVC
svm = SVC(C=10, random_state=seed)
svm.fit(x_train, y_train)
print(sum(svm.predict(x_train) == y_train) / len(y_train)) #train set predict 정확도 체크

pred = svm.predict(x_test)

submit = pd.read_csv(SUBMIT_PATH, index_col=0)
submit['Label'] = pred

submit.to_csv(f"{method}_baseline.csv")

Select Method is voxel
0.458


In [25]:
# 3D 데이터 표현 방법 선택: 'point_cloud', 'voxel', 'mesh' 중 하나를 선택
method = 'mesh'

In [26]:
def calculate_normal_vector_from_mesh(vertices, faces) -> np.array:

    normal_vector = list()
    

    for face in faces:

        P = vertices[face[0]]
        Q = vertices[face[1]]
        R = vertices[face[2]]


        PQ = Q - P
        PR = R - P
        normal = np.cross(PQ, PR)
        normal_vector.append(normal)

    return np.array(normal_vector)

In [27]:
def pca_normal_vector(normal_vector) -> np.array:
    from sklearn.decomposition import PCA
    PCA = PCA(n_components=3)
    PCA.fit_transform(normal_vector)

    #print(PCA.components_)
    pca_x, pca_y, pca_z = PCA.components_
    #pc = PCA.components_

    feature = np.concatenate((pca_x, pca_y, pca_z), axis=0)
    #feature = np.concatenate((pc[0], pc[1], pc[2]), axis=0)

    return feature

In [None]:
if method == 'mesh':
    # 학습 데이터 (x_train_mesh) 담을 리스트 생성
    x_train_mesh, y_train_mesh = list(), list()

    pbar = tqdm(enumerate(train_dataloader, start=1))
    for i, (off_, label_) in pbar:
        off, label = off_[0], label_.item()
        
        point_cloud, faces = read_off(off)
        
        normal_vector = calculate_normal_vector_from_mesh(point_cloud, faces)
        
        feature = pca_normal_vector(normal_vector)
        
        x_train_mesh.append(feature)
        y_train_mesh.append(label)

        pbar.set_description(f'Processing: {os.path.basename(off)}\tPercentage: {i / len(train_dataset) * 100:.1f}%')

    x_train_mesh = np.array(x_train_mesh)
    y_train_mesh = np.array(y_train_mesh)

Processing: sofa_0029.off	Percentage: 30.3%: : 303it [04:01,  1.63it/s]       

In [None]:
if method == 'mesh':
    # 평가 데이터 (x_test_mesh) 담을 리스트 생성
    x_test_mesh = list()

    pbar = tqdm(enumerate(test_dataloader, start=1))
    for i, (off_) in pbar:
        off = off_[0]
        
        point_cloud, faces = read_off(off)
        
        normal_vector = calculate_normal_vector_from_mesh(point_cloud, faces)
        
        feature = pca_normal_vector(normal_vector)
        
        x_test_mesh.append(feature)

        pbar.set_description(f'Processing: {os.path.basename(off)}\tPercentage: {i / len(test_dataset) * 100:.1f}%')

    x_test_mesh = np.array(x_test_mesh)

In [None]:
# method는 상단 셀의 Select Method에서 정한 값으로 설정됩니다.
print(f'Select Method is {method}')

if method == 'point_cloud':
    x_train, y_train = x_train_point_cloud, y_train_point_cloud
    x_test = x_test_point_cloud

elif method == 'voxel':
    x_train, y_train = x_train_voxel, y_train_voxel
    x_test = x_test_voxel

elif method == 'mesh':
    x_train, y_train = x_train_mesh, y_train_mesh
    x_test = x_test_mesh
    
from sklearn.svm import SVC # 분류기 불러오기
svm = SVC(C=10, random_state=seed) # 분류기 정의
svm.fit(x_train, y_train) # 학습
print(sum(svm.predict(x_train) == y_train) / len(y_train)) #train set predict 정확도 체크

pred = svm.predict(x_test)

In [None]:
submit = pd.read_csv(SUBMIT_PATH, index_col=0) # submit 불러오기
submit['Label'] = pred # 제출 파일 label 열을 예측값으로 바꾼다

submit.to_csv(f"{method}_baseline.csv")