# Data Loading

In [None]:
from huggingface_hub import login

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
from wordcloud import WordCloud
import sklearn

#login()

In [None]:
print("Pandas version:", pd.__version__)
print("Numpy version:", np.__version__)
print("scikit learn version:", sklearn.__version__)


df = pd.read_parquet("hf://datasets/NASK-PIB/PL-Guard/test/data.parquet")
df_adversarial = pd.read_parquet("hf://datasets/NASK-PIB/PL-Guard/test_adversarial/data.parquet")


In [None]:
df.info()

In [None]:
df_adversarial.info()

In [None]:
df.head(2).style.set_properties(**{
    'text-align': 'left',
    'white-space': 'pre-wrap',
})

In [None]:
df.describe()

In [None]:
df_adversarial.head(2).style.set_properties(**{
    'text-align': 'left',
    'white-space': 'pre-wrap',
})

In [None]:
text = " ".join(df.iloc[:, 0].dropna().astype(str).tolist())

polish_stopwords = {
    "i", "w", "z", "na", "do", "że", "o", "a", "to", "się", 
    "jak", "tak", "jest", "po", "za", "ale", "od", "lub", "nie", "dla", "czy", "które", "takich", "innych", "jeśli", "takie", "ponieważ", "także"
}

wordcloud = WordCloud(
    width=1600, height=800,
    background_color='white',
    stopwords=polish_stopwords,
    min_word_length=5
).generate(text)

plt.figure(figsize=(15, 10))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.savefig("Plots/wordcloud.png")
plt.show()

In [None]:
print(df['text'].apply(lambda x: len(str(x).split())).describe())

In [None]:
import matplotlib.pyplot as plt
lengths = df['text'].str.split().map(len)
plt.hist(lengths, bins=50, color='skyblue', edgecolor='black')
plt.title('Distribution of prompt lengths (number of words)')
plt.xlabel('Length')
plt.ylabel('Number of examples')
plt.savefig("Plots/WordDistribution.png")
plt.show()

# Basic Preprocessing

In [None]:
split_data = df['category'].str.split('\n', expand=True)
split_data_adversarial = df_adversarial['category'].str.split('\n', expand=True)


df['category_code'] = split_data[1]
df_adversarial['category_code'] = split_data_adversarial[1]

def custom_encoder(val):
    if val is None:
        return 0
        
    val = str(val).strip()
    
    if val.lower() in ['safe', 'nan', 'none', '']:
        return 0
    
    elif val.startswith('S'):
        try:
            return int(val[1:])
        except ValueError:
            return -1
            
    return -1

df['target'] = df['category_code'].apply(custom_encoder)
df_adversarial['target'] = df_adversarial['category_code'].apply(custom_encoder)

print("Unique mappings:")
print(df[['category_code', 'target']].drop_duplicates().sort_values('target'))

In [None]:
df['label'] = df['target'].apply(lambda x: 0 if x == 0 else 1)

In [None]:
df.head(2)

In [None]:
df_adversarial.head(2)

In [None]:
df.info()

# Text Representation: Using Embeddings

## Vector representation using BERT

In [None]:
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModel

model_name = "bert-base-multilingual-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

def get_embeddings_from_df(text_list, batch_size=32):
    all_embeddings = []
    
    # I process in batches so as not to clog up the RAM with 900+ lines.h
    for i in range(0, len(text_list), batch_size):
        batch_texts = text_list[i:i+batch_size]
        
        # Tokenization
        inputs = tokenizer(batch_texts, padding=True, truncation=True, 
                           max_length=512, return_tensors="pt")
        
        with torch.no_grad():
            outputs = model(**inputs)
            
        # Mean Pooling
        token_embeddings = outputs.last_hidden_state
        input_mask_expanded = inputs['attention_mask'].unsqueeze(-1).expand(token_embeddings.size()).float()
        sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
        sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
        batch_embeddings = sum_embeddings / sum_mask
        
        all_embeddings.append(batch_embeddings.numpy())
    
    return np.vstack(all_embeddings)

print("Generating vectors for clean prompts")
vec_original = get_embeddings_from_df(df['text'].tolist())

#print("Generating vectors for adversarial prompts)
#vec_adversarial = get_embeddings_from_df(df_adversarial['text'].tolist())

print(f"Gotowe. Wymiary: df={vec_original.shape}")
#print(f"Shape for adversial: {vec_adversarial.shape}")

## Fine-tuning BERT and new vecors

### Training for multiple classification

In [None]:
import pandas as pd
import numpy as np
import torch
from torch import nn
from datasets import Dataset
from transformers import BertTokenizer, AutoModelForSequenceClassification, AutoModel, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
import evaluate
from tqdm.auto import tqdm
import os
import gc

MODEL_NAME = "bert-base-multilingual-cased"
MAX_LEN = 512
BATCH_SIZE = 8
GRAD_ACC_STEPS = 4
EPOCHS = 5

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Used device: {device}")


df_orig = df.copy()
df_adv = df_adversarial.copy()


train_idx_orig, test_idx_orig = train_test_split(
    np.arange(len(df_orig)), 
    test_size=0.2, 
    random_state=42, 
    stratify=df_orig['target']
)

train_df_pure = df_orig.iloc[train_idx_orig][['text', 'target']].rename(columns={'target': 'label'})
test_df_pure = df_orig.iloc[test_idx_orig][['text', 'target']].rename(columns={'target': 'label'})
test_df_adv_full = df_adv[['text', 'target']].rename(columns={'target': 'label'})


train_idx_mix, test_idx_mix = train_test_split(
    np.arange(len(df_orig)), 
    test_size=0.2, 
    random_state=42, 
    stratify=df_orig['target']
)

train_df_mix = pd.concat([
    df_orig.iloc[train_idx_mix][['text', 'target']].rename(columns={'target': 'label'}),
    df_adv.iloc[train_idx_mix][['text', 'target']].rename(columns={'target': 'label'})
])

test_df_mix = pd.concat([
    df_orig.iloc[test_idx_mix][['text', 'target']].rename(columns={'target': 'label'}).assign(type='original'),
    df_adv.iloc[test_idx_mix][['text', 'target']].rename(columns={'target': 'label'}).assign(type='adversarial')
])

print(f"Senario Pure: Train={len(train_df_pure)}, Test Orig={len(test_df_pure)}, Test Adv={len(test_df_adv_full)}")
print(f"Scenario Mix:  Train={len(train_df_mix)}, Test={len(test_df_mix)}")



