In this project, I use the ParTy (Parallel Typology Corpus) dataset developed by Natalia Levshina:

Levshina, Natalia. 2016. Verbs of letting in Germanic and Romance languages: A quantitative investigation based on a parallel corpus of film subtitles. Languages in Contrast 16(1): 84–117.

ParTy is a multilingual parallel corpus constructed from film subtitles. It contains dialogue segments aligned at the sentence level across a wide range of languages. Each film is therefore represented by a set of subtitles in multiple languages. Given the genre of the texts in the corpus, the sentences are typically short and dialogic in nature.

In this project, the dataset is used as the basis for developing a model capable of determining whether two subtitle segments in different languages express the same semantic content. The goal is to build a cross-linguistic textual similarity model between text segments using a Siamese LSTM network.

In the corpus, English serves as the sole source language, while the target languages number 42. However, the number of available films differs across languages. The objective is to perform prediction not for a single language pair, but from English to all available target languages.

Potential applications of the model include the automatic identification of semantically equivalent sentences in large multilingual collections, such as parallel corpora or subtitle datasets in multiple languages.

In [None]:
from pathlib import Path
import zipfile
import shutil
import requests

In [None]:
import numpy as np

In [None]:
import pandas as pd

In [None]:
import matplotlib.pyplot as plt

In [None]:
import re

In [None]:
from tqdm.auto import tqdm

In [None]:
from time import perf_counter

In [None]:
import timeit

In [None]:
import gc

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
from tensorflow.keras.callbacks import Callback

In [None]:
from sklearn.metrics import confusion_matrix, classification_report

In [None]:
from sklearn.metrics import precision_recall_curve

In [None]:
party_data_root = Path('party_data')
party_zip_path = party_data_root.joinpath('party_repository.zip')
party_extract_root = party_data_root.joinpath('extracted_repository')

In [None]:
party_data_root.mkdir(parents = True, exist_ok = True)

In [None]:
party_repository_url = 'https://github.com/levshina/ParTy-1.0/archive/refs/heads/master.zip'

In [None]:
response = requests.get(party_repository_url, stream = True)
response.raise_for_status()

In [None]:
with open(party_zip_path, "wb") as zip_file:
    for chunk in response.iter_content(chunk_size = 1024 * 1024):
        if chunk:
            zip_file.write(chunk)

In [None]:
party_zip_path

In [None]:
party_extract_root.mkdir(parents=True, exist_ok = True)

In [None]:
with zipfile.ZipFile(party_zip_path, "r") as zip_file:
    zip_file.extractall(party_extract_root)

In [None]:
party_repository_root = next(
    path for path in party_extract_root.iterdir()
    if path.is_dir()
)

In [None]:
party_repository_root

In [None]:
movie_directories = [path for path in party_repository_root.iterdir() if path.is_dir()]

In [None]:
movie_directories

In [None]:
len(movie_directories)

In [None]:
movie_names = sorted([path.name for path in movie_directories])

In [None]:
print(movie_names)

In [None]:
for i, t in enumerate(movie_names):
    print(f'{i+1}. {t}')

In [None]:
sample_movie_directory = movie_directories[0]

In [None]:
sample_movie_directory, list(sample_movie_directory.iterdir())

__Now let's inspect the structure__

Since the English–Arabic pair appears first in alphabetical order, we will use it as an illustrative example.

In [None]:
sample_txt_file_paths = [file_path for file_path in sample_movie_directory.iterdir() if file_path.is_file() and file_path.suffix == ".txt"]

In [None]:
sample_txt_file_path = sample_txt_file_paths[0]

In [None]:
sample_txt_file_path

In [None]:
n_lines_to_lookup = 30

with open(sample_txt_file_path, 'r', encoding = 'utf-8', errors = 'replace') as text_file:
    sample_txt_first_lines = [next(text_file) for _ in range(n_lines_to_lookup)]

In [None]:
print(''.join(sample_txt_first_lines))

In [None]:
sample_txt_file_path.name

In [None]:
with open(sample_txt_file_path, "r", encoding="utf-8", errors = "replace") as subtitle_file:
    subtitle_lines = [line.rstrip("\n") for line in subtitle_file]

