## 0. Load necessary libraries

In [1]:
#!/usr/bin/env python

'''TODO: add high-level description of this Python script'''

import re
import json
# import argparse <- don't need this in colab
import numpy as np
import pandas as pd
from keras.models import Sequential
from keras.layers import Dense, Embedding, LSTM
from keras.initializers import Constant
from sklearn.metrics import accuracy_score, f1_score, classification_report, precision_score, recall_score
from sklearn.preprocessing import LabelBinarizer
from sklearn.utils.class_weight import compute_class_weight

from tensorflow.keras.optimizers import SGD
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Embedding, LSTM, Dropout, TextVectorization, Bidirectional
from tensorflow.keras.initializers import Constant
import tensorflow as tf

# Set random seeds for reproducibility
import random as python_random

## 1. Set up reproducble random seeds

In [2]:
# Make reproducible as much as possible
np.random.seed(1234)
tf.random.set_seed(1234)
python_random.seed(1234)

## 2. Path definitions
Argparth is not supported in colab, which is replaced by direct path definitions

In [3]:
TRAIN_PATH = '/kaggle/input/off-set-rnn/train.tsv'
DEV_PATH = '/kaggle/input/off-set-rnn/dev.tsv'
TEST_PATH = '/kaggle/input/off-set-rnn/test.tsv'
EMBEDDINGS_PATH = '/kaggle/input/glove-embedding/glove.twitter.27B.200d.txt'

## 3. Corpus loading

In [4]:
# Add preprocessing function to clean tweets
def clean_tweet(text):
    # Convert to lowercase
    #text = text.lower()

    # Replace user mentions
    text = re.sub(r'@[A-Za-z0-9_]+', '<user>', text)

    # Replace URLs
    text = re.sub(r'https?://\S+|www\.\S+', '<url>', text)

    # Replace hashtags but keep the text
    text = re.sub(r'#([A-Za-z0-9_]+)', r'\1', text)

    # Replace repeated characters (e.g., 'hellooooo' -> 'helloo')
    text = re.sub(r'(.)\1{2,}', r'\1\1', text)

    return text


In [5]:
def read_corpus(corpus_file):
    '''Read in dataset and returns docs and labels'''
    documents = []
    labels = []

    print(f"Reading corpus from {corpus_file}")

    try:
        # Read the TSV file
        df = pd.read_csv(corpus_file, sep='\t', header=None)

        # Extract text and labels
        documents = df[0].apply(clean_tweet).tolist()  # First column is text
        labels = df[1].tolist()     # Second column is label

        print(f"Read {len(documents)} documents")
        print(f"Label distribution: {pd.Series(labels).value_counts()}")

    except Exception as e:
        print(f"Error reading corpus: {str(e)}")
        return [], []

    return documents, labels

## 4. Load GloVe embedding

In [6]:
def read_embeddings(embeddings_file):
    '''Read in word embeddings from file and save as numpy array'''
    embeddings = json.load(open(embeddings_file, 'r'))
    return {word: np.array(embeddings[word]) for word in embeddings}

