# Anomaly Detection and Interpretation from Tabular Data

## Anomaly Detector

In [1]:
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments, AdamW
from datasets import Dataset
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import random
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score
import time

2024-08-09 19:27:18.437742: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-08-09 19:27:19.023548: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-08-09 19:27:23.329491: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-08-09 19:27:23.345078: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# Data preparation
file_path = 'ecoli.csv'
# Make sure to drop ID column if any
df = pd.read_csv(file_path).drop('ID', axis=1)

def row_to_sentence(row):
    return ','.join([f'[{col}:{val}]' for col, val in row.items()])

# Assuming the label column is called 'Class'
df['sentence'] = df.drop('Class', axis=1).apply(row_to_sentence, axis=1)
sentences = df['sentence'].tolist()

df['Class'].value_counts().shape

(8,)

In [4]:
# Generating vocab
def generate_vocab(df):
    vocab = {}
    column_values = {}
    for col in df.columns:
        if col !='sentence':
            unique_values = set(df[col].unique())
            column_values[col] = {str(val): None for val in unique_values}  # Convert values to strings

    # Assign token IDs
    token_id = 0
    for col, values in column_values.items():
        for val in values:
            token = f'[{col}:{val}]'
            vocab[token] = token_id
            token_id += 1

    # Special tokens
    vocab["<OOV>"] = token_id
    vocab["<PAD>"] = token_id + 1

    return vocab, column_values

In [5]:
# Custom Tokenizer based on the closest method
def find_closest_known_token(col_val_pair, column_values, threshold=0.5):
    col, val = col_val_pair.strip('[]').split(':')
    val = float(val)  # Convert value to float for comparison
    
    closest_val = None
    closest_diff = float('inf')
    for known_val in column_values[col]:
        known_val_float = float(known_val)
        diff = abs(known_val_float - val)
        if diff < closest_diff and diff <= threshold:
            closest_val = known_val
            closest_diff = diff

    if closest_val is not None:
        return f'[{col}:{closest_val}]'
    else:
        return "<OOV>"

def is_number(string):
    try:
        float(string)
        return True
    except ValueError:
        return False

def custom_tokenizer(examples, return_tensors="pt"):
    tokenized_outputs = []
    numerical_values = []

    for sentence in examples:
        tokens = sentence.split(',')
        token_ids = []
        nums = []
        for idx, token in enumerate(tokens):
            if token in vocab:
                token_id = vocab[token]
            else:
                # Handle OOV tokens
                closest_token = find_closest_known_token(token, column_values)
                token_id = vocab.get(closest_token, vocab["<OOV>"])
            if is_number(token.split(':')[1].strip(']')):  # Check if the token contains a number
                nums.append((idx, float(token.split(':')[1].strip(']'))))
            token_ids.append(token_id)
        tokenized_outputs.append(token_ids)
        numerical_values.append(nums)

    # Compatibility with `datasets` library
    return {"input_ids": tokenized_outputs, "numerical_values": numerical_values}


# Generate vocab and column values
vocab, column_values = generate_vocab(df)
# Constructing id_to_token by reversing vocab
id_to_token = {id: token for token, id in vocab.items()}

def convert_ids_to_tokens(token_ids):
    return [id_to_token.get(id, "<OOV>") for id in token_ids]

# Test tokenizer
text = df['sentence'].iloc[5]
print(text)

# Tokenize the text
inputs = custom_tokenizer([text])

# Inspect the tokens to see the effect of whole word masking
tokens = convert_ids_to_tokens(inputs["input_ids"][0])
print(tokens)

[MCG:0.67],[GVH:0.39],[LIP:0.48],[CHG:0.5],[AAC:0.36],[ALM1:0.38],[ALM2:0.46]
['[MCG:0.67]', '[GVH:0.39]', '[LIP:0.48]', '[CHG:0.5]', '[AAC:0.36]', '[ALM1:0.38]', '[ALM2:0.46]']


In [6]:
# Prepare for training
# Assuming df['Class'] contains labels and df['sentence'] contains the text
train_sentences, val_sentences, train_labels, val_labels = train_test_split(df['sentence'], df['Class'], test_size=0.1, random_state=42)