class WeightedTrainer(Trainer):
    def __init__(self, class_weights, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss_fct = nn.CrossEntropyLoss(weight=self.class_weights)
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

def get_embeddings(text_list, model, tokenizer, batch_size=32):
    model.eval()
    all_embeddings = []
    iterator = range(0, len(text_list), batch_size)
    for i in tqdm(iterator, desc="Generating Vectors", unit="batch", leave=False):
        batch_texts = text_list[i:i+batch_size]
        inputs = tokenizer(batch_texts, padding=True, truncation=True, max_length=MAX_LEN, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = model(**inputs)
        
        # Mean Pooling
        token_embeddings = outputs.last_hidden_state
        input_mask_expanded = inputs['attention_mask'].unsqueeze(-1).expand(token_embeddings.size()).float()
        sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
        sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
        batch_embeddings = sum_embeddings / sum_mask
        all_embeddings.append(batch_embeddings.cpu().numpy())
    return np.vstack(all_embeddings)

def run_training_pipeline(run_name, output_dir, train_df, test_df, dfs_to_embed):
    """
    It trains the model, saves it, and generates vectors for the given DataFrames.
    """
    print(f"\n{'='*40}")
    print(f"starting scenario: {run_name}")
    print(f"{'='*40}")
    
    train_ds = Dataset.from_pandas(train_df[['text', 'label']])
    test_ds = Dataset.from_pandas(test_df[['text', 'label']])
    
    # Tokenization
    tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
    def tokenize(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=MAX_LEN)
    
    print(f"[{run_name}] Tokenization")
    train_ds = train_ds.map(tokenize, batched=True)
    test_ds = test_ds.map(tokenize, batched=True)
    
    # weights
    class_counts = train_df['label'].value_counts().sort_index().values
    weights = 1.0 / class_counts
    weights = weights / weights.sum() * len(class_counts)
    class_weights_tensor = torch.tensor(weights, dtype=torch.float).to(device)
    
    model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=15).to(device)
    
    acc_metric = evaluate.load("accuracy")
    f1_metric = evaluate.load("f1")
    
    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        acc = acc_metric.compute(predictions=predictions, references=labels)
        f1 = f1_metric.compute(predictions=predictions, references=labels, average="macro")
        return {**acc, **f1}
    
    args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=EPOCHS,
        per_device_train_batch_size=BATCH_SIZE,
        per_device_eval_batch_size=BATCH_SIZE,
        gradient_accumulation_steps=GRAD_ACC_STEPS,
        fp16=True,
        learning_rate=2e-5,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="f1",
        save_total_limit=1,
        report_to="none"
    )
    
    trainer = WeightedTrainer(
        class_weights=class_weights_tensor,
        model=model,
        args=args,
        train_dataset=train_ds,
        eval_dataset=test_ds,
        compute_metrics=compute_metrics
    )
    
    print(f"[{run_name}] Training")
    trainer.train()
    
    print(f"[{run_name}] Saving the model")
    trainer.save_model(output_dir)
    tokenizer.save_pretrained(output_dir)
    
    del model, trainer
    torch.cuda.empty_cache()
    gc.collect()
    
    print(f"[{run_name}] Loading the model for feature extraction")
    embed_model = AutoModel.from_pretrained(output_dir).to(device)
    
    results = {}
    for name, df_target in dfs_to_embed.items():
        print(f"[{run_name}] Generating vectors for: {name}...")
        embeddings = get_embeddings(df_target['text'].tolist(), embed_model, tokenizer, batch_size=BATCH_SIZE*2)
        df_copy = df_target.copy()
        df_copy['embedding'] = list(embeddings)
        
        # Zapis do pliku
        save_path = f"{name}_with_embeddings.pkl"
        df_copy.to_pickle(save_path)
        print(f"Saved to: {save_path}")
        results[name] = df_copy
        
    del embed_model
    torch.cuda.empty_cache()
    
    return results


# SCENARIO 1: ‘NAIVE’ MODEL (only on originals)
# I generate vectors for:
# 1. The original test (to check accuracy purely)
# 2. Adversarial Full (to check how the model does NOT perform)
output_pure = run_training_pipeline(
    run_name="PURE_MODEL",
    output_dir="./bert_pure_finetuned",
    train_df=train_df_pure,
    test_df=test_df_pure,
    dfs_to_embed={
        "test_pure_orig": test_df_pure,
        "test_pure_adv": test_df_adv_full
    }
)

# SCENARIO 2: MODEL "ROBUST" (Mix)
output_mix = run_training_pipeline(
    run_name="ROBUST_MODEL",
    output_dir="./bert_robust_finetuned",
    train_df=train_df_mix,
    test_df=test_df_mix,
    dfs_to_embed={
        "test_df_mix": test_df_mix,
        # "train_df_mix": train_df_mix 
    }
)


test_df = output_mix["test_df_mix"]

print("PROCESS SUCCESSFULLY COMPLETED")
print("Available files with results (you can load them with pd.read_pickle):")
print("1. test_pure_orig_with_embeddings.pkl -> Model Pure on Orig data")
print("2. test_pure_adv_with_embeddings.pkl  -> Model Pure on Adv data")
print("3. test_df_mix_with_embeddings.pkl    -> Model Mix on Mix data")

## Vector representation using FastText

In [None]:
import fasttext
import numpy as np
import pandas as pd

path_to_model = "cc.pl.300.bin"

print("Model Loading...")
model = fasttext.load_model(path_to_model)
print("Model is ready")

def get_vector_efficient(text):
    clean_text = text.replace("\n", " ")
    # tokenization
    return model.get_sentence_vector(clean_text)


print("Vectors generating dla df...")
vectors_orig = [get_vector_efficient(t) for t in df['text']]
# We convert to numpy (float32 for RAM savings)
vec_ft_original = np.array(vectors_orig, dtype=np.float32)

print("Vectors generating for df_adversarial...")
vectors_adv = [get_vector_efficient(t) for t in df_adversarial['text']]
vec_ft_adversarial = np.array(vectors_adv, dtype=np.float32)

print(f"Finished, Shape: {vec_ft_original.shape}")

del model

# Dimensionality Reduction

## PCA

In [None]:
import pandas as pd
import numpy as np
import random as rd
from sklearn.decomposition import PCA
from sklearn import preprocessing
import matplotlib.pyplot as plt
from sklearn.pipeline import Pipeline
import plotly.express as px

### PCA visualizations for BERT vector representation

#### 2D (two principal components)

In [None]:
n_components = 2

normalizer = preprocessing.Normalizer(norm='l2')

scaled_vectors_b = normalizer.fit_transform(vec_original)

pca = PCA(n_components=n_components)
pca.fit(scaled_vectors_b)

pca_data = pca.transform(scaled_vectors_b)

In [None]:
def plot_scree_pca(pca, method: str = "BERT"):
    
    per_var = np.round(pca.explained_variance_ratio_ * 100, decimals=1)
    
    n_components = len(per_var)
    
    labels = ['PC' + str(x) for x in range(1, n_components + 1)]

    plt.figure(figsize=(10, 6))
    plt.bar(x=range(1, n_components + 1), height=per_var, tick_label=labels, color='skyblue', edgecolor='black')
    
    for i, v in enumerate(per_var):
        plt.text(i + 1, v + 0.5, str(v) + '%', ha='center', fontweight='bold')

    plt.ylabel('Percentage of Explained Variance')
    plt.xlabel('Principal Component')
    plt.title(f'Scree Plot for {n_components} Principal Components')
    
    plt.savefig(f'Plots/PcaPlots/ScreePlot_{method}_{n_components}D.png', bbox_inches='tight')
    plt.show()

In [None]:
plot_scree_pca(pca, 'RawBERT')

In [None]:
explained_variance_ratio = pca.explained_variance_ratio_
print(explained_variance_ratio)

cumulative_variance = np.sum(explained_variance_ratio)

print("\nVariance Explained")
for i, ratio in enumerate(explained_variance_ratio):
    print(f"PC {i+1}: {ratio:.4f} ({ratio*100:.2f}% of variance)")

print("...")
print(f"Total variance explained by {n_components} components: {cumulative_variance:.4f} ({cumulative_variance*100:.2f}%)")

In [None]:
pca_df = pd.DataFrame(pca_data, columns=['PC1', 'PC2'])

# adding information from the original df (target i tekst)
pca_df['target'] = df['target'].reset_index(drop=True)
pca_df['text'] = df['text'].reset_index(drop=True)
pca_df['category_code'] = df['category_code'].reset_index(drop=True)

# for readability
pca_df['label'] = pca_df['target'].map({0: 'Safe', 1: 'Malicious'})

print(pca_df.head())

In [None]:
per_var = np.round(pca.explained_variance_ratio_ * 100, decimals=1)
fig = px.scatter(
    pca_df,
    x='PC1',
    y='PC2',
    color='label',
    hover_data=['text'],
    title='PCA analysis',
    color_discrete_map={'Safe': 'green', 'Malicious': 'red'},
    opacity=0.7,
    labels={'PC1': f'PC1 ({per_var[0]:.1f}%)', 'PC2': f'PC2 ({per_var[1]:.1f}%)'}
)

fig.update_traces(marker=dict(size=8))
fig.update_layout(width=1200,
    height=800,
    autosize=False,
    hoverlabel=dict(bgcolor="white", font_size=12))

fig.show()

#### 3D (3 principal components)

In [None]:
# PCA for 3 componets
pca_3d = PCA(n_components=3)
pca_3d.fit(scaled_vectors_b) # We use the same normalized vectors as before.
pca_data_3d = pca_3d.transform(scaled_vectors_b)

print(f"Wariancja wyjaśniona: {pca_3d.explained_variance_ratio_}")
print(f"Suma: {sum(pca_3d.explained_variance_ratio_)*100:.2f}%")

