# Segmentation methodology for Chameleon

In [18]:
import importlib
from time import time
import itertools

import numpy as np
from matplotlib import pyplot as plt
from tsfresh.feature_extraction.settings import (
    EfficientFCParameters,
)

import unsupervised_learning.unsupervised_sliding_windows_classification as unsupervised_sliding_windows_classification

importlib.reload(unsupervised_sliding_windows_classification)
import inference_pipeline.heuristic as heuristic
importlib.reload(heuristic)
import unsupervised_learning.vectorization as vectorization
importlib.reload(vectorization)
import unsupervised_learning.classification as classification
importlib.reload(classification)

fc_params = EfficientFCParameters()
# Removing the least efficient features
fc_params.pop('query_similarity_count', None) # Often not applicable
fc_params.pop('augmented_dickey_fuller', None)
fc_params.pop('number_cwt_peaks', None)
fc_params.pop('agg_linear_trend', None)
fc_params.pop('change_quantiles', None)
fc_params.pop('lempel_ziv_complexity', None)
fc_params.pop('permutation_entropy', None)


window_size = 10_000
stride = 50 #50


#vectorizer = vectorization.TSFreshVectorizer(fc_params=fc_params, n_jobs=min(int(multiprocessing.cpu_count()/1.2), 64))
vectorizer = vectorization.AutoencoderVectorizer(window_size=window_size, base_lr=2e-4, lr_decay= 0.7)
#classifier = classification.KMeansClassifier(n_clusters=6)
classifier = classification.RadialThresholdClassifier()

In [2]:
dataset = 'BASE'
chunk = 1

chameleon_file = f"datasets/{dataset}/chameleon_{dataset.lower()}_chunk_{chunk}.h5"
output_file = "temporary_files/classification_file.npy"

batch_size = vectorizer.batch_data_points//window_size

Compute and save segmentation as file `output_file`.
Function _classifyTrace_ has a few parameters to configure based on the experiment:

- `stride`: Define the stride to use for the sliding window.
- `window_size`: Define the size of the sliding window itself.

