In [2]:
!pip install tensorflow
!pip install datasets

Collecting tensorflow
  Downloading tensorflow-2.19.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Collecting astunparse>=1.6.0 (from tensorflow)
  Downloading astunparse-1.6.3-py2.py3-none-any.whl.metadata (4.4 kB)
Collecting flatbuffers>=24.3.25 (from tensorflow)
  Downloading flatbuffers-25.2.10-py2.py3-none-any.whl.metadata (875 bytes)
Collecting google-pasta>=0.1.1 (from tensorflow)
  Downloading google_pasta-0.2.0-py3-none-any.whl.metadata (814 bytes)
Collecting libclang>=13.0.0 (from tensorflow)
  Downloading libclang-18.1.1-py2.py3-none-manylinux2010_x86_64.whl.metadata (5.2 kB)
Collecting tensorboard~=2.19.0 (from tensorflow)
  Downloading tensorboard-2.19.0-py3-none-any.whl.metadata (1.8 kB)
Collecting tensorflow-io-gcs-filesystem>=0.23.1 (from tensorflow)
  Downloading tensorflow_io_gcs_filesystem-0.37.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (14 kB)
Collecting wheel<1.0,>=0.23.0 (from astunparse>=1.6.0->tensorflow

In [26]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, classification_report
import pickle
import os
from datasets import load_dataset
import RNN
import importlib
importlib.reload(RNN)
from RNN import SimpleRNN


In [4]:
vectorize_layer = None
models = {}
histories = {}
vocab_size = 10000
sequence_length = 100
embedding_dim = 128
results={}

In [5]:
#Load dataset
dataset = load_dataset("indonlp/NusaX-senti", "ind")

# Extract texts and labels from the dataset
train_data = dataset['train']
val_data = dataset['validation']
test_data = dataset['test']

# Convert to lists
train_texts = train_data['text']
train_labels = train_data['label']
val_texts = val_data['text']
val_labels = val_data['label']
test_texts = test_data['text']
test_labels = test_data['label']

print(f"Train size: {len(train_texts)}")
print(f"Validation size: {len(val_texts)}")
print(f"Test size: {len(test_texts)}")

unique_labels, counts = np.unique(train_labels, return_counts=True)
print(f"Label distribution in training set:")
for label, count in zip(unique_labels, counts):
    print(f"  Label {label}: {count} samples")



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/5.61k [00:00<?, ?B/s]

NusaX-senti.py:   0%|          | 0.00/4.39k [00:00<?, ?B/s]

0000.parquet:   0%|          | 0.00/54.2k [00:00<?, ?B/s]

0000.parquet:   0%|          | 0.00/14.1k [00:00<?, ?B/s]

0000.parquet:   0%|          | 0.00/43.7k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/500 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/100 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/400 [00:00<?, ? examples/s]

Train size: 500
Validation size: 100
Test size: 400
Label distribution in training set:
  Label 0: 192 samples
  Label 1: 119 samples
  Label 2: 189 samples


In [6]:
vectorize_layer = TextVectorization(
    max_tokens=vocab_size,
    output_sequence_length=sequence_length,
    output_mode='int'
)

# Adapt to training data
vectorize_layer.adapt(train_texts)

# Create datasets
train_ds = tf.data.Dataset.from_tensor_slices((train_texts, train_labels))
train_ds = train_ds.map(lambda x, y: (vectorize_layer(x), y))
train_ds = train_ds.batch(32).prefetch(tf.data.AUTOTUNE)

val_ds = tf.data.Dataset.from_tensor_slices((val_texts, val_labels))
val_ds = val_ds.map(lambda x, y: (vectorize_layer(x), y))
val_ds = val_ds.batch(32).prefetch(tf.data.AUTOTUNE)

test_ds = tf.data.Dataset.from_tensor_slices((test_texts, test_labels))
test_ds = test_ds.map(lambda x, y: (vectorize_layer(x), y))
test_ds = test_ds.batch(32).prefetch(tf.data.AUTOTUNE)

In [7]:
def create_model(num_rnn_layers=1, rnn_units=64, bidirectional=False):

    model = keras.Sequential()

    # Embedding layer
    model.add(layers.Embedding(
        input_dim=vocab_size,
        output_dim=embedding_dim,
        input_length=sequence_length
    ))

    # RNN layers
    for i in range(num_rnn_layers):
        return_sequences = (i < num_rnn_layers - 1)  # Only last layer returns single output

        if bidirectional:
            model.add(layers.Bidirectional(
                layers.SimpleRNN(rnn_units, return_sequences=return_sequences, dropout=0.2)
            ))
        else:
            model.add(layers.SimpleRNN(rnn_units, return_sequences=return_sequences, dropout=0.2))

    # Dropout layer
    model.add(layers.Dropout(0.5))

    # Determine number of classes dynamically
    num_classes = len(set(train_labels))
    print(f"Number of classes detected: {num_classes}")

    # Dense layer for classification
    model.add(layers.Dense(num_classes, activation='softmax'))

    # Compile model
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model


In [None]:
def variations_train():
    #variasi konfigurasi model
    epochs = 10

    #number of rnn layers
    layer_configs = [1, 2, 3]

    for num_layers in layer_configs:
        model_name = f"layers_{num_layers}"
        print(f"Training model with {num_layers} RNN layer(s)")

        model = create_model(
            num_rnn_layers=num_layers,
            rnn_units=64,
            bidirectional=False
        )

        history = model.fit(
            train_ds,
            validation_data=val_ds,
            epochs=epochs,
            verbose=1
        )

        models[model_name] = model
        histories[model_name] = history
        predictions = model.predict(test_ds)
        predicted_labels = np.argmax(predictions, axis=1)

        # Calculate macro F1 score
        f1_macro = f1_score(test_labels, predicted_labels, average='macro')

        results[model_name] = {
            'f1_macro': f1_macro,
            'predictions': predicted_labels
        }

    #rnn units per layer amount
    unit_configs = [32, 64, 128]

    for units in unit_configs:
        model_name = f"units_{units}"
        print(f"Training model with {units} RNN units")

        model = create_model(
            num_rnn_layers=1,
            rnn_units=units,
            bidirectional=False
        )

        history = model.fit(
            train_ds,
            validation_data=val_ds,
            epochs=epochs,
            verbose=1
        )

        models[model_name] = model
        histories[model_name] = history
        predictions = model.predict(test_ds)
        predicted_labels = np.argmax(predictions, axis=1)

        # Calculate macro F1 score
        f1_macro = f1_score(test_labels, predicted_labels, average='macro')

        results[model_name] = {
            'f1_macro': f1_macro,
            'predictions': predicted_labels
        }

    #bidirectional vs not
    direction_configs = [False, True]

    for is_bidirectional in direction_configs:
        direction_name = "bidirectional" if is_bidirectional else "unidirectional"
        model_name = f"direction_{direction_name}"
        print(f"Training {direction_name} model")

        model = create_model(
            num_rnn_layers=1,
            rnn_units=64,
            bidirectional=is_bidirectional
        )

        history = model.fit(
            train_ds,
            validation_data=val_ds,
            epochs=epochs,
            verbose=1
        )

        models[model_name] = model
        histories[model_name] = history
        predictions = model.predict(test_ds)
        predicted_labels = np.argmax(predictions, axis=1)

        # Calculate macro F1 score
        f1_macro = f1_score(test_labels, predicted_labels, average='macro')

        results[model_name] = {
            'f1_macro': f1_macro,
            'predictions': predicted_labels
        }

In [None]:
def plot_training_curves():
    #plot loss
    plt.figure(figsize=(15, 12))

    # Group models by variation type
    layer_models = [name for name in models.keys() if name.startswith('layers_')]
    unit_models = [name for name in models.keys() if name.startswith('units_')]
    direction_models = [name for name in models.keys() if name.startswith('direction_')]

    # Plot layer variation
    plt.subplot(2, 3, 1)
    for model_name in layer_models:
        history = histories[model_name]
        plt.plot(history.history['loss'], label=f'{model_name} train')
    plt.title('Training Loss - Layer Variation')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(2, 3, 2)
    for model_name in layer_models:
        history = histories[model_name]
        plt.plot(history.history['val_loss'], label=f'{model_name} val')
    plt.title('Validation Loss - Layer Variation')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # Plot unit variation
    plt.subplot(2, 3, 3)
    for model_name in unit_models:
        history = histories[model_name]
        plt.plot(history.history['loss'], label=f'{model_name} train')
    plt.title('Training Loss - Unit Variation')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(2, 3, 4)
    for model_name in unit_models:
        history = histories[model_name]
        plt.plot(history.history['val_loss'], label=f'{model_name} val')
    plt.title('Validation Loss - Unit Variation')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # Plot direction variation
    plt.subplot(2, 3, 5)
    for model_name in direction_models:
        history = histories[model_name]
        plt.plot(history.history['loss'], label=f'{model_name} train')
    plt.title('Training Loss - Direction Variation')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(2, 3, 6)
    for model_name in direction_models:
        history = histories[model_name]
        plt.plot(history.history['val_loss'], label=f'{model_name} val')
    plt.title('Validation Loss - Direction Variation')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()


In [10]:
def save_models(models):
    """Save trained models and weights"""
    print("\nSaving models...")

    if not os.path.exists('rnn_models'):
        os.makedirs('rnn_models')

    for model_name, model in models.items():
        model.save(f'rnn_models/{model_name}.keras')
        print(f"Saved {model_name}.keras")

    # with open('models/vectorize_layer.pkl', 'wb') as f:
    #     pickle.dump({
    #         'vocab_size': vocab_size,
    #         'sequence_length': sequence_length,
    #         'vocabulary': vectorize_layer.get_vocabulary()
    #     }, f)


In [11]:
print(test_ds)

<_PrefetchDataset element_spec=(TensorSpec(shape=(None, None), dtype=tf.int64, name=None), TensorSpec(shape=(None,), dtype=tf.int32, name=None))>


In [None]:
variations_train()
plot_training_curves()
save_models(models)
xs=[]
xy=[]
for batch_x, batch_y in test_ds:
  xs.append(batch_x)
  xy.append(batch_y)
x_array = np.concatenate(xs, axis=0)
y_array = np.concatenate(xy, axis=0)

#test scratch implementation
for model_name in os.listdir("rnn_models"):
    model_path = os.path.join("rnn_models", model_name)
    if os.path.isfile(model_path) and (model_name.endswith('.keras')):
        print(f"MODEL {model_name}")
        model = tf.keras.models.load_model(model_path)
        if "bidirectional" in model_name:
            rnn = SimpleRNN(bidirectional=True)
        elif "layers_" in model_name:
            rnn = SimpleRNN(num_layers=int(model_name.split("_")[1].split(".")[0]))
        else:
            rnn = SimpleRNN()
        rnn.load_keras_weights(model)
        keras_prediction = model.predict(test_ds)
        keras_prediction_label = np.argmax(keras_prediction, axis = 1)

        rnn_prediction = rnn.predict(x_array)

        keras_f1=f1_score(test_labels, keras_prediction_label, average='macro')
        rnn_f1=f1_score(test_labels, rnn_prediction, average='macro')

        print(f"Keras f1score: {keras_f1:.4f}")
        print(f"Scratch f1score: {rnn_f1:.4f}")
        print(f"Difference: {abs(rnn_f1-keras_f1):.4f}")

        match = np.sum(keras_prediction_label == rnn_prediction)
        percentage = (match/len(keras_prediction_label))*100
        print(f"Match percentage: {percentage:.4f}%")





MODEL direction_unidirectional.keras
Loading weights from Keras model...
Loaded weights - Vocab: 10000, Embedding: 128, RNN units: 64, Classes: 3, Layers: 1, Bidirectional: False
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
Keras f1score: 0.3758
Scratch f1score: 0.3758
Difference: 0.0000
Match percentage: 100.0000%
MODEL units_128.keras
Loading weights from Keras model...
Loaded weights - Vocab: 10000, Embedding: 128, RNN units: 128, Classes: 3, Layers: 1, Bidirectional: False
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
Keras f1score: 0.2840
Scratch f1score: 0.2840
Difference: 0.0000
Match percentage: 100.0000%
MODEL direction_bidirectional.keras
Loading weights from Keras model...
Loaded weights - Vocab: 10000, Embedding: 128, RNN units: 64, Classes: 3, Layers: 1, Bidirectional: True
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 34ms/step
Keras f1score: 0.4656
Scratch f1score: 0.4656
Difference: 0.0000
Match 