In [25]:
import sys
sys.path.append('../libs')  # Update this path according to the location of your 'dataset' module
import dataset
import preprocessing
import classes
import numpy as np
from sklearn.preprocessing import normalize
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.metrics import f1_score, accuracy_score
from sklearn.preprocessing import RobustScaler
from sklearn import svm
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay


In [26]:
# Chosen Hyper Params
chosen_pca_components = 15
chosen_svm_cost = 10
chosen_svm_kernel = "rbf"
chosen_scaler_q = 5

In [27]:
X = dataset.data()
y = np.array(list(map(classes.label_to_class, dataset.labels_array())))

X = X[0:20000]
y = y[0:20000]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state=42)

# X_test, X_validation, y_test, y_validation = train_test_split(X_test_validation, y_test_validation, test_size=0.25, random_state=999)

(samples, features, frames) = X_train.shape


In [28]:


# scaler = RobustScaler(quantile_range=(scaler_q_min, scaler_q_max))
# X_transposed = X_train.transpose(0,2,1)
# arr = np.reshape(X_transposed, (samples * frames, features))
# scaled = scaler.fit_transform(arr)

# pca = PCA(n_components=pca_components)
# reduced_X = pca.fit(scaled)

In [29]:
# flat_X = np.empty((samples, frames, pca_components))

# for i in range(samples):
#     flat_X[i] = pca.transform(scaler.transform(X_transposed[i]))

# flat_X = np.reshape(flat_X, (samples, frames * pca_components))
# print(flat_X.shape)

In [30]:
# clf = svm.SVC(cache_size=1000, verbose=True, class_weight='balanced', C = svm_cost, kernel=svm_kernel)
# clf.fit(flat_X, y_train)

In [31]:
# (samples, features, frames) = X_test.shape

# X_test_transposed = X_test.transpose(0,2,1)
# arr = np.reshape(X_test_transposed, (samples * frames, features))
# scaled = scaler.transform(arr)

# reduced_X_test = pca.transform(scaled)

# flat_X_test = np.empty((samples, frames, pca_components))

# for i in range(samples):
#     flat_X_test[i] = pca.transform(scaler.transform(X_test_transposed[i]))

# flat_X_test = np.reshape(flat_X_test, (samples, frames * pca_components))
# print(flat_X_test.shape)

# y_pred = clf.predict(flat_X_test)

In [32]:
# print(y_test[0:10])
# print(y_pred[0:10])
# print(np.array(list(classes.REVERSE_CLASSES.keys())))

# cm = confusion_matrix(y_test, y_pred, labels=np.array(list(classes.REVERSE_CLASSES.keys())))
# disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes.CLASSES)
# disp.plot()

In [33]:
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import ShuffleSplit
from skopt import BayesSearchCV
import json


class SVMClassifier(BaseEstimator):
     def __init__(self, pca_components = chosen_pca_components, svm_cost = chosen_svm_cost, svm_kernel = chosen_svm_kernel, scaler_q = chosen_scaler_q):
          self.pca_components = pca_components 
          self.svm_cost = svm_cost 
          self.svm_kernel = svm_kernel
          self.scaler_q = scaler_q

          self.clf = None
          self.scaler = None
          self.pca = None

     def fit(self, X, y):

          (samples, features, frames) = X.shape

          scaler_q_min = self.scaler_q 
          scaler_q_max = 100 - self.scaler_q

          self.scaler = RobustScaler(quantile_range=(scaler_q_min, scaler_q_max))
          X_transposed = X.transpose(0,2,1)
          arr = np.reshape(X_transposed, (samples * frames, features))
          scaled = self.scaler.fit_transform(arr)

          self.pca = PCA(n_components=self.pca_components)
          reduced_X = self.pca.fit(scaled)

          flat_X = np.empty((samples, frames, self.pca_components))

          for i in range(samples):
               flat_X[i] = self.pca.transform(self.scaler.transform(X_transposed[i]))

          flat_X = np.reshape(flat_X, (samples, frames * self.pca_components))

          self.clf = svm.SVC(cache_size=1000, class_weight='balanced', C = self.svm_cost, kernel=self.svm_kernel)
          self.clf.fit(flat_X, y)

          self.is_fitted_ = True
          return self

     def predict(self, X):

          if not self.is_fitted_:
               raise Exception("Not fitted")

          (samples, features, frames) = X.shape

          X_test_transposed = X.transpose(0,2,1)
          arr = np.reshape(X_test_transposed, (samples * frames, features))
          scaled = self.scaler.transform(arr)


          flat_X_test = np.empty((samples, frames, self.pca_components))

          for i in range(samples):
               flat_X_test[i] = self.pca.transform(self.scaler.transform(X_test_transposed[i]))

          flat_X_test = np.reshape(flat_X_test, (samples, frames * self.pca_components))

          y_pred = self.clf.predict(flat_X_test)

          return y_pred

In [34]:
# clf = SVMClassifier()
# clf.fit(X,y)
# y_pred = clf.predict(X_test)

In [35]:
parameters = {
    'svm_cost': [0.5, 1, 5, 10], 
    "pca_components": [7, 10, 15],
    "svm_kernel": ["poly", "rbf"],
    "scaler_q": [25, 15, 5]
}

In [36]:
OPTIMIZE = False
NORMAL_RUN = False

clf = None

if OPTIMIZE:
    estimator = SVMClassifier()
    bayes = BayesSearchCV(estimator, parameters, n_iter=20, n_jobs=1,scoring="f1_weighted", cv=ShuffleSplit(n_splits=1, test_size=0.2))
    bayes.fit(X_train, y_train)

    clf = SVMClassifier(**bayes.best_params_)
    clf.fit(X_train, y_train)

elif NORMAL_RUN:
    clf = SVMClassifier()
    clf.fit(X_train, y_train)

if clf:
    y_pred = clf.predict(X_test)
    f1 = str(f1_score(y_test, y_pred, average='micro'))
    accuracy = str(accuracy_score(y_test, y_pred))

    print(f1, accuracy, clf.get_params())

    with open('svm_params.txt', 'w') as file:
        file.write(accuracy)
        file.write("\n")
        file.write(f1)
        file.write("\n")
        file.write(json.dumps(clf.get_params()))

    cm = confusion_matrix(y_test, y_pred, labels=np.array(list(classes.REVERSE_CLASSES.keys())))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes.CLASSES)
    disp.plot()


In [37]:
OVERFITTING = True

if OVERFITTING:
    accuracies = []

    for cost in [0.5,1,5,10,20,40,80]:
        clf = SVMClassifier(svm_cost=cost)
        clf.fit(X_train,y_train)

        y_pred_train = clf.predict(X_train)
        y_pred_test = clf.predict(X_test)
        accuracy_train = str(accuracy_score(y_train, y_pred_train))
        accuracy_test = str(accuracy_score(y_test, y_pred_test))

        print(accuracy_train, accuracy_test)
    
        accuracies.append((accuracy_train, accuracy_test))

    print(accuracies)


# [1,5,10,20,40,80]
# [('0.8222941176470588', '0.832'), ('0.9428823529411765', '0.9453333333333334'), ('0.974', '0.9753333333333334'), ('0.9888823529411764', '0.9903333333333333'), ('0.9972352941176471', '0.9986666666666667'), ('0.9997647058823529', '1.0')]

# [0.5, 160, 320]
# [('0.7744705882352941', '0.7903333333333333'), ('1.0', '1.0'), ('1.0', '1.0')]