# Semantic Text Similarity
Este modelo utiliza gensim para convertir pares de vectores + puntuaciones en vectores (word embeddings).
Dado un dataset, infiere la puntuación de similitud entre ambas frases.

In [1]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
# Requisitos
from gensim.utils import simple_preprocess
from gensim.corpora import Dictionary
import numpy as np

In [2]:
# Tipado
from typing import Tuple, List, Optional

In [5]:
# Modelos pre-entrenados
# WV_MODEL_PATH = "/Users/salva/Downloads/cc.ca.300.bin.gz"
WV_MODEL_PATH = '/Users/salva/Downloads/cc.ca.300.vec.gz'
import gensim
wv_model =  gensim.models.KeyedVectors.load_word2vec_format(WV_MODEL_PATH, binary=False)
wv_model

<gensim.models.keyedvectors.KeyedVectors at 0x13c007490>

In [7]:
# Ejemplo de 10 pares de oraciones con puntuación de similitud asociada
input_pairs = [
    ('M\'agrada el futbol', 'Disfruto veient partits de futbol', 4),
    ('El cel està despejat', 'Fa un dia bonic', 4.5),
    ('M\'encanta viatjar', 'Explorar nous llocs és una passió', 3.5),
    ('Prefereixo l\'estiu', 'No m\'agrada el fred de l\'hivern', 2.5),
    ('Tinc gana', 'Què hi ha per sopar?', 2),
    ('La música em relaxa', 'Escoltar música és una teràpia', 3),
    ('El llibre és emocionant', 'No puc deixar de llegir-lo', 4),
    ('M\'agrada la pizza', 'És el meu menjar preferit', 4.5),
    ('Estic cansat', 'Necessito fer una migdiada', 1.5),
    ('Avui fa molta calor', 'És un dia sofocant', 3.5)
    ]

In [8]:
REMAP_EMBEDDINGS: bool = True
USE_PRETRAINED: bool = True

In [17]:
from datasets import load_dataset
# Text Similarity (STS) dataset (principal per la Pràctica 4)
train = load_dataset("projecte-aina/sts-ca", split="train")
test = load_dataset("projecte-aina/sts-ca", split="test")
val = load_dataset("projecte-aina/sts-ca", split="validation")
all_data = load_dataset("projecte-aina/sts-ca", split="all")
all_data

Dataset({
    features: ['id', 'sentence_1', 'sentence_2', 'label'],
    num_rows: 3073
})

In [58]:
# Preprocesamiento de las oraciones y creación del diccionario
sentences_1_preproc = [simple_preprocess(d["sentence_1"]) for d in all_data]
sentences_2_preproc = [simple_preprocess(d["sentence_2"]) for d in all_data]
scores = [d["label"] for d in all_data]
sentence_pairs = list(zip(sentences_1_preproc, sentences_2_preproc, scores))
# Versión aplanada para poder entrenar el modelo
sentences_pairs_flattened = sentences_1_preproc + sentences_2_preproc
diccionario = Dictionary(sentences_pairs_flattened)
diccionario

<gensim.corpora.dictionary.Dictionary at 0x39bebcb50>

In [59]:
print("Max Len:", max([len(s) for s in sentences_1_preproc]), max([len(s) for s in sentences_2_preproc]))
print(list(diccionario.doc2idx(sentences_1_preproc[0])))

Max Len: 30 30
[0, 11, 13, 1, 9, 10, 5, 14, 8, 7, 2, 8, 12, 2, 6, 4, 3, 15]


In [64]:
from typing import Union


def map_word_embeddings(
        sentence: Union[str, List[str]],
        sequence_len: int = 32,
        fixed_dictionary: Optional[Dictionary] = None
) -> np.ndarray:
    """
    Map to word-embedding indices
    :param sentence:
    :param sequence_len:
    :param fixed_dictionary:
    :return:
    """
    if not isinstance(sentence, list):
        sentence_preproc = simple_preprocess(sentence)
    else:
        sentence_preproc = sentence
    _vectors = np.zeros(sequence_len, dtype=np.int32)
    index = 0
    for word in sentence_preproc:
        if fixed_dictionary is not None:
            if word in fixed_dictionary.token2id:
                # Sumo 1 porque el valor 0 está reservado a padding
                _vectors[index] = fixed_dictionary.token2id[word] + 1
                index += 1
        else:
            if word in wv_model.key_to_index:
                _vectors[index] = wv_model.key_to_index[word] + 1
                index += 1
    return _vectors


