# Semantic Text Similarity
Este modelo utiliza gensim para convertir pares de vectores + puntuaciones en vectores (word embeddings).
Dado un dataset, infiere la puntuación de similitud entre ambas frases.

In [1]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
# Requisitos
from gensim.utils import simple_preprocess
from gensim.corpora import Dictionary
import numpy as np

In [2]:
# Tipado
from typing import Tuple, List, Optional

In [3]:
# Modelos pre-entrenados
WV_MODEL_PATH = "/Users/salva/Downloads/cc.ca.300.bin.gz"
# from gensim.models import Word2Vec
# wv_model = Word2Vec.load('path_to_word2vec_model').wv
# from gensim.models import fasttext
# wv_model = fasttext.load_facebook_vectors(WV_MODEL_PATH)

In [4]:
# Llavors podeu carregar el model com a mmap
from gensim.models.fasttext import FastTextKeyedVectors
wv_model = FastTextKeyedVectors.load('cc.ca.gensim.bin', mmap='r')

FileNotFoundError: [Errno 2] No such file or directory: 'cc.ca.gensim.bin'

In [None]:
# Ejemplo de 10 pares de oraciones con puntuación de similitud asociada
input_pairs = [
    ('M\'agrada el futbol', 'Disfruto veient partits de futbol', 4),
    ('El cel està despejat', 'Fa un dia bonic', 4.5),
    ('M\'encanta viatjar', 'Explorar nous llocs és una passió', 3.5),
    ('Prefereixo l\'estiu', 'No m\'agrada el fred de l\'hivern', 2.5),
    ('Tinc gana', 'Què hi ha per sopar?', 2),
    ('La música em relaxa', 'Escoltar música és una teràpia', 3),
    ('El llibre és emocionant', 'No puc deixar de llegir-lo', 4),
    ('M\'agrada la pizza', 'És el meu menjar preferit', 4.5),
    ('Estic cansat', 'Necessito fer una migdiada', 1.5),
    ('Avui fa molta calor', 'És un dia sofocant', 3.5)
    ]

In [None]:
REMAP_EMBEDDINGS: bool = True
USE_PRETRAINED: bool = True

In [None]:
# Datos reales
TRAIN_DATA_FILE: str = "/Users/salva/Downloads/train.tsv"
import pandas as pd
tsv_data = pd.read_csv(TRAIN_DATA_FILE, sep='\t', header=None, usecols=[1, 2, 3])
input_pairs = tsv_data.values.tolist()

In [None]:
# Preprocesamiento de las oraciones y creación del diccionario
sentences_1_preproc = [simple_preprocess(sentence_1) for sentence_1, _, _ in input_pairs]
sentences_2_preproc = [simple_preprocess(sentence_2) for _, sentence_2, _ in input_pairs]
sentence_pairs = list(zip(sentences_1_preproc, sentences_2_preproc))
# Versión aplanada para poder entrenar el modelo
sentences_pairs_flattened = sentences_1_preproc + sentences_2_preproc
diccionario = Dictionary(sentences_pairs_flattened)

In [None]:
print("Max Len:", max([len(s) for s in sentences_1_preproc]), max([len(s) for s in sentences_2_preproc]))
print(list(diccionario.doc2idx(sentences_1_preproc[0])))

In [None]:
def map_word_embeddings(
        sentence: str,
        sequence_len: int = 32,
        fixed_dictionary: Optional[Dictionary] = None
) -> np.ndarray:
    """
    Map to word-embedding indices
    :param sentence:
    :param sequence_len:
    :param fixed_dictionary:
    :return:
    """
    sentence_preproc = simple_preprocess(sentence)
    _vectors = np.zeros(sequence_len, dtype=np.int32)
    index = 0
    for word in sentence_preproc:
        if fixed_dictionary is not None:
            if word in fixed_dictionary.token2id:
                # Sumo 1 porque el valor 0 está reservado a padding
                _vectors[index] = fixed_dictionary.token2id[word] + 1
                index += 1
        else:
            if word in wv_model.key_to_index:
                _vectors[index] = wv_model.key_to_index[word] + 1
                index += 1
    return _vectors


def map_pairs(
        sentence_pairs: List[Tuple[str, str, float]],
        sequence_len: int = 32,
        fixed_dictionary: Optional[Dictionary] = None
) -> List[Tuple[Tuple[np.ndarray, np.ndarray], float]]:
    """
    Mapea los tripletes de oraciones a listas de (x, y), (pares de vectores, score)
    :param sentence_pairs:
    :param sequence_len:
    :param fixed_dictionary:
    :return:
    """
    # Mapeo de los pares de oraciones a pares de vectores
    pares_vectores = []
    for i, (sentence_1, sentence_2, similitud) in enumerate(sentence_pairs):
        vector1 = map_word_embeddings(sentence_1, sequence_len, fixed_dictionary)
        vector2 = map_word_embeddings(sentence_2, sequence_len, fixed_dictionary)
        # Añadir a la lista
        pares_vectores.append(((vector1, vector2), similitud))
    return pares_vectores