train_dataset = Dataset.from_dict({'text': train_sentences.tolist(), 'labels': train_labels.tolist()})
val_dataset = Dataset.from_dict({'text': val_sentences.tolist(), 'labels': val_labels.tolist()})

# Tokenize the datasets
train_dataset = train_dataset.map(lambda examples: custom_tokenizer(examples['text']), batched=True)
val_dataset = val_dataset.map(lambda examples: custom_tokenizer(examples['text']), batched=True)

# The collate_fn function is used in the DataLoader to specify how a list of data samples should be merged into a single batch during the training process. 
def collate_fn(batch):
    max_len = max(len(item['input_ids']) for item in batch)
    input_ids = torch.tensor([item['input_ids'] + [vocab["<PAD>"]] * (max_len - len(item['input_ids'])) for item in batch])
    labels = torch.tensor([item['labels'] for item in batch])

    # Prepare padded numerical values and indices
    padded_numerical_values = []
    padded_num_indices = []
    for item in batch:
        nums = item['numerical_values']
        indices = [idx for idx, _ in nums]
        values = [val for _, val in nums]
        padded_num_indices.append(indices + [-1] * (max_len - len(indices)))  # Pad with -1 to ignore during multiplication
        padded_numerical_values.append(values + [0] * (max_len - len(values)))  # Pad with 0
    
    numerical_values = torch.tensor(padded_numerical_values, dtype=torch.float)
    num_indices = torch.tensor(padded_num_indices, dtype=torch.long)

    return {'input_ids': input_ids, 'labels': labels, 'numerical_values': numerical_values, 'num_indices': num_indices, 'max_len': max_len}


train_loader = DataLoader(train_dataset, batch_size=64, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=64, collate_fn=collate_fn)

# Initialize custom embeddings
vocab_size = len(vocab)
embedding_dim = 768  # Adjust as needed
custom_embeddings = torch.FloatTensor(np.random.rand(vocab_size, embedding_dim))

# Set random seeds for reproducibility
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
torch.cuda.manual_seed_all(SEED)  # If using CUDA

# Convert custom embeddings to a FloatTensor
custom_embeddings_tensor = torch.tensor(custom_embeddings, dtype=torch.float)

# Load the model
model = BertForSequenceClassification.from_pretrained('albert-base-v2', num_labels=2, output_attentions=True)

# Replace the model's word embeddings with the custom embeddings
model.bert.embeddings.word_embeddings = nn.Embedding(num_embeddings=custom_embeddings_tensor.size(0), embedding_dim=custom_embeddings_tensor.size(1))
model.bert.embeddings.word_embeddings.weight = nn.Parameter(custom_embeddings_tensor)

# Optionally, freeze the custom embeddings to prevent them from being updated during training
# model.bert.embeddings.word_embeddings.weight.requires_grad = False

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Optimizer
optimizer = AdamW(model.parameters(), lr=5e-5)



  0%|          | 0/1 [00:00<?, ?ba/s]

  0%|          | 0/1 [00:00<?, ?ba/s]

  custom_embeddings_tensor = torch.tensor(custom_embeddings, dtype=torch.float)