In [None]:
subtitle_lines_df = pd.DataFrame({"raw_line": subtitle_lines})

The alignment of subtitle segments across the two languages is not perfectly clean in the files. It is not possible to directly read the data as a two-column CSV using tabulation as a separator. Some preprocessing is required. There may be double tab characters or other typographical inconsistencies.

Since XML files are also provided with the corpus, we will first examine what they contain and then decide how to proceed.

In [None]:
subtitle_lines_df.head(20)

Within each film directory, there is an XML folder. Let us take the film Amélie as an example and examine its contents.

In [None]:
sample_movie_directory = party_repository_root.joinpath("Amelie")
sample_movie_xml_directory = sample_movie_directory.joinpath("XML")

In [None]:
sample_movie_xml_directory.exists(), sample_movie_xml_directory

In [None]:
xml_entry_paths = sorted(list(sample_movie_xml_directory.iterdir()))

In [None]:
xml_entry_paths[:20], len(xml_entry_paths)

In [None]:
# Open a single XML file for inspection. We select the Italian version.

sample_readable_xml_file_path = sample_movie_xml_directory.joinpath("Amelie_ita.xml")

with open(sample_readable_xml_file_path, "r", encoding="utf-8", errors="replace") as xml_file:
    for _ in range(50):
        print(xml_file.readline().rstrip())

Quindi, qua possiamo vedere che ogni file xml è monolingue, con dentro un timestamp, segmenti (\<s>) di testo e una sequenza di parole (\<w>) tokenizzate.

Ai fini del presente progetto scegliamo di usare i dati testuali nei file txt bilingui in quanto più adatti al nostro obiettivo.

Parsiamo uno dei file testo per iniziare.

In [None]:
bilingual_filename_pattern = re.compile(
    r"^(?P<movie>.+)_(?P<source>[a-z]{3})_(?P<target>[a-z]{3})\.txt$",
    re.IGNORECASE
)

In [None]:
def parse_bilingual_subtitle_txt_file(subtitle_txt_file_path):
    filename_match = bilingual_filename_pattern.match(subtitle_txt_file_path.name)
    if filename_match is None:
        raise ValueError(f"nome file non riconosciuto: {subtitle_txt_file_path.name}")

    movie_id = filename_match.group("movie")
    source_language = filename_match.group("source").lower()
    target_language = filename_match.group("target").lower()

    parsed_rows = []
    skipped_line_count = 0

    with open(subtitle_txt_file_path, "r", encoding="utf-8", errors="replace") as subtitle_file:
        for raw_line in subtitle_file:
            stripped_line = raw_line.rstrip("\n")
            if not stripped_line.strip():
                continue

            fields = stripped_line.split("\t", maxsplit=2)
            if len(fields) < 3:
                skipped_line_count += 1
                continue

            segment_id_text = fields[0].strip()
            if not segment_id_text.isdigit():
                skipped_line_count += 1
                continue

            parsed_rows.append({
                "movie_id": movie_id,
                "source_language": source_language,
                "target_language": target_language,
                "segment_id": int(segment_id_text),
                "text_source": fields[1].strip(),
                "text_target": fields[2].strip(),
            })

    subtitle_pairs_df = pd.DataFrame(parsed_rows)
    return subtitle_pairs_df, skipped_line_count

In [None]:
amelie_eng_ita_path = party_repository_root.joinpath("Amelie").joinpath("Amelie_eng_ita.txt")

In [None]:
subtitle_pairs_df, skipped_line_count = parse_bilingual_subtitle_txt_file(amelie_eng_ita_path)

In [None]:
subtitle_pairs_df.head(10), skipped_line_count, subtitle_pairs_df.shape

In [None]:
subtitle_pairs_df.head()

We have split each line of text in English and Italian, apparently successfully, using one film and the English–Italian language pair as an example.

Now we will apply the same procedure to all films and collect everything into a DataFrame.

First, we will index all files corresponding to the English–Italian language pair.