In [None]:
# Imprimir los pares de vectores y la puntuación de similitud asociada
mapped = map_pairs(input_pairs, fixed_dictionary=diccionario if REMAP_EMBEDDINGS else None)
# for vectors, similitud in mapped:
#     print(f"Pares de vectores: {vectors[0].shape}, {vectors[1].shape}")
#     print(f"Puntuación de similitud: {similitud}")
print(mapped[0])

In [None]:
# Definir el Modelo
import tensorflow as tf
tf.config.set_visible_devices([], 'GPU')
def build_and_compile_model(
        input_length: int = 32,
        hidden_size: int = 64,
        dictionary_size: int = 1000,
        embedding_size: int = 16,
        pretrained_weights: Optional[np.ndarray] = None,
        learning_rate: float = 0.001,
        trainable: bool = False,
) -> tf.keras.Model:
    """
    Este es un modelo muy básico. Hace lo mismo que el modelo single_vector. La puntuación es mejor por no eliminar stopwords.
    :param input_length:
    :param hidden_size:
    :param dictionary_size:
    :param embedding_size:
    :param pretrained_weights:
    :param learning_rate:
    :param trainable:
    :return:
    """
    input_1, input_2 = tf.keras.Input((input_length, ), dtype=tf.int32, ), tf.keras.Input((input_length, ), dtype=tf.int32, )
    # Define Layers
    if pretrained_weights is None:
        embedding = tf.keras.layers.Embedding(
            dictionary_size, embedding_size, input_length=input_length, mask_zero=True, )
    else:
        dictionary_size = pretrained_weights.shape[0]
        embedding_size = pretrained_weights.shape[1]
        initializer = tf.keras.initializers.Constant(pretrained_weights)
        embedding = tf.keras.layers.Embedding(
            dictionary_size, embedding_size, input_length=input_length, mask_zero=True,
            embeddings_initializer=initializer, trainable=trainable, )
    pooling = tf.keras.layers.GlobalAveragePooling1D()
    # Pass through the layers
    _input_mask_1, _input_mask_2 = tf.not_equal(input_1, 0), tf.not_equal(input_2, 0)
    _embedded_1, _embedded_2 = embedding(input_1, ), embedding(input_2, )
    _pooled_1, _pooled_2 = pooling(_embedded_1, mask=_input_mask_1), pooling(_embedded_2, mask=_input_mask_2)

    # Compute the cosine distance
    projected_1 = tf.linalg.l2_normalize(_pooled_1, axis=1, )
    projected_2 = tf.linalg.l2_normalize(_pooled_2, axis=1, )
    output = 2.5 * (1.0 + tf.reduce_sum(projected_1 * projected_2, axis=1, ))

    # Define the model
    model = tf.keras.Model(inputs=(input_1, input_2, ), outputs=output,)
    model.compile(loss='mean_squared_error',
                optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate))
    return model

