# ROCKET

In [3]:
import numpy as np
from sklearn.linear_model import RidgeClassifierCV

from rocket_functions import generate_kernels, apply_kernels

dataset = "Computers"

training_data = np.loadtxt(f"../Univariate_arff/{dataset}/{dataset}_TRAIN.txt")
Y_training, X_training = training_data[:, 0].astype(int), training_data[:, 1:]

# generate "dummy" kernels -> compiles *generate_kernels(...)*
_ = generate_kernels(100, 10)

# apply "dummy" kernels to "dummy" data -> compiles *apply_kernels(...)*
_ = apply_kernels(np.zeros_like(training_data)[:, 1:], _)

kernels = generate_kernels(X_training.shape[1], 100)

X_training_transform = apply_kernels(X_training, kernels)

classifier = RidgeClassifierCV(alphas = np.logspace(-3, 3, 10))
classifier.fit(X_training_transform, Y_training)

print(end = "") # suppress print output of classifier.fit(...)

test_data = np.loadtxt(f"../Univariate_arff/{dataset}/{dataset}_TEST.txt")
Y_test, X_test = test_data[:, 0].astype(int), test_data[:, 1:]

X_test_transform = apply_kernels(X_test, kernels)

predictions = classifier.predict(X_test_transform)

print(f"predictions = {', '.join(predictions.astype(str))}")
print(f"accuracy    = {(predictions == Y_test).mean()}") 
print(classifier.score(X_test_transform, Y_test))

predictions = 1, 1, 1, 1, 1, 2, 1, 2, 1, 2, 2, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 2, 2, 1, 1, 2, 1, 1, 2, 1, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 2, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 2, 2, 2, 1, 1, 1, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 2, 2, 2, 1, 1, 1, 2, 2, 2, 2, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 1, 2, 2, 2, 2, 2, 1, 2, 1, 2, 1, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 2, 2, 2, 2, 2, 1, 2, 1, 1, 2, 1, 1, 1, 2, 1, 2, 2, 1, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 1, 1, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1, 1, 2, 1, 2
accuracy    = 0.664
0.664


# Mio Codice

In [4]:
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from numba import njit, prange
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, matthews_corrcoef, average_precision_score, roc_auc_score
from pyod.utils.data import precision_n_scores
from scipy.special import softmax

@njit("Tuple((float64[:],int32[:],float64[:],int32[:],int32[:]))(int64,int64)")
def generate_kernels(input_length, num_kernels):

    candidate_lengths = np.array((7, 9, 11), dtype = np.int32)
    lengths = np.random.choice(candidate_lengths, num_kernels)

    weights = np.zeros(lengths.sum(), dtype = np.float64)
    biases = np.zeros(num_kernels, dtype = np.float64)
    dilations = np.zeros(num_kernels, dtype = np.int32)
    paddings = np.zeros(num_kernels, dtype = np.int32)

    a1 = 0

    for i in range(num_kernels):

        _length = lengths[i]

        _weights = np.random.normal(0, 1, _length)

        b1 = a1 + _length
        weights[a1:b1] = _weights - _weights.mean()

        biases[i] = np.random.uniform(-1, 1)

        dilation = 2 ** np.random.uniform(0, np.log2((input_length - 1) / (_length - 1)))
        dilation = np.int32(dilation)
        dilations[i] = dilation

        padding = ((_length - 1) * dilation) // 2 if np.random.randint(2) == 1 else 0
        paddings[i] = padding

        a1 = b1

    return weights, lengths, biases, dilations, paddings

@njit(fastmath = True)
def apply_kernel(X, weights, length, bias, dilation, padding):

    input_length = len(X)

    output_length = (input_length + (2 * padding)) - ((length - 1) * dilation)

    _ppv = 0
    _max = np.NINF

    end = (input_length + padding) - ((length - 1) * dilation)

    for i in range(-padding, end):

        _sum = bias

        index = i

        for j in range(length):

            if index > -1 and index < input_length:

                _sum = _sum + weights[j] * X[index]

            index = index + dilation

        if _sum > _max:
            _max = _sum

        if _sum > 0:
            _ppv += 1

    return _ppv / output_length, _max

