# В этом ноутбуке тренируется и тестируется модель PointNet

In [None]:
import pandas as pd
import numpy as np
from tqdm import trange

import os
import torch
from torch import nn
from torch.utils.data import Dataset
import sys

sys.path.append('/Users/alexander.savelyev/PycharmProjects/Snow_recognition')
from PointNet import PointNetDenseCls

from matplotlib import pyplot as plt
import plotly.offline as py
import plotly.figure_factory as ff
import plotly.graph_objs as go
import tqdm
from IPython.display import clear_output
py.init_notebook_mode(connected=True)

EQUAL_ASPECT_RATIO_LAYOUT = dict(
    margin={
        'l': 0,
        'r': 0,
        'b': 0,
        't': 0
    }, scene=dict(
    aspectmode='data'
))

from sklearn.metrics import roc_auc_score

%matplotlib inline

Мы будем использовать оригинальный набор данных, потому что архитектура своими нейросетевыми кодировщиками сама будет решать как нормализовать данные. А также эта модель задумывалась на применение ее сразу после получения результатов работы лидара, чтобы не тратить время на препроцессинг 

In [None]:
class Snow3D(Dataset):


    def __init__(self, path: str, train: bool, features=['x', 'y', 'z']):
        file = '/Volumes/HP P800/itmo/Lidar data/data_train.csv' if train else '/Volumes/HP P800/itmo/Lidar data/data_test.csv'
        self.data = pd.read_csv(file, index_col='scene_id')
        self.features = features
        self.train = train

    def __len__(self):
        return len(self.data.index.unique())

    def __getitem__(self, idx):
        if idx >= len(self):
            raise IndexError
        queried_data = self.data.loc[idx]
        X = torch.tensor(queried_data[self.features].to_numpy()).type(torch.FloatTensor)
        if self.train:
            y = torch.tensor(queried_data['label'].to_numpy()).type(torch.LongTensor)
            return X, y
        return X

Генерируем тренировочный, валидационный и тестовый датасеты. Валидационный датасет нужен, чтобы модель не переобучилась на тренировочных данных

In [None]:
train_data = Snow3D(dataset_path, train=True)
train_size = int(0.9 * len(train_data))
val_size = len(train_data) - train_size

train_data, val_data = torch.utils.data.random_split(
    train_data,
    [train_size, val_size],
    generator=torch.Generator().manual_seed(42),
)
test_data = Snow3D(dataset_path, train=False)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))

Определяем модель

In [None]:
model = PointNetDenseCls()

Посмотрим на ее архитектуру

In [None]:
model

Определяем функцию потерь, и оптимизатор

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

In [None]:
interval = 50
train_ts, train_loss = [], []
val_ts, val_loss, val_acc = [], [], []

In [None]:
# функция для отслживания прогресса при обучении
def show_progress(t):
    clear_output(wait=True)
    fig, (ax1, ax2) = plt.subplots(1, 2, constrained_layout=True, figsize=(20, 5))
    fig.suptitle(f'Epoch {t:3.3f}', fontsize=16)
    ax1.set_title('loss')
    ax1.set_xlabel('time (epochs)')
    ax1.set_ylabel('loss')
    ax1.plot(train_ts, train_loss, c='darkblue', lw=3)
    ax1.plot(val_ts, val_loss, c='green', marker='o', lw=5)
    ax2.set_title('accuracy')
    ax2.set_xlabel('time (epochs)')
    ax2.plot(val_ts, val_acc, c='green', marker='o', lw=5)
    plt.show()

# функция для тренировки
def train(epoch, dataset, model, loss_fn, optimizer):
    model.train()
    n_scenes = len(dataset)
    for scene, (X, y) in enumerate(dataset):
        # Send data to training device
        X, y = X.to(device), y.to(device)
        # Compute prediction error

        #print(X.shape)
        X = torch.unsqueeze(X, 0).swapaxes(1,2)
        #print(g.shape)
        
        pred = model(X)
        #print(pred[0].shape)
        print(pred[0][0,:,1].shape)
        print(y.type(torch.FloatTensor).shape)
        loss = loss_fn(pred[0][0,:,1].unsqueeze(0), y.type(torch.FloatTensor).unsqueeze(0))
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # Progress output
        if scene % interval == 0:
            t = epoch + (scene + 1) / n_scenes
            train_ts.append(t)
            train_loss.append(loss.item())
            show_progress(t)

# функция для тестирования
def test(epoch, dataset, model, loss_fn):
    model.eval()
    n_scenes = len(dataset)
    n_points = 0
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataset:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            n_points += len(y)
    test_loss /= n_scenes
    correct /= n_points
    val_ts.append(epoch + 1)
    val_loss.append(test_loss)
    val_acc.append(correct)
    show_progress(epoch + 1)

In [None]:
epochs = 100
for t in range(epochs):
    train(t, train_data, model, loss_fn, optimizer)
    test(t, val_data, model, loss_fn)

In [None]:
predictions = []

model.eval()
with torch.no_grad():
    for X in test_data:
        X = X.to(device)
        pred = model(X).argmax(1).cpu().numpy()
        predictions.extend(list(pred))

In [None]:
df_test = pd.read_csv('/Volumes/HP P800/itmo/Lidar data/data_test.csv', index_col='scene_id')
y_test = df_test["label"]

оценка результатов на тестовых данных

In [None]:
roc_auc_score(y_test, predictions)