In [None]:
subtitle_filename_pattern = re.compile(
    r"^(?P<movie>.+)_(?P<src>[a-z]{3})_(?P<tgt>[a-z]{3})\.txt$",
    re.IGNORECASE
)

In [None]:
def collect_bilingual_txt_paths(party_repository_root, source_language="eng", target_language="ita"):
    bilingual_txt_paths = []
    for movie_directory in party_repository_root.iterdir():
        if not movie_directory.is_dir():
            continue

        for file_path in movie_directory.iterdir():
            if not (file_path.is_file() and file_path.suffix.lower() == ".txt"):
                continue

            match = subtitle_filename_pattern.match(file_path.name)
            if match is None:
                continue

            if match.group("src").lower() == source_language and match.group("tgt").lower() == target_language:
                bilingual_txt_paths.append(file_path)

    return sorted(bilingual_txt_paths)

In [None]:
eng_ita_txt_paths = collect_bilingual_txt_paths(
    party_repository_root,
    source_language="eng",
    target_language="ita"
)

In [None]:
len(eng_ita_txt_paths), eng_ita_txt_paths

Ci sono solo 5 film eng-ita, allora vediamo le altre lingue.

In [None]:
subtitle_filename_pattern = re.compile(
    r"^(?P<movie>.+)_(?P<src>[a-z]{3})_(?P<tgt>[a-z]{3})\.txt$",
    re.IGNORECASE
)

In [None]:
def collect_all_eng_txt_paths(party_repository_root):
    eng_txt_paths = []
    for movie_directory in party_repository_root.iterdir():
        if not movie_directory.is_dir():
            continue
        for file_path in movie_directory.iterdir():
            if not (file_path.is_file() and file_path.suffix.lower() == ".txt"):
                continue
            match = subtitle_filename_pattern.match(file_path.name)
            if match and match.group("src").lower() == "eng":
                eng_txt_paths.append(file_path)
    return sorted(eng_txt_paths)

In [None]:
eng_all_txt_paths = collect_all_eng_txt_paths(party_repository_root)

In [None]:
len(eng_all_txt_paths), eng_all_txt_paths

Parse all files and concatenate the results

In [None]:
all_pairs_dfs = []

In [None]:
total_skipped_lines = 0

for txt_file_path in eng_all_txt_paths:
    subtitle_pairs_df, skipped_line_count = parse_bilingual_subtitle_txt_file(txt_file_path)
    all_pairs_dfs.append(subtitle_pairs_df)
    total_skipped_lines += skipped_line_count

In [None]:
full_subtitle_pairs_df = pd.concat(all_pairs_dfs, ignore_index = True)

In [None]:
full_subtitle_pairs_df.shape, full_subtitle_pairs_df["target_language"].nunique(), total_skipped_lines

There are 42 target languages, 30 discarded rows, and a total of 414,954 source–target language entries.

In [None]:
full_subtitle_pairs_df.columns

In [None]:
full_subtitle_pairs_df['source_language'].value_counts()

In [None]:
full_subtitle_pairs_df['target_language'].value_counts()

In [None]:
len(full_subtitle_pairs_df['target_language'].unique())

