In [None]:
!python -m spacy download en_core_web_md
!python -m spacy download it_core_news_md

In [None]:
import spacy
import random
from tqdm import tqdm
from collections import Counter
from tensorflow.data import Dataset
import tensorflow as tf
import keras
from keras.layers import GRUCell, Embedding, Attention
import json

In [None]:
!wget https://www.manythings.org/anki/ita-eng.zip

--2023-09-14 20:22:15--  https://www.manythings.org/anki/ita-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8220355 (7.8M) [application/zip]
Saving to: ‘ita-eng.zip’


2023-09-14 20:22:16 (17.9 MB/s) - ‘ita-eng.zip’ saved [8220355/8220355]



In [None]:
!unzip ita-eng.zip

Archive:  ita-eng.zip
  inflating: ita.txt                 
  inflating: _about.txt              


In [None]:
UNK = "<UNK>"
BOS = "<BOS>"
EOS = "<EOS>"
PAD = "<PAD>"

In [None]:
PAD_IDX = 0
UNK_IDX = 1
BOS_IDX = 2
EOS_IDX = 3

# tokenizzazione

In [None]:
class Tokenizer:
    def __init__(self, language):
        self.language = language
        if language == "it":
            self.nlp = spacy.load("it_core_news_md")
        else:
            self.nlp = spacy.load("en_core_web_md")

    def tokenize(self, text):
        return [token.text.lower() for token in self.nlp(text)]

# vocabolario

In [None]:
def build_vocabularies(filepath, num_rows, vocab_size):

    # counter
    en_counter = Counter()
    it_counter = Counter()

    # tokenizzatori
    en_tokenizer = Tokenizer("en")
    it_tokenizer = Tokenizer("it")

    # vocabolari
    en_vocab = {"<PAD>": 0, "<UNK>": 1, "<BOS>": 2, "<EOS>": 3}
    it_vocab = {"<PAD>": 0, "<UNK>": 1, "<BOS>": 2, "<EOS>": 3}

    counter = 0
    pbar = tqdm()
    with open(filepath, "r") as f:
        while (line := f.readline()) is not None:
            en_sentence, it_sentence, *_ = line.split("\t")
            en_tokenized = en_tokenizer.tokenize(en_sentence)
            it_tokenized = it_tokenizer.tokenize(it_sentence)

            # update dei Counter
            en_counter.update(en_tokenized)
            it_counter.update(it_tokenized)

            pbar.update(1)

            counter += 1
            if num_rows and counter >= num_rows:
                break

    # update dei dizionari
    most_common_en = [item[0] for item in en_counter.most_common(vocab_size)]
    most_common_it = [item[0] for item in it_counter.most_common(vocab_size)]

    en_vocab |= {word: idx for idx, word in enumerate(most_common_en, start=4)}
    it_vocab |= {word: idx for idx, word in enumerate(most_common_it, start=4)}

    en_vocab_inv = {idx: word for word, idx in en_vocab.items()}
    it_vocab_inv = {idx: word for word, idx in it_vocab.items()}

    return en_vocab, en_vocab_inv, it_vocab, it_vocab_inv



In [None]:
en_vocab, en_vocab_inv, it_vocab, it_vocab_inv = build_vocabularies(filepath="ita.txt", num_rows=100_000, vocab_size=10_000)

# Dump vocabularies

In [None]:
with open("en_vocab.json", "w") as f:
    json.dump(en_vocab, f)

with open("en_vocab_inv.json", "w") as f:
    json.dump(en_vocab_inv, f)

with open("it_vocab.json", "w") as f:
    json.dump(it_vocab, f)

with open("it_vocab_inv.json", "w") as f:
    json.dump(it_vocab_inv, f)

# Load vocabularies

In [None]:
with open("en_vocab.json") as f:
    en_vocab = json.load(f)

with open("en_vocab_inv.json") as f:
    en_vocab_inv = json.load(f)

with open("it_vocab.json") as f:
    it_vocab = json.load(f)

with open("it_vocab_inv.json") as f:
    it_vocab_inv = json.load(f)