def map_pairs(
        sentence_pairs: List[Tuple[str, str, float]],
        sequence_len: int = 32,
        fixed_dictionary: Optional[Dictionary] = None
) -> List[Tuple[Tuple[np.ndarray, np.ndarray], float]]:
    """
    Mapea los tripletes de oraciones a listas de (x, y), (pares de vectores, score)
    :param sentence_pairs:
    :param sequence_len:
    :param fixed_dictionary:
    :return:
    """
    # Mapeo de los pares de oraciones a pares de vectores
    pares_vectores = []
    for i, (sentence_1, sentence_2, similitud) in enumerate(sentence_pairs):
        vector1 = map_word_embeddings(sentence_1, sequence_len, fixed_dictionary)
        vector2 = map_word_embeddings(sentence_2, sequence_len, fixed_dictionary)
        # Añadir a la lista
        pares_vectores.append(((vector1, vector2), similitud))
    return pares_vectores

In [65]:
# Imprimir los pares de vectores y la puntuación de similitud asociada
mapped = map_pairs(sentence_pairs, fixed_dictionary=diccionario if REMAP_EMBEDDINGS else None)
# for vectors, similitud in mapped:
#     print(f"Pares de vectores: {vectors[0].shape}, {vectors[1].shape}")
#     print(f"Puntuación de similitud: {similitud}")
print(mapped[0])

((array([ 1, 12, 14,  2, 10, 11,  6, 15,  9,  8,  3,  9, 13,  3,  7,  5,  4,
       16,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0],
      dtype=int32), array([10010,     9,  2784,     6,    15,     9,     8,     3,     9,
          13,     3,     7,     5,     4,    16,     0,     0,     0,
           0,     0,     0,     0,     0,     0,     0,     0,     0,
           0,     0,     0,     0,     0], dtype=int32)), 3.5)


In [318]:
# Definir constantes de entrenamiento
batch_size: int = 64
num_epochs: int = 128
train_val_split: float = 0.8

In [287]:
len(mapped)

3073

In [288]:
# Obtener x_train e y_train
train_slice: int = int(len(mapped) * train_val_split)

def pair_list_to_x_y(pair_list: List[Tuple[Tuple[np.ndarray, np.ndarray], int]]) -> Tuple[Tuple[np.ndarray, np.ndarray], np.ndarray]:
    """
    Otiene las matrices X_1 (N x d) , X_2 (N x d), e Y (n) a partir de listas de parejas de vectores de oraciones - Listas de (d, d, 1)
    :param pair_list:
    :return:
    """
    _x, _y = zip(*pair_list)
    _x_1, _x_2 = zip(*_x)
    return (np.row_stack(_x_1), np.row_stack(_x_2)), np.array(_y) / 5.0

# Obtener las listas de train y test
x_train, y_train = pair_list_to_x_y(mapped[:train_slice])
x_val, y_val = pair_list_to_x_y(mapped[train_slice:])

In [289]:
import tensorflow as tf

# Preparar los conjuntos de datos de entrenamiento y validación
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=len(x_train)).batch(batch_size)

val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)

In [290]:
pretrained_weights: Optional[np.ndarray] = None
if USE_PRETRAINED:
    if REMAP_EMBEDDINGS:
        pretrained_weights = np.zeros(
            (len(diccionario.token2id) + 1, wv_model.vector_size),  dtype=np.float32)
        for token, _id in diccionario.token2id.items():
            if token in wv_model:
                pretrained_weights[_id + 1] = wv_model[token]
            else:
                # In W2V, OOV will not have a representation. We will use 0.
                pass
    else:
        # Not recommended (this will consume A LOT of RAM)
        pretrained_weights = np.zeros((wv_model.vectors.shape[0] + 1, wv_model.vector_size,),  dtype=np.float32)
        pretrained_weights[1:, :] = wv_model.vectors


In [291]:
pretrained_weights[:5]

