# Semantic Text Similarity

In [None]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
# Requisites
from gensim.utils import simple_preprocess
from gensim.corpora import Dictionary
import numpy as np

In [None]:
from typing import Tuple, List, Optional

In [None]:
WORD_EMBEDDING_FILE = '/Users/salva/Downloads/cc.es.300.bin.gz'
from gensim.models import fasttext
wv_model = fasttext.load_facebook_vectors(WORD_EMBEDDING_FILE)

In [None]:
USE_MMAP = False
if USE_MMAP:
    from gensim.models.fasttext import FastTextKeyedVectors
    MMAP_PATH = 'cc.es.gensim.bin'
    wv_model.save(MMAP_PATH)
    wv_model = FastTextKeyedVectors.load(MMAP_PATH, mmap='r')

In [None]:
# Example
input_pairs_example = [
    ('Me gusta el futbol', 'Disfruto viendo partidos de futbol', 4),
    ('El cielo está despejado', 'Hace un día bonito', 4.5),
    ('Me encanta viajar', 'Explorar nuevos lugares es una pasión', 3.5),
    ('Prefiero el verano', 'No me gusta el frío del invierno', 2.5),
    ('Tengo hambre', '¿Qué hay para cenar?', 2),
    ('La música me relaja', 'Escuchar música es una terapia', 3),
    ('El libro es emocionante', 'No puedo dejar de leerlo', 4),
    ('Me gusta la pizza', 'Es mi comida favorita', 4.5),
    ('Estoy cansado', 'Necesito hacer una siesta', 1.5),
    ('Hoy hace mucho calor', 'Es un día sofocante', 3.5)
]

In [None]:
REMAP_EMBEDDINGS: bool = True
USE_PRETRAINED: bool = True
MAX_LEN: int = 96

In [None]:
!pip install datasets
from datasets import load_dataset
dataset = load_dataset("PlanTL-GOB-ES/sts-es")

In [None]:
input_pairs = [(e["sentence1"], e["sentence2"], e["label"], ) for e in dataset["train"].to_list()]
input_pairs_val = [(e["sentence1"], e["sentence2"], e["label"], ) for e in dataset["validation"].to_list()]
input_pairs_test = [(e["sentence1"], e["sentence2"], e["label"], ) for e in dataset["test"].to_list()]

In [None]:
all_input_pairs = input_pairs + input_pairs_val + input_pairs_test
# Preprocesamiento de las oraciones y creación del diccionario
sentences_1_preproc = [simple_preprocess(sentence_1) for sentence_1, _, _ in all_input_pairs]
sentences_2_preproc = [simple_preprocess(sentence_2) for _, sentence_2, _ in all_input_pairs]
sentence_pairs = list(zip(sentences_1_preproc, sentences_2_preproc))
# Versión aplanada para poder entrenar el modelo
sentences_pairs_flattened = sentences_1_preproc + sentences_2_preproc
diccionario = Dictionary(sentences_pairs_flattened)

In [None]:
print("Max Len:", max([len(s) for s in sentences_1_preproc]), max([len(s) for s in sentences_2_preproc]))
print(list(diccionario.doc2idx(sentences_1_preproc[0])))

In [None]:
def map_word_embeddings(
        sentence: str,
        sequence_len: int = MAX_LEN,
        fixed_dictionary: Optional[Dictionary] = None
) -> np.ndarray:
    """
    Map to word-embedding indices
    :param sentence:
    :param sequence_len:
    :param fixed_dictionary:
    :return:
    """
    sentence_preproc = simple_preprocess(sentence)[:sequence_len]
    _vectors = np.zeros(sequence_len, dtype=np.int32)
    index = 0
    for word in sentence_preproc:
        if fixed_dictionary is not None:
            if word in fixed_dictionary.token2id:
                # Sumo 1 porque el valor 0 está reservado a padding
                _vectors[index] = fixed_dictionary.token2id[word] + 1
                index += 1
        else:
            if word in wv_model.key_to_index:
                _vectors[index] = wv_model.key_to_index[word] + 1
                index += 1
    return _vectors