You are using a model of type albert to instantiate a model of type bert. This is not supported for all configurations of models and can yield errors.
Some weights of the model checkpoint at albert-base-v2 were not used when initializing BertForSequenceClassification: ['albert.pooler.weight', 'predictions.bias', 'albert.pooler.bias', 'albert.embeddings.LayerNorm.weight', 'albert.encoder.albert_layer_groups.0.albert_layers.0.full_layer_layer_norm.weight', 'albert.encoder.albert_layer_groups.0.albert_layers.0.full_layer_layer_norm.bias', 'albert.encoder.albert_layer_groups.0.albert_layers.0.ffn.bias', 'predictions.LayerNorm.bias', 'albert.encoder.embedding_hidden_mapping_in.weight', 'albert.encoder.albert_layer_groups.0.albert_layers.0.attention.LayerNorm.weight', 'albert.embeddings.token_type_embeddings.weight', 'albert.embeddings.position_embeddings.weight', 'albert.encoder.albert_layer_groups.0.albert_laye

In [None]:
# Training
start_time = time.time()
model.train()
for epoch in range(3):  # number of epochs
    total_loss = 0
    for batch in tqdm(train_loader):
        input_ids = batch['input_ids'].to(device)
        labels = batch['labels'].to(device)
        numerical_values = batch['numerical_values'].to(device)
        num_indices = batch['num_indices'].to(device)
        max_len = batch['max_len']
        
        # Get the embeddings for the input_ids
        embeddings = model.bert.embeddings.word_embeddings(input_ids)
        
        # Multiply the embeddings by the actual numerical values
        for batch_idx in range(input_ids.size(0)):
            for token_idx in range(max_len):
                if num_indices[batch_idx, token_idx] != -1:  # Ignore padded indices
                    embeddings[batch_idx, token_idx] *= numerical_values[batch_idx, token_idx]

        # Recompute the logits with the updated embeddings
        model_output = model(inputs_embeds=embeddings, labels=labels)
        loss = model_output.loss
        total_loss += loss.item()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    print(f"Epoch {epoch+1} finished. Total Loss: {total_loss}")
training_time=time.time() - start_time

In [None]:
# Evaluation
model.eval()
total_eval_accuracy = 0
all_true_labels = []
all_predictions = []

for batch in tqdm(val_loader):
    input_ids = batch['input_ids'].to(device)
    labels = batch['labels'].to(device)
    numerical_values = batch['numerical_values'].to(device)
    num_indices = batch['num_indices'].to(device)
    max_len = batch['max_len']

    # Get the embeddings for the input_ids
    embeddings = model.bert.embeddings.word_embeddings(input_ids)
    
    # Multiply the embeddings by the actual numerical values
    for batch_idx in range(input_ids.size(0)):
        for token_idx in range(max_len):
            if num_indices[batch_idx, token_idx] != -1:  # Ignore padded indices
                embeddings[batch_idx, token_idx] *= numerical_values[batch_idx, token_idx]

    # Recompute the logits with the updated embeddings
    with torch.no_grad():
        model_output = model(inputs_embeds=embeddings, labels=labels)
        logits = model_output.logits

    predictions = torch.argmax(logits, dim=-1)
    accuracy = (predictions == labels).cpu().numpy().mean()  # Simplified accuracy calculation
    total_eval_accuracy += accuracy

    all_true_labels.extend(labels.cpu().numpy())
    all_predictions.extend(predictions.cpu().numpy())

# Calculate accuracy and F1 score
bert_accuracy = total_eval_accuracy / len(val_loader)
bert_f1 = f1_score(all_true_labels, all_predictions, average='binary')  # Use 'binary' for binary classification

print(f"Validation Accuracy: {bert_accuracy}")
print(f"Validation F1 Score: {bert_f1}")


## Anomaly Interpreter

In [None]:
import torch
import torch.nn.functional as F

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

def get_prediction_attention(sentence):
    # Wrap the sentence in a list to match the expected input of custom_tokenizer
    inputs = custom_tokenizer([sentence])  # Pass a list with a single sentence
    
    # Convert list of token IDs to a tensor and add a batch dimension
    input_ids = torch.tensor(inputs['input_ids'], dtype=torch.long).to(device)
    
    # Create an attention mask for the input_ids
    attention_mask = torch.tensor([[1]*len(input_ids[0])], dtype=torch.long).to(device)
    
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        # Model forward pass
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, output_attentions=True)

    logits = outputs.logits
    attentions = outputs.attentions

    probs = F.softmax(logits, dim=-1)
    predicted_class = torch.argmax(probs, dim=-1).cpu().numpy()[0]  # Unwrap batch dimension
    predicted_prob = probs[0, predicted_class].item()  # Adjusted for batch

    return predicted_class, predicted_prob, attentions

# Example usage:
example_sentence = df['sentence'].iloc[5]  
prediction, probability, attention_weights = get_prediction_attention(example_sentence)
print("Prediction:", prediction)
print("Probability:", probability)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