In [19]:
# Zeit um TSFRESH Features zu bearbeiten: 1h
start = time()
scores = unsupervised_sliding_windows_classification.classify_trace_unsupervised(
    trace_file=chameleon_file,
    vectorizer=vectorizer,
    classifier=classifier,
    stride=stride,
    window_size=window_size,
    epochs=8,
    tmp_folder = "temporary_files",
    batch_size=batch_size, # Bigger batch size hit's the memory limit of the docker
    stored_features_path = 'autoencoder_features/BASE_err', # Remove or none to extract new features, path in temporary_files otherwise
    limit_traces = 4
)
end = time()
with open("time.txt", "w") as f:
    f.write(str((end-start)//60))

'''
Recon error
Base: 0.021
DFS: 0.053
'''

# Best balanced radius at 0.7

'''
choose_from_error:
5, 95:
Precicion: 1.0000
Recall: 0.7076
F1: 0.8287
Balanced accuracy: 0.8538

10, 99
'''

unsupervised_sliding_windows_classification.saveClassification(scores, output_file)

0.09474612466060044 noise windows found
0.09530943106053606 noise windows found
0.09569465514620633 noise windows found
0.09390638086069641 noise windows found


KeyboardInterrupt: 

In [17]:
import numpy as np
import plotly.express as px
import plotly.io as pio

pio.renderers.default = "plotly_mimetype"   # stays inline

Xp, y = classifier.post_pipeline_data

# -------- options --------
max_points = 500_000
classes_to_plot = [0,1]      # e.g. [1] or [0, 1]; None = all
# -------------------------

# filter by class (before subsampling)
if classes_to_plot is not None:
    classes_to_plot = np.asarray(classes_to_plot)
    mask = np.isin(y, classes_to_plot)
    Xp = Xp[mask]
    y = y[mask]

# subsample
if Xp.shape[0] > max_points:
    idx = np.random.choice(Xp.shape[0], max_points, replace=False)
    Xp_plot = Xp[idx]
    y_plot = y[idx]
else:
    Xp_plot = Xp
    y_plot = y

fig = px.scatter_3d(
    x=Xp_plot[:, 0],
    y=Xp_plot[:, 1],
    z=Xp_plot[:, 2],
    color=y_plot.astype(str),
)

fig.update_traces(
    marker=dict(
        size=2,
        opacity=0.5,
    )
)

fig.show()


In [4]:
n_trace = 0
trace_len = 134_217_550
classification_file = output_file

labels = unsupervised_sliding_windows_classification.loaderGt(chameleon_file)

# Use memmap to avoid loading the whole file into RAM
classification = np.load(classification_file, mmap_mode="r")[n_trace]  # (num_windows, num_classes)

# --- GT starts/ends (FIX: append, don't overwrite) ---
label = next(itertools.islice(labels, n_trace, None))
gts_starts = label["start"]
gts_ends   = label["end"]

ground_truth_labels = np.zeros(trace_len, dtype=np.uint8)
for s, e in zip(gts_starts, gts_ends):
    if s < 0 or e < 0 or s >= trace_len:
        continue
    e = min(e, trace_len - 1)
    ground_truth_labels[s:e + 1] = 1

found_labels_ones = np.zeros(trace_len, dtype=np.uint16)
found_labels_count = np.zeros(trace_len, dtype=np.uint16)
for w, window_scores in enumerate(classification):
    start = w * stride
    end = start + window_size
    if start >= trace_len:
        break
    end = min(end, trace_len)

    if np.argmax(window_scores) == 1:
        found_labels_ones[start:end] += 1
    found_labels_count[start:end] += 1

co_classification_ratio = 0.47
found_labels = np.zeros(trace_len, dtype=np.uint8)

covered = found_labels_count > 0
found_labels[covered] = (found_labels_ones[covered] >= (co_classification_ratio * found_labels_count[covered])).astype(np.uint8)


In [None]:
center = 100_000
margin = center
lim = (center - margin, center + margin)

fig, ax = plt.subplots(1, figsize=(13, 7))
plt.rcParams.update({'font.size': 18})
fig.tight_layout(pad=2.0)

ax.set_xlim(lim)

# GT dashed
ax.plot(ground_truth_labels, color="orange", linestyle="--", linewidth=3)

# Found labels
ax.plot(found_labels, color="blue", linewidth=2, alpha = 1)

# Dashed vertical lines for GT starts / ends inside limits
for s, e in zip(gts_starts, gts_ends):
    continue
    if lim[0] <= s <= lim[1]:
        ax.axvline(s, color="red", linestyle="--", alpha=0.4)
    if lim[0] <= e <= lim[1]:
        ax.axvline(e, color="white", linestyle="--", alpha=0.4)

plt.show()

In [None]:
# Per-label IoU + TPR/FPR/TNR/FNR (sample-level)

gt = ground_truth_labels.astype(bool)   # True = class 1
fd = found_labels.astype(bool)

intersection = np.count_nonzero(gt & fd)
union = np.count_nonzero(gt | fd)

iou = intersection / union if union > 0 else 0.0

TP = np.count_nonzero(gt & fd)
FP = np.count_nonzero(~gt & fd)
TN = np.count_nonzero(~gt & ~fd)
FN = np.count_nonzero(gt & ~fd)

# Class 1 (positive) rates
TPR_1 = TP / (TP + FN) if (TP + FN) else 0.0  # recall / sensitivity
FNR_1 = FN / (TP + FN) if (TP + FN) else 0.0
FPR_1 = FP / (FP + TN) if (FP + TN) else 0.0
TNR_1 = TN / (FP + TN) if (FP + TN) else 0.0  # specificity

# Class 1 IoU
IoU_1 = TP / (TP + FP + FN) if (TP + FP + FN) else 0.0

# Class 0 (treat class 0 as the “positive” class by swapping)
TP0, FP0, TN0, FN0 = TN, FN, TP, FP

TPR_0 = TP0 / (TP0 + FN0) if (TP0 + FN0) else 0.0
FNR_0 = FN0 / (TP0 + FN0) if (TP0 + FN0) else 0.0
FPR_0 = FP0 / (FP0 + TN0) if (FP0 + TN0) else 0.0
TNR_0 = TN0 / (FP0 + TN0) if (FP0 + TN0) else 0.0

IoU_0 = TP0 / (TP0 + FP0 + FN0) if (TP0 + FP0 + FN0) else 0.0

print(f"IoU: {iou:.4f}\n")

print("Class 1 (CO):")
print(f"  IoU={IoU_1:.4f}\n  TPR={TPR_1:.4f}  FPR={FPR_1:.4f}  TNR={TNR_1:.4f}  FNR={FNR_1:.4f}\n")

print("Class 0 (non-CO):")
print(f"  IoU={IoU_0:.4f}\n  TPR={TPR_0:.4f}  FPR={FPR_0:.4f}  TNR={TNR_0:.4f}  FNR={FNR_0:.4f}")