def map_pairs(
    sentence_pairs: List[Tuple[str, str, float]],
    sequence_len: int = MAX_LEN,
    fixed_dictionary: Optional[Dictionary] = None
) -> List[Tuple[Tuple[np.ndarray, np.ndarray], float]]:
    # Mapeo de los pares de oraciones a pares de vectores
    pares_vectores = []
    for i, (sentence_1, sentence_2, similitud) in enumerate(sentence_pairs):
        vector1 = map_word_embeddings(sentence_1, sequence_len, fixed_dictionary)
        vector2 = map_word_embeddings(sentence_2, sequence_len, fixed_dictionary)
        # Añadir a la lista
        pares_vectores.append(((vector1, vector2), similitud))
    return pares_vectores

In [None]:
mapped_train = map_pairs(input_pairs, fixed_dictionary=diccionario if REMAP_EMBEDDINGS else None)
mapped_val = map_pairs(input_pairs_val, fixed_dictionary=diccionario if REMAP_EMBEDDINGS else None)
mapped_test = map_pairs(input_pairs_test, fixed_dictionary=diccionario if REMAP_EMBEDDINGS else None)

In [None]:
mapped_train[0]

In [None]:
# Define model 1
import tensorflow as tf
def model_1(
    input_length: int = MAX_LEN,
    dictionary_size: int = 1000,
    embedding_size: int = 16,
    pretrained_weights: Optional[np.ndarray] = None,
    learning_rate: float = 1e-3,
    trainable: bool = False,
    use_cosine: bool = False,
) -> tf.keras.Model:
    # Input layers
    input_1 = tf.keras.Input(shape=(input_length,), dtype=tf.int32)
    input_2 = tf.keras.Input(shape=(input_length,), dtype=tf.int32)

    # Embedding layer
    if pretrained_weights is None:
        embedding = tf.keras.layers.Embedding(
            dictionary_size, embedding_size, input_length=input_length, mask_zero=True
        )
    else:
        dictionary_size = pretrained_weights.shape[0]
        embedding_size = pretrained_weights.shape[1]
        initializer = tf.keras.initializers.Constant(pretrained_weights)
        embedding = tf.keras.layers.Embedding(
            dictionary_size,
            embedding_size,
            input_length=input_length,
            mask_zero=True,
            embeddings_initializer=initializer,
            trainable=trainable,
        )

    # Apply embedding to input sequences
    embedded_1 = embedding(input_1)
    embedded_2 = embedding(input_2)

    # Global average pooling
    _input_mask_1, _input_mask_2 = tf.not_equal(input_1, 0), tf.not_equal(input_2, 0)
    pooled_1 = tf.keras.layers.GlobalAveragePooling1D()(embedded_1, mask=_input_mask_1)
    pooled_2 = tf.keras.layers.GlobalAveragePooling1D()(embedded_2, mask=_input_mask_2)

    # Compute similarity/distance
    if use_cosine:   
        # Compute the cosine distance using a Lambda layer
        def cosine_distance(x):
            x1, x2 = x
            x1_normalized = tf.keras.backend.l2_normalize(x1, axis=1)
            x2_normalized = tf.keras.backend.l2_normalize(x2, axis=1)
            return 2.5 * (1.0 + tf.reduce_sum(x1_normalized * x2_normalized, axis=1))
        output = tf.keras.layers.Lambda(cosine_distance)([pooled_1, pooled_2])
    else:
        # Compute the cosine distance using a Lambda layer
        def normalized_product(x):
            x1, x2 = x
            x1_normalized = tf.keras.backend.l2_normalize(x1, axis=1)
            x2_normalized = tf.keras.backend.l2_normalize(x2, axis=1)
            return x1_normalized * x2_normalized
    
        output = tf.keras.layers.Lambda(normalized_product)([pooled_1, pooled_2])
        output = tf.keras.layers.Dropout(0.1)(output)
        output = tf.keras.layers.Dense(
            16,
            activation="relu",
        )(output)
        output = tf.keras.layers.Dropout(0.2)(output)
        output = tf.keras.layers.Dense(
            1,
            activation="sigmoid",
        )(output)
        
        output = tf.keras.layers.Lambda(lambda x: x * 5)(output)

    # Define the model
    model = tf.keras.Model(inputs=[input_1, input_2], outputs=output)

    # Compile the model
    model.compile(loss='mean_squared_error', optimizer=tf.keras.optimizers.Adam(learning_rate))

    return model