In [None]:
def build_similarity_pairs_multilingual(
    full_subtitle_pairs_df: pd.DataFrame,
    negatives_intra_movie: int = 1,
    negatives_cross_movie: int = 1,
    random_seed: int = 42,
    max_rows: int | None = None
) -> pd.DataFrame:
    rng = np.random.default_rng(random_seed)

    usable_pairs_df = full_subtitle_pairs_df[
        full_subtitle_pairs_df["text_source"].astype(str).str.strip().ne("") &
        full_subtitle_pairs_df["text_target"].astype(str).str.strip().ne("")
    ].reset_index(drop=True)

    if max_rows is not None:
        usable_pairs_df = usable_pairs_df.sample(n=max_rows, random_state=random_seed).reset_index(drop=True)

    # Precompute index arrays grouped by target language for efficient access
    lang_to_indices = {
        lang: usable_pairs_df.index[usable_pairs_df["target_language"].eq(lang)].to_numpy()
        for lang in usable_pairs_df["target_language"].unique()
    }

    # Precompute language+movie indices (negatives sampled within the same movie)
    lang_movie_to_indices = {}
    for lang in usable_pairs_df["target_language"].unique():
        lang_df = usable_pairs_df[usable_pairs_df["target_language"].eq(lang)]
        for movie_id, movie_df in lang_df.groupby("movie_id"):
            lang_movie_to_indices[(lang, movie_id)] = movie_df.index.to_numpy()

    similarity_records = []

    for row_index, row in tqdm(usable_pairs_df.iterrows(), total=len(usable_pairs_df), desc = "building similarity pairs"):
        movie_id = row["movie_id"]
        lang = row["target_language"]
        segment_id = row["segment_id"]

        # positive
        similarity_records.append({
            "movie_id": movie_id,
            "target_language": lang,
            "segment_id_source": int(segment_id),
            "segment_id_target": int(segment_id),
            "text_source": row["text_source"],
            "text_target": row["text_target"],
            "label": 1
        })

        # Negative samples within the same movie (same language)
        intra_candidates = lang_movie_to_indices.get((lang, movie_id), np.array([], dtype=int))
        if intra_candidates.size > 1:
            intra_candidates = intra_candidates[usable_pairs_df.loc[intra_candidates, "segment_id"].ne(segment_id).to_numpy()]

        for _ in range(negatives_intra_movie):
            if intra_candidates.size == 0:
                break
            neg_idx = int(rng.choice(intra_candidates))
            similarity_records.append({
                "movie_id": movie_id,
                "target_language": lang,
                "segment_id_source": int(segment_id),
                "segment_id_target": int(usable_pairs_df.at[neg_idx, "segment_id"]),
                "text_source": row["text_source"],
                "text_target": usable_pairs_df.at[neg_idx, "text_target"],
                "label": 0
            })

        # Negative samples across different movies (same language)
        lang_candidates = lang_to_indices[lang]
        if lang_candidates.size > 1:
            cross_candidates = lang_candidates[usable_pairs_df.loc[lang_candidates, "movie_id"].ne(movie_id).to_numpy()]
        else:
            cross_candidates = np.array([], dtype=int)

        for _ in range(negatives_cross_movie):
            if cross_candidates.size == 0:
                break
            neg_idx = int(rng.choice(cross_candidates))
            similarity_records.append({
                "movie_id": movie_id,
                "target_language": lang,
                "segment_id_source": int(segment_id),
                "segment_id_target": int(usable_pairs_df.at[neg_idx, "segment_id"]),
                "text_source": row["text_source"],
                "text_target": usable_pairs_df.at[neg_idx, "text_target"],
                "label": 0
            })

    similarity_pairs_df = pd.DataFrame(similarity_records)
    similarity_pairs_df = similarity_pairs_df.sample(frac=1, random_state=random_seed).reset_index(drop = True)
    return similarity_pairs_df

In [None]:
class timer:
    def __init__(self, label: str):
        self.label = label
        self.start = None

    def __enter__(self):
        self.start = perf_counter()
        return self

    def __exit__(self, exc_type, exc, tb):
        elapsed = perf_counter() - self.start
        print(f"{self.label}: {elapsed:.2f} s")

In [None]:
with timer("build similarity_pairs_df FULL DATASET"):
    similarity_pairs_df = build_similarity_pairs_multilingual(
        full_subtitle_pairs_df=full_subtitle_pairs_df,
        negatives_intra_movie = 1,
        negatives_cross_movie = 1,
        random_seed = 42,
        max_rows = None
    )

In [None]:
#full_subtitle_pairs_df.to_excel('full_subtitle_pairs.xlsx')

In [None]:
del full_subtitle_pairs_df

In [None]:
gc.collect()

In [None]:
similarity_pairs_df.shape

In [None]:
# backup
similarity_pairs_df.to_parquet(
    "similarity_pairs_df.parquet",
    index = False,
    compression = "snappy"
)

In [None]:
similarity_pairs_df["label"].value_counts()

In [None]:
similarity_pairs_df["target_language"].nunique()

