In [None]:
import ast
import os
os.chdir(os.pardir)

import numpy as np
import pandas as pd
from sklearn.metrics import make_scorer
from sklearn.model_selection import GridSearchCV
from tqdm.auto import tqdm

from src import load_point_cloud
from src import show_point_cloud
from src import rms_angle_error

from estimator import NormalEstimator

# Baseline

In [None]:
xyz, n = load_point_cloud('Cup33100k_ddist_minmax')

In [None]:
_, ax = show_point_cloud(xyz, n)

In [None]:
k = 30
s = k - np.sqrt(2 * k)
s

In [None]:
estimator = NormalEstimator(k=k, deg=3, s=s, kernel=None, gamma=None)

In [None]:
estimator = estimator.fit(xyz)

In [None]:
n_estim = estimator.predict(xyz)

In [None]:
angle_err = rms_angle_error(estimator.predict(xyz), n, orient=False)
print(f'Unoriented normal angle error = {angle_err:.4f}°')

# Hyperparameter tuning

Training set only (validation is not possible in this case).
Validation set will be omitted.

In [None]:
data_path = 'data'

In [None]:
with open(os.path.join(data_path, 'trainingset_no_noise.txt')) as f:
    trainingset_no_noise = [line.strip() for line in f]
with open(os.path.join(data_path, 'trainingset_vardensity_whitenoise.txt')) as f:
    trainingset_vardensity_whitenoise = [line.strip() for line in f]
with open(os.path.join(data_path, 'trainingset_vardensity.txt')) as f:
    trainingset_vardensity = [line.strip() for line in f]
with open(os.path.join(data_path, 'trainingset_whitenoise.txt')) as f:
    trainingset_whitenoise = [line.strip() for line in f]

In [None]:
trainingset_all = (trainingset_no_noise
                   + trainingset_vardensity_whitenoise
                   + trainingset_vardensity
                   + trainingset_whitenoise)

In [None]:
X_train_d = {}
y_train_d = {}
for trainingset in set(trainingset_all):
    xyz, n = load_point_cloud(trainingset)
    X_train_d[trainingset] = xyz
    y_train_d[trainingset] = n

In [None]:
param_grid = [{'k': [10, 30],
               'deg': [2, 3],
               's': [None],
               'kernel': [None]},
              {'k': [30, 50, 100],
               's': [None], 
               'kernel': ['gaussian', 'inverse_multiquadric'],
               'gamma': [0.1, 0.3, 1]},
              {'k': [30, 50, 100],
               's': [None, 10, 0], 
               'kernel': ['rbf'],
               'gamma': [1, 3, 10]}]

In [None]:
scorer = lambda y, y_pred: rms_angle_error(y_pred, y, orient=False)
custom_scorer = make_scorer(scorer, greater_is_better=False)

In [None]:
def tuning():
    df_grid_res = pd.DataFrame()
    for key in tqdm(X_train_d.keys()):
        grid = GridSearchCV(estimator=NormalEstimator(),
                            param_grid=param_grid,
                            scoring=custom_scorer,
                            n_jobs=-1,
                            cv=[(slice(None), slice(None))],
                            verbose=2)
        grid.fit(X_train_d[key], y_train_d[key])
        df_temp = pd.DataFrame(grid.cv_results_)
        df_temp['dataset'] = key
        df_grid_res = pd.concat((df_grid_res, df_temp), ignore_index=True)
        return df_grid_res

In [None]:
force_train = False
save = False
fname = os.path.join('estimator', 'grid_res.csv')

if force_train:
    print('Optimization started...')
    df_grid_res = tuning()
else:
    try:
        print('Trying to restore the grid...')
        df_grid_res = pd.read_csv(fname, index_col=0)
        # df_grid_res['params'] = df_grid_res['params'].apply(
        #     lambda row: ast.literal_eval(row)
        # )
        print('Restoring successful.')
    except Exception as e:
        print(e)
        print('Restoring failed. Fitting the surrogate model...')
        df_grid_res = tuning()
if save:
    df_grid_res.to_csv(fname)

## Optimal parameters

In [None]:
sel = ['params', 'mean_test_score', 'dataset']

In [None]:
rank_test_score = 1  # best performing
df_grid_res.loc[
    df_grid_res['rank_test_score'] == rank_test_score, sel
].sort_values(by='dataset')

In [None]:
no_noise, low_noise, med_noise, high_noise = [], [], [], []
for set in trainingset_whitenoise:
    ind = set.split('_')[-1]
    if ind == '1.00e-02':
        low_noise.append(set)
    elif ind == '5.00e-02':
        med_noise.append(set)
    elif ind == '1.00e-01':
        high_noise.append(set)
    else:
        no_noise.append(set)

In [None]:
trainingset_vardensity
striped, gradient = [], []
for set in trainingset_vardensity:
    ind = set.split('_')[-1]
    if ind == 'layers':
        striped.append(set)
    elif ind == 'minmax':
        gradient.append(set)

In [None]:
def mapper(row):
    if row['dataset'] in no_noise:
        return 'no_noise'
    elif row['dataset'] in low_noise:
        return 'low_noise'
    elif row['dataset'] in med_noise:
        return 'med_noise'
    elif row['dataset'] in high_noise:
        return 'high_noise'
    elif row['dataset'] in striped:
        return 'striped'
    elif row['dataset'] in gradient:
        return 'gradient'
df_grid_res['class'] = df_grid_res[sel].apply(mapper, axis=1)

In [None]:
# grouping params and "class"; for each params-class combo, find the mean RMS
# angle error for all availabe datasets within that class
df_grouped = df_grid_res[['params', 'class', 'mean_test_score']].groupby(
    by=['params', 'class']
).mean().reset_index()
df_grouped

In [None]:
# params with the mean RMS angle error closest to zero (highest in this case)
# should be extracted for test set
idx_max = df_grouped.groupby('class')['mean_test_score'].idxmax()
df_param_opt = df_grouped.loc[idx_max][['class', 'params', 'mean_test_score']]
df_param_opt

In [None]:
save = False
fname = os.path.join('estimator', 'param_opt.csv')

if save:
    df_param_opt.to_csv(fname)