In [None]:
if 'google.colab' in str(get_ipython()):
    from google.colab import drive
    drive.mount('/content/drive')
    %cd "/content/drive/MyDrive/Colab Notebooks/SIP_LSTM/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/Colab Notebooks/SIP_LSTM


In [None]:
import numpy as np
import tensorflow as tf
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, f1_score
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dropout, Dense
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
from scipy.stats import skew, kurtosis

In [None]:
# Carica il dataset
df = pd.read_csv('Cleaned_scenari_validi.csv')

dialogs = list(df['Replaced Signalling Description'].to_list())

# Costruisci i due vocabolari (metodo e codice)
methods = set()
codes   = set()
seqs = []
max = 0
for d in dialogs:
    seq = []
    for msg in d.split(':'):
        tok = msg.split(',')[2]
        if '-' in tok:
            m, c = tok.split('-',1)
            seq.append(m)
            seq.append(c)
            methods.add(m)
            codes.add(c)
        else:
            seq.append(tok)
            methods.add(tok)
    if len(seq) > max:
        max = len(seq)

    seqs.append(seq)

symbols = methods | codes
symbols.add('<PAD>')

message2idx = {m:i for i,m in enumerate(sorted(symbols))}

print(seqs)
print(message2idx)

NameError: name 'pd' is not defined

In [None]:
dialogs = []
for s in seqs:
    dialogs.append(''.join(s))

In [None]:
# -------------------
# 1) Iperparametri
# -------------------
M = len(message2idx) - 1 # Rappresenta il numero complessivo di tipi distinti di messaggi SIP (richieste e risposte) che possono comparire in un dialogo.
LM = len(message2idx) # È la lunghezza del vettore one-hot usato per codificare ogni messaggio SIP.
LN = max # È la lunghezza fissa delle sequenze “padded” in input alla rete. Ogni osservazione ​viene allungata aggiungendo zeri fino al valore LN.
N = len(set(dialogs)) # Indica il numero di classi di output del modello, ovvero il numero totale di dialoghi SIP unici presenti nel training set. Ciascun dialogo è etichettato con un identificatore in {1, ..., N}
units = 256 # 1043
dropout_rate = 0.5
batch_size = 64
learning_rate = 0.001
max_epochs = 200

opt = tf.keras.optimizers.Adam(learning_rate=learning_rate)
early_stop = EarlyStopping(monitor='val_loss', mode='min', patience=10, restore_best_weights=True)

print("M = ", M)
print("LM = ", LM)
print("LN = ", LN)
print("N = ", N)

M =  27
LM =  28
LN =  250
N =  2823


In [None]:
# -------------------
# 2) Costruzione modelli
# -------------------
def build_model_1():
    m = Sequential([
        LSTM(units, input_shape=(LN, LM)),
        Dropout(dropout_rate),
        Dense(N, activation='softmax')
    ])
    m.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    return m

def build_model_2():
    m = Sequential([
        LSTM(units, return_sequences=True, input_shape=(LN, LM)),
        Dropout(dropout_rate),
        LSTM(units),
        Dropout(dropout_rate),
        Dense(N, activation='softmax')
    ])
    m.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    return m

In [None]:
# -------------------
# 3) Caricamento dati e One-Hot Encoding
# -------------------
encoded_dialogs = []
for s in seqs:
    encoded_dialog = np.zeros((LN, LM), dtype=float)
    for i in range(0, LN):
        if i < len(s):
            encoded_dialog[i] = to_categorical(message2idx[s[i]], num_classes=LM)
        else:
            encoded_dialog[i] = to_categorical(message2idx['<PAD>'], num_classes=LM)

    encoded_dialogs.append(encoded_dialog)

print(encoded_dialogs)