In [None]:
it_vocab_inv = {int(k): v for k, v in it_vocab_inv.items()}
en_vocab_inv = {int(k): v for k, v in en_vocab_inv.items()}

# split train-validation-test

In [None]:
def split_file(filepath):
    with open(filepath, "r") as f:
        lines = f.readlines()

    num_rows = len(lines)
    shuffled_row_idxs = random.sample(list(range(num_rows)), k=num_rows)

    train_idxs = shuffled_row_idxs[:int(num_rows*0.6)]
    val_idxs = shuffled_row_idxs[int(num_rows*0.6):int(num_rows*0.8)]
    test_idxs = shuffled_row_idxs[int(num_rows*0.8):]

    # train
    with open("train.txt", "w") as f:
        f.writelines([lines[idx] for idx in train_idxs])

    # validation
    with open("val.txt", "w") as f:
        f.writelines([lines[idx] for idx in val_idxs])

    # test
    with open("test.txt", "w") as f:
        f.writelines([lines[idx] for idx in test_idxs])

In [None]:
split_file("ita.txt")

# Generatori

In [None]:
def dataset_generator(filepath, en_dict, it_dict, en_tokenizer, it_tokenizer):
    def gen():
        with open(filepath, "r") as f:
            while (line := f.readline()) is not None:
                en_sentence, it_sentence, *_ = line.split("\t")
                en_sentence_tokenized = en_tokenizer.tokenize(en_sentence)
                it_sentence_tokenized = it_tokenizer.tokenize(it_sentence)
                src_sentence_tokenized = en_sentence_tokenized + [EOS]
                tgt_sentence_in_tokenized = [BOS] + it_sentence_tokenized
                tgt_sentence_out_tokenized = it_sentence_tokenized.copy() + [EOS]

                src_sentence_encoded = [en_dict.get(token, en_dict[UNK]) for token in src_sentence_tokenized]
                tgt_sentence_in_encoded = [it_dict.get(token, it_dict[UNK]) for token in tgt_sentence_in_tokenized]
                tgt_sentence_out_encoded = [it_dict.get(token, it_dict[UNK]) for token in tgt_sentence_out_tokenized]

                yield (src_sentence_encoded, tgt_sentence_in_encoded), tgt_sentence_out_encoded

    return gen

In [None]:
# for (src, tgt_in), tgt_out in dataset_generator("val.txt", en_vocab, it_vocab, Tokenizer("en"), Tokenizer("it")):
#     print(src)
#     print(tgt_in)
#     print(tgt_out)
#     break

In [None]:
trainset = Dataset.from_generator(
    generator=dataset_generator("train.txt", en_vocab, it_vocab, Tokenizer("en"), Tokenizer("it")),
    output_signature=(
        (tf.TensorSpec(shape=(None,), dtype=tf.int32), tf.TensorSpec(shape=(None,), dtype=tf.int32)),
        tf.TensorSpec(shape=(None,), dtype=tf.int32))
)

valset = Dataset.from_generator(
    generator=dataset_generator("val.txt", en_vocab, it_vocab, Tokenizer("en"), Tokenizer("it")),
    output_signature=(
        (tf.TensorSpec(shape=(None,), dtype=tf.int32), tf.TensorSpec(shape=(None,), dtype=tf.int32)),
        tf.TensorSpec(shape=(None,), dtype=tf.int32))
)

testset = Dataset.from_generator(
    generator=dataset_generator("test.txt", en_vocab, it_vocab, Tokenizer("en"), Tokenizer("it")),
    output_signature=(
        (tf.TensorSpec(shape=(None,), dtype=tf.int32), tf.TensorSpec(shape=(None,), dtype=tf.int32)),
        tf.TensorSpec(shape=(None,), dtype=tf.int32))
)

In [None]:
trainset = trainset.shuffle(buffer_size=1000, reshuffle_each_iteration=True)
trainset = trainset.padded_batch(batch_size=32)