array([[ 0.    ,  0.    ,  0.    , ...,  0.    ,  0.    ,  0.    ],
       [-0.0307,  0.0032,  0.0128, ..., -0.0154,  0.0374,  0.0234],
       [ 0.0519, -0.0079, -0.0013, ..., -0.0154, -0.0353, -0.0235],
       [ 0.0058, -0.0161,  0.062 , ...,  0.0129,  0.019 ,  0.0177],
       [-0.042 , -0.0113,  0.0837, ..., -0.0396, -0.0253, -0.0045]],
      dtype=float32)

In [292]:
import tensorflow as tf
import numpy as np
from typing import Optional

class SimpleAttention(tf.keras.layers.Layer):
    def __init__(self, units: int, **kwargs):
        super(SimpleAttention, self).__init__(**kwargs)
        self.units = units
        self.dropout_s1 = tf.keras.layers.Dropout(0.3)
        self.dropout_s2 = tf.keras.layers.Dropout(0.2)
        self.W_s1 = tf.keras.layers.Dense(units, activation='tanh', use_bias=True, name="attention_transform")
        # Dense layer to compute attention scores (context vector)
        self.W_s2 = tf.keras.layers.Dense(1, use_bias=False, name="attention_scorer")
        self.supports_masking = True  # Declare that this layer supports masking

    def call(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> tf.Tensor:
        # inputs shape: (batch_size, sequence_length, embedding_dim)
        # mask shape: (batch_size, sequence_length) boolean tensor

        # Attention hidden states
        hidden_states = self.dropout_s1(self.W_s1(inputs))

        # Compute attention scores
        scores = self.dropout_s2(self.W_s2(hidden_states))

        if mask is not None:
            # Apply the mask to the scores before softmax
            expanded_mask = tf.expand_dims(tf.cast(mask, dtype=tf.float32), axis=-1)
            # Add a large negative number to masked (padded) scores
            scores += (1.0 - expanded_mask) * -1e9

        # Compute attention weights
        attention_weights = tf.nn.softmax(scores, axis=1)

        # Compute the context vector (weighted sum of input embeddings)
        context_vector = tf.reduce_sum(inputs * attention_weights, axis=1)

        return context_vector

    def get_config(self) -> dict:
        config = super(SimpleAttention, self).get_config()
        config.update({"units": self.units})
        return config

    def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
        return None


def build_and_compile_model_2(
        input_length: int = 32,
        dictionary_size: int = 1000,
        embedding_size: int = 300,
        learning_rate: float = 0.001,
        trainable_embedding: bool = False,
        pretrained_weights: Optional[np.ndarray] = None,
        attention_units: int = 4,
) -> tf.keras.Model:
    input_1 = tf.keras.Input((input_length,), dtype=tf.int32, name="input_1")
    input_2 = tf.keras.Input((input_length,), dtype=tf.int32, name="input_2")

    # Determine effective embedding parameters
    if pretrained_weights is not None:
        effective_dictionary_size = pretrained_weights.shape[0]
        effective_embedding_size = pretrained_weights.shape[1]
        embedding_initializer = tf.keras.initializers.Constant(pretrained_weights)
        is_embedding_trainable = trainable_embedding
        embedding_layer_name = "embedding_pretrained"
    else:
        effective_dictionary_size = dictionary_size
        effective_embedding_size = embedding_size
        embedding_initializer = 'uniform'
        is_embedding_trainable = True
        embedding_layer_name = "embedding"

    # Shared Embedding Layer
    embedding_layer = tf.keras.layers.Embedding(
        input_dim=effective_dictionary_size,
        output_dim=effective_embedding_size,
        input_length=input_length,
        mask_zero=True,
        embeddings_initializer=embedding_initializer,
        trainable=is_embedding_trainable,
        name=embedding_layer_name
    )

    # Apply embedding layer to both inputs
    embedded_1 = embedding_layer(input_1)  # Shape: (batch_size, input_length, effective_embedding_size)
    embedded_2 = embedding_layer(input_2)  # Shape: (batch_size, input_length, effective_embedding_size)

    # Shared Attention Layer
    # Input: (batch_size, input_length, effective_embedding_size) with a mask
    # Output: (batch_size, effective_embedding_size)
    sentence_attention_layer = SimpleAttention(units=attention_units, name="sentence_attention")
    # sentence_attention_layer = tf.keras.layers.GlobalAveragePooling1D(name="sentence_attention_layer")

    sentence_vector_1 = sentence_attention_layer(embedded_1)
    sentence_vector_2 = sentence_attention_layer(embedded_2)

    # Projection layer
    first_projection_layer = tf.keras.layers.Dense(
        effective_embedding_size,
        activation='tanh',
        kernel_initializer=tf.keras.initializers.Identity(),
        bias_initializer=tf.keras.initializers.Zeros(),
        name="projection_layer"
    )
    dropout = tf.keras.layers.Dropout(0.2, name="projection_dropout")
    projected_1 = dropout(first_projection_layer(sentence_vector_1))
    projected_2 = dropout(first_projection_layer(sentence_vector_2))

    # Normalize the projected vectors (L2 normalization)
    normalized_1 = tf.keras.layers.Lambda(
        lambda x: tf.linalg.l2_normalize(x, axis=1), name="normalize_1"
    )(projected_1)
    normalized_2 = tf.keras.layers.Lambda(
        lambda x: tf.linalg.l2_normalize(x, axis=1), name="normalize_2"
    )(projected_2)

    # Compute Cosine Similarity
    similarity_score = tf.keras.layers.Lambda(
        lambda x: tf.reduce_sum(x[0] * x[1], axis=1, keepdims=True), name="cosine_similarity"
    )([normalized_1, normalized_2])

    # Scale similarity from [-1, 1] to [0, 1]
    output_layer = tf.keras.layers.Lambda(
        lambda x: 0.5 * (1.0 + x), name="output_scaling"
    )(similarity_score)

    # Define the Keras Model
    model = tf.keras.Model(
        inputs=[input_1, input_2],
        outputs=output_layer,
        name="sequence_similarity_attention_model"
    )

    # Compile the model
    model.compile(
        loss='mean_squared_error',
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        metrics=['mae'],
    )

    return model

In [319]:
# Construir y compilar el modelo
model = build_and_compile_model_2(pretrained_weights=pretrained_weights, learning_rate=1e-3)
# Entrenar el modelo
model.fit(train_dataset, epochs=num_epochs, validation_data=val_dataset)

Epoch 1/128
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 9ms/step - loss: 0.1281 - mae: 0.3198 - val_loss: 0.1467 - val_mae: 0.3441
Epoch 2/128
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.0865 - mae: 0.2520 - val_loss: 0.1344 - val_mae: 0.3236
Epoch 3/128
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0735 - mae: 0.2248 - val_loss: 0.1296 - val_mae: 0.3162
Epoch 4/128
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0691 - mae: 0.2179 - val_loss: 0.1272 - val_mae: 0.3136
Epoch 5/128
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0686 - mae: 0.2151 - val_loss: 0.1267 - val_mae: 0.3120
Epoch 6/128
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0647 - mae: 0.2066 - val_loss: 0.1246 - val_mae: 0.3089
Epoch 7/128
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0

<keras.src.callbacks.history.History at 0x465ec6650>

In [320]:
model.summary()

In [321]:
from scipy.stats import pearsonr
# Obtener las predicciones del modelo para los datos de prueba. En este ejemplo vamos a utilizar el corpus de training.
y_pred = model.predict(x_val)
# Calcular la correlación de Pearson entre las predicciones y los datos de prueba
correlation, _ = pearsonr(y_pred.flatten(), y_val.flatten())
# Imprimir el coeficiente de correlación de Pearson
print(f"Correlación de Pearson: {correlation}")


[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
Correlación de Pearson: 0.5462249601414344


In [322]:
from scipy.stats import pearsonr
# Obtener las predicciones del modelo para los datos de prueba. En este ejemplo vamos a utilizar el corpus de training.
y_pred = model.predict(x_train)
# Calcular la correlación de Pearson entre las predicciones y los datos de prueba
correlation, _ = pearsonr(y_pred.flatten(), y_train.flatten())
# Imprimir el coeficiente de correlación de Pearson
print(f"Correlación de Pearson: {correlation}")


[1m77/77[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
Correlación de Pearson: 0.7561164912323477


In [323]:
tf.__version__

'2.19.0'