@njit("float64[:,:](float64[:,:],Tuple((float64[::1],int32[:],float64[:],int32[:],int32[:])))", parallel = True, fastmath = True)
def apply_kernels(X, kernels):

    weights, lengths, biases, dilations, paddings = kernels

    num_examples, _ = X.shape
    num_kernels = len(lengths)

    _X = np.zeros((num_examples, num_kernels * 2), dtype = np.float64) # 2 features per kernel

    for i in prange(num_examples):

        a1 = 0 # for weights
        a2 = 0 # for features

        for j in range(num_kernels):

            b1 = a1 + lengths[j]
            b2 = a2 + 2

            _X[i, a2:b2] = \
            apply_kernel(X[i], weights[a1:b1], lengths[j], biases[j], dilations[j], paddings[j])

            a1 = b1
            a2 = b2

    return _X

    res = {
        "Accuracy": round(accuracy_score(y_test, y_pred), digits),
        "Precision": precision_score(y_test, y_pred, average='weighted').round(digits),
        "Recall": recall_score(y_test, y_pred, average='weighted').round(digits),
        "F1": f1_score(y_test, y_pred, average='weighted').round(digits),
        "MCC": round(matthews_corrcoef(y_test, y_pred), ndigits=digits)
    }
    if y_proba is not None:
        res["AUC_ROC"] = roc_auc_score(y_test, y_proba, multi_class='ovr', average='weighted').round(digits)
    return res

def evaluate_metrics(y_test, y_pred, y_proba=None, digits=3):
    res = {"Accuracy": round(accuracy_score(y_test, y_pred), digits),
           "Precision": precision_score(y_test, y_pred, average='weighted').round(digits),
            "Recall": recall_score(y_test, y_pred, average='weighted').round(digits),
            "F1": f1_score(y_test, y_pred, average='weighted').round(digits),
           "MCC": round(matthews_corrcoef(y_test, y_pred), ndigits=digits)}
    if y_proba is not None:
        res["AUC_PR"] = average_precision_score(y_test, y_proba).round(digits)
        res["AUC_ROC"] = roc_auc_score(y_test, y_proba, multi_class='ovr', average='weighted').round(digits)
    return res

# Genera kernel convoluzionali casuali
input_length = X_training.shape[1]
num_kernels = 1000
kernels = generate_kernels(input_length, num_kernels)

# Applica i kernel alle serie temporali
features_train = apply_kernels(X_training, kernels)
features_test = apply_kernels(X_test, kernels)


# Addestramento del modello supervisionato
model = RidgeClassifierCV(alphas = np.logspace(-3, 3, 10))
model.fit(features_train, Y_training)

# Predizione delle anomalie nei dati di test
y_pred = model.predict(features_test)
if  len(np.unique(Y_test)) > 2:
    y_proba = softmax(model.decision_function(features_test), axis=1)
else:
    y_proba = softmax(model.decision_function(features_test), axis=0)

# Visualizzazione dei risultati
print("Predizioni nel test set:", y_pred)

# Eseguiamo la valutazione delle metriche
metrics = evaluate_metrics(Y_test, y_pred, y_proba)
print("Metriche di valutazione:\n", metrics)
# {'Accuracy': 0.977, 'Precision': 0.972, 'Recall': 0.92, 'F1': 0.945, 'MCC': 0.932, 'AUC_PR': 0.962, 'AUC_ROC': 0.984, 'PREC_N_SCORES': 0.929}

Predizioni nel test set: [1 1 1 1 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 1 1 1 1 1 1 1 2 1 1 2 1 1 2 2 1 1 2
 2 1 2 1 1 1 1 1 2 1 1 1 1 1 2 1 1 1 1 1 2 1 2 2 1 1 1 1 1 1 1 1 1 2 1 1 1
 1 1 2 1 1 1 2 2 2 2 1 1 1 1 1 1 2 1 1 1 1 1 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1
 2 1 2 1 1 1 1 1 2 1 1 2 1 1 2 2 2 1 1 1 2 2 2 2 1 1 1 1 2 2 2 2 1 2 1 2 1
 2 1 1 1 2 2 2 1 2 1 1 1 1 2 2 1 1 1 1 1 2 1 1 1 2 1 2 1 2 2 1 1 1 2 1 2 1
 2 2 2 2 2 1 1 1 2 2 1 1 2 2 1 2 1 1 2 2 2 2 2 2 1 1 2 2 2 2 2 2 2 2 1 2 1
 1 2 2 2 2 2 2 2 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 1 2 2 1 2]
Metriche di valutazione:
 {'Accuracy': 0.684, 'Precision': 0.692, 'Recall': 0.684, 'F1': 0.681, 'MCC': 0.376, 'AUC_PR': 0.36, 'AUC_ROC': 0.748}