df_3d = pd.DataFrame(pca_data_3d, columns=['PC1', 'PC2', 'PC3'])

# 3. We add labels and text
df_3d['target'] = df['target'].reset_index(drop=True)
df_3d['text'] = df['text'].reset_index(drop=True)
df_3d['label'] = df_3d['target'].map({0: 'Safe', 1: 'Malicious'})

In [None]:
plot_scree_pca(pca_3d, 'RawBERT')

In [None]:
fig = px.scatter_3d(
    df_3d,
    x='PC1', 
    y='PC2', 
    z='PC3',
    color='label',
    color_discrete_map={'Safe': 'green', 'Malicious': 'red'},
    hover_data=['text'],
    title='Wizualizacja PCA 3D',
    opacity=0.7
)

fig.update_traces(marker=dict(size=4))

fig.update_layout(width=1200,
    height=800,
    autosize=False,
    scene=dict(
    xaxis_title='PC1',
    yaxis_title='PC2',
    zaxis_title='PC3'
), margin=dict(l=0, r=0, b=0, t=30))

fig.show()

### PCA visualizations for FastText

#### 2D

In [None]:
n_components_ft = 2

normalizer = preprocessing.Normalizer(norm='l2')

scaled_vectors_ft = normalizer.fit_transform(vec_ft_original)

pca_ft = PCA(n_components=n_components)
pca_ft.fit(scaled_vectors_ft)

pca_data_ft = pca_ft.transform(scaled_vectors_ft)

In [None]:
plot_scree_pca(pca_ft, 'FastText')

In [None]:
explained_variance_ratio = pca_ft.explained_variance_ratio_
print(explained_variance_ratio)

cumulative_variance = np.sum(explained_variance_ratio)

print("\nVariance Explained")
for i, ratio in enumerate(explained_variance_ratio):
    print(f"PC {i+1}: {ratio:.4f} ({ratio*100:.2f}% of variance)")

print("...")
print(f"Total variance explained by {n_components_ft} components: {cumulative_variance:.4f} ({cumulative_variance*100:.2f}%)")

In [None]:
pca_df = pd.DataFrame(pca_data_ft, columns=['PC1', 'PC2'])

pca_df['target'] = df['target'].reset_index(drop=True)
pca_df['text'] = df['text'].reset_index(drop=True)
pca_df['category_code'] = df['category_code'].reset_index(drop=True)

pca_df['label'] = pca_df['target'].map({0: 'Safe', 1: 'Malicious'})

print(pca_df.head())

In [None]:
per_var_ft = np.round(pca.explained_variance_ratio_ * 100, decimals=1)

fig = px.scatter(
    pca_df,
    x='PC1',
    y='PC2',
    color='label',
    hover_data=['text'],
    title='Interaktywna analiza PCA',
    color_discrete_map={'Safe': 'green', 'Malicious': 'red'},
    opacity=0.7,
    labels={'PC1': f'PC1 ({per_var_ft[0]:.1f}%)', 'PC2': f'PC2 ({per_var_ft[1]:.1f}%)'}
)

fig.update_traces(marker=dict(size=8))
fig.update_layout(width=1200,
    height=800,
    autosize=False,
    hoverlabel=dict(bgcolor="white", font_size=12))

fig.show()

#### 3D

In [None]:
pca_3d_ft = PCA(n_components=3)
pca_3d_ft.fit(scaled_vectors_ft)
pca_data_3d_ft = pca_3d_ft.transform(scaled_vectors_ft)

print(f"Explained variance: {pca_3d_ft.explained_variance_ratio_}")
print(f"Summ: {sum(pca_3d_ft.explained_variance_ratio_)*100:.2f}%")

df_3d_ft = pd.DataFrame(pca_data_3d_ft, columns=['PC1', 'PC2', 'PC3'])

df_3d_ft['target'] = df['target'].reset_index(drop=True)
df_3d_ft['text'] = df['text'].reset_index(drop=True)
df_3d_ft['label'] = df_3d_ft['target'].map({0: 'Safe', 1: 'Malicious'})

In [None]:
plot_scree_pca(pca_3d_ft, 'FastText')

In [None]:
fig = px.scatter_3d(
    df_3d_ft,
    x='PC1', 
    y='PC2', 
    z='PC3',
    color='label',
    color_discrete_map={'Safe': 'green', 'Malicious': 'red'},
    hover_data=['text'],
    title='Visualisation PCA 3D',
    opacity=0.7
)

fig.update_traces(marker=dict(size=4))

fig.update_layout(width=1200,
    height=800,
    autosize=False,
    scene=dict(
    xaxis_title='PC1',
    yaxis_title='PC2',
    zaxis_title='PC3'
), margin=dict(l=0, r=0, b=0, t=40))

fig.show()

## t-SNE

In [None]:
from sklearn.manifold import TSNE

### t-SNE for BERT vector representation

#### 2D

In [None]:
def t_sne_vis(scaled_vectors, pp):
    pca_50 = PCA(n_components=100)
    pca_result_50 = pca_50.fit_transform(scaled_vectors)
    
    # t-SNE
    # We run for perplexity between 30 and 50
    tsne = TSNE(n_components=2, verbose=1, perplexity=pp, random_state=42)
    tsne_results = tsne.fit_transform(pca_result_50)
    
    df_tsne = pd.DataFrame(tsne_results, columns=['x', 'y'])
    df_tsne['target'] = df['target'].reset_index(drop=True)
    df_tsne['text'] = df['text'].reset_index(drop=True)
    df_tsne['label'] = df_tsne['target'].map({0: 'Safe', 1: 'Malicious'})
    
    fig = px.scatter(
        df_tsne, x='x', y='y',
        color='label',
        hover_data=['text'],
        title='Wizualizacja t-SNE',
        color_discrete_map={'Safe': 'green', 'Malicious': 'red'},
        opacity=0.7,
        width=1000,
        height=800
    )
    fig.show()

t_sne_vis(scaled_vectors_b, 35)

In [None]:
pca_50 = PCA(n_components=50)
pca_result_50 = pca_50.fit_transform(scaled_vectors_b)
perplexities = [5, 30, 50, 100]

plt.figure(figsize=(20, 5))

for i, perp in enumerate(perplexities):
    tsne = TSNE(n_components=2, perplexity=perp, random_state=42, init='pca', learning_rate='auto')
    tsne_results = tsne.fit_transform(pca_result_50)
    
    ax = plt.subplot(1, 4, i+1)
    
    scatter = ax.scatter(tsne_results[:, 0], tsne_results[:, 1], 
                         c=df['target'], cmap='RdYlGn_r', alpha=0.6, s=10)
    
    ax.set_title(f'Perplexity: {perp}')
    ax.axis('off')

plt.tight_layout()
plt.show()

#### 3D

In [None]:
tsne_3d = TSNE(n_components=3, verbose=1, perplexity=45, random_state=42)
tsne_results_3d = tsne_3d.fit_transform(pca_result_50)

# 2. DataFrame
df_tsne_3d = pd.DataFrame(tsne_results_3d, columns=['x', 'y', 'z'])
df_tsne_3d['target'] = df['target'].reset_index(drop=True)
df_tsne_3d['text'] = df['text'].reset_index(drop=True)
df_tsne_3d['label'] = df_tsne_3d['target'].map({0: 'Safe', 1: 'Malicious'})

fig = px.scatter_3d(
    df_tsne_3d, x='x', y='y', z='z',
    color='label',
    hover_data=['text'],
    title='t-SNE 3D',
    color_discrete_map={'Safe': 'green', 'Malicious': 'red'},
    opacity=0.6
)

fig.update_traces(marker=dict(size=4))
fig.update_layout(width=1000, height=800)
fig.show()

### t-SNE for FastText vector representation

#### 2D

In [None]:
t_sne_vis(scaled_vectors_ft, 37)

In [None]:
perplexities = [5, 30, 50, 100]

plt.figure(figsize=(20, 5))

