In [None]:
%load_ext autoreload
%autoreload 2

# Test and use the library

In [None]:
from pathlib import Path
from typing import cast

import matplotlib.pyplot as plt
import numpy as np
import spacy
import torch
from scipy.signal.windows import triang
from sentence_transformers import SentenceTransformer, util
from transformers.pipelines import pipeline
from transformers.pipelines.text2text_generation import TranslationPipeline
from sklearn.metrics.pairwise import cosine_similarity

from cached_pipe import TranslationPipelineCache
from epub import Chapter, EPub
from utils import enumerate_sent, get_ebook_folder, spacy_load_cached, sentence_encode_np


### Load NLP objects

In [None]:
lts = ["en", "fr"]
lts_pair = list(zip(lts, lts[::-1]))
lts_pair_tags = [f"{lt}_{lt_other}" for lt, lt_other in lts_pair]
lts, lts_pair, lts_pair_tags


In [None]:
cache_dir = Path("~/.cache/spacy_my_models").expanduser()

nlp = {
    # "en": spacy.load("en_core_web_md"),
    "en": spacy_load_cached("en_core_web_md", cache_dir),
    # "fr": spacy.load("fr_core_news_md"),
    "fr": spacy_load_cached("fr_core_news_md", cache_dir),
}


In [None]:
# TODO:
# Should export env variables to avoid needing an internet connection.
# https://huggingface.co/docs/transformers/installation#offline-mode


In [None]:
# load the pipeline only if needed, both models take 3.4Gb of GPU memory :(
load_pipeline = {
    "en": False,
    "fr": False,
}

pipe = {
    f"{lt}_{lt_other}": cast(
        TranslationPipeline,
        pipeline("translation", model=f"Helsinki-NLP/opus-mt-{lt}-{lt_other}"),
    )
    if load_pipeline[lt]
    else None
    for lt, lt_other in lts_pair
}


In [None]:
# sentence_transformer = SentenceTransformer("paraphrase-MiniLM-L6-v2")
sentence_transformer = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")


## Load cached translator pipeline

In [None]:
cache_file_path = {
    f"{lt}_{lt_other}": Path(f"translated_{lt}_{lt_other}.json")
    for lt, lt_other in lts_pair
}


In [None]:
pipe_cache = {
    (lt_pair := f"{lt}_{lt_other}"): TranslationPipelineCache(
        pipe[lt_pair], cache_file_path[lt_pair], lt, lt_other
    )
    for lt, lt_other in lts_pair
}


In [None]:
pipe_cache["en_fr"]("Let's try this cool way to create a callable class.")


## Load epubs

In [None]:
ebook_folder = get_ebook_folder()
epub_path = {
    "fr": ebook_folder / "Gaston_Leroux_-_Le_Mystere_de_la_chambre_jaune.epub",
    "en": ebook_folder / "mystery_yellow_room.epub",
}
print(epub_path)


In [None]:
epub = {
    lt: EPub(epub_path[lt], nlp, pipe_cache, lt, lt_other)
    for lt, lt_other in zip(lts, lts[::-1])
}


In [None]:
# ### Translate a manazza and check for similarity with spacy

# sent_fr = epub["fr"].chapters[0].paragraphs[0].sents_orig[0]
# sent_fr.text

# sent_fr_to_en = pipe["fr_en"](sent_fr.text)
# sent_fr_to_en

# doc_fr_to_en = nlp["en"](sent_fr_to_en[0]["translation_text"])
# print(type(doc_fr_to_en))
# doc_fr_to_en

# sent_en = epub["en"].chapters[0].paragraphs[2].sents_orig[0]
# print(type(sent_en))
# sent_en

# doc_fr_to_en.similarity(sent_en)

# sent_en2 = epub["en"].chapters[0].paragraphs[2].sents_orig[2]
# print(sent_en2)

# doc_fr_to_en.similarity(sent_en2)


### Iterate over sentences

In [None]:
ch_id = 0
ch_delta = 0

ch_en = epub["en"].chapters[ch_id + ch_delta]
ch_fr = epub["fr"].chapters[ch_id]

# sent_text_en = []
# sent_doc_en = []
# for k, sent in enumerate_sent(ch_en, which_sent="orig"):
#     text_en = sent.text
#     # print(k, text_en)
#     sent_text_en.append(text_en)
#     sent_doc_en.append(sent)

# sent_text_fr_tran = []
# sent_doc_fr_tran = []
# for k, sent in enumerate_sent(ch_fr, which_sent="tran"):
#     text_fr_tran = sent.text
#     sent_text_fr_tran.append(text_fr_tran)
#     sent_doc_fr_tran.append(sent)

sent_text_en = ch_en.sents_text_orig
sent_text_fr_tran = ch_fr.sents_text_tran