In [None]:
def split_by_movie(similarity_pairs_df, train_ratio = 0.8, val_ratio = 0.1, random_seed = 42):
    rng = np.random.default_rng(random_seed)

    movie_ids = similarity_pairs_df["movie_id"].unique()
    rng.shuffle(movie_ids)

    num_movies = len(movie_ids)
    train_cut = int(num_movies * train_ratio)
    val_cut = int(num_movies * (train_ratio + val_ratio))

    train_movies = set(movie_ids[:train_cut])
    val_movies = set(movie_ids[train_cut:val_cut])
    test_movies = set(movie_ids[val_cut:])

    train_pairs_df = similarity_pairs_df[similarity_pairs_df["movie_id"].isin(train_movies)].reset_index(drop=True)
    val_pairs_df = similarity_pairs_df[similarity_pairs_df["movie_id"].isin(val_movies)].reset_index(drop=True)
    test_pairs_df = similarity_pairs_df[similarity_pairs_df["movie_id"].isin(test_movies)].reset_index(drop=True)

    return train_pairs_df, val_pairs_df, test_pairs_df

In [None]:
with timer("split train/val/test by movie"):
    train_pairs_df, val_pairs_df, test_pairs_df = split_by_movie(
        similarity_pairs_df,
        train_ratio=0.8,
        val_ratio=0.1,
        random_seed=42
    )

In [None]:
train_pairs_df.shape, val_pairs_df.shape, test_pairs_df.shape

In [None]:
# How many films are there in each split?
train_pairs_df["movie_id"].nunique(), val_pairs_df["movie_id"].nunique(), test_pairs_df["movie_id"].nunique()

In [None]:
# Normalized label distribution in each split
train_pairs_df["label"].value_counts(normalize = True), val_pairs_df["label"].value_counts(normalize = True), test_pairs_df["label"].value_counts(normalize = True)

In [None]:
# backup splits
#
with timer('save splits parquet'):
    train_pairs_df.to_parquet('train_pairs_df.parquet', index = False, compression = "snappy")
    val_pairs_df.to_parquet('val_pairs_df.parquet', index = False, compression = "snappy")
    test_pairs_df.to_parquet('test_pairs_df.parquet', index = False, compression = "snappy")

__Char-level TextVectorization__

Word-level tokenization would require a separate pipeline for each language; therefore, character-level vectorization is adopted.

In [None]:
print(tf.config.list_physical_devices("GPU"))

In [None]:
max_char_length = 250
max_char_vocab_size = 200

In [None]:
char_vectorizer = layers.TextVectorization(
    standardize="lower",
    split="character",
    output_mode="int",
    output_sequence_length=max_char_length,
    max_tokens=max_char_vocab_size
)

In [None]:
with timer("char_vectorizer.adapt"):
    train_source_text = tf.constant(train_pairs_df["text_source"].astype(str).values)
    train_target_text = tf.constant(train_pairs_df["text_target"].astype(str).values)

    train_text_for_vocab = tf.data.Dataset.from_tensor_slices(
        tf.concat([train_source_text, train_target_text], axis=0)
    )
    char_vectorizer.adapt(train_text_for_vocab.batch(2048))

In [None]:
char_vocab_size = len(char_vectorizer.get_vocabulary())

In [None]:
char_vocab_size

In [None]:
def make_tf_dataset(pairs_df, batch_size = 256, shuffle = True):
    source_text = tf.constant(pairs_df["text_source"].astype(str).values)
    target_text = tf.constant(pairs_df["text_target"].astype(str).values)
    labels = tf.constant(pairs_df["label"].astype(int).values)

    source_ids = char_vectorizer(source_text)
    target_ids = char_vectorizer(target_text)

    ds = tf.data.Dataset.from_tensor_slices(((source_ids, target_ids), labels))
    if shuffle:
        ds = ds.shuffle(buffer_size=min(len(pairs_df), 20000), reshuffle_each_iteration=True)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