def visualize_attention(input_text, attentions):
    # Tokenize the input text using the custom tokenizer
    # Assume `custom_tokenizer` returns token IDs for a batch of sentences
    tokenized_input = custom_tokenizer([input_text])['input_ids'][0]  # Process a single sentence
    
    # Convert token IDs back to tokens using `id_to_token`
    tokens = [id_to_token[id] for id in tokenized_input]
    
    # Assuming attentions is a list of tensors, one for each layer
    # Get the attentions for the last layer and mean across heads
    attention = attentions[-1].squeeze(0).mean(0)  # [seq_len, seq_len]
    
    # Convert to numpy for visualization
    attention_np = attention.cpu().detach().numpy()

    # Plotting
    plt.figure(figsize=(10, 8))
    sns.heatmap(attention_np, xticklabels=tokens, yticklabels=tokens, cmap='viridis')
    plt.show()

# Example usage
# Assuming `attention_weights` is obtained from the `get_prediction_attention` function
visualize_attention(example_sentence, attention_weights)


In [None]:
from collections import defaultdict
#Create Association Matrix from Training Data
# An association matrix M can capture the average attention between all pairs of words across the training data. 
# This function iterates over training sentences, calculate attention, and then aggregate these to form M.


def create_full_association_matrix(sentences, model, tokenizer):
    """
    Create an association matrix for a subset of tokens based on attention weights across all sentences.
    
    Args:
    sentences (list of str): List of sentences to process.
    model (transformers model): Pre-trained transformer model.
    tokenizer (transformers tokenizer): Custom tokenizer for the model.
    
    Returns:
    np.ndarray: An association matrix capturing the average association between token pairs.
    """
    token_counts = defaultdict(int)
    association_matrix = np.zeros((vocab_size, vocab_size), dtype=np.float32)
    
    model.eval()  # Ensure model is in evaluation mode
    model.to(device)  # Ensure model is on the correct device
    
    for sentence in sentences:
        # Using custom tokenizer here
        inputs = tokenizer([sentence])  # Assuming tokenizer processes a list of sentences
        input_ids = inputs["input_ids"][0]  # Assuming the tokenizer returns 'input_ids' directly
        
        # Convert token IDs to a tensor, add batch dimension, and move to the device
        input_ids_tensor = torch.tensor([input_ids], dtype=torch.long).to(device)
        
        with torch.no_grad():
            outputs = model(input_ids=input_ids_tensor, output_attentions=True)
        attention = outputs.attentions[-1].squeeze(0).mean(dim=0)  # Get last layer's attention, average over all heads
        
        for i, id1 in enumerate(input_ids):
            for j, id2 in enumerate(input_ids):
                if id1 < vocab_size and id2 < vocab_size:  # Ensure indices are within the vocab size
                    token_counts[id1] += 1
                    token_counts[id2] += 1
                    # Increment association matrix by attention value, ensuring conversion to scalar with `.item()`
                    association_matrix[id1, id2] += attention[i, j].item()
    
    # Normalize the association scores
    for i in range(vocab_size):
        for j in range(vocab_size):
            total_occurrences = token_counts[i] + token_counts[j]
            if total_occurrences > 0:  # Avoid division by zero
                association_matrix[i, j] /= total_occurrences
    
    return association_matrix



In [None]:
# We are creating association matrix only for normal (i.e., class = 0) training data
normal_df=df[df['Class']==0]
normal_sentences = normal_df['sentence'].tolist()

start_time = time.time()
M = create_full_association_matrix(normal_sentences, model, custom_tokenizer)
time_to_build_M=time.time() - start_time

In [None]:
# Take a look at the association Matrix, specially the distribution of values
plt.hist(M.flatten(), bins=50, log=True)
plt.title('Distribution of Association Scores')
plt.xlabel('Association Score')
plt.ylabel('Frequency (log scale)')
plt.show()


In [None]:
# Set threshold value for associated words
threshold = np.mean(M.flatten())#np.percentile(M.flatten(), 75)
violation_threshold=threshold#np.percentile(M.flatten(), 75)
print(violation_threshold)

In [None]:
#Identify Associated Words
def identify_high_attention_pairs(attention_matrix, tokens):
    high_attention_pairs = []
    seq_len = attention_matrix.shape[0]
    
    for i in range(seq_len):
        for j in range(i + 1, seq_len):  # Consider only pairs without repetition and self-attention
            if attention_matrix[i, j] > threshold:  # Correct indexing for numpy arrays
                token1, token2 = tokens[i], tokens[j]
                if token1 not in ['[PAD]', '[SEP]', '[CLS]'] and token2 not in ['[PAD]', '[SEP]', '[CLS]']:  # Filter out special tokens if present
                    high_attention_pairs.append((token1, token2))
    
    return high_attention_pairs