print(sent_text_en[4])
print(ch_en.sents_text_orig[4])
print(sent_text_fr_tran[2])


## Sentence encoder used for similarity

In [None]:
# sentence_transformer = SentenceTransformer("paraphrase-MiniLM-L6-v2")
# sentence_transformer = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

# Our sentences we like to encode
sentences = [
    "This framework generates embeddings for each input sentence",
    "Sentences are passed as a list of string.",
    "The quick brown fox jumps over the lazy dog.",
]

# Sentences are encoded by calling sentence_transformer.encode()
embeddings = sentence_transformer.encode(sentences)

# # Print the embeddings
# for sentence, embedding in zip(sentences, embeddings):
#     print("Sentence:", sentence)
#     print("Embedding:", embedding)
#     print("")


### Convert sents to embedding and compute the distance

In [None]:
# sent_num_en = len(sent_text_en)
# enc_en = cast(
#     torch.Tensor,
#     sentence_transformer.encode(sent_text_en, convert_to_tensor=True),
# )
# print("en", enc_en.shape, sent_num_en, enc_en[0].shape)

# sent_num_fr = len(sent_text_fr_tran)
# enc_fr_tran = cast(
#     torch.Tensor,
#     sentence_transformer.encode(sent_text_fr_tran, convert_to_tensor=True),
# )
# print("fr", enc_fr_tran.shape, sent_num_fr, enc_fr_tran[0].shape)

# sent_num_en, ch_en.sents_num
# # chop off the horrible chapter 0 beginning
# sent_text_en_cheat = sent_text_en[3:]
# sent_num_en_cheat = len(sent_text_en_cheat)
# enc_en_cheat = cast(
#     torch.Tensor,
# sentence_transformer.encode(sent_text_en_cheat, convert_to_tensor=True),
# )
# # sim = np.zeros((sent_num_en, sent_num_fr))
# # for i in range(sent_num_en):
# #     for ii in range(sent_num_fr):
# #         sim[i][ii] = util.pytorch_cos_sim(enc_en[i], enc_fr_tran[ii])
# #         # sim[i][ii] = util.pytorch_cos_sim(enc_en[i], enc_en[ii])
# # plt.imshow(sim)

# sim_torch = util.pytorch_cos_sim(enc_en, enc_fr_tran)
# sim = sim_torch.detach().cpu().numpy()


# enc_en_np = enc_en.detach().cpu().numpy()
# enc_fr_tran_np = enc_fr_tran.detach().cpu().numpy()
# sim_np = cosine_similarity(enc_en_np, enc_fr_tran_np)


In [None]:
# do everything directly in numpy
enc_en_np_direct = sentence_encode_np(sentence_transformer, sent_text_en)
enc_fr_np_direct = sentence_encode_np(sentence_transformer, sent_text_fr_tran)
sim = cosine_similarity(enc_en_np_direct, enc_fr_np_direct)


In [None]:
plt.imshow(sim)
plt.title(f"Similarity *en* vs *fr_translated*")
plt.ylabel("en")
plt.xlabel("fr_tran")
plt.savefig(f"Similarity_en_vs_fr_translated_{ch_delta}.pdf")
plt.show()


In [None]:
sent_num_en = ch_en.sents_num
sent_num_fr = ch_fr.sents_num

In [None]:
win_len = 20
ratio = sent_num_en / sent_num_fr
print(f"{ratio=}")
sim_center = np.zeros((sent_num_en, win_len * 2 + 1))
for i in range(sent_num_en):
    # the similarity of this english sent to all the translated ones
    this_sent_sim = sim[i]
    # find the center rescaled because there are different number of sent in the two chapters
    ii = int(i / ratio)
    if ii < win_len:
        ii = win_len
    if ii > sent_num_fr - (win_len + 1):
        ii = sent_num_fr - (win_len + 1)
    # the chopped similarity array
    some_sent_sim = this_sent_sim[ii - win_len : ii + win_len + 1]
    sim_center[i] = some_sent_sim

fig, ax = plt.subplots()
ax.imshow(sim_center, aspect="auto")
ax.set_title(f"Similarity en vs fr_tran, shifted")
ax.set_ylabel("en")
ax.set_xlabel("fr_tran shifted")

# I hate matplotlib

# overlap
# xticks_lab = list(range(-win_len, win_len + 1))
# xticks_pos = list(range(win_len * 2))

# works but I hate it

# xticks_lab = list(range(-win_len, win_len + 1, 3))
# xticks_pos = list(range(0, win_len * 2 + 1, 3))

step = 3
xticks_lab = list(range(-step, -win_len, -step))[::-1] + list(range(0, win_len, step))
min_lab = xticks_lab[0]
min_shift = win_len + min_lab
xticks_pos = list(range(min_shift, win_len * 2 + 1, step))

ax.set_xticks(xticks_pos)
ax.set_xticklabels(xticks_lab)