In [None]:
import tensorflow as tf

def model_2(
    input_length: int = MAX_LEN,
    dictionary_size: int = 1000,
    embedding_size: int = 16,
    learning_rate: float = 1e-3,
    pretrained_weights: Optional[np.ndarray] = None,
    trainable: bool = False,
    use_cosine: bool = False,
) -> tf.keras.Model:
    # Inputs
    input_1 = tf.keras.Input((input_length,), dtype=tf.int32)
    input_2 = tf.keras.Input((input_length,), dtype=tf.int32)

    # Embedding Layer
    if pretrained_weights is None:
        embedding = tf.keras.layers.Embedding(
            dictionary_size, embedding_size, input_length=input_length, mask_zero=True
        )
    else:
        dictionary_size = pretrained_weights.shape[0]
        embedding_size = pretrained_weights.shape[1]
        initializer = tf.keras.initializers.Constant(pretrained_weights)
        embedding = tf.keras.layers.Embedding(
            dictionary_size,
            embedding_size,
            input_length=input_length,
            mask_zero=True,
            embeddings_initializer=initializer,
            trainable=trainable,
        )

    # Embed the inputs
    embedded_1 = embedding(input_1)
    embedded_2 = embedding(input_2)
    # Pass through the embedding layer
    _input_mask_1, _input_mask_2 = tf.not_equal(input_1, 0), tf.not_equal(input_2, 0)

    # Attention Mechanism
    attention_mlp = tf.keras.Sequential([
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(16, activation='tanh'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(1)
    ])
    # Apply attention to each embedding
    attention_weights_1 = attention_mlp(embedded_1)  
    attention_weights_2 = attention_mlp(embedded_2) 
    # Mask the attention weights
    attention_weights_1 = tf.exp(attention_weights_1) * tf.cast(_input_mask_1[:, :, None], tf.float32)
    attention_weights_2 = tf.exp(attention_weights_2) * tf.cast(_input_mask_2[:, :, None], tf.float32)
    # Normalize attention weights
    attention_weights_1 = attention_weights_1 / tf.reduce_sum(attention_weights_1, axis=1, keepdims=True)
    attention_weights_2 = attention_weights_2 / tf.reduce_sum(attention_weights_2, axis=1, keepdims=True)
    # Compute context vectors
    projected_1 = tf.reduce_sum(embedded_1 * attention_weights_1, axis=1) 
    projected_2 = tf.reduce_sum(embedded_2 * attention_weights_2, axis=1) 
    
    
    if use_cosine:
        # Compute the cosine distance using a Lambda layer
        def cosine_distance(x):
            x1, x2 = x
            x1_normalized = tf.keras.backend.l2_normalize(x1, axis=1)
            x2_normalized = tf.keras.backend.l2_normalize(x2, axis=1)
            return 2.5 * (1.0 + tf.reduce_sum(x1_normalized * x2_normalized, axis=1))
        output = tf.keras.layers.Lambda(cosine_distance)([projected_1, projected_2])
    else:
         # Compute the cosine distance using a Lambda layer
        def normalized_product(x):
            x1, x2 = x
            x1_normalized = tf.keras.backend.l2_normalize(x1, axis=1)
            x2_normalized = tf.keras.backend.l2_normalize(x2, axis=1)
            return x1_normalized * x2_normalized
    
        output = tf.keras.layers.Lambda(normalized_product)([projected_1, projected_2])
        output = tf.keras.layers.Dropout(0.1)(output)
        output = tf.keras.layers.Dense(
            16,
            activation="relu",
        )(output)
        output = tf.keras.layers.Dropout(0.2)(output)
        output = tf.keras.layers.Dense(
            1,
            activation="sigmoid",
        )(output)
        
        output = tf.keras.layers.Lambda(lambda x: x * 5)(output)
    # Model Definition
    model = tf.keras.Model(inputs=(input_1, input_2), outputs=output)
    model.compile(
        loss="mean_squared_error", optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate)
    )
    return model