def read_embeddings_from_txt(embeddings_file):
    '''Read in word embeddings from a text file and save as numpy array'''
    embeddings = {}
    with open(embeddings_file, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.array(values[1:], dtype=float)
            embeddings[word] = vector
    return embeddings

In [7]:
def get_emb_matrix(voc, emb):
    '''Get embedding matrix given vocab and the embeddings'''
    num_tokens = len(voc) + 2
    word_index = dict(zip(voc, range(len(voc))))
    # Bit hacky, get embedding dimension from the word "the"
    embedding_dim = len(emb["the"])
    # Prepare embedding matrix to the correct size
    embedding_matrix = np.zeros((num_tokens, embedding_dim))
    for word, i in word_index.items():
        embedding_vector = emb.get(word)
        if embedding_vector is not None:
            # Words not found in embedding index will be all-zeros.
            embedding_matrix[i] = embedding_vector
    # Final matrix with pretrained embeddings that we can feed to embedding layer
    return embedding_matrix

## 5. Set up model

In [8]:
def create_model(Y_train, emb_matrix):
    '''Create the Keras model to use'''
    # Define settings, you might want to create cmd line args for them
    learning_rate = 0.0005 # 0.001 -> 0.0005
    loss_function = 'binary_crossentropy' # Changed from 'categorical_crossentropy' because of 2-class problem
    optim = tf.keras.optimizers.Adam(learning_rate=learning_rate,
                                     clipnorm = 1.0) # Changed from SGD to Adam

    # Take embedding dim and size from emb_matrix
    embedding_dim = len(emb_matrix[0])
    num_tokens = len(emb_matrix)

    #num_labels = len(set(Y_train)) <- this is removed by claude

    # Now build the model
    model = Sequential()

    # Embedding layer -- keeping embeddings frozen (trainable=False)
    model.add(Embedding(num_tokens,
                        embedding_dim,
                        embeddings_initializer=Constant(emb_matrix),
                        trainable=False))

    # Add LSTM layers with dropout
#     model.add(Bidirectional(LSTM(units = 100, # 100 -> 128 -> 256 -> 100
#                    return_sequences = True, # Return full sequence for next layer
#                    dropout = 0.3, # Dropout on inputs 0.2 -> 0.3
#                    recurrent_dropout = 0.3,))) # Dropout on recurrent connections 0.2 -> 0.3

#     # Add another LSTM layer
#     model.add(Bidirectional(LSTM(units = 50, # 50 -> 64 -> 128 -> 50
#                    dropout = 0.2,
#                    recurrent_dropout = 0.2,)))
    
    model.add((LSTM(units = 100, # 100 -> 128 -> 256 -> 100
                   return_sequences = True, # Return full sequence for next layer
                   dropout = 0.3, # Dropout on inputs 0.2 -> 0.3
                   recurrent_dropout = 0.3,))) # Dropout on recurrent connections 0.2 -> 0.3

    # Add another LSTM layer
    model.add((LSTM(units = 50, # 50 -> 64 -> 128 -> 50
                   dropout = 0.2,
                   recurrent_dropout = 0.2,)))

    # Dense layer -- Ultimately, end with dense layer with softmax
    model.add(Dense(50,
                   activation='relu',
                   kernel_regularizer=tf.keras.regularizers.l2(0.01)))  # Fixed syntax
    model.add(Dropout(0.3))

    model.add(Dense(32,
                   activation='relu',
                   kernel_regularizer=tf.keras.regularizers.l2(0.01)))
    model.add(Dropout(0.4))  # Increased dropout before final layer

    # Output layer
        model.add(Dense(units=1, activation="sigmoid"))

    # Compile model using our settings, check for accuracy
    model.compile(loss=loss_function,
                  optimizer=optim,
                  metrics=['accuracy'])
    return model

## 6. Model training

In [9]:
def train_model(model, X_train, Y_train, X_dev, Y_dev):
    '''Train the model here. Note the different settings you can experiment with!'''
    # Potentially change these to cmd line args again
    # And yes, don't be afraid to experiment!
    verbose = 1
    batch_size = 32 # 16 -> 32 -> 64 -> 32
    epochs = 50 # 50 -> 10 -> 50 -> 30 -> 50

    # Add learning rate scheduling
    lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor = 'val_loss',
                                                        factor = 0.1, # 0.5 -> 0.2 -> 0.1
                                                        patience = 2,
                                                        verbose = 1,
                                                        min_lr = 1e-6,
                                                        min_delta = 0.0001,
                                                        )

    # Create callback to stop training early if no improvement
    checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("best_model.keras",
                                                       save_best_only=True,
                                                       monitor='val_accuracy',) # Use val_accuracy to monitor

    # Early stopping callback
    early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=7,
                                                         monitor='val_accuracy',
                                                         restore_best_weights = True)

    # Add class weights
    class_weights = compute_class_weight(class_weight='balanced',
                                         classes=np.unique(Y_train),
                                         y=Y_train)
    class_weight_dict = dict(enumerate(class_weights))

    # Finally fit the model to our data
    history = model.fit(
        X_train, Y_train,
        class_weight = class_weight_dict,
        verbose=verbose,
        epochs=epochs,
        callbacks=[checkpoint_cb, early_stopping_cb, lr_scheduler],
        batch_size=batch_size,
        validation_data=(X_dev, Y_dev))



    return model, history

## 7. Testing and evaluating predictions

In [10]:
def test_set_predict(model, X_test, Y_test, ident):
    '''Do predictions and measure accuracy on our own test set (that we split off train)'''
    # Get predictions using the trained model
    Y_pred = model.predict(X_test)

    # Finally, convert to numerical labels to get scores with sklearn
    Y_pred_binary = (Y_pred > 0.5).astype(int)

    # Calculate accuracy
    accuracy = accuracy_score(Y_test, Y_pred_binary)

    # Calculate F1 score
    f1 = f1_score(Y_test, Y_pred_binary, average='macro')
    precision = precision_score(Y_test, Y_pred_binary, average='macro')
    recall = recall_score(Y_test, Y_pred_binary, average='macro')

    # Print out the results
    print(f'Results on {ident} set:')
    print(f"Accuracy on {ident} set: {accuracy:.4f}")
    print(f"Macro F1 score on {ident} set: {f1:.4f}")
    print(f"Macro Recall score on {ident} set: {recall:.4f}")
    print(f"Macro Precision score on {ident} set: {precision:.4f}")
    print('\nClassification Report:')
    print(classification_report(Y_test, Y_pred_binary))

    return accuracy, f1