In [None]:
with timer("build tf.data datasets"):
    train_ds = make_tf_dataset(train_pairs_df, batch_size = 256, shuffle = True)
    val_ds = make_tf_dataset(val_pairs_df, batch_size = 256, shuffle = False)
    test_ds = make_tf_dataset(test_pairs_df, batch_size = 256, shuffle = False)

In [None]:
for (source_batch, target_batch), label_batch in train_ds.take(1):
    print(source_batch.shape, target_batch.shape, label_batch.shape)

__Siamese BiLSTM__

In [None]:
embedding_dim = 64
lstm_units = 64

In [None]:
def build_shared_encoder(char_vocab_size, max_char_length, embedding_dim, lstm_units):
    text_input = keras.Input(shape=(max_char_length,), dtype="int32", name="text_ids")

    x = layers.Embedding(
        input_dim=char_vocab_size,
        output_dim=embedding_dim,
        mask_zero=True,
        name="char_embedding"
    )(text_input)

    x = layers.Bidirectional(
        layers.LSTM(lstm_units, name="lstm"),
        name="bilstm"
    )(x)

    encoder_model = keras.Model(text_input, x, name="shared_encoder")
    return encoder_model

In [None]:
shared_encoder = build_shared_encoder(
    char_vocab_size=char_vocab_size,
    max_char_length=max_char_length,
    embedding_dim=embedding_dim,
    lstm_units=lstm_units
)

In [None]:
source_input = keras.Input(shape=(max_char_length,), dtype="int32", name="source_ids")
target_input = keras.Input(shape=(max_char_length,), dtype="int32", name="target_ids")

source_vec = shared_encoder(source_input)
target_vec = shared_encoder(target_input)

abs_diff = layers.Lambda(lambda tensors: tf.abs(tensors[0] - tensors[1]), name="abs_diff")([source_vec, target_vec])

mult = layers.Multiply(name="elementwise_product")([source_vec, target_vec])

combined = layers.Concatenate(name="combine_features")([abs_diff, mult])
combined = layers.Dense(128, activation="relu", name="dense_128")(combined)
combined = layers.Dropout(0.2, name="dropout_02")(combined)

output = layers.Dense(1, activation="sigmoid", name="similarity")(combined)

In [None]:
siamese_lstm_model = keras.Model(
    inputs=[source_input, target_input],
    outputs=output,
    name="siamese_bilstm_similarity"
)

In [None]:
siamese_lstm_model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss="binary_crossentropy",
    metrics=[keras.metrics.BinaryAccuracy(name="accuracy"), keras.metrics.AUC(name="auc")]
)

In [None]:
siamese_lstm_model.summary()

In [None]:
class TqdmProgressCallback(Callback):

    def on_train_begin(self, logs=None):
        self.epochs = self.params["epochs"]
        self.steps_per_epoch = self.params["steps"]
        self.epoch_bar = tqdm(total=self.epochs, desc="epochs", position=0)

    def on_epoch_begin(self, epoch, logs=None):
        self.batch_bar = tqdm(
            total=self.steps_per_epoch,
            desc=f"epoch {epoch+1}",
            position=1,
            leave=False
        )

    def on_train_batch_end(self, batch, logs=None):
        self.batch_bar.update(1)

    def on_epoch_end(self, epoch, logs=None):
        self.batch_bar.close()
        self.epoch_bar.update(1)

    def on_train_end(self, logs=None):
        self.epoch_bar.close()

In [None]:
early_stopping = keras.callbacks.EarlyStopping(
    monitor = "val_auc",
    mode = "max",
    patience = 2,
    restore_best_weights = True
)

In [None]:
with timer("training siamese_lstm_model"):
    history = siamese_lstm_model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=5,
        callbacks=[
            early_stopping,
            TqdmProgressCallback()
        ],
        verbose = 0  # otherwise it conflicts with tqdm...
    )

In [None]:
import json
with open("history.json", "w") as f:
    json.dump(history.history, f)

In [None]:
history_df = pd.DataFrame(history.history)
history_df.to_csv("history.csv", index=False)

In [None]:
siamese_lstm_model.save("siamese_lstm_model.keras")

In [None]:
for metric in [c for c in history_df.columns if c.startswith("val_")]:
    pass  # solo per vedere cosa c'è