In [None]:
# Definir constantes de entrenamiento
batch_size: int = 64
num_epochs: int = 128

In [None]:
# Obtener x_train e y_train
def pair_list_to_x_y(pair_list: List[Tuple[Tuple[np.ndarray, np.ndarray], int]]) -> Tuple[Tuple[np.ndarray, np.ndarray], np.ndarray]:
    _x, _y = zip(*pair_list)
    _x_1, _x_2 = zip(*_x)
    return (np.row_stack(_x_1), np.row_stack(_x_2)), np.array(_y)

# Obtener las listas de train y test
x_train, y_train = pair_list_to_x_y(mapped_train)
x_val, y_val = pair_list_to_x_y(mapped_val)

In [None]:
# Preparar los conjuntos de datos de entrenamiento y validación
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=len(x_train)).batch(batch_size)

val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)

In [None]:
_pretrained_weights: Optional[np.ndarray] = None
if USE_PRETRAINED:
    if REMAP_EMBEDDINGS:
        _pretrained_weights = np.zeros(
            (len(diccionario.token2id) + 1, wv_model.vector_size),  dtype=np.float32)
        for token, _id in diccionario.token2id.items():
            if token in wv_model:
             _pretrained_weights[_id + 1] = wv_model[token]

            else:
                # In W2V, OOV will not have a representation. We will use 0.
                pass
    else:
        # Not recommended (this will consume A LOT of RAM)
        _pretrained_weights = np.zeros((wv_model.vectors.shape[0] + 1, wv_model.vector_size,),  dtype=np.float32)
        _pretrained_weights[1:, :] = wv_model.vectors

In [None]:
_pretrained_weights[:5,:]

In [None]:
# Build and compile the model
model = model_1(pretrained_weights=_pretrained_weights, trainable=False, use_cosine=False, )
model.summary()

In [None]:
# Train the model
model.fit(train_dataset, epochs=num_epochs, validation_data=val_dataset)

In [None]:
from scipy.stats import pearsonr

x_test, y_test = pair_list_to_x_y(mapped_test)
def compute_pearson(x_, y_, model):
    # Get predictions for the model
    y_pred = model.predict(x_)
    # Compute pearson correlation
    correlation, _ = pearsonr(y_pred.flatten(), y_.flatten())
    return correlation

In [None]:
# Print results
print(f"Correlación de Pearson (train): {compute_pearson(x_train, y_train, model)}")
print(f"Correlación de Pearson (validation): {compute_pearson(x_val, y_val, model)}")
print(f"Correlación de Pearson (test): {compute_pearson(x_test, y_test, model)}")


Alternative model

In [None]:
# Build Model 2
model2 = model_2(pretrained_weights=_pretrained_weights, trainable=False, use_cosine=False)
model2.summary()

In [None]:
# Train model 2
model2.fit(train_dataset, epochs=num_epochs, validation_data=val_dataset)

In [None]:
# Print results
print(f"Correlación de Pearson (train): {compute_pearson(x_train, y_train, model2)}")
print(f"Correlación de Pearson (validation): {compute_pearson(x_val, y_val, model2)}")
print(f"Correlación de Pearson (test): {compute_pearson(x_test, y_test, model2)}")