print(ax.xaxis.get_ticklabels())
print(ax.get_xticks())

# ax.xaxis.set_major_locator(plt.MaxNLocator(10))

plt.savefig(f"Similarity_en_vs_fr_tran_shifted_{ch_delta}.pdf")
plt.show()


### Analyze the similarity scores

In [None]:
all_max = []

all_good_max = []
all_good_i = []

ratio = sent_num_en / sent_num_fr
# ratio = sent_num_en_cheat / sent_num_fr

for i in range(sent_num_en):
    # for i in range(sent_num_en_cheat):
    # for i in range(40):

    # the similarity of this english sent to all the translated ones
    this_sent_sim = sim[i]
    # this_sent_sim = sim_cheat[i]

    # find the center rescaled because there are different number of sent in the two chapters
    ii = int(i / ratio)
    # the chopped similarity array
    win_left = max(0, ii - win_len)
    win_right = min(sent_num_fr, ii + win_len + 1)
    some_sent_sim = this_sent_sim[win_left:win_right]
    # print(f"{i} {ii} {ii-win_len} {ii+win_len+1} {some_sent_sim}")

    max_id = some_sent_sim.argmax() + win_left
    all_max.append(max_id)

    if len(ch_en.sents_doc_orig[i]) > 4 and len(ch_fr.sents_doc_tran[max_id]) > 4:
        # if len(sent_doc_en[i]) > 4 and len(sent_doc_fr_tran[max_id]) > 4:
        all_good_i.append(i)
        all_good_max.append(max_id)
    else:
        # print(f"skipping {sent_doc_en[i]} or {sent_doc_fr_tran[max_id]}")
        print(f"skipping {ch_en.sents_doc_orig[i]} or {ch_fr.sents_doc_tran[max_id]}")

    if 0 <= i < 6:
        print(max_id)

        print(sent_text_en[i][: 120 * 2])
        # print(sent_text_en_cheat[i][: 120 * 2])

        print(sent_text_fr_tran[max_id][: 120 * 2])

        plt.bar(range(win_left, win_right), some_sent_sim)
        plt.axvline(ii)
        plt.axvline(max_id, c="r")
        title = f"{i} {epub['en'].chapters[ch_id].sent_to_parsent[i]}"
        title += f" - {ii} {epub['fr'].chapters[ch_id].sent_to_parsent[ii]}"
        title += f" - {max_id} {epub['fr'].chapters[ch_id].sent_to_parsent[max_id]}"
        plt.title(title)
        plt.show()


In [None]:
plt.plot(all_max)
plt.plot([0, sent_num_en], [0, sent_num_fr])


In [None]:
plt.scatter(all_good_i, all_good_max, s=0.1)

for par_id, par in enumerate(ch_en.paragraphs):
    par_en_start = ch_en.parsent_to_sent[(par_id, 0)]
    plt.axvline(par_en_start, linewidth=0.15)


for par_id, par in enumerate(ch_fr.paragraphs):
    par_fr_start = ch_fr.parsent_to_sent[(par_id, 0)]
    plt.axhline(par_fr_start, linewidth=0.15)

plt.plot([0, sent_num_en], [0, sent_num_fr], linewidth=0.3)

fit_coeff = np.polyfit(all_good_i, all_good_max, 1)
fit_func = np.poly1d(fit_coeff)
fit_y = fit_func([0, sent_num_en])
plt.plot([0, sent_num_en], fit_y)

fig = plt.gcf()
fig.set_size_inches(15, 10)


In [None]:
triang_height = 1
triang_filt = triang(win_len * 4 + 1) * triang_height + (1 - triang_height)
triang_center = win_len * 2 + 1
print(f"{sent_num_en=} {sent_num_fr=}")

all_max_rescaled = []
all_good_i_rescaled = []
all_good_max_rescaled = []

