# Import libraries

In [1]:
import os
import glob
import numpy  as np
import pandas as pd
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection  import train_test_split
from sklearn.svm              import OneClassSVM
from sklearn.metrics          import accuracy_score, f1_score, roc_auc_score
from hmmlearn import hmm
from scipy.stats import norm
from scipy.special import softmax

# Get data

In [2]:
# Configurações
DATA_IMU_DIR  = "data/training/imu"
LABELS_CSV    = "data/training/labels/labels.csv"
WINDOW_SIZE   = 50
TEST_SIZE     = 0.2
RANDOM_STATE  = 42
PRIOR         = 0.5

# Carrega rótulos e ordena frames
labels_df = pd.read_csv(LABELS_CSV)
label_map = {row.filename: row.label for _, row in labels_df.iterrows()}

all_files = sorted(glob.glob(os.path.join(DATA_IMU_DIR, "frame_*.npz")))
mags, lbls = [], []
for path in all_files:
    fname = os.path.basename(path)
    data  = np.load(path)
    acc3  = data["accelerometer"]
    mag   = np.linalg.norm(acc3)
    if fname not in label_map:
        continue
    mags.append(mag)
    lbls.append(label_map[fname])

mags = np.array(mags)   # shape (N_total,)
lbls = np.array(lbls)   # shape (N_total,), strings

# mapeia strings para inteiros
classes   = sorted(set(lbls.tolist()))
label2int = {lab:i for i,lab in enumerate(classes)}
lbls      = np.array([label2int[lab] for lab in lbls])
# agora lbls contém valores inteiros 0,1,2,...

# Extrai janelas deslizantes
Xw, yw = [], []
for i in range(len(mags) - WINDOW_SIZE + 1):
    w    = mags[i : i+WINDOW_SIZE]           # vetor (WINDOW_SIZE,)
    ywin = lbls[i : i+WINDOW_SIZE]           # vetor inteiro
    lab  = np.bincount(ywin).argmax()        # classe mais frequente
    Xw.append(w)
    yw.append(lab)

X = np.vstack([w.reshape(1,-1) for w in Xw])  # (M,WINDOW_SIZE)
y = np.array(yw)                              # (M,)

# Split treino/teste
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=TEST_SIZE,
    stratify=y, random_state=RANDOM_STATE
)

# Estima parâmetros TNB
def estimate_parameters(X, n_iter=5):
    mu    = np.mean(X[:,1:])
    alpha = 0.0
    for _ in range(n_iter):
        alpha = np.sum((X[:,1:]-mu)*X[:,:-1]) / np.sum((X[:,:-1]-mu)**2)
        mu    = np.mean(X[:,1:] - alpha*X[:,:-1])
    res   = X[:,1:] - (mu + alpha*X[:,:-1])
    sigma = np.sqrt(np.mean(res**2))
    return mu, sigma, alpha

params = {}
for cls in np.unique(y_train):
    Xc = X_train[y_train==cls]
    params[cls] = estimate_parameters(Xc)

# Predição TNB
def predict_tnb(X, params, prior=PRIOR):
    preds, scores = [], []
    for x in X:
        ll = {cls: np.log(prior) for cls in params}
        for t in range(1, x.size):
            for cls,(mu,sigma,alpha) in params.items():
                ll[cls] += norm.logpdf(x[t], loc=mu + alpha*x[t-1], scale=sigma)
        cls_list = sorted(params)
        scs = [ll[c] for c in cls_list]
        preds.append(cls_list[np.argmax(scs)])
        # usa log-posterior da classe "1" como score
        scores.append(ll.get(1, max(scs)))
    return np.array(preds), np.array(scores)

y_pred_tnb, scores_tnb = predict_tnb(X_test, params)

# Train and evaluate models

In [3]:
results = []

# GaussianNB
gnb = GaussianNB().fit(X_train, y_train)
y_nb    = gnb.predict(X_test)
proba_nb = gnb.predict_proba(X_test)           # (n_samples, n_classes)
results.append({
    "model":    "GaussianNB",
    "accuracy": accuracy_score(y_test,  y_nb),
    "f1":       f1_score(y_test,    y_nb, average="weighted"),
    "auc":      roc_auc_score(y_test, proba_nb,
                              multi_class="ovo", average="weighted")
})