[array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 1., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]]), array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 1., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]]), array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 1., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]]), array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 1., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]]), array([[0., 0., 0., ..., 0., 0.

In [None]:
X = np.array(encoded_dialogs)  # shape = (num_dialoghi, LN, LM)

dialogs_str = [' '.join(s) for s in seqs]
labels, uniques = pd.factorize(dialogs_str)
y_int = labels                    # interi 0..N-1
y = to_categorical(y_int, num_classes=N)  # one-hot

X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42
)

  labels, uniques = pd.factorize(dialogs_str)


[[1. 0. 0. ... 0. 0. 0.]
 [0. 1. 0. ... 0. 0. 0.]
 [0. 0. 1. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
10000
10000


In [None]:
# -------------------
# 4) Training
# -------------------
model1 = build_model_1()
history1 = model1.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=max_epochs,
    batch_size=batch_size,
    callbacks=[early_stop],
    verbose=2
)

model2 = build_model_2()
history2 = model2.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=max_epochs,
    batch_size=batch_size,
    callbacks=[early_stop],
    verbose=2
)

  super().__init__(**kwargs)


Epoch 1/200
100/100 - 103s - 1s/step - accuracy: 0.2092 - loss: 5.6760 - val_accuracy: 0.2212 - val_loss: 5.3192
Epoch 2/200
100/100 - 150s - 1s/step - accuracy: 0.2170 - loss: 5.2286 - val_accuracy: 0.2212 - val_loss: 5.4135
Epoch 3/200


KeyboardInterrupt: 

In [None]:
# -------------------
# IV.B Detection Performance
# -------------------
def detection_perf(model, X, y_true):
    y_pred = model.predict(X, batch_size=batch_size)
    y_pred_labels = np.argmax(y_pred, axis=1)
    y_true_labels = np.argmax(y_true, axis=1)
    acc = accuracy_score(y_true_labels, y_pred_labels)
    return acc

pd_train_1 = detection_perf(model1, X_train, y_train)
pd_test_1  = detection_perf(model1, X_test,  y_test)
print(f"IV.B – Model1 Detection PD_train={pd_train_1:.4f}, PD_test={pd_test_1:.4f}")

pd_train_2 = detection_perf(model2, X_train, y_train)
pd_test_2  = detection_perf(model2, X_test,  y_test)
print(f"IV.B – Model2 Detection PD_train={pd_train_2:.4f}, PD_test={pd_test_2:.4f}")

In [None]:
# -------------------
# IV.C Prediction Performance
# -------------------
def prediction_perf(model, X_pref, y_pref):
    y_pred = model.predict(X_pref, batch_size=batch_size)
    correct = (np.argmax(y_pred,1) == np.argmax(y_pref,1)).sum()
    total   = len(y_pref)
    return correct / total

pe_train_1 = prediction_perf(model1, X_train_prefixes, y_train_prefixes)
pe_test_1  = prediction_perf(model1, X_test_prefixes,  y_test_prefixes)
print(f"IV.C – Model1 Prediction PE_train={pe_train_1:.4f}, PE_test={pe_test_1:.4f}")

pe_train_2 = prediction_perf(model2, X_train_prefixes, y_train_prefixes)
pe_test_2  = prediction_perf(model2, X_test_prefixes,  y_test_prefixes)
print(f"IV.C – Model2 Prediction PE_train={pe_train_2:.4f}, PE_test={pe_test_2:.4f}")

In [None]:
# -------------------
# IV.D Detection of Unknown SIP Dialogs
# -------------------
# 1) Calcolo soglia λM = mean(max_i yhat_i) su tutti i dialoghi noti (train_full)
yhat_train_full = model1.predict(X_train_full, batch_size=batch_size)
max_train       = np.max(yhat_train_full, axis=1)
lambda_M        = max_train.mean()

# 2) Calcolo soglie λS, λK su skewness e kurtosis delle uscite dei dialoghi noti
sk_train       = skew(yhat_train_full, axis=1)
ku_train       = kurtosis(yhat_train_full, axis=1)
mu_S, var_S    = sk_train.mean(),  sk_train.var()
mu_K, var_K    = ku_train.mean(),  ku_train.var()
lambda_S       = mu_S - var_S
lambda_K       = mu_K - var_K

# 3) Funzioni di classificazione
def classify_max_threshold(yhat):
    return np.where(np.max(yhat,axis=1) < lambda_M, -1, 0)  # -1 = unknown, 0 = known

def classify_moments(yhat):
    ske = skew(yhat,axis=1)
    kur = kurtosis(yhat,axis=1)
    return np.where((ske<lambda_S)&(kur<lambda_K), -1, 0)

# 4) Predizioni su set “unknown” (unisci anomalous + test unknown)
X_u = X_unknown_full
y_true = np.full(len(X_u), -1)           # ground‐truth = unknown
yhat_u = model1.predict(X_u, batch_size=batch_size)

# 5) Costruisci insieme “known” da usare come negativi: X_train_full
X_k = X_train_full
y_true_k = np.zeros(len(X_k), dtype=int)  # ground‐truth = known

# 6) Unisci per valutazione
X_all = np.vstack([X_k, X_u])
y_true_all = np.concatenate([y_true_k, y_true])

yhat_all = model1.predict(X_all, batch_size=batch_size)

# 7) Classifica
y_pred_max = classify_max_threshold(yhat_all)
y_pred_moments = classify_moments(yhat_all)

# 8) Confusion matrix e metriche
def report(y_true, y_pred):
    cm = confusion_matrix(y_true==0, y_pred==0)
    acc = accuracy_score(y_true,    y_pred)
    prec = precision_score(y_true==0, y_pred==0)
    rec  = recall_score(y_true==0,    y_pred==0)
    f1   = f1_score(y_true==0,       y_pred==0)
    return cm, acc, prec, rec, f1

cm1, acc1, prec1, rec1, f11 = report(y_true_all, y_pred_max)
cm2, acc2, prec2, rec2, f12 = report(y_true_all, y_pred_moments)

print("IV.D – Max-Threshold Classifier")
print(" Confusion Matrix:\n", cm1)
print(f" Accuracy={acc1:.4f}, Precision={prec1:.4f}, Recall={rec1:.4f}, F1={f11:.4f}")

print("IV.D – Skew/Kurtosis Classifier")
print(" Confusion Matrix:\n", cm2)
print(f" Accuracy={acc2:.4f}, Precision={prec2:.4f}, Recall={rec2:.4f}, F1={f12:.4f}")