In [None]:
# Definir el Modelo
import tensorflow as tf
def build_and_compile_model_2(
        input_length: int = 32,
        dictionary_size: int = 1000,
        embedding_size: int = 16,
        learning_rate: float = 0.001,
        trainable: bool = False,
        pretrained_weights:bool = None
) -> tf.keras.Model:
    """
    Este es un modelo algo más avanzado. Calcula internamente una media ponderada de los word embeddings. Calcula también la proyección.
    :param input_length:
    :param hidden_size:
    :param dictionary_size:
    :param embedding_size:
    :param pretrained_weights:
    :param learning_rate:
    :param trainable:
    :return:
    """
    input_1, input_2 = tf.keras.Input((input_length, ), dtype=tf.int32, ), tf.keras.Input((input_length, ), dtype=tf.int32, )
    # Define Layers
    if pretrained_weights is None:
        embedding = tf.keras.layers.Embedding(
            dictionary_size, embedding_size, input_length=input_length, mask_zero=True, )
    else:
        dictionary_size = pretrained_weights.shape[0]
        embedding_size = pretrained_weights.shape[1]
        initializer = tf.keras.initializers.Constant(pretrained_weights)
        embedding = tf.keras.layers.Embedding(
            dictionary_size, embedding_size, input_length=input_length, mask_zero=True,
            embeddings_initializer=initializer, trainable=trainable, )
    # Pass through the layers
    _input_mask_1, _input_mask_2 = tf.not_equal(input_1, 0), tf.not_equal(input_2, 0)
    _embedded_1, _embedded_2 = embedding(input_1, ), embedding(input_2, )

    # Compute custom weights
    weights_computation = tf.keras.layers.Dense(1, name="weight_computation")
    dropout = tf.keras.layers.Dropout(0.2, name="dropout_in")
    _weights_1 = weights_computation(dropout(_embedded_1))
    weights_1 = tf.squeeze(_weights_1, axis=[-1])
    _weights_2 = weights_computation(dropout(_embedded_2))
    weights_2 = tf.squeeze(_weights_2, axis=[-1])
    # Define softmax
    softmax = tf.keras.layers.Softmax(name="weighted_sum_softmax")
    scores_1 = softmax(weights_1, mask=_input_mask_1)
    _pooled_1 = tf.math.reduce_sum(_embedded_1 * tf.expand_dims(scores_1, axis=-1), axis=1)
    scores_2 = softmax(weights_2, mask=_input_mask_2)
    _pooled_2 = tf.math.reduce_sum(_embedded_2 * tf.expand_dims(scores_2, axis=-1) , axis=1)
    # Compute the distance
    dense_output = tf.keras.layers.Dense(1)
    dropout_out = tf.keras.layers.Dropout(0.2, name="dropout_out")
    projected_1 = tf.linalg.l2_normalize(_pooled_1, axis=1, )
    projected_2 = tf.linalg.l2_normalize(_pooled_2, axis=1, )
    output = dense_output(dropout_out(projected_1 * projected_2), )

    # Define the model
    model = tf.keras.Model(inputs=(input_1, input_2, ), outputs=output,)
    model.compile(loss='mean_absolute_error',
                optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate))
    return model

In [None]:
# Definir constantes de entrenamiento
batch_size: int = 64
num_epochs: int = 64
train_val_split: float = 0.8

In [None]:
# Obtener x_train e y_train
train_slice: int = int(len(mapped) * train_val_split)

def pair_list_to_x_y(pair_list: List[Tuple[Tuple[np.ndarray, np.ndarray], int]]) -> Tuple[Tuple[np.ndarray, np.ndarray], np.ndarray]:
    """
    Otiene las matrices X_1 (N x d) , X_2 (N x d), e Y (n) a partir de listas de parejas de vectores de oraciones - Listas de (d, d, 1)
    :param pair_list:
    :return:
    """
    _x, _y = zip(*pair_list)
    _x_1, _x_2 = zip(*_x)
    return (np.row_stack(_x_1), np.row_stack(_x_2)), np.array(_y)

# Obtener las listas de train y test
x_train, y_train = pair_list_to_x_y(mapped[:train_slice])
x_val, y_val = pair_list_to_x_y(mapped[train_slice:])

In [None]:
# Preparar los conjuntos de datos de entrenamiento y validación
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=len(x_train)).batch(batch_size)

val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)

In [None]:
pretrained_weights: Optional[np.ndarray] = None
if USE_PRETRAINED:
    if REMAP_EMBEDDINGS:
        pretrained_weights = np.zeros(
            (len(diccionario.token2id) + 1, wv_model.vector_size),  dtype=np.float32)
        for token, _id in diccionario.token2id.items():
            if token in wv_model:
                pretrained_weights[_id + 1] = wv_model[token]
            else:
                # In W2V, OOV will not have a representation. We will use 0.
                pass
    else:
        # Not recommended (this will consume A LOT of RAM)
        pretrained_weights = np.zeros((wv_model.vectors.shape[0] + 1, wv_model.vector_size,),  dtype=np.float32)
        pretrained_weights[1:, :] = wv_model.vectors


In [None]:
# Construir y compilar el modelo
model = build_and_compile_model(pretrained_weights=pretrained_weights, )
# Entrenar el modelo
model.fit(train_dataset, epochs=num_epochs, validation_data=val_dataset)

In [None]:
from scipy.stats import pearsonr
# Obtener las predicciones del modelo para los datos de prueba. En este ejemplo vamos a utilizar el corpus de training.
y_pred = model.predict(x_val)
# Calcular la correlación de Pearson entre las predicciones y los datos de prueba
correlation, _ = pearsonr(y_pred.flatten(), y_val.flatten())
# Imprimir el coeficiente de correlación de Pearson
print(f"Correlación de Pearson: {correlation}")


In [None]:
list(zip(y_pred, y_train))