for i, perp in enumerate(perplexities):
    tsne = TSNE(n_components=2, perplexity=perp, random_state=42, init='pca', learning_rate='auto')
    tsne_results = tsne.fit_transform(pca_result_50)
    
    ax = plt.subplot(1, 4, i+1)
    
    scatter = ax.scatter(tsne_results[:, 0], tsne_results[:, 1], 
                         c=df['target'], cmap='RdYlGn_r', alpha=0.6, s=10)
    
    ax.set_title(f'Perplexity: {perp}')
    ax.axis('off')

plt.tight_layout()
plt.show()

#### 3D

In [None]:
tsne_3d = TSNE(n_components=3, verbose=1, perplexity=30, random_state=42)
tsne_results_3d = tsne_3d.fit_transform(pca_result_50)

df_tsne_3d = pd.DataFrame(tsne_results_3d, columns=['x', 'y', 'z'])
df_tsne_3d['target'] = df['target'].reset_index(drop=True)
df_tsne_3d['text'] = df['text'].reset_index(drop=True)
df_tsne_3d['label'] = df_tsne_3d['target'].map({0: 'Safe', 1: 'Malicious'})

fig = px.scatter_3d(
    df_tsne_3d, x='x', y='y', z='z',
    color='label',
    hover_data=['text'],
    title='t-SNE 3D',
    color_discrete_map={'Safe': 'green', 'Malicious': 'red'},
    opacity=0.6
)

fig.update_traces(marker=dict(size=4))
fig.update_layout(width=1000, height=800)
fig.show()

## UMAP

In [None]:
import umap

In [None]:
def visualize_umap(scaled_vectors, n_components, model_type, metadata_df):

    reducer = umap.UMAP(
        n_neighbors=15,
        min_dist=0.5,
        n_components=n_components, 
        metric='cosine',   
        random_state=42
    )

    umap_embedding = reducer.fit_transform(scaled_vectors)

    cols = ['x', 'y'] if n_components == 2 else ['x', 'y', 'z']
    df_umap = pd.DataFrame(umap_embedding, columns=cols)
    
    df_umap['text'] = metadata_df['text'].reset_index(drop=True)
    df_umap['target'] = metadata_df['target'].reset_index(drop=True)
    
    df_umap['label'] = df_umap['target'].map({0: 'Safe', 1: 'Malicious'})

    title_text = f'UMAP Visualization ({model_type}) - {n_components}D'
    
    common_params = dict(
        data_frame=df_umap,
        color='label',
        hover_data=['text'],
        title=title_text,
        color_discrete_map={'Safe': 'green', 'Malicious': 'red'},
        opacity=0.7
    )

    if n_components == 2:
        fig = px.scatter(x='x', y='y', **common_params)
    elif n_components == 3:
        fig = px.scatter_3d(x='x', y='y', z='z', **common_params)
    else:
        print("Error: n_components must be 2 or 3.")
        return

    marker_size = 6 if n_components == 2 else 4
    
    fig.update_traces(marker=dict(size=marker_size))
    fig.update_layout(
        width=1000,
        height=800,
        legend_title_text='Label',
        margin=dict(l=0, r=0, b=0, t=40)
    )

    fig.show()

### UMAP for BERT

In [None]:
visualize_umap(
    scaled_vectors=scaled_vectors_b, 
    n_components=2, 
    model_type="RawBERT", 
    metadata_df=df
)

In [None]:
visualize_umap(
     scaled_vectors=scaled_vectors_b, 
     n_components=3, 
     model_type="RawBERT", 
     metadata_df=df
)

### UMAP for FastText

In [None]:
visualize_umap(
     scaled_vectors=scaled_vectors_ft, 
     n_components=2, 
     model_type="FastText", 
     metadata_df=df
)

In [None]:
visualize_umap(
     scaled_vectors=scaled_vectors_ft, 
     n_components=3, 
     model_type="FastText", 
     metadata_df=df
)

# Dimensionality Reduction for fine-tuned BERT for 15 classes

## PCA

### 2D

In [None]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn import preprocessing
import matplotlib.pyplot as plt
import plotly.express as px
try:
    test_df = pd.read_pickle("test_df_mix_with_embeddings.pkl")
    print(f"Loaded vectors for {len(test_df)} samples")
except FileNotFoundError:
    print("Error: didn't find 'test_df_mix_with_embeddings.pkl' ")


vectors_fine = np.stack(test_df['embedding'].values)

n_components_tuned = 2

normalizer = preprocessing.Normalizer(norm='l2')
scaled_vectors_tuned = normalizer.fit_transform(vectors_fine)

pca_tuned = PCA(n_components=n_components_tuned)
pca_tuned.fit(scaled_vectors_tuned)
pca_data_tuned = pca_tuned.transform(scaled_vectors_tuned)

# calculating variation
per_var_tuned = np.round(pca_tuned.explained_variance_ratio_ * 100, decimals=1)
labels_tuned = ['PC' + str(x) for x in range(1, n_components_tuned+1)]

cumulative_variance_tuned = np.sum(pca_tuned.explained_variance_ratio_)
print("-" * 30)
print(f"Variance Explained PC1: {per_var_tuned[0]}%")
print(f"Variance Explained PC2: {per_var_tuned[1]}%")
print(f"Total variance explained: {cumulative_variance_tuned*100:.2f}%")
print("-" * 30)

In [None]:
plot_scree_pca(pca_tuned, 'FineTunedBERT')

In [None]:
pca_df_tuned = pd.DataFrame(pca_data_tuned, columns=['PC1', 'PC2'])

# assigning metadata (reset_index is important so that the rows match the PCA)
pca_df_tuned['text'] = test_df['text'].reset_index(drop=True)
pca_df_tuned['label_id'] = test_df['label'].reset_index(drop=True)

# Mapping category names
label_map = {
    0: "Safe",
    1: "S1",
    2: "S2",
    3: "S3",
    4: "S4",
    5: "S5",
    6: "S6",
    7: "S7",
    8: "S8",
    9: "S9",
    10: "S10",
    11: "S11",
    12: "S12",
    13: "S13",
    14: "S14"
}
# readable columns with names
pca_df_tuned['category_name'] = pca_df_tuned['label_id'].apply(lambda x: label_map.get(x, f"Class {x}"))
pca_df_tuned = pca_df_tuned.sort_values('label_id')

In [None]:
fig = px.scatter(
    pca_df_tuned,
    x='PC1',
    y='PC2',
    color='category_name',
    hover_data=['text'],
    title=f'PCA: Fine-Tuned BERT (15 KCategories) - Total Variance: {cumulative_variance_tuned*100:.1f}%',
    opacity=0.8,
    color_discrete_sequence=px.colors.qualitative.Dark24,
    labels={
        'PC1': f'PC1 ({per_var_tuned[0]:.1f}%)',
        'PC2': f'PC2 ({per_var_tuned[1]:.1f}%)',
        'category_name': 'Category'
    }
)

fig.update_traces(marker=dict(size=8))
fig.update_layout(width=1100,
    height=800,
    autosize=False,
    legend_title_text='Category',
    hoverlabel=dict(bgcolor="white", font_size=12)
)

fig.show()

### 3D

In [None]:
n_components_tuned_3d = 3
pca_tuned_3d = PCA(n_components=n_components_tuned_3d)
pca_tuned_3d.fit(scaled_vectors_tuned)
pca_data_tuned_3d = pca_tuned_3d.transform(scaled_vectors_tuned)

# calculating variation
per_var_tuned_3d = np.round(pca_tuned_3d.explained_variance_ratio_ * 100, decimals=1)
cumulative_variance_tuned_3d = np.sum(pca_tuned_3d.explained_variance_ratio_)

print(f"Variance Explained: PC1={per_var_tuned_3d[0]}%, PC2={per_var_tuned_3d[1]}%, PC3={per_var_tuned_3d[2]}%")
print(f"Total variance explained: {cumulative_variance_tuned_3d*100:.2f}%")

In [None]:
plot_scree_pca(pca_tuned_3d, 'FineTunedBERT')

In [None]:
df_3d_fine = pd.DataFrame(pca_data_tuned_3d, columns=['PC1', 'PC2', 'PC3'])

