## Import Libraries

In [None]:
%%capture

!pip install datasets
!pip install transformers

In [None]:
import tensorflow as tf
from transformers import TFAutoModel, BertTokenizer
from tqdm import tqdm

In [None]:
from datasets import load_dataset
dataset = load_dataset("LazarusNLP/stsb_mt_id")

In [None]:
try:
    dataset['train'] = dataset.pop('validation')
except:
    pass

try:
    dataset['validation'] = dataset.pop('test')
except:
    pass


In [None]:
print(dataset)

## Normalize the correlation score

In [None]:
train_cor = [cor['correlation'] for cor in dataset['train']]
val_cor = [cor['correlation'] for cor in dataset['validation']]

norm_train_cor = [float(i)/5.0 for i in train_cor]
norm_val_cor = [float(i)/5.0 for i in val_cor]

## Tokenizer

In [None]:
# Prepare test data
sentence_1 = [item['text_1'] for item in dataset['validation']]
sentence_2 = [item['text_2'] for item in dataset['validation']]
text_cat = [[str(x), str(y)] for x,y in zip(sentence_1, sentence_2)][0]

## Define STSBert Model Architecture

In [None]:
class STSBertModel(tf.keras.layers.Layer):
    def __init__(self, model_name_or_path, **kwargs):
        super(STSBertModel, self).__init__()
        # loads transformers model
        self.model = TFAutoModel.from_pretrained(model_name_or_path, **kwargs)

    def call(self, inputs, normalize=True):
        # runs model on inputs
        model_output = self.model(inputs)
        # Perform pooling. In this case, mean pooling.
        embeddings = self.mean_pooling(model_output, inputs["attention_mask"])
        # normalizes the embeddings if wanted
        if normalize:
            embeddings = self.normalize(embeddings)
        return embeddings

    def mean_pooling(self, model_output, attention_mask):
        token_embeddings = model_output[0] # First element of model_output contains all token embeddings
        input_mask_expanded = tf.cast(
            tf.broadcast_to(tf.expand_dims(attention_mask, -1), tf.shape(token_embeddings)),
            tf.float32
        )
        return tf.math.reduce_sum(token_embeddings * input_mask_expanded, axis=1) / tf.clip_by_value(tf.math.reduce_sum(input_mask_expanded, axis=1), 1e-9, tf.float32.max)

    def normalize(self, embeddings):
        embeddings, _ = tf.linalg.normalize(embeddings, 2, axis=1)
        return embeddings

## Embedding Test

In [None]:
# Hugging Face model id
model_id = 'indobenchmark/indobert-base-p2'

# Load model and tokenizer
tokenizer = BertTokenizer.from_pretrained(model_id, model_max_length=128)
model = STSBertModel(model_id)

# Run inference & create embeddings
sentences = ["Pupuk NPK",
           "Pupuk Nitrogen"]
input_data = tokenizer(payload, padding=True, truncation=True, return_tensors='tf')
sentence_embedding = model(input_data)

print(sentence_embedding.shape)

In [None]:
class DataSequence(tf.keras.utils.Sequence):
    def __init__(self, dataset, tokenizer):
        similarity = [item['score'] for item in dataset]
        self.label = [float(item)/5.0 for item in similarity]
        self.sentence_1 = [item['text_1'] for item in dataset]
        self.sentence_2 = [item['text_2'] for item in dataset]
        self.text_cat = [[str(x), str(y)] for x, y in zip(self.sentence_1, self.sentence_2)]
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.text_cat)

    def get_batch_labels(self, idx):
        return self.label[idx]

    def get_batch_texts(self, idx):
        inputs = self.tokenizer(
            self.text_cat[idx],
            padding='max_length',
            max_length=128,
            truncation=True,
            return_tensors="tf"
        )
        return inputs

    def __getitem__(self, idx):
        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)
        return batch_texts, batch_y

def collate_fn(texts):
    num_texts = len(texts['input_ids'])
    features = list()
    for i in range(num_texts):
        features.append({'input_ids':texts['input_ids'][i], 'attention_mask':texts['attention_mask'][i]})
  
    return features

## Define CosineLoss Function

In [None]:
def cosine_loss(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    return 1.0 - cosine_similarity(y_true, y_pred)

## Train the model

In [None]:
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import cosine_similarity

# Huggingface model_id
model_id = 'indobenchmark/indobert-base-p2'

# Load BERT tokenizer and model
tokenizer = BertTokenizer.from_pretrained(model_id, model_max_length=128)
bert_model = STSBertModel(model_id)


# Tokenize input sentences
train_encodings = tokenizer(dataset['train']['text_1'], dataset['train']['text_2'], truncation=True, padding=True)
val_encodings = tokenizer(dataset['validation']['text_1'], dataset['validation']['text_2'], truncation=True, padding=True)
train_labels = norm_train_cor
val_labels = norm_val_cor

# Create Tensorflow Datasets
train_dataset = tf.data.Dataset.from_tensor_slices((
    dict(train_encodings),
    train_labels
))
val_dataset = tf.data.Dataset.from_tensor_slices((
    dict(val_encodings),
    val_labels
))

# Define the model inputs
input_ids = Input(shape=(None,), dtype=tf.int32, name="input_ids")
attention_mask = Input(shape=(None,), dtype=tf.int32, name="attention_mask")
token_type_ids = Input(shape=(None,), dtype=tf.int32, name="token_type_ids")

# Get the BERT model outputs
bert_outputs = bert_model({"input_ids": input_ids, "attention_mask": attention_mask, "token_type_ids": token_type_ids})

# Add a dense layer for similarity classification
dense = Dense(1, activation='sigmoid')(bert_outputs)

# Define the model
model = Model(inputs=[input_ids, attention_mask, token_type_ids], outputs=dense)

EPOCHS = 8
LEARNING_RATE = 1e-6
BATCH_SIZE = 8
              
# Compile the model with cosine similarity loss
model.compile(optimizer=Adam(learning_rate=LEARNING_RATE), loss=cosine_loss)

# Tokenize and batch the data
train_dataset = train_dataset.shuffle(len(dataset['train']['text_1'])).batch(BATCH_SIZE).repeat(4)
val_dataset = val_dataset.batch(BATCH_SIZE)

# Train the model
model.fit(train_dataset, epochs=EPOCHS, validation_data=val_dataset)