In [None]:
!pip install x-transformers

> Parameter configurations by iterative search and embeddings generation:

In [None]:
import pandas as pd 
import torch
import numpy as np
from x_transformers import ContinuousTransformerWrapper, Encoder
from sklearn.cluster import KMeans
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import silhouette_score
from itertools import product
from sklearn.metrics import accuracy_score
import joblib
from sklearn.preprocessing import RobustScaler


path = "../../data"

output_dir = '.'
import os
os.makedirs(output_dir, exist_ok=True)




def split_sequence(tensor, window_size):
    seq_len = tensor.shape[1]
    return [tensor[:, i:i+window_size, :] for i in range(0, seq_len, window_size)]

early_warning_url = f"{path}/early_warning_exp_1.csv"
early_warning = pd.read_csv(early_warning_url, sep=";")
colunas = [
    'total_pacotes', 'total_pacotes_icmp', 'total_pacotes_udp', 'total_pacotes_tcp', 'maior_pacote',
    'menor_pacote', 'soma_pacotes', 'total_ips_origem', 'total_ips_destino', 'porta_origem_mais_frequente',
    'porta_destino_mais_frequente', 'total_mac_source', 'total_mac_dst', 'ip_version', 'maior_ttl', 'menor_ttl',
    'std_ttl', 'mean_ttl', 'total_flags_tcp', 'total_tcp_flags_fin', 'total_tcp_flags_syn', 'total_tcp_flags_reset',
    'total_tcp_flags_push', 'total_tcp_flags_ack', 'total_tcp_flags_urg', 'maior_tcp_window_size_value', 'menor_tcp_window_size_value',
    'soma_tcp_window_size_value', 'std_tcp_window_size_value', 'mean_tcp_window_size_value', 'maior_tcp_seq', 'menor_tcp_seq', 'soma_tcp_seq',
    'std_tcp_seq', 'mean_tcp_seq', 'maior_time_delta', 'menor_time_delta', 'soma_time_delta', 'std_time_delta', 'mean_time_delta',
    'maior_tcp_time_delta', 'menor_tcp_time_delta', 'soma_tcp_time_delta', 'std_tcp_time_delta', 'mean_tcp_time_delta', 'maior_tcp_time_relative',
    'menor_tcp_time_relative', 'soma_tcp_time_relative', 'std_tcp_time_relative', 'mean_tcp_time_relative'
]


dim = [ 128, 12, 32, 64,20]
depth = [256, 128,32,16,64]
heads = [64,12,20, 32,16] 
dim_out=[128,64, 12,20, 16]
sup=0
inf=5632

melhor_score = -1
melhores_parametros = {}
melhor_y_pred = None
y_test_real_melhor = None
melhor_embeddings = None

for dim_out, heads, depth, dim in product(dim_out, heads, depth, dim):
    scaler = RobustScaler()
    print(f"\n- Config: dim={dim}, depth={depth}, heads={heads}, dim_out={dim_out}")
    slice_init_unlabeled_test = sup
    slice_end_unlabeled_test = inf

    x_test_df = early_warning[colunas][slice_init_unlabeled_test:slice_end_unlabeled_test].copy()
    y_test_real = early_warning['has_bot'][slice_init_unlabeled_test:slice_end_unlabeled_test].to_numpy()
    x_test_normalized = scaler.fit_transform(x_test_df)
    x_test_tensor = torch.tensor(x_test_normalized, dtype=torch.float32).unsqueeze(0)
    num_features = x_test_tensor.shape[2]

    model = ContinuousTransformerWrapper(
        dim_in=num_features,
        dim_out=dim_out,
        max_seq_len=6000,  
        attn_layers=Encoder(
            dim=dim,
            depth=depth,
            heads=heads,
            dynamic_pos_bias=True
        )
    )

    windows = split_sequence(x_test_tensor, window_size=30)
    embeddings_list = []
    with torch.no_grad():
        for window in windows:
            mask = torch.ones_like(window[:, :, 0]).bool()
            emb = model(window, mask=mask)
            embeddings_list.append(emb.squeeze(0).cpu())

    embeddings_test_np = torch.cat(embeddings_list, dim=0).numpy()

    kmeans = KMeans(n_clusters=2, random_state=0, n_init='auto').fit(embeddings_test_np)
    y_pred = kmeans.labels_

    acc = accuracy_score(y_test_real, y_pred)
    acc_inv = accuracy_score(y_test_real, 1 - y_pred)
    acc_corrigida = max(acc, acc_inv)

    print(f"accuracy: {acc_corrigida:.4f}")

    if acc_corrigida > melhor_score:
        melhor_score = acc_corrigida
        melhores_parametros = {'dim': dim, 'depth': depth, 'heads': heads, 'init': {sup}, 'end': {inf}}
        melhor_y_pred = y_pred if acc >= acc_inv else 1 - y_pred
        melhor_embeddings = embeddings_test_np.copy()
        y_test_real_melhor = y_test_real.copy()
        embeddings_df = pd.DataFrame(melhor_embeddings)
        embeddings_df.columns = [f'embedding_{i}' for i in range(embeddings_test_np.shape[1])]

        melhor_df = x_test_df.copy()
        melhor_df['predicted_label'] = melhor_y_pred
        melhor_df['real_label'] = y_test_real_melhor

        np.save(f'{output_dir}/embeddings_ex1.npy', melhor_embeddings)
        embeddings_df.to_csv(f'{output_dir}/embeddings_ex1.csv', index=False)
        melhor_df.to_csv(f'{output_dir}/resultado_ex1.csv', index=False)

print("\nBest Configuration:")
print(melhores_parametros)
print("\n - Confusion Matrix:" )
print(confusion_matrix(y_test_real_melhor, melhor_y_pred))
print("\n - Classification Report:")
print(classification_report(y_test_real_melhor, melhor_y_pred, digits=4, zero_division=0))

score = silhouette_score(melhor_embeddings, melhor_y_pred)
print(f"\n- Silhouette Score: {score:.4f}")