## 8. Main execution

In [11]:

def main():
    '''Main function to train and test neural network given cmd line arguments'''

    # Read in the data and embeddings
    X_train, Y_train = read_corpus(TRAIN_PATH)
    X_dev, Y_dev = read_corpus(DEV_PATH)
    X_test, Y_test = read_corpus(DEV_PATH)

    print('Reading embeddings...')
    embeddings = read_embeddings_from_txt(EMBEDDINGS_PATH)

    # Transform words to indices using a vectorizer
    print("Vectorizing data...")
    vectorizer = TextVectorization(standardize='lower_and_strip_punctuation',
                                   output_sequence_length=75) # Needs to adjust based on test results

    # Use train and dev to create vocab - could also do just train
    text_ds = tf.data.Dataset.from_tensor_slices(X_train + X_dev + X_test)
    vectorizer.adapt(text_ds)

    # Dictionary mapping words to idx
    voc = vectorizer.get_vocabulary()

    # Get embedding matrix
    emb_matrix = get_emb_matrix(voc, embeddings)

    # Transform string labels to one-hot encodings (binary)
    label_dict = {'NOT': 0, 'OFF': 1}
    Y_train =  np.array([label_dict[y] for y in Y_train])  # NOT Use encoder.classes_ to find mapping back
    Y_dev = np.array([label_dict[y] for y in Y_dev])
    Y_test = np.array([label_dict[y] for y in Y_test])

    # Vectorize text data
    X_train_vect = vectorizer(np.array([[s] for s in X_train])).numpy()
    X_dev_vect = vectorizer(np.array([[s] for s in X_dev])).numpy()
    X_test_vect = vectorizer(np.array([[s] for s in X_test])).numpy()

    # Create model
    #model = create_model(Y_train, emb_matrix) <- Claude removed it

    # Transform input to vectorized input
    # Create and train model
    print('Creating model...')
    model = create_model(Y_train, emb_matrix)
    print('Training model...')
    model, history = train_model(model, X_train_vect, Y_train, X_dev_vect, Y_dev)

    # Train the model
    #model = train_model(model, X_train_vect, Y_train_bin, X_dev_vect, Y_dev_bin) <- clause removed it

    return model, history, X_dev_vect, Y_dev, X_test_vect, Y_test






 # Do predictions on specified test set -- test if specified

In [12]:
# # Do predictions on specified test set -- test if specified
# if TEST_PATH:
#     # Read in test set and vectorize
#     print('Predicting on test set...')
#     X_test, Y_test = read_corpus(TEST_PATH)
#     Y_test = np.array([label_dict[y] for y in Y_test])
#     X_test_vect = vectorizer(np.array([[s] for s in X_test])).numpy()

#     # Finally do the predictions
#     test_set_predict(model, X_test_vect, Y_test, "test")

## 9. Add visualisation to training history

In [13]:
def plot_training_history(history):
    """Plot training history"""
    import matplotlib.pyplot as plt

    # Plot training & validation accuracy values
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')

    # Plot training & validation loss values
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')

    plt.tight_layout()
    plt.show()

## Results & Evaluation


In [14]:
if __name__ == '__main__':
    # Train model and get history
    model, history, X_dev_vect, Y_dev, X_test_vect, Y_test = main()

    # Get final evaluation results
    print("\nFinal Evaluation Results")
    test_set_predict(model, X_test_vect, Y_test, 'test')

    # Plot training history
    plot_training_history(history)

Reading corpus from /kaggle/input/off-set-rnn/train.tsv
Read 12240 documents
Label distribution: NOT    8192
OFF    4048
Name: count, dtype: int64
Reading corpus from /kaggle/input/off-set-rnn/dev.tsv
Read 1000 documents
Label distribution: NOT    648
OFF    352
Name: count, dtype: int64
Reading corpus from /kaggle/input/off-set-rnn/dev.tsv
Read 1000 documents
Label distribution: NOT    648
OFF    352
Name: count, dtype: int64
Reading embeddings...


KeyboardInterrupt: 