df_3d_fine['text'] = test_df['text'].reset_index(drop=True)
df_3d_fine['label_id'] = test_df['label'].reset_index(drop=True)

label_map = {
    0: "Safe",
    1: "S1",
    2: "S2",
    3: "S3",
    4: "S4",
    5: "S5",
    6: "S6",
    7: "S7",
    8: "S8",
    9: "S9",
    10: "S10",
    11: "S11",
    12: "S12",
    13: "S13",
    14: "S14"
}
df_3d_fine['category_name'] = df_3d_fine['label_id'].apply(lambda x: label_map.get(x, f"Class {x}"))

df_3d_fine = df_3d_fine.sort_values('label_id')

In [None]:
fig = px.scatter_3d(
    df_3d_fine,
    x='PC1', 
    y='PC2', 
    z='PC3',
    color='category_name',
    hover_data=['text'],
    title=f'PCA 3D: Fine-Tuned BERT (15 Categories)',
    opacity=0.7,
    color_discrete_sequence=px.colors.qualitative.Dark24
)

fig.update_traces(marker=dict(size=3))

fig.update_layout(
    width=1200,
    height=800,
    autosize=False,
    scene=dict(
        xaxis_title=f'PC1 ({per_var_tuned_3d[0]}%)',
        yaxis_title=f'PC2 ({per_var_tuned_3d[1]}%)',
        zaxis_title=f'PC3 ({per_var_tuned_3d[2]}%)'
    ),
    legend_title_text='Kategoria',
    margin=dict(l=0, r=0, b=0, t=40)
)

fig.show()

## t-SNE

In [None]:
import plotly.express as px
from sklearn.manifold import TSNE
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA

n_samples = scaled_vectors_tuned.shape[0]
n_comp = min(50, n_samples) 

pca_50 = PCA(n_components=n_comp, random_state=42)
pca_result_50 = pca_50.fit_transform(scaled_vectors_tuned)

def t_sne_vis_fnBERT(data_pca, pp, n_components=2):
    tsne = TSNE(n_components=n_components, verbose=1, perplexity=pp, random_state=42, init='pca', learning_rate='auto')
    tsne_results = tsne.fit_transform(data_pca)
    
    cols = ['x', 'y'] if n_components == 2 else ['x', 'y', 'z']
    df_tsne = pd.DataFrame(tsne_results, columns=cols)
    
    df_tsne['text'] = test_df['text'].reset_index(drop=True)
    df_tsne['label_id'] = test_df['label'].reset_index(drop=True)
    
    label_map = {
        0: "Safe", 1: "S1", 2: "S2", 3: "S3", 4: "S4", 5: "S5",
        6: "S6", 7: "S7", 8: "S8", 9: "S9", 10: "S10",
        11: "S11", 12: "S12", 13: "S13", 14: "S14"
    }
    df_tsne['category_name'] = df_tsne['label_id'].apply(lambda x: label_map.get(x, f"Class {x}"))
    df_tsne = df_tsne.sort_values('label_id')

    title_text = f't-SNE {n_components}D (Perplexity={pp}) - 15 Categories'

    if n_components == 2:
        fig = px.scatter(
            df_tsne, x='x', y='y',
            color='category_name',
            hover_data=['text'],
            title=title_text,
            color_discrete_sequence=px.colors.qualitative.Dark24, 
            opacity=0.8,
            labels={'category_name': 'Category'}
        )
    elif n_components == 3:
        fig = px.scatter_3d(
            df_tsne, x='x', y='y', z='z',
            color='category_name',
            hover_data=['text'],
            title=title_text,
            color_discrete_sequence=px.colors.qualitative.Dark24, 
            opacity=0.8,
            labels={'category_name': 'Category'}
        )
    else:
        print("Błąd: n_components musi wynosić 2 lub 3.")
        return

    marker_size = 7 if n_components == 2 else 4
    fig.update_traces(marker=dict(size=marker_size))
    
    fig.update_layout(
        width=1000,
        height=800,
        legend_title_text='Category',
        hoverlabel=dict(bgcolor="white", font_size=12)
    )
    
    fig.show()

In [None]:
t_sne_vis_fnBERT(pca_result_50, pp=30, n_components=2)

In [None]:
print("\nGenerating Perplexity comparison...")

n_samples = pca_result_50.shape[0]
proposed_perplexities = [5, 30, 50, 100]
perplexities = [p for p in proposed_perplexities if p < n_samples]

if len(perplexities) < 4:
    perplexities = [5, 15, 30, int(n_samples/2)]
    perplexities = sorted(list(set(perplexities)))

plt.figure(figsize=(20, 6))


categories = test_df['label'].reset_index(drop=True)

for i, perp in enumerate(perplexities):
    if i >= 4: break

    print(f"Calculating t-SNE for perplexity={perp}...")
    
    # runnng t-SNE on the data after PCA (from the previous step)
    tsne = TSNE(n_components=2, perplexity=perp, random_state=42, init='pca', learning_rate='auto')
    tsne_results = tsne.fit_transform(pca_result_50)
    
    ax = plt.subplot(1, len(perplexities), i+1)
    
    scatter = ax.scatter(
        tsne_results[:, 0], 
        tsne_results[:, 1], 
        c=categories, 
        cmap='tab20', 
        alpha=0.7, 
        s=20
    )
    
    ax.set_title(f'Perplexity: {perp}')
    ax.axis('off')

plt.suptitle("The impact of the Perplexity parameter on cluster structure (Test Set)", fontsize=16)
plt.tight_layout()
plt.show()

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

X = pca_result_50
y = test_df['label']

# vector quality test
clf = LogisticRegression(max_iter=1000)
clf.fit(X, y)
preds = clf.predict(X)
print(f"Linear separation quality (Accuracy): {accuracy_score(y, preds):.4f}")

### 3D

In [None]:
t_sne_vis_fnBERT(pca_result_50, pp=30, n_components=3)

## UMAP

In [None]:
vectors = np.stack(test_df['embedding'].values)

def run_and_plot_umap(vectors, n_components=2, n_neighbors=30, min_dist=0.9, embeddings=True, figure=True):

    reducer = umap.UMAP(
        n_neighbors=n_neighbors,
        min_dist=min_dist,
        n_components=n_components,
        metric='cosine',
        random_state=42
    )

    umap_embedding = reducer.fit_transform(vectors)

    cols = ['x', 'y'] if n_components == 2 else ['x', 'y', 'z']
    df_umap = pd.DataFrame(umap_embedding, columns=cols)

    df_umap['text'] = test_df['text'].reset_index(drop=True)
    df_umap['label_id'] = test_df['label'].reset_index(drop=True)

    label_map = {
        0: "Safe", 1: "S1", 2: "S2", 3: "S3", 4: "S4", 5: "S5",
        6: "S6", 7: "S7", 8: "S8", 9: "S9", 10: "S10",
        11: "S11", 12: "S12", 13: "S13", 14: "S14"
    }
    df_umap['category_name'] = df_umap['label_id'].apply(lambda x: label_map.get(x, f"Class {x}"))
    df_umap = df_umap.sort_values('label_id')


    if figure is True:
        title_text = f'UMAP {n_components}D (n_neighbors={n_neighbors}, min_dist={min_dist})'
        
        if n_components == 2:
            fig = px.scatter(
                df_umap, 
                x='x', y='y',
                color='category_name',
                hover_data=['text'],
                title=title_text,
                opacity=0.8,
                color_discrete_sequence=px.colors.qualitative.Dark24 
            )
        elif n_components == 3:
            fig = px.scatter_3d(
                df_umap, 
                x='x', y='y', z='z',
                color='category_name',
                hover_data=['text'],
                title=title_text,
                opacity=0.8,
                color_discrete_sequence=px.colors.qualitative.Dark24 
            )
        else:
            print("Error: n_components must be 2 or 3")
            return
    
        marker_size = 6 if n_components == 2 else 4
        
        fig.update_traces(marker=dict(size=marker_size))
        fig.update_layout(
            width=1000,
            height=800,
            legend_title_text='Category',
            hoverlabel=dict(bgcolor="white", font_size=12)
        )
    
        fig.show()
    
    if embeddings is True:
        return umap_embedding
    else:
        return