plt.figure()
if "loss" in history_df:
    plt.plot(history_df["loss"], label="loss")
if "val_loss" in history_df:
    plt.plot(history_df["val_loss"], label="val_loss")
plt.legend()
plt.title("Loss per epoch")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()

The model shows slight overfitting.

In [None]:
plt.figure()
if "auc" in history_df:
    plt.plot(history_df["auc"], label="auc")
if "val_auc" in history_df:
    plt.plot(history_df["val_auc"], label="val_auc")
plt.legend()
plt.title("AUC per epoch")
plt.xlabel("Epoch")
plt.ylabel("AUC")
plt.show()

In [None]:
history.history.keys(), {k: history.history[k][-1] for k in history.history}

In [None]:
with timer("evaluate on test"):
    test_metrics = siamese_lstm_model.evaluate(test_ds, return_dict=True)

test_metrics

In [None]:
y_true_list = []
y_score_list = []

for (source_ids_batch, target_ids_batch), label_batch in test_ds:
    score_batch = siamese_lstm_model.predict(
        [source_ids_batch, target_ids_batch],
        verbose=0
    ).ravel()
    y_true_list.append(label_batch.numpy())
    y_score_list.append(score_batch)

y_true = np.concatenate(y_true_list)
y_score = np.concatenate(y_score_list)

In [None]:
threshold = 0.5
y_pred = (y_score >= threshold).astype(int)

cm = confusion_matrix(y_true, y_pred)

In [None]:
cm

In [None]:
print(classification_report(y_true, y_pred, digits=4))

**Test set** (210,768 examples):

Accuracy = 0.8154
AUC = 0.8591
Loss = 0.4212

An AUC of 0.859 indicates good discriminative ability.

The accuracy of 0.815 is consistent with the class imbalance, as negative examples outnumber positive ones.

**Class 0** (non-match)

Precision = 0.8246
Recall = 0.9164
F1 = 0.8681

The model performs well in identifying non-matches, as reflected by the high recall.

**Class 1** (match)

Precision = 0.7895
Recall = 0.6165
F1 = 0.6923

When the model predicts a match, it is generally reliable (reasonably high precision), but it misses a portion of true matches (lower recall).

Therefore, we now select the optimal decision threshold:

In [None]:
precision, recall, thresholds = precision_recall_curve(y_true, y_score)
f1 = (2 * precision * recall) / (precision + recall + 1e-12)

best_idx = int(np.nanargmax(f1))
best_threshold = thresholds[max(best_idx - 1, 0)]

In [None]:
best_threshold

Repeat the evaluation with the updated threshold

In [None]:
y_pred_best = (y_score >= best_threshold).astype(int)
print("best_threshold =", best_threshold)
print(classification_report(y_true, y_pred_best, digits=4))

The results improved with the updated threshold.

The recall for Class 1 has increased.
The precision for Class 1 has decreased slightly, but remains acceptable.
The F1 score for Class 1 has improved.

In [None]:
with open("best_threshold.txt", "w") as f:
    f.write(str(best_threshold))

In [None]:
test_pairs_with_scores_df = test_pairs_df.copy()
test_pairs_with_scores_df["score"] = y_score
test_pairs_with_scores_df["pred"] = (test_pairs_with_scores_df["score"] >= best_threshold).astype(int)

In [None]:
test_pairs_with_scores_df.head()

In [None]:
test_pairs_with_scores_df.to_parquet("test_pairs_with_scores.parquet")

In [None]:
test_pairs_with_scores_df.shape

In [None]:
test_metrics

In [None]:
with open("test_metrics.json", "w", encoding="utf-8") as f:
    json.dump(test_metrics, f, indent = 2)

In [None]:
history

In [None]:
with open("training_history.json", "w", encoding="utf-8") as f:
    json.dump(history.history, f, indent=2)

In [None]:
from google.colab import files

files.download("siamese_lstm_model.keras")
files.download("test_metrics.json")
files.download("training_history.json")
files.download("best_threshold.txt")
files.download("test_pairs_with_scores.parquet")