In [None]:
def process_test_sentence(sentence):
    # Assuming 'model', 'device', and 'custom_tokenizer' are defined globally
    predicted_class, predicted_prob, attentions = get_prediction_attention(sentence)
    # Convert token IDs back to tokens
    inputs = custom_tokenizer([sentence])
    token_ids = inputs['input_ids'][0]
    tokens = [id_to_token.get(id, "<UNK>") for id in token_ids]  # Assuming 'id_to_token' mapping exists globally
    
    if predicted_class == 0 and not ("<OOV>" in tokens):
        return "normal", []

    # Flatten attentions from the last layer and average across heads
    attention_matrix = attentions[-1].squeeze(0).mean(dim=0).cpu().numpy()
    

    
    high_attention_pairs = identify_high_attention_pairs(attention_matrix, tokens)
    
    return "anomalous", high_attention_pairs




In [None]:
def check_association_violations(high_attention_pairs, association_matrix, violation_threshold=violation_threshold):
    violations = []
    # Adjust matrix_size to consider the additional <OOV> and <PAD> tokens
    matrix_size = vocab_size

    for word1, word2 in high_attention_pairs:
        id1 = vocab.get(word1, vocab["<OOV>"])  # Use <OOV> ID for unknown tokens
        id2 = vocab.get(word2, vocab["<OOV>"])

        # Ensure both IDs are within the bounds of the association matrix
        if id1 < matrix_size and id2 < matrix_size:
            if association_matrix[id1, id2] < violation_threshold or word1 == "<OOV>" or word2 == "<OOV>":
                violations.append((word1, word2))
        else:
            # This should not happen given the adjustment for <OOV> and <PAD>, but left for safety
            print(f"Skipping pair ({word1}, {word2}) with IDs ({id1}, {id2}) outside association matrix bounds.")

    return violations



In [None]:
# Example Test
example_sentence = df['sentence'].iloc[15]
print(example_sentence)

status, high_attention_pairs = process_test_sentence(example_sentence)
print(f"Status: {status}")
if status == "anomalous":
    print("Highly associated words based on attention weights:", high_attention_pairs)
    # Assuming M is your association matrix and it's already initialized
    violations = check_association_violations(high_attention_pairs, M)
    if violations:
        print("Violations of association rules:", violations)
    else:
        print("No violations of association rules.")
else:
    print("Sentence is normal.")


## Mutation Analysis
To perform mutation analysis on the dataset and calculate the mutation score, we follow these steps. The code will select 5% of the normal data, randomly mutate two column values per row, and then check if these mutations lead to the data being classified as anomalous and whether the mutated pairs are in the set of violations. The mutation score is calculated as the ratio of mutants killed (those that result in anomalies with violations) to the total number of mutants.



In [None]:
import random

# Assuming df is your DataFrame and M is your association matrix

# Select 5% of normal data
normal_data = df[df['Class'] == 0].sample(frac=0.05)

def mutate_sentence(sentence):
    columns = sentence.split('],[')
    if columns:
        columns[0] = columns[0][1:]  # Remove the first '['
        columns[-1] = columns[-1][:-1]  # Remove the last ']'
    
    mutation_indices = random.sample(range(len(columns)), 2)
    mutated_columns = []  # Track mutated columns
    
    for idx in mutation_indices:
        col_val_split = columns[idx].split(':')
        col = col_val_split[0]
        mutated_val = str(random.randint(1,2))
        

        columns[idx] = f"{col}:{mutated_val}"
        mutated_columns.append(f"[{col}:{mutated_val}]")  # Store mutated column for comparison
    
    mutated_sentence = '[' + '],['.join(columns) + ']'
    return mutated_sentence, mutated_columns

killed_mutants_1 = 0
killed_mutants_2 = 0
total_mutants = len(normal_data)  # Correct total_mutants calculation