In [None]:
run_and_plot_umap(vectors, n_components=2, n_neighbors=30, min_dist=0.9, embeddings=False)

In [None]:
run_and_plot_umap(vectors, n_components=3, n_neighbors=30, min_dist=0.5, embeddings=False)

# Clustering using fine-tuned BERT vector representation (umap)

### kmeans

In [None]:
from sklearn.cluster import KMeans

In [None]:
umap_embedding_tuned = run_and_plot_umap(vectors, n_neighbors=30, embeddings=True, figure=False)

In [None]:
inertias = []
# checking the range around the expected number of 15
k_range = range(2, 20)

for k in k_range:
    kmeans_temp = KMeans(n_clusters=k, random_state=42, n_init=20)
    kmeans_temp.fit(umap_embedding_tuned)
    inertias.append(kmeans_temp.inertia_)

plt.figure(figsize=(10, 6))
plt.plot(k_range, inertias, 'bx-')
plt.xlabel('Number of clusters (k)')
plt.ylabel('Sum of square distances')
plt.title('Elbow method')
plt.axvline(x=15, color='r', linestyle='--', label='The actual number of classes (15)')
plt.legend()
plt.grid(True)
plt.savefig("Plots/Clustering/kmeans_elbow.png")
plt.show()

- Silhouette Score

In [None]:
import matplotlib.cm as cm
from sklearn.metrics import silhouette_score, silhouette_samples

X = umap_embedding_tuned 
silhouette_scores = []

range_n_clusters = range(2, 16)

for n_clusters in range_n_clusters:
    fig, ax1 = plt.subplots(1, 1)
    fig.set_size_inches(7, 5)

    ax1.set_xlim([-0.1, 1])
    ax1.set_ylim([0, len(X) + (n_clusters + 1) * 10])

    kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    cluster_labels = kmeans.fit_predict(X)

    silhouette_avg = silhouette_score(X, cluster_labels)
    silhouette_scores.append(silhouette_avg)
    print(f"For n_clusters = {n_clusters}, avg silhouette score: {silhouette_avg:.4f}")

    sample_silhouette_values = silhouette_samples(X, cluster_labels)

    y_lower = 10
    for i in range(n_clusters):
        ith_cluster_silhouette_values = sample_silhouette_values[cluster_labels == i]
        
        ith_cluster_silhouette_values.sort()

        size_cluster_i = ith_cluster_silhouette_values.shape[0]
        y_upper = y_lower + size_cluster_i

        color = cm.nipy_spectral(float(i) / n_clusters)
        
        ax1.fill_betweenx(np.arange(y_lower, y_upper),
                          0, ith_cluster_silhouette_values,
                          facecolor=color, edgecolor=color, alpha=0.7)

        ax1.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i))

        y_lower = y_upper + 10

    ax1.set_title(f" Silhouette for K = {n_clusters}")
    ax1.set_xlabel("Silhouette coefficient value")
    ax1.set_ylabel("Number of clusters")

    ax1.axvline(x=silhouette_avg, color="red", linestyle="--")

    ax1.set_yticks([]) 
    ax1.set_xticks([-0.1, 0, 0.2, 0.4, 0.6, 0.8, 1])

    plt.show()

plt.figure(figsize=(8, 5))
plt.plot(range_n_clusters, silhouette_scores, 'bo-', linewidth=2, markersize=8)
plt.title('Dependence of the average Silhouette Score on the number of clusters')
plt.xlabel('Number of clusters (K)')
plt.ylabel('Silhouette Score')
plt.grid(True)
plt.savefig("Plots/Clustering/kmeans_silhouette_scores.png")
plt.show()

In [None]:
kmeans_2 = KMeans(n_clusters=2, random_state=42, n_init=10)
labels_k2 = kmeans_2.fit_predict(umap_embedding_tuned)

df_k2 = pd.DataFrame(umap_embedding_tuned, columns=['x', 'y'])
df_k2['Cluster'] = labels_k2.astype(str)
df_k2['True_Label'] = test_df['label'].reset_index(drop=True)
label_map = {
    0: "Safe", 1: "S1 (Violence)", 2: "S2", 3: "S3", 4: "S4", 
    5: "S5", 6: "S6", 7: "S7", 8: "S8", 9: "S9", 
    10: "S10", 11: "S11", 12: "S12", 13: "S13", 14: "S14"
}
df_k2['Category_Name'] = df_k2['True_Label'].map(label_map).fillna("Unknown")
df_k2['Text'] = test_df['text'].reset_index(drop=True)

# Wykres K=2
fig2 = px.scatter(
    df_k2, x='x', y='y',
    color='Cluster',
    hover_data=['Category_Name', 'Text'],
    title='K-Means (k=2): Attempt at binary space partitioning',
    color_discrete_sequence=['#FF0000', '#0000FF'],
    opacity=0.7,
    width=1000,
    height=800
)
fig2.update_traces(marker=dict(size=6))
fig2.show()

In [None]:
from sklearn.metrics import adjusted_rand_score

kmeans_15 = KMeans(n_clusters=15, random_state=42, n_init=10)
labels_k15 = kmeans_15.fit_predict(umap_embedding_tuned)

ari = adjusted_rand_score(test_df['label'], labels_k15)
print(f"Adjusted Rand Index (ARI) dla k=15: {ari:.4f}")

df_k15 = df_k2.copy()
df_k15['Cluster'] = labels_k15.astype(str)
df_k15 = df_k15.sort_values('Cluster')

fig15 = px.scatter(
    df_k15, x='x', y='y',
    color='Cluster',
    hover_data=['Category_Name', 'Text'],
    title=f'K-Means (k=15): Reproduction of 15 categories (ARI={ari:.2f})',
    color_discrete_sequence=px.colors.qualitative.Dark24,
    opacity=0.8,
    width=1000,
    height=800
)
fig15.update_traces(marker=dict(size=6))
fig15.show()

In [None]:
kmeans_12 = KMeans(n_clusters=12, random_state=42, n_init=10)
labels_k12 = kmeans_12.fit_predict(umap_embedding_tuned)

ari_12 = adjusted_rand_score(test_df['label'], labels_k12)
print(f"Adjusted Rand Index (ARI) dla k=12: {ari_12:.4f}")


df_k12 = pd.DataFrame(umap_embedding_tuned, columns=['x', 'y'])
df_k12['Cluster'] = labels_k12
df_k12['True_Label_ID'] = test_df['label'].reset_index(drop=True)

label_map = {
    0: "Safe", 1: "S1 (Violence)", 2: "S2", 3: "S3", 4: "S4", 
    5: "S5", 6: "S6", 7: "S7", 8: "S8", 9: "S9", 
    10: "S10", 11: "S11", 12: "S12", 13: "S13", 14: "S14"
}
df_k12['Category_Name'] = df_k12['True_Label_ID'].map(label_map).fillna("Unknown")
df_k12['Text'] = test_df['text'].reset_index(drop=True)

print("\n" + "="*50)
print("CLASS MERGER REPORT")
print("="*50)

crosstab = pd.crosstab(df_k12['Category_Name'], df_k12['Cluster'])

for cluster_id in sorted(df_k12['Cluster'].unique()):
    cluster_composition = df_k12[df_k12['Cluster'] == cluster_id]['Category_Name'].value_counts()
    
    major_components = cluster_composition[cluster_composition > (cluster_composition.sum() * 0.1)]
    
    print(f"\nKlaster {cluster_id}:")
    if len(major_components) == 1:
        print(f"CLEAN: Class domination {major_components.index[0]} ({major_components.values[0]} samples)")
    else:
        print(f"MERGED:")
        for name, count in major_components.items():
            print(f"     - {name}: {count} samples")

df_k12['Cluster_Label'] = df_k12['Cluster'].astype(str)
df_k12 = df_k12.sort_values('Cluster')

