# ОБУЧЕНИЕ POINTNET СЕТЕЙ ПО КЛАССИФИКАЦИИ НА ОСНОВЕ ОБЛАКОВ ТОЧЕК ВЭКГ

In [1]:
import os
import numpy as np
import itertools
import math, random
from glob import glob
random.seed = 42

from func import *

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import pandas as pd

import scipy.spatial.distance
import plotly.graph_objects as go
import plotly.express as px
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

Демонстрация нормализации (StandardScaler) исходных облаков точек:

In [2]:
file_path = 'final_pointcloud_dataset/train/00002_period_3.csv'
file_path_normalized = 'final_pointcloud_dataset_normalized/train/00002_period_3_normalized.csv'

print('Исходное облако точек:')
# Чтение CSV-файла
df = pd.read_csv(file_path)
point_cloud_array = df[['x', 'y', 'z']].values
pcshow(*point_cloud_array.T)


print('Нормализованное облако точек:')
# Чтение CSV-файла
df_normalized = pd.read_csv(file_path_normalized)
point_cloud_array = df_normalized[['x', 'y', 'z']].values
pcshow(*point_cloud_array.T)

Исходное облако точек:


Нормализованное облако точек:


---

# Преподготовка данных перд обучением сетей:

Кастомные траснсформации датасета:

In [3]:
# Реализация нормального шума
class RandomNoise(object):
    def __init__(self, std=0.002):
        self.std = std
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2

        noise = np.random.normal(0, self.std, (pointcloud.shape))
    
        noisy_pointcloud = pointcloud + noise
        return  noisy_pointcloud


class ToTensor(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2

        return torch.from_numpy(pointcloud)

# Реализация нормализации чтобы максильное значение равнялось 1 (схоже с методом в изображениях)
class Normalize(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2
        
        norm_pointcloud = pointcloud - np.mean(pointcloud, axis=0) 
        norm_pointcloud /= np.max(np.linalg.norm(norm_pointcloud, axis=1))

        return  norm_pointcloud


# Создание рандомного семплирования (если точек меньше то просто добавлю дубликаты чтобы добить число)
class PointSampler:
    def __init__(self, num_points=512): 
        self.num_points = num_points

    def __call__(self, pointcloud):
        num_total_points = pointcloud.shape[0]
        
        if num_total_points < self.num_points:
              
            # Семплирование оставшихся точек из уже существующих точек
            sampled_indices = np.random.choice(num_total_points, self.num_points - num_total_points, replace=True)
            duplicated_points = pointcloud[sampled_indices, :]
            
            # Собираем все точки в итоговую выборку
            sampled_points = np.vstack((pointcloud, duplicated_points))
            #print("Всего точек меньше чем надо семплировать - будут дубликаты")
        else:
            # Случайно берем точки:
            sampled_indices = np.random.choice(num_total_points, self.num_points, replace=False)
            sampled_points = pointcloud[sampled_indices, :]
        
        return sampled_points

# Создание рандомного семплирования  с попыткой брать больше точек что дальше лежат от (0,0) 
# (если точек меньше то просто добавлю дубликаты чтобы добить число)
class PointSampler_weighted:
    def __init__(self, num_points=512): 
        self.num_points = num_points

    def __call__(self, pointcloud):
        num_total_points = pointcloud.shape[0]
        
        if num_total_points < self.num_points:
              
            # Семплирование оставшихся точек из уже существующих точек
            sampled_indices = np.random.choice(num_total_points, self.num_points - num_total_points, replace=True)
            duplicated_points = pointcloud[sampled_indices, :]
            
            # Собираем все точки в итоговую выборку
            sampled_points = np.vstack((pointcloud, duplicated_points))
            #print("Всего точек меньше чем надо семплировать - будут дубликаты")
        else:
            # Выше вероятность чем дальше от 0,0
            distances = np.linalg.norm(pointcloud, axis=1)
            weights = distances / np.sum(distances)

            sampled_indices = np.random.choice(num_total_points, self.num_points, replace=False, p=weights)
            sampled_points = pointcloud[sampled_indices, :]
        
        return sampled_points

__Пример нормализации с приведением максильного значения к 1:__

In [4]:
point_cloud_array = df[['x', 'y', 'z']].values
norm_pointcloud = Normalize()(point_cloud_array)
pcshow(*norm_pointcloud.T)

__Пример добавления небольшого шума:__

In [5]:
sampled_point_cloud = RandomNoise(std=0.01)(norm_pointcloud)
pcshow(*sampled_point_cloud.T)

__Семплирование точек:__ <br>
Пример с семплированием без весов:

In [6]:
sampled_point_cloud = PointSampler(150)(norm_pointcloud)
pcshow(*sampled_point_cloud.T)

Пример с семплированием c весами (делает шанс выбора точек более удаленных от начала координат более высоким):

In [7]:
sampled_point_cloud = PointSampler_weighted(150)(norm_pointcloud)
pcshow(*sampled_point_cloud.T)

In [8]:
# Класс Датасета
class PointCloudData(Dataset):
    def __init__(self, root, subset_type, transform=None):

        self.df_gt = pd.read_csv(f"{root}/{subset_type}/ground_truth.csv", index_col=0)

        self.df_gt['class'] = self.df_gt['EF'].apply(lambda x: 1 if x < 50 else 0)

        self.csv_paths = glob(f"{root}/{subset_type}/*.csv")
        self.csv_paths = [path for path in self.csv_paths if path != f"{root}/{subset_type}\\ground_truth.csv"]
        self.transform = transform
        self.classes = {0 : 'normal', 1 : 'pathology'}

    def __len__(self):
        return len(self.csv_paths)

    def __getitem__(self, idx):
        file_path = self.csv_paths[idx]
        df = pd.read_csv(file_path)
        point_cloud = df[['x', 'y', 'z']].values

        if self.transform:
            pointcloud = self.transform(point_cloud)

        target = self.df_gt.loc[os.path.basename(file_path)]['class'] 

        return  {'pointcloud': pointcloud, 
                'category': target}

In [9]:
# КОНСТАНТЫ:
BATCH_SIZE = 32

# Путь к корневой папке датасета
DATASET_PATH = "final_pointcloud_dataset"

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [15]:
train_transforms = transforms.Compose([
                    Normalize(),
                    PointSampler_weighted(512),
                    RandomNoise(std=0.002),
                    ToTensor()
                    ])

val_transforms = transforms.Compose([
                    Normalize(),
                    PointSampler_weighted(512),
                    ToTensor()
                    ])

train_ds = PointCloudData(DATASET_PATH, "train", transform=train_transforms)
valid_ds = PointCloudData(DATASET_PATH, "val", transform=val_transforms)

train_loader = DataLoader(dataset=train_ds, batch_size=32, shuffle=True)
valid_loader = DataLoader(dataset=valid_ds, batch_size=32)

In [11]:
inv_classes = {0 : 'normal', 1 : 'pathology'}
print('Train dataset size: ', len(train_ds))
print('Valid dataset size: ', len(valid_ds))
print('Number of classes: ', len(train_ds.classes))

print('\nПример первого элемент тренировочного датасета:')
print('Sample pointcloud shape: ', train_ds[0]['pointcloud'].size())
print('Class: ', inv_classes[train_ds[0]['category']])
pcshow(*train_ds[0]['pointcloud'].T)

Train dataset size:  1757
Valid dataset size:  578
Number of classes:  2

Пример первого элемент тренировочного датасета:
Sample pointcloud shape:  torch.Size([512, 3])
Class:  normal