for sentence in normal_data['sentence']:
    mutated_sentence, mutated_columns = mutate_sentence(sentence)

    status, high_attention_pairs = process_test_sentence(mutated_sentence)
    print(f"Status: {status}")
    if status == "anomalous":
        print("Highly associated words based on attention:", high_attention_pairs)
        killed_mutants_1 += 1
        violations = check_association_violations(high_attention_pairs, M)
        if violations:
            print("Violations of association rules found:", violations)
            # Check if any mutated column is in the violations
            if any("<OOV>" in pair for violation in violations for pair in violation) or any(col in pair for violation in violations for pair in violation for col in mutated_columns):
               killed_mutants_2 += 1

        else:
            print("No violations of association rules.")
    else:
        print("Sentence is normal.")

mutation_score_1 = (killed_mutants_1 / total_mutants) * 100
mutation_score_2 = (killed_mutants_2 / total_mutants) * 100
print(f"Mutation Score for anomaly detection: {mutation_score_1:.2f}%")
print(f"Mutation Score for violation detection: {mutation_score_2:.2f}%")

## Compare with other models

In [None]:
# LSTM
from keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Embedding, LSTM, Dense
from keras.callbacks import EarlyStopping
from sklearn.metrics import accuracy_score, f1_score

# Assume the same data preprocessing steps as before, resulting in train_sentences and train_labels

# Tokenize the text for LSTM
max_words = 5000  # This is the size of the vocabulary
max_len = 50  # This should be adjusted to the length of the longest sentence after tokenization

tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(train_sentences)
sequences = tokenizer.texts_to_sequences(train_sentences)
padded_sequences = pad_sequences(sequences, maxlen=max_len)

# Define the LSTM model
lstm_model = Sequential()
lstm_model.add(Embedding(max_words, 128, input_length=max_len))
lstm_model.add(LSTM(64))
lstm_model.add(Dense(1, activation='sigmoid'))

lstm_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the LSTM model
early_stopping = EarlyStopping(monitor='val_loss', patience=3)
lstm_model.fit(padded_sequences, train_labels, batch_size=32, epochs=10, validation_split=0.1, callbacks=[early_stopping])

# Prepare the validation data
val_sequences = tokenizer.texts_to_sequences(val_sentences)
val_padded_sequences = pad_sequences(val_sequences, maxlen=max_len)
val_labels = np.array(val_labels)

lstm_preds = (lstm_model.predict(val_padded_sequences) > 0.5).astype('int32').flatten()

lstm_accuracy = accuracy_score(val_labels, lstm_preds)
lstm_f1 = f1_score(val_labels, lstm_preds)


In [None]:
# MLP
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.callbacks import EarlyStopping
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score

# Assuming df is your DataFrame and the last column 'Class' is the label

# Separate features and labels
X = df.drop(['Class','sentence'], axis=1).values
y = df['Class'].values

# Standardize the features
scaler = StandardScaler()
X_standardized = scaler.fit_transform(X)

# Split the dataset into a training and a validation set
X_train, X_val, y_train, y_val = train_test_split(X_standardized, y, test_size=0.1, random_state=42)

# Define the MLP model
mlp_model = Sequential()
mlp_model.add(Dense(64, activation='relu', input_shape=(X_train.shape[1],)))
mlp_model.add(Dropout(0.5))
mlp_model.add(Dense(32, activation='relu'))
mlp_model.add(Dropout(0.5))
mlp_model.add(Dense(1, activation='sigmoid'))

mlp_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the MLP model with early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=5)
mlp_model.fit(X_train, y_train, batch_size=32, epochs=100, validation_split=0.1, callbacks=[early_stopping])

# Make predictions with the MLP model
mlp_preds = (mlp_model.predict(X_val) > 0.5).astype('int32').flatten()

# Evaluate the MLP model
mlp_accuracy = accuracy_score(y_val, mlp_preds)
mlp_f1 = f1_score(y_val, mlp_preds)



# Summary of Evaluation Results

In [None]:
print(file_path)

In [None]:

# Compare the models
comparison_dict = {
    'Model': ['BERT', 'LSTM','MLP'],
    'Accuracy': [bert_accuracy, lstm_accuracy,mlp_accuracy],
    'F1 Score': [bert_f1, lstm_f1, mlp_f1]
}

comparison_df = pd.DataFrame(comparison_dict)
comparison_df

In [None]:
# Mutation Scores
print(mutation_score_1, mutation_score_2)

In [None]:
# Time (s)
print(training_time)
print(time_to_build_M)