# HMM (um modelo por classe)
models_hmm = {}
for cls, (mu, sigma, _) in params.items():
    m = hmm.GaussianHMM(n_components=2,
                        covariance_type="diag",
                        n_iter=100,
                        random_state=42)
    Xc = X_train[y_train == cls]
    m.fit(Xc.reshape(-1,1), [Xc.shape[1]]*len(Xc))
    models_hmm[cls] = m

# pontua cada janela de teste
class_list = sorted(models_hmm.keys())
scores_hmm = np.zeros((len(X_test), len(class_list)))
for i, x in enumerate(X_test):
    for idx, cls in enumerate(class_list):
        scores_hmm[i, idx] = models_hmm[cls].score(x.reshape(-1,1))

# normaliza para pseudo-probabilidades
proba_hmm = softmax(scores_hmm, axis=1)
y_hmm     = np.argmax(scores_hmm, axis=1)
results.append({
    "model":    "HMM",
    "accuracy": accuracy_score(y_test, y_hmm),
    "f1":       f1_score(y_test, y_hmm, average="weighted"),
    "auc":      roc_auc_score(y_test, proba_hmm,
                              multi_class="ovo", average="weighted")
})

# One-Class SVM (um detector para cada classe)
cls_list = sorted(np.unique(y_train))
ocsvm_models = {}
for cls in cls_list:
    ocsvm_models[cls] = OneClassSVM(gamma="auto").fit(
        X_train[y_train == cls]
    )

# para cada janela de teste, cada modelo dá um score
scores_svm = np.zeros((len(X_test), len(cls_list)))
for i, x in enumerate(X_test):
    for j, cls in enumerate(cls_list):
        scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))

# normaliza para pseudo-probabilidades
proba_svm = softmax(scores_svm, axis=1)
y_svm     = np.argmax(scores_svm, axis=1)

results.append({
    "model":    "OneClassSVM",
    "accuracy": accuracy_score(y_test, y_svm),
    "f1":       f1_score(y_test, y_svm, average="weighted"),
    "auc":      roc_auc_score(y_test, proba_svm,
                              multi_class="ovo", average="weighted")
})

# Temporal Naive Bayes (TNB)
def predict_tnb_all(X, params, prior=0.5):
    cls_list   = sorted(params.keys())
    K          = len(cls_list)
    scores_mat = np.zeros((len(X), K))
    y_pred     = []
    for i, x in enumerate(X):
        ll = {cls: np.log(prior) for cls in cls_list}
        for t in range(1, len(x)):
            for cls in cls_list:
                mu, sigma, alpha = params[cls]
                ll[cls] += norm.logpdf(x[t], loc=mu + alpha*x[t-1], scale=sigma)
        vec = np.array([ll[c] for c in cls_list])
        scores_mat[i] = vec
        y_pred.append(cls_list[np.argmax(vec)])
    return np.array(y_pred), scores_mat

y_tnb, scores_tnb = predict_tnb_all(X_test, params)
proba_tnb        = softmax(scores_tnb, axis=1)
results.append({
    "model":    "TNB",
    "accuracy": accuracy_score(y_test, y_tnb),
    "f1":       f1_score(y_test, y_tnb, average="weighted"),
    "auc":      roc_auc_score(y_test, proba_tnb,
                              multi_class="ovo", average="weighted")
})

# Salva CSV
df = pd.DataFrame(results, columns=["model","accuracy","f1","auc"])
df.to_csv("carla_benchmark_results.csv", index=False)
print(df)


  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsvm_models[cls].decision_function(x.reshape(1, -1))
  scores_svm[i, j] = ocsv

         model  accuracy        f1       auc
0   GaussianNB      0.64  0.612000  0.685026
1          HMM      0.68  0.678050  0.914868
2  OneClassSVM      0.24  0.216774  0.640741
3          TNB      0.72  0.784689  0.966032