for i in range(sent_num_en):

    # the similarity of this english sent to all the translated ones
    this_sent_sim = sim[i]

    # find the center rescaled because there are different number of sent in the two chapters
    ii = int(i / ratio)

    # the chopped similarity array, centered on ii
    win_left = max(0, ii - win_len)
    win_right = min(sent_num_fr, ii + win_len + 1)
    some_sent_sim = this_sent_sim[win_left:win_right]

    # the fit along the line
    ii_fit = fit_func([i])[0]
    ii_fit = int(ii_fit)
    if ii_fit < 0:
        ii_fit = 0
    if ii_fit >= sent_num_fr:
        ii_fit = sent_num_fr - 1
    # print(f"{i=} {ii=} {ii_fit=}")

    # chop the filter, centering the apex on the fitted line ii_fit
    # the apex is in win_len*2+1
    # the similarity is centered on ii
    # the shifted filter is still win_len*2+1 long
    delta_ii_fit = ii - ii_fit
    filt_edge_left = triang_center + delta_ii_fit - win_len - 1
    filt_edge_right = triang_center + delta_ii_fit + win_len + 0
    triang_filt_shifted = triang_filt[filt_edge_left:filt_edge_right]

    # chop the filter as well, if the similarity is near the border
    if ii < win_len:
        triang_filt_chop = triang_filt_shifted[win_len - ii :]
    elif ii > sent_num_fr - (win_len + 1):
        left_edge = sent_num_fr - (win_len + 1)
        triang_filt_chop = triang_filt_shifted[: -(ii - left_edge)]
    else:
        triang_filt_chop = triang_filt_shifted

    # print( f"{i=} {ii=} {ii-win_len=} {ii+win_len+1=} {len(some_sent_sim)=} {len(triang_filt_chop)=}")
    assert len(triang_filt_chop) == len(some_sent_sim)

    # rescale the similarity
    sim_rescaled = some_sent_sim * triang_filt_chop

    max_id = all_max[i]

    max_id_rescaled = sim_rescaled.argmax() + win_left
    all_max_rescaled.append(max_id_rescaled)

    # if len(sent_doc_en[i]) > 4 and len(sent_doc_fr_tran[max_id_rescaled]) > 4:
    if len(ch_en.sents_doc_orig[i]) > 4 and len(ch_fr.sents_doc_tran[max_id_rescaled]) > 4:
        all_good_i_rescaled.append(i)
        all_good_max_rescaled.append(max_id_rescaled)

    if max_id != max_id_rescaled or False:

        diff_tag = " <><><><>" if max_id != max_id_rescaled else ""
        print(f"{i=} {ii=} {ii_fit=} {max_id=} {max_id_rescaled=}{diff_tag}")
        print(sent_text_en[i][: 120 * 2])
        print(sent_text_fr_tran[max_id_rescaled][: 120 * 2])

        # if -22 <= i < 260:
        x_ii = range(win_left, win_right)
        plt.bar(x_ii, some_sent_sim)
        plt.bar(x_ii, sim_rescaled)
        plt.plot(x_ii, triang_filt_chop)
        plt.axvline(ii)
        plt.axvline(ii_fit, c="r")
        plt.axvline(max_id_rescaled + 0.1, c="g")
        title = f"{i} {epub['en'].chapters[ch_id].sent_to_parsent[i]}"
        title += f" - {ii} {epub['fr'].chapters[ch_id].sent_to_parsent[ii]}"
        title += f" - {ii_fit} {epub['fr'].chapters[ch_id].sent_to_parsent[ii_fit]}"
        plt.title(title)
        plt.show()


In [None]:
plt.scatter(all_good_i, all_good_max, s=0.9)
# plt.scatter(all_good_i_rescaled, all_good_max_rescaled, s=0.9, marker="x")
plt.plot(all_good_i_rescaled, all_good_max_rescaled, linewidth=0.9, c="C1")

fig = plt.gcf()
fig.set_size_inches(18, 10)


### InterPOLLO

In [None]:
# list(zip(all_good_i_rescaled, all_good_max_rescaled))

is_ooo = []

for j, (good_i, good_max_rescaled) in enumerate(
    zip(all_good_i_rescaled, all_good_max_rescaled)
):

    # check for out of order ids
    ooo = False

    if j == 0:
        # only check to the right for the first value
        if good_max_rescaled > all_good_max_rescaled[j + 1]:
            ooo = True
    elif j == len(all_good_max_rescaled) - 1:
        # only check to the left for the last value
        if good_max_rescaled < all_good_max_rescaled[j - 1]:
            ooo = True
    else:
        if (
            good_max_rescaled > all_good_max_rescaled[j + 1]
            or good_max_rescaled < all_good_max_rescaled[j - 1]
        ):
            ooo = True

    if ooo:
        print(j, good_i, good_max_rescaled)

    is_ooo.append(ooo)


In [None]:
list(zip(all_good_i_rescaled, all_good_max_rescaled, is_ooo))


In [None]:
# print(sent_text_en[i][: 120 * 2])
# print(sent_text_fr_tran[max_id_rescaled][: 120 * 2])

# for s in sent_text_en:
#     print(s)
#     print()

par_id_old = 0
for k, sent in enumerate_sent(ch_en, which_sent="orig"):
    par_id, sent_id = k
    if par_id != par_id_old:
        print()
        par_id_old = par_id

    text_en = sent.text
    print(k, text_en)


par_id_old = 0
for k, sent in enumerate_sent(ch_fr, which_sent="orig"):
    par_id, sent_id = k
    if par_id != par_id_old:
        print()
        par_id_old = par_id

    text_fr = sent.text
    print(k, text_fr)


## Use similarity to pair up sentences

In [None]:
sim[0].shape, sim[:, 0].shape, sim.shape