fig12 = px.scatter(
    df_k12, x='x', y='y',
    color='Cluster_Label',
    hover_data=['Category_Name', 'Text'],
    title=f'K-Means (k=12): Analysis of combined classes (ARI={ari_12:.4f})',
    color_discrete_sequence=px.colors.qualitative.Dark24,
    opacity=0.8,
    width=1000,
    height=800
)
fig12.update_traces(marker=dict(size=6))
fig12.show()

### Hierarchical clustering

In [None]:
import matplotlib.pyplot as plt
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
from sklearn.metrics import silhouette_score, adjusted_rand_score
import numpy as np

X = umap_embedding_tuned 
y = test_df['label']

methods = ['single', 'complete', 'average', 'weighted', 'centroid', 'median', 'ward']

print(f"{'Method':<15} | {'Silhouette':<10} | {'ARI':<10}")
print("-" * 45)

for m in methods:
        linked = linkage(X, method=m)
        
        labels = fcluster(linked, t=15, criterion='maxclust')
        
        sil_score = silhouette_score(X, labels)
        
        ari_score = adjusted_rand_score(y, labels)
        
        print(f"{m:<15} | {sil_score:.3f} | {ari_score:.3f}")
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 6))
        
        dendrogram(linked, 
                   ax=ax1,
                   orientation='top',
                   distance_sort='descending',
                   show_leaf_counts=True,
                   truncate_mode='lastp', # We only show the last p clusters
                   p=40,                  # Number of branches at the bottom of the chart
                   show_contracted=True)  # Shows how many points there are in the ‘packed’ branches.
        
        ax1.set_title(f'Dendrogram ({m}) - shortened')
        ax1.set_xlabel("Cluster size or sample index")
        ax1.set_ylabel("Distance")

        scatter = ax2.scatter(X[:, 0], X[:, 1], c=labels, cmap='tab20', s=20, alpha=0.7)
        ax2.set_title(f'Method: {m}\nSilhouette: {sil_score:.3f} | ARI: {ari_score:.3f}')
        ax2.set_xlabel("UMAP Dimension 1")
        ax2.set_ylabel("UMAP Dimension 2")
        
        plt.tight_layout()
        if m == 'ward':
            plt.savefig('Plots/Clustering/HC_ward.png')
        plt.show()

### DBSCAN 

In [None]:
import pandas as pd
import numpy as np
import plotly.express as px
from sklearn.neighbors import NearestNeighbors

min_samples_val = 30

neighbors = NearestNeighbors(n_neighbors=min_samples_val)
neighbors_fit = neighbors.fit(umap_embedding_tuned)
distances, indices = neighbors_fit.kneighbors(umap_embedding_tuned)

sorted_distances = np.sort(distances[:, min_samples_val-1], axis=0)

df_knee = pd.DataFrame({
    'Index': np.arange(len(sorted_distances)),
    'Distance': sorted_distances
})

fig = px.line(
    df_knee, 
    x='Index', 
    y='Distance', 
    title=f'k-NN Chart (K-Nearest Neighbours Method) for k={min_samples_val}',
    labels={'Index': 'Points sorted by distance', 'Distance': 'Epsilon value (Distance)'}
)

fig.update_layout(
    hovermode="x unified",
    width=1000,
    height=600
)

fig.show()

In [None]:
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.metrics import silhouette_score, adjusted_rand_score

eps_candidates = np.arange(0.1, 2.5, 0.1)
best_score = -1
best_eps = -1
best_n_clusters = -1

print(f"{'EPS':<6} | {'Clusters':<8} | {'Noise':<6} | {'Silhouette':<10} | {'ARI':<10}")

for eps in eps_candidates:
    db = DBSCAN(eps=eps, min_samples=30)
    labels = db.fit_predict(umap_embedding_tuned)
    
    n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
    n_noise = list(labels).count(-1)
    
    if 1 < n_clusters < len(umap_embedding_tuned):
        sil = silhouette_score(umap_embedding_tuned, labels)
        ari = adjusted_rand_score(test_df['label'], labels)
        
        print(f"{eps:<6.1f} | {n_clusters:<8} | {n_noise:<6} | {sil:.4f}     | {ari:.4f}")
        
        if ari > best_score:
            best_score = ari
            best_eps = eps
            best_n_clusters = n_clusters
    else:
        print(f"{eps:<6.1f} | {n_clusters:<8} | {n_noise:<6} | {'-':<10} | {'-':<10}")

print(f"Best ARI score({best_score:.4f}) obtained for eps={best_eps:.1f}")

In [None]:
from sklearn.cluster import DBSCAN
import pandas as pd
import plotly.express as px

EPS_VALUE = 1.7     # best from research above
MIN_SAMPLES = 30

dbscan = DBSCAN(eps=EPS_VALUE, min_samples=MIN_SAMPLES)
labels_db = dbscan.fit_predict(umap_embedding_tuned)

df_db = pd.DataFrame(umap_embedding_tuned, columns=['x', 'y'])
df_db['Cluster'] = labels_db.astype(str)


df_db['Cluster_Legend'] = df_db['Cluster'].replace('-1', 'Noise (Outliers)')

df_db['Category_Name'] = test_df['label'].reset_index(drop=True).map(label_map).fillna("Unknown")
df_db['Text'] = test_df['text'].reset_index(drop=True)

df_db = df_db.sort_values('Cluster_Legend')

fig = px.scatter(
    df_db, x='x', y='y',
    color='Cluster_Legend',
    hover_data=['Category_Name', 'Text'],
    title=f'DBSCAN Clustering (eps={EPS_VALUE}): 6 Clasters + Noise',
    color_discrete_sequence=px.colors.qualitative.G10,
    opacity=0.8,
    width=1000,
    height=800
)

fig.update_traces(marker=dict(size=6))

fig.show()

# Outliers Detection

## Outliers using DBSCAN

In [None]:
noise_indices = np.where(labels_db == -1)[0]
noise_texts = test_df.iloc[noise_indices][['text', 'label']]

print(f"Number of outliers according to DBSCAN: {len(noise_texts)}")
print("\nSample texts from DBSCAN noise:")
print(noise_texts.head(10))

## Isolation Forest + Raw / Fine-tuned BERT

In [None]:
from sklearn.ensemble import IsolationForest
import pandas as pd
import plotly.express as px

iso_forest = IsolationForest(contamination=0.05, random_state=42)
labels_iso = iso_forest.fit_predict(umap_embedding_tuned)

n_outliers_iso = list(labels_iso).count(-1)
print(f"Number of outliers according to Isolation Forest: {n_outliers_iso}")

df_iso = pd.DataFrame(umap_embedding_tuned, columns=['x', 'y'])
df_iso['Anomaly'] = ['Outlier' if x == -1 else 'Normal' for x in labels_iso]
df_iso['Text'] = test_df['text'].reset_index(drop=True)
df_iso['Label'] = test_df['label'].reset_index(drop=True).map(label_map)

fig = px.scatter(
    df_iso, x='x', y='y',
    color='Anomaly',
    color_discrete_map={'Outlier': 'red', 'Normal': 'lightgrey'},
    hover_data=['Label', 'Text'],
    title='Isolation Forest Outlier Detection',
    opacity=0.8,
    width=1000,
    height=800
)
fig.show()

In [None]:
iso_outlier_indices = set(np.where(labels_iso == -1)[0])

dbscan_outlier_indices = set(np.where(labels_db == -1)[0])

common_outliers = iso_outlier_indices.intersection(dbscan_outlier_indices)

print(f"Unique DBSCAN outliers: {len(dbscan_outlier_indices)}")
print(f"Unique IsoForest outliers: {len(iso_outlier_indices)}")
print(f"Common section (CONFIRMED ANOMALIES): {len(common_outliers)}")

if len(common_outliers) > 0:
    print("\nMOST ANOMALOUS TEXTS (Detected by both methods)")
    common_indices_list = list(common_outliers)
    print(test_df.iloc[common_indices_list][['text', 'label']].head(18))

# Impact of Adversarial Changes

In [None]:
from scipy.spatial.distance import cosine
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np


half_point = len(test_df) // 2

embeddings_orig = embeddings_test[:half_point]
embeddings_adv = embeddings_test[half_point:]