In [None]:
class TextEncoderDecoder:
    def __init__(self, en_vocab, en_vocab_inv, it_vocab, it_vocab_inv):
        self.en_vocab = en_vocab
        self.en_vocab_inv = en_vocab_inv
        self.it_vocab = it_vocab
        self.it_vocab_inv = it_vocab_inv
        self.nlp_it = spacy.load("it_core_news_md")
        self.nlp_en = spacy.load("en_core_web_md")

    def encode(self, text, language):
        vocab = self.en_vocab if language == "en" else self.it_vocab
        nlp = self.nlp_en if language == "en" else self.nlp_it
        tokenized_text = [token.text.lower() for token in nlp(text)]
        return [vocab.get(token, vocab[UNK]) for token in tokenized_text]

    def decode(self, coded_text, language):
        vocab_inv = self.en_vocab_inv if language == "en" else self.it_vocab_inv
        return [vocab_inv[code] for code in coded_text]

# Modello

In [None]:
class Encoder(keras.Model):
    def __init__(self, vocabulary_size, embedding_size, recurrent_layers, recurrent_units, **kwargs):
        """
        args
        ----
        - vocabulary_size (int): including special tokens (<BOS>, <EOS>, <UNK>)
        - embedding_size (int): dimensione dello spazio degli embedding

        """
        super().__init__(**kwargs)

        # embedding
        # 0 index mean padding
        self.embedding = Embedding(
            vocabulary_size, embedding_size, mask_zero=True)

        gru_cells = [GRUCell(recurrent_units) for _ in range(recurrent_layers)]

        stacked_cells = tf.keras.layers.StackedRNNCells(gru_cells)
        self.gru_layer = tf.keras.layers.RNN(stacked_cells, return_state=True, return_sequences=True)

    def call(self, data, training=None):
        x = self.embedding(data, training=training)
        output, *state = self.gru_layer(x, training=training)

        return output, state

In [None]:
class Decoder(keras.Model):
    def __init__(self, vocabulary_size, embedding_size, recurrent_layers, recurrent_units, attention=False, **kwargs):
        super().__init__(**kwargs)

        self.recurrent_layers = recurrent_layers
        self.recurrent_units = recurrent_units

        self.embedding = Embedding(
            vocabulary_size, embedding_size, mask_zero=True)

        gru_cells = [GRUCell(recurrent_units) for _ in range(recurrent_layers)]

        stacked_cells = tf.keras.layers.StackedRNNCells(gru_cells)
        self.gru_layer = tf.keras.layers.RNN(stacked_cells, return_sequences=True, return_state=True)

        self.dense = tf.keras.layers.Dense(vocabulary_size)

        if attention:
            self.attention = Attention(score_mode="dot")

    def call(self, target_in, encoder_output, encoder_state, training=None, max_sentence_length=None):
        # target_in.shape = batch x length
        # initial_state.shape = batch x recurrent_layers x recurrent_units
        # encoder_output.shape = batch x length x recurrent_units

        # Addestramento
        x = self.embedding(target_in, training=training)
        # data.shape = batch x length x embedding
        # concatena il contesto con l'input
        # x = tf.concat([x, tf.repeat(tf.expand_dims(encoder_output[:, -1, :], axis=1), repeats=x.shape[1], axis=1)], axis=2)

        output, *state = self.gru_layer(x, training=training, initial_state=encoder_state)
        return tf.keras.activations.softmax(self.dense(output)), state

    def generate(self, encoder_output, encoder_state, training=None, max_sentence_length=None):
        # Generazione
        batch_size = encoder_output.shape[0]
        x = tf.fill([batch_size, 1], BOS_IDX)
        state = encoder_state
        out_words_list = []
        for _ in range(max_sentence_length):
            # x.shape = batch_size x 1 x embedding_size
            x = self.embedding(x, training=training)
            # aggiunge il contesto
            # x = tf.concat([x, tf.expand_dims(encoder_output[:, -1, :], axis=1)], axis=2)
            output, *state = self.gru_layer(x, training=training, initial_state=state)
            # output_size = batch_size x 1 x embedding_size

            # trova i caratteri più probabili
            # probs.shape = batch_size x 1 x vocabulary_size
            probs = tf.keras.activations.softmax(self.dense(output))
            x = tf.argmax(probs, axis=-1)
            out_words_list.append(x.numpy().item())

        return out_words_list

In [None]:
MAX_SENTENCE_LENGTH = 20

In [None]:
class EncoderDecoder(keras.Model):
    def __init__(self, vocabulary_size, embedding_size, recurrent_layers, recurrent_units, **kwargs):
        super().__init__(**kwargs)

        self.vocabulary_size = vocabulary_size
        self.embedding_size = embedding_size
        self.recurrent_layers = recurrent_layers
        self.recurrent_units = recurrent_units


        self.encoder = Encoder(vocabulary_size, embedding_size,
                               recurrent_layers, recurrent_units)
        self.decoder = Decoder(vocabulary_size, embedding_size, recurrent_layers,
                               recurrent_units)


    def call(self, data, training=None, max_sentence_length=MAX_SENTENCE_LENGTH):
        # unpack data
        src_sentences, dst_sentences = data

        # encoder call
        # encoder_output.shape = batch x len_sentences x encoder_recurrent_units
        encoder_output, encoder_state = self.encoder(src_sentences, training=training)

        decoder_output, decoder_state = self.decoder(dst_sentences, encoder_output, encoder_state, training=training, max_sentence_length=max_sentence_length)

        return decoder_output

    def generate(self, data, training=None, max_sentence_length=MAX_SENTENCE_LENGTH):
        # unpack data
        src_sentences = data

        # encoder call
        # encoder_output.shape = batch x len_sentences x encoder_recurrent_units
        encoder_output, encoder_state = self.encoder(src_sentences, training=training)

        words = self.decoder.generate(encoder_output, encoder_state, training=training, max_sentence_length=max_sentence_length)

        return words

In [None]:
def custom_loss(y_true, y_pred):
    y_true_reshaped = tf.reshape(y_true, [-1])
    y_pred_reshaped = tf.reshape(y_pred, [-1, y_pred.shape[-1]])
    scc = tf.keras.losses.SparseCategoricalCrossentropy(
        reduction=tf.keras.losses.Reduction.NONE)
    results = scc(y_true_reshaped, y_pred_reshaped)
    mask = tf.cast(y_true_reshaped != 0, tf.float32)
    return tf.reduce_sum(results*mask) / tf.reduce_sum(mask)

# Addestramento

In [None]:
encoder_decoder = EncoderDecoder(vocabulary_size=10_000+4, embedding_size=128, recurrent_layers=1, recurrent_units=128)

In [None]:
encoder_decoder.compile(optimizer=keras.optimizers.Adam(), loss=custom_loss, run_eagerly=False)

In [None]:
encoder_decoder.fit(x=trainset, steps_per_epoch=100, epochs=100, initial_epoch=71)

Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78/100
Epoch 79/100
Epoch 80/100
Epoch 81/100
Epoch 82/100
Epoch 83/100
Epoch 84/100
Epoch 85/100
Epoch 86/100
Epoch 87/100
Epoch 88/100
Epoch 89/100
Epoch 90/100
Epoch 91/100
Epoch 92/100
Epoch 93/100
Epoch 94/100
Epoch 95/100
Epoch 96/100
Epoch 97/100
Epoch 98/100
Epoch 99/100
Epoch 100/100


<keras.src.callbacks.History at 0x7f7c1a76a470>

In [None]:
encoder_decoder.save_weights("weights_100.h5")

# Text generation

In [None]:
sentence = "i am a good guy and my home is beautiful"
nlp = spacy.load("en_core_web_md")
tokens = [t.text.lower() for t in nlp(sentence)]
idxs = [en_vocab.get(t, en_vocab[UNK]) for t in tokens]
translation = encoder_decoder.generate(tf.reshape(tf.constant(idxs), [1, -1]))
print(" ".join([it_vocab_inv[idx] for idx in translation]))

sono un insegnante di francese , non sono bravo in un amico . <EOS> . <EOS> . <EOS> . <EOS>
