In [None]:
import base

import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
from tqdm import notebook as tqdm
import matplotlib.pyplot as plt
import torch 

import iisignature

from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
from tslearn.datasets import UCR_UEA_datasets
from tslearn.preprocessing import TimeSeriesScalerMinMax, TimeSeriesScalerMeanVariance
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit

from tslearn.svm import TimeSeriesSVC

from transformers_sig import AddTime, LeadLag

from sigKer_fast import sig_kernel_Gram_matrix 
# from sigKer_torch import SigKernelGramMat

In [None]:
def transform(paths, scale=1., at=False, ll=False):
    paths = scale*paths
    if ll:
        paths = LeadLag().fit_transform(paths)
    if at:
        paths = AddTime().fit_transform(paths)
    return np.array(paths)

In [None]:
data = UCR_UEA_datasets().list_multivariate_datasets()

In [None]:
name = data[26]
print(name)

In [None]:
X_train, Y_train, X_test, Y_test = UCR_UEA_datasets(use_cache=True).load_dataset(name)

In [None]:
subsample = 1
subsample_len = 1

x_train = X_train[::subsample,::subsample_len,:]
y_train = Y_train[::subsample]
x_test = X_test[::subsample,::subsample_len,:]
y_test = Y_test[::subsample]

# x_train = TimeSeriesScalerMinMax().fit_transform(x_train)
# x_train = TimeSeriesScalerMeanVariance().fit_transform(x_train)
# x_test = TimeSeriesScalerMinMax().fit_transform(x_test)
# x_test = TimeSeriesScalerMeanVariance().fit_transform(x_test)

y_train = LabelEncoder().fit_transform(y_train)
y_test = LabelEncoder().fit_transform(y_test)

print(x_train.shape)
print(x_test.shape)

In [None]:
scale = 1e-1
at = True
ll = False

x_train = transform(x_train, scale, at, ll)
x_test = transform(x_test, scale, at, ll)

In [None]:
k=6
plt.plot(x_train[k])
print(y_train[k])
plt.show()

In [None]:
print(f'classes: {np.unique(y_train)}')

In [None]:
n,solver =  0,0

In [None]:
# x_train_ = torch.tensor(x_train).cuda()
# x_test_ = torch.tensor(x_test).cuda()

In [None]:
rbf, sigma = True, 1e2

In [None]:
gram_matrix = sig_kernel_Gram_matrix(x_train, x_train, n, solver, sym=True, rbf=rbf, sigma=sigma)
# gram_matrix_ = SigKernelGramMat.apply(x_train_, x_train_, n, solver, True)

In [None]:
test_matrix = sig_kernel_Gram_matrix(x_train, x_test, n, solver, sym=False, rbf=rbf, sigma=sigma)
# test_matrix_ = SigKernelGramMat.apply(x_train_, x_test_, n, solver, False)

In [None]:
# gram_matrix = gram_matrix_.cpu().numpy()
# test_matrix = test_matrix_.cpu().numpy()

In [None]:
gram_matrix

In [None]:
svc_parameters = {'C': np.logspace(0, 5, 6), 'gamma': np.logspace(-5, 5, 11)}
# svc_parameters = {'C': np.logspace(0, 4, 5), 'gamma': np.logspace(-2, 2, 5)}

In [None]:
svc = TimeSeriesSVC(kernel='precomputed', decision_function_shape='ovo')

svc_model = GridSearchCV(estimator=svc, 
                         param_grid=svc_parameters, 
                         cv=TimeSeriesSplit(n_splits=2),
                         n_jobs=-1)

In [None]:
svc_model.fit(gram_matrix, y_train)

In [None]:
predictions = svc_model.predict(test_matrix.T)

In [None]:
svc_model.score(test_matrix.T, y_test)

In [None]:
print(classification_report(predictions, y_test))

In [None]:
confusion_matrix(predictions, y_test)

In [None]:
final = {}

In [None]:
final['sig-ker-PDE'] = svc_model.score(test_matrix.T, y_test)

In [None]:
for ker in tqdm.tqdm(['linear', 'rbf']):
    
    svc = TimeSeriesSVC(kernel=ker, gamma='auto', decision_function_shape='ovo')

    svc_model = GridSearchCV(estimator=svc, 
                             param_grid=svc_parameters, 
                             cv=TimeSeriesSplit(n_splits=5),
                             n_jobs=-1)
    
    svc_model.fit(x_train, y_train)
    
    final[ker] = svc_model.score(x_test, y_test)

In [None]:
for gamma in tqdm.tqdm([1., 0.1, 0.01, 0.001]):
            
    svc = TimeSeriesSVC(kernel='gak', gamma=gamma, decision_function_shape='ovo')

    svc_model = GridSearchCV(estimator=svc, 
                             param_grid=svc_parameters, 
                             cv=TimeSeriesSplit(n_splits=5),
                             n_jobs=-1)

    svc_model.fit(x_train, y_train)

    final[f'gak_{gamma}'] = svc_model.score(x_test, y_test)

In [None]:
### truncated signature kernel
for d in tqdm.tqdm(range(1,4)):
    
    svc = TimeSeriesSVC(kernel='linear', decision_function_shape='ovo')

    svc_model = GridSearchCV(estimator=svc, 
                             param_grid=svc_parameters, 
                             cv=TimeSeriesSplit(n_splits=5),
                             n_jobs=-1)

    sig_train = iisignature.sig(x_train, d)
    sig_test = iisignature.sig(x_test, d)

    svc_model.fit(sig_train, y_train)

    final[f'sig-ker-{d}'] = svc_model.score(sig_test, y_test)

In [None]:
final

In [None]:
final_csv = pd.DataFrame.from_dict(final, orient='index').rename(columns={0:'accuracy (%)'})
print(final_csv)
if rbf:
    if ll:
        if at:
            final_csv.to_csv(f'../results/svm-{name}-{np.round(scale,3)}-{np.round(sigma,1)}-LL-AT.csv')
        else:
            final_csv.to_csv(f'../results/svm-{name}-{np.round(scale,3)}-{np.round(sigma,1)}-LL.csv')
    else:
        if at:
            final_csv.to_csv(f'../results/svm-{name}-{np.round(scale,3)}-{np.round(sigma,1)}-AT.csv')
        else:
            final_csv.to_csv(f'../results/svm-{name}-{np.round(scale,3)}-{np.round(sigma,1)}.csv')
else:
    if ll:
        if at:
            final_csv.to_csv(f'../results/svm-{name}-{np.round(scale,3)}-LL-AT.csv')
        else:
            final_csv.to_csv(f'../results/svm-{name}-{np.round(scale,3)}-LL.csv')
    else:
        if at:
            final_csv.to_csv(f'../results/svm-{name}-{np.round(scale,3)}-AT.csv')
        else:
            final_csv.to_csv(f'../results/svm-{name}-{np.round(scale,3)}.csv')