texts_orig = test_df.iloc[:half_point]['text'].tolist()
texts_adv = test_df.iloc[half_point:]['text'].tolist()
labels_orig = test_df.iloc[:half_point]['label'].tolist()

print(f"divided data")
print(f"amount of pairs to compare: {len(embeddings_orig)}")


print("Calculating cosine distans for every pair")

distances = []
for i in range(len(embeddings_orig)):
    dist = cosine(embeddings_orig[i], embeddings_adv[i])
    distances.append(dist)

plt.figure(figsize=(10, 6))
sns.histplot(distances, bins=50, kde=True, color='purple')
plt.title('Adversarial Drift')
plt.xlabel('Cosine distance (Original vs Adversarial)')
plt.ylabel('Number of samples')
plt.grid(True, alpha=0.3)
plt.savefig("Plots/Adversial_Drift.png")
plt.show()


df_drift = pd.DataFrame({
    'Original': texts_orig,
    'Adversarial': texts_adv,
    'Distance': distances,
    'Label': labels_orig
})

top_movers = df_drift.sort_values('Distance', ascending=False).head(10)

print("\n" + "="*60)
print("TOP 10 CHANGES THAT MOST CONFUSED THE MODEL (High Drift):")
print("="*60)

for index, row in top_movers.iterrows():
    print(f"\n[Dystans: {row['Distance']:.4f}] Klasa: {row['Label']}")
    print(f"ORG: {row['Original']}")
    print(f"ADV: {row['Adversarial']}")

# Classification

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_recall_fscore_support
from sklearn.model_selection import train_test_split


try:
    df_pure_orig = pd.read_pickle("test_pure_orig_with_embeddings.pkl") # only original
    df_pure_adv = pd.read_pickle("test_pure_adv_with_embeddings.pkl")   # only adversarial
    df_mix = pd.read_pickle("test_df_mix_with_embeddings.pkl")          # Mix (Orig + Adv)
except FileNotFoundError:
    print("Error occured during data loading")
    df_pure_orig, df_pure_adv, df_mix = pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

def get_Xy(df):
    return np.vstack(df['embedding'].tolist()), df['label'].values


# method A
X_orig, y_orig = get_Xy(df_pure_orig)
X_train_A, X_test_A, y_train_A, y_test_A = train_test_split(X_orig, y_orig, test_size=0.3, random_state=42)

print("Training option A (on original prompts)...")
clf_A = LogisticRegression(max_iter=2000, n_jobs=-1, random_state=42)
clf_A.fit(X_train_A, y_train_A)

# Method B (mix)
X_mix_all, y_mix_all = get_Xy(df_mix)
X_train_B, X_test_B, y_train_B, y_test_B = train_test_split(X_mix_all, y_mix_all, test_size=0.3, random_state=42)

print("Training option B (on mixed data)...")
clf_B = LogisticRegression(max_iter=2000, n_jobs=-1, random_state=42)
clf_B.fit(X_train_B, y_train_B)

# evaluation

# Case 1: method A on ORIGINAL data
preds_1 = clf_A.predict(X_test_A)
y_true_1 = y_test_A
label_1 = "1. Model A (Baseline)\nTest: Original"

# Case 2: method A on ADVERSARIAL data
# (I take the entire adversarial set as a test, because I did not see it in method A.)
X_adv, y_adv = get_Xy(df_pure_adv)
preds_2 = clf_A.predict(X_adv)
y_true_2 = y_adv
label_2 = "2. Model A (Atak)\nTest: Adversarial"

# Case 3: Model B on MIX (Orig + Adv)
# (I am testing on a separate section X_test_B, which contains both types.)
preds_3 = clf_B.predict(X_test_B)
y_true_3 = y_test_B
label_3 = "3. Model B (Robust)\nTest: Mix (Orig+Adv)"

# results table

def calculate_metrics(y_true, y_pred, scenario_name):
    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted', zero_division=0)
    return {
        "Scenerio": scenario_name,
        "Accuracy": acc,
        "Precision": prec,
        "Recall": rec,
        "F1-Score": f1
    }

results_list = [
    calculate_metrics(y_true_1, preds_1, "Baseline (Orig)"),
    calculate_metrics(y_true_2, preds_2, "Under Attack (Adv)"),
    calculate_metrics(y_true_3, preds_3, "Robust Mix")
]

df_results = pd.DataFrame(results_list)

print("\nComparison Table")
print(df_results.set_index("Scenerio").round(4))

# visualisation
df_melted = df_results.melt(id_vars="Scenerio", var_name="Metric", value_name="Value")

plt.figure(figsize=(12, 6))
chart = sns.barplot(data=df_melted, x="Metric", y="Value", hue="Scenerio", palette=["green", "red", "blue"])
plt.title("Comparison of model effectiveness in three scenarios", fontsize=14)
plt.ylim(0, 1.1)
plt.ylabel("Value (0-1)")
plt.legend(loc='lower right', title="Scenerio")
plt.grid(axis='y', alpha=0.3)

for container in chart.containers:
    chart.bar_label(container, fmt='%.2f', padding=3, fontsize=9)

plt.tight_layout()
plt.savefig("Plots/Classification/MethodsComparison.png")
plt.show()

# confusion matrices
fig, axes = plt.subplots(1, 3, figsize=(24, 7))

scenarios = [
    (y_true_1, preds_1, label_1),
    (y_true_2, preds_2, label_2),
    (y_true_3, preds_3, label_3)
]

for i, (y_true, y_pred, title) in enumerate(scenarios):
    cm = confusion_matrix(y_true, y_pred)
    
    cm_norm = cm.astype('float') / (cm.sum(axis=1)[:, np.newaxis] + 1e-9)
    
    sns.heatmap(cm_norm, annot=True, fmt='.2f', cmap='Greens', ax=axes[i], cbar=False, vmin=0, vmax=1)
    
    axes[i].set_title(title + " (Normalized)", fontsize=12, fontweight='bold')
    axes[i].set_xlabel("Predicted class")
    axes[i].set_ylabel("True class")

plt.suptitle("Comparison of effectiveness (scale 0.00 - 1.00)", fontsize=16)
plt.tight_layout()
plt.savefig("Plots/Classification/ConfusionMatrices.png")
plt.show()

acc_base = df_results.iloc[0]['Accuracy']
acc_attack = df_results.iloc[1]['Accuracy']
acc_robust = df_results.iloc[2]['Accuracy']

drop = (acc_base - acc_attack) * 100
recovery = (acc_robust - acc_attack) * 100

print(f"\nSummary:")
print(f"1. Spadek jakości przez testowanie na adversarial: {drop:.2f} p.p. (Baseline vs adversarial)")
print(f"2. Improvement through training on Mix: {recovery:.2f} p.p. (Attack vs Robust Mix)")
if acc_robust > acc_base * 1.01:
    print("3. Conclusion: SUCCESS! The Robust (Mix) model achieved SIGNIFICANTLY BETTER results than the baseline model.")
    print(f"   (Increase by {(acc_robust - acc_base)*100:.2f} p.p. comparing to the original).")
    print("   Adding adversarial examples worked like data augmentation, improving generalisation.")
elif acc_robust >= acc_base * 0.95:
    print("3. Conclusion: The Robust (Mix) model has regained almost full efficiency of the base model.")
else:
    print("3. Conclusion: The Robust (Mix) model performs better than the attacked model, but still worse than on clean data.")

In [None]:
from sklearn.ensemble import RandomForestClassifier


print("\nRandom Forest vs Logistic Regression")


rf_clf = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)

# Trening
rf_clf.fit(X_train_B, y_train_B)

# prediction
rf_preds = rf_clf.predict(X_test_B)

rf_acc = accuracy_score(y_test_B, rf_preds)
lr_acc = df_results.iloc[2]['Accuracy']

print(f"\nResult for Logistic Regression: {lr_acc:.4f}")
print(f"Result for Random Forest:       {rf_acc:.4f}")

diff = (rf_acc - lr_acc) * 100
if diff > 0:
    print(f"Conclusion: Random Forest is better by {diff:.2f} p.p.")
else:
    print(f"Conclusion: Random Forest is worse by {abs(diff):.2f} p.p.")