In [26]:
import os
import tensorflow as tf
from transformers import BertTokenizer, TFBertForSequenceClassification
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

In [27]:
class CustomTFBertForSequenceClassification(TFBertForSequenceClassification):
    def train_step(self, data):
        x, y, sample_weight = self.unpack_data(data)

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)
            loss = self.compiled_loss(y, y_pred, sample_weight, regularization_losses=self.losses)

        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        self.compiled_metrics.update_state(y, y_pred, sample_weight)
        
        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        x, y, sample_weight = self.unpack_data(data)

        y_pred = self(x, training=False)
        loss = self.compiled_loss(y, y_pred, sample_weight, regularization_losses=self.losses)

        self.compiled_metrics.update_state(y, y_pred, sample_weight)

        return {m.name: m.result() for m in self.metrics}

    def unpack_data(self, data):
        if len(data) == 2:
            return data[0], data[1], None
        elif len(data) == 3:
            return data
        else:
            raise ValueError("Unexpected number of elements in `data`")

In [28]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = CustomTFBertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

All PyTorch model weights were used when initializing CustomTFBertForSequenceClassification.

Some weights or buffers of the TF 2.0 model CustomTFBertForSequenceClassification were not initialized from the PyTorch model and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [29]:
# Function to load texts, labels, and file names
def load_texts_labels_and_filenames(base_directory):
    texts = []
    labels = []
    file_names = []
    label_map = {}
    label_index = 0
    
    for label in os.listdir(base_directory):
        label_dir = os.path.join(base_directory, label)
        if os.path.isdir(label_dir):
            if label not in label_map:
                label_map[label] = label_index
                label_index += 1
            for filename in os.listdir(label_dir):
                filepath = os.path.join(label_dir, filename)
                if os.path.isfile(filepath) and filename.endswith('.txt'):
                    with open(filepath, 'r', encoding='utf-8') as file:
                        texts.append(file.read())
                        labels.append(label_map[label])
                        file_names.append(filename)  # Store the file name
    
    return texts, labels, label_map, file_names

In [30]:
def tokenize_texts(texts, tokenizer, max_len=128):
    input_ids = []
    attention_masks = []
    for text in texts:
        encoded_dict = tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=max_len,
            padding='max_length',
            return_attention_mask=True,
            truncation=True,
            return_tensors='tf'
        )
        input_ids.append(encoded_dict['input_ids'])
        attention_masks.append(encoded_dict['attention_mask'])
    return tf.concat(input_ids, axis=0), tf.concat(attention_masks, axis=0)

In [31]:
from sklearn.model_selection import StratifiedShuffleSplit

# Base directory where your data is stored
base_directory = 'data_v3'  # Replace with the actual path
texts, labels, label_map, file_names = load_texts_labels_and_filenames(base_directory)

# Tokenize texts
input_ids, attention_masks = tokenize_texts(texts, tokenizer)

# Convert labels into tensors
labels = tf.convert_to_tensor(labels)

input_ids_np = input_ids.numpy()
attention_masks_np = attention_masks.numpy()
labels_np = labels.numpy()

# Use StratifiedShuffleSplit to maintain category distribution
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.3, random_state=42)

for train_index, temp_index in sss.split(input_ids_np, labels_np):
    train_inputs, temp_inputs = input_ids_np[train_index], input_ids_np[temp_index]
    train_labels, temp_labels = labels_np[train_index], labels_np[temp_index]
    train_masks, temp_masks = attention_masks_np[train_index], attention_masks_np[temp_index]
    train_file_names, temp_file_names = [file_names[i] for i in train_index], [file_names[i] for i in temp_index]

# Now split the temp set into validation and test sets, stratifying again
sss_val_test = StratifiedShuffleSplit(n_splits=1, test_size=0.5, random_state=42)

for val_index, test_index in sss_val_test.split(temp_inputs, temp_labels):
    validation_inputs, test_inputs = temp_inputs[val_index], temp_inputs[test_index]
    validation_labels, test_labels = temp_labels[val_index], temp_labels[test_index]
    validation_masks, test_masks = temp_masks[val_index], temp_masks[test_index]
    validation_file_names, test_file_names = [temp_file_names[i] for i in val_index], [temp_file_names[i] for i in test_index]

# Convert everything back to tensors if needed for the model input
train_inputs = tf.convert_to_tensor(train_inputs)
validation_inputs = tf.convert_to_tensor(validation_inputs)
test_inputs = tf.convert_to_tensor(test_inputs)

train_masks = tf.convert_to_tensor(train_masks)
validation_masks = tf.convert_to_tensor(validation_masks)
test_masks = tf.convert_to_tensor(test_masks)

train_labels = tf.convert_to_tensor(train_labels)
validation_labels = tf.convert_to_tensor(validation_labels)
test_labels = tf.convert_to_tensor(test_labels)

In [32]:
print(validation_masks.shape)
print(train_masks.shape)
print(test_masks.shape)

(22, 128)
(105, 128)
(23, 128)


In [33]:
# Define the optimizer, loss function, and metrics
optimizer = Adam(learning_rate=2e-5, epsilon=1e-8)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metric = tf.keras.metrics.SparseCategoricalAccuracy('accuracy')



In [34]:
model.compile(optimizer=optimizer, loss=loss, metrics=[metric])

# Train the model
history = model.fit(
    [train_inputs, train_masks],
    train_labels,
    validation_data=([validation_inputs, validation_masks], validation_labels),
    epochs=4,
    batch_size=4
)



Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4


In [35]:
# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate([test_inputs, test_masks], test_labels)
print(f"Test Accuracy: {test_accuracy}")

Test Accuracy: 1.0


In [36]:
# Making predictions on the test set (optional)
predictions = model.predict([test_inputs, test_masks])
predicted_labels = tf.argmax(predictions.logits, axis=-1)
print("Predicted labels on the test set:", predicted_labels.numpy())





Predicted labels on the test set: [0 0 1 0 1 1 0 1 0 0 0 0 0 1 0 1 0 0 1 0 0 1 0]


In [37]:
predicted_labels_np = predicted_labels.numpy()
true_labels_np = test_labels

# Compute the confusion matrix
conf_matrix = confusion_matrix(true_labels_np, predicted_labels_np)

print("Confusion Matrix:")
for row in conf_matrix:
    print(' '.join(map(str, row)))

# Optionally print the labels for reference
print("\nLabel mapping (index -> label name):")
for label_name, index in label_map.items():
    print(f"{index}: {label_name}")

Confusion Matrix:
15 0
0 8

Label mapping (index -> label name):
0: neutral
1: commands


In [38]:
commands_label = label_map['commands']  # Assuming "commands" is one of the labels

predictions = model.predict([test_inputs, test_masks])
predicted_labels = tf.argmax(predictions.logits, axis=-1)

predicted_labels_np = predicted_labels.numpy()

# Define the label index for commands
commands_label = label_map['commands']

commands_indices = np.where(predicted_labels_np == commands_label)[0]

commands_filenames = [test_file_names[i] for i in commands_indices]

print("File names classified as commands:")
for fname in commands_filenames:
    print(fname)

File names classified as commands:
keys_3.txt
eraser_4.txt
keys_6.txt
keys_12.txt
eraser_10.txt
keys_4.txt
eraser_14.txt
eraser_21.txt


In [39]:
# Optionally, use the new model for further processing
if len(commands_indices) > 0:
    commands_inputs = tf.gather(test_inputs, commands_indices)
    commands_masks = tf.gather(test_masks, commands_indices)

    # Load the new model for further processing
    new_model = CustomTFBertForSequenceClassification.from_pretrained('models/trained_v1')  # Replace with your new model path
    new_tokenizer = BertTokenizer.from_pretrained('models/trained_v1')  # Load new tokenizer if different

    # Predict using the new model on the command inputs
    new_predictions = new_model.predict([commands_inputs, commands_masks])
    new_predicted_labels = tf.argmax(new_predictions.logits, axis=-1)
    
    print("Predicted labels from the new model for command inputs:", new_predicted_labels.numpy())
else:
    print("No inputs classified as commands.")

Some layers from the model checkpoint at models/trained_v1 were not used when initializing CustomTFBertForSequenceClassification: ['dropout_37']
- This IS expected if you are initializing CustomTFBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing CustomTFBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of CustomTFBertForSequenceClassification were initialized from the model checkpoint at models/trained_v1.
If your task is similar to the task the model of the checkpoint was trained on, you can already use CustomTFBertForSequenceClassification for predictions without further training.


Predicted labels from the new model for command inputs: [2 0 2 2 0 2 0 0]
