In [None]:
from IPython.display import clear_output
import warnings
warnings.filterwarnings('ignore')

In [None]:
!pip install transformers

In [None]:
import pandas as pd
import numpy as np

file = "/kaggle/input/sentiment140/training.1600000.processed.noemoticon.csv"
df = pd.read_csv(file, encoding='ISO-8859-1', usecols=[0,5], header=None)\
        .sample(frac=0.3, random_state=42)

df.columns = ['label','sentence']
df.label = df.label.apply(lambda x: np.float64(1) if x==4 else np.float64(x))

print("df.shape =",df.shape)
print(f"label distribution :\n{df.label.value_counts()}")
print(df.head())

In [None]:
from transformers import AutoTokenizer, TFAutoModel

checkpoint = "google/mobilebert-uncased"

tokenizer = AutoTokenizer.from_pretrained(checkpoint)
model = TFAutoModel.from_pretrained(checkpoint, output_hidden_states=True)
clear_output()

In [None]:
from sklearn.model_selection import train_test_split
import tensorflow as tf

sequences, test_val_sequences = train_test_split(df, test_size=0.3,
                                             stratify=df.label, random_state=44)
val_sequences, test_sequences = train_test_split(test_val_sequences, test_size=0.7,
                                             stratify=test_val_sequences.label, random_state=44)
dataset = {
    "TRAIN": sequences['sentence'].values.tolist(),
    "TEST": test_sequences['sentence'].values.tolist(),
    "VAL": val_sequences['sentence'].values.tolist()
}
targets = {
    "TRAIN": sequences['label'].values.tolist(),
    "TEST": test_sequences['label'].values.tolist(),
    "VAL": val_sequences['label'].values.tolist()
}

In [None]:
def tokenization(data, **kwargs):
    return tokenizer(data, 
                   padding=kwargs.get('padding','longest'), 
                   max_length=kwargs.get('max_length',55),
                   truncation=True, 
                   return_tensors="tf")

In [None]:
def get_model(**kwargs):
    global model
    max_seq_length = kwargs.get('max_seq_length',55)

    # Load tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained('google/mobilebert-uncased')
    

    input_ids = tf.keras.Input(shape=(max_seq_length,), dtype='int32', name='input_ids')
    attention_mask = tf.keras.Input(shape=(max_seq_length,), dtype='int32', name='attention_mask')

    # Tokenize inputs and pass them through the MobileBERT model
    inputs = {'input_ids': input_ids, 'attention_mask': attention_mask}
    outputs = model(inputs)
    pooler_output = outputs['pooler_output']

    # Model Head
    h1 = tf.keras.layers.Dense(128, activation='relu')(pooler_output)
    dropout = tf.keras.layers.Dropout(0.2)(h1)
    output = tf.keras.layers.Dense(1, activation='sigmoid')(dropout)

    # Create and compile the new model
    new_model = tf.keras.models.Model(inputs=[input_ids, attention_mask], outputs=output)
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
    loss = tf.keras.losses.BinaryCrossentropy(from_logits=False)
    metrics = [tf.keras.metrics.BinaryAccuracy()]
    new_model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

    return new_model

In [None]:
from sklearn.metrics import classification_report

def test_result(model):    
    test_inputs = tokenization(dataset["TEST"])
    result_proba = model.predict([test_inputs.input_ids, test_inputs.attention_mask])
    result = [1 if x>0.5 else 0 for x in result_proba.ravel()]
    print(classification_report(targets['TEST'],result))
    return result_proba, result

In [None]:
new_model = get_model()
#result_proba_before, result_before = test_result(new_model)

In [None]:
inputs = tokenization(dataset['TRAIN'])
train_targets = tf.convert_to_tensor(targets['TRAIN'])

val_inputs = tokenization(dataset['VAL'])
val_targets = tf.convert_to_tensor(targets['VAL'])

# Train the model
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_binary_accuracy', mode='max', patience=5)

new_model.fit([inputs.input_ids, inputs.attention_mask], train_targets, 
              validation_data = ([val_inputs.input_ids, val_inputs.attention_mask], val_targets),
              epochs=100, batch_size=128, callbacks=[early_stop])

In [None]:
result_proba_after, result_after = test_result(new_model)

In [None]:
# SAVE MODEL WEIGHTS
new_model.save_weights(f'sentiment_weights_MobileBert_final.h5')
!zip -r sentiment_weights_MobileBert_final.zip sentiment_weights_MobileBert_final.h5
from IPython.display import FileLink
FileLink(r'sentiment_weights_MobileBert_final.zip')