In [14]:
import re
import json
import random
import numpy as np
from collections import defaultdict

import json
import numpy as np

class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NpEncoder, self).default(obj)

class Key:
    def __init__(self):
        numbers = np.random.permutation(range(1, 100))
        alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
        alphabet = ''.join(random.sample(alphabet, len(alphabet)))
        self.k = defaultdict(list)
        for i, n in enumerate(numbers):
            letter = alphabet[i % len(alphabet)]
            self.k[letter].append(n)

    def cipher(self, text):
        return [random.choice(self.k[c]) for c in text]
        
    def __repr__(self):
        output = json.dumps(dict(self.k), cls=NpEncoder, sort_keys=True, indent=4)
        output2 = re.sub(r'": \[\s+', '": [', output)
        output3 = re.sub(r',\s+ (\d)', r', \1', output2)
        output4 = re.sub(r'\s+\]', ']', output3)
        return output4

def encode(text):
    clean_text = re.sub(r'[^A-Z]', '', text.upper())
    k = Key()
    return k.cipher(clean_text)


# Get sequences of text in capital letters which represent
# telegrams and contain single whitespaces.
import re
from typing import List

def clean(text):
    """
    Cleans a text by keeping only capital letters and single spaces.
    """
    no_eol_text = re.sub(r'\r', ' ', text)
    no_eol_text = re.sub(r'\n', ' ', text)
    
    no_tabs_text = re.sub(r'\t', ' ', no_eol_text)
    # print(f"no tabs = '{no_tabs_text}'")
    
    capital_text = re.sub(r'[^A-Za-z\s]', '', no_tabs_text).strip().upper()
    # print(f"capital = '{capital_text}'")
    
    single_space_text = re.sub(r'\s\s+', ' ', capital_text)
    # print(f"single space = '{single_space_text}'")
    return single_space_text

def get_start(lines):
    idx = 0
    while idx < len(lines):
        if lines[idx].startswith("*** START OF THE PROJECT GUTENBERG EBOOK"):
            return idx
        idx += 1
    return idx

def get_end(lines):
    idx = len(lines) - 1
    while idx > -1:
        if lines[idx].startswith("*** END OF THE PROJECT GUTENBERG EBOOK"):
            return idx
        idx -= 1
    return idx

max_length = 340
def build_texts_from_book(book_name: str):
    with open(f"books/{book_name}") as f:
        lines = [line for line in f]
    
    start_idx = get_start(lines)
    end_idx = get_end(lines)
    selected_lines = [line for i, line in enumerate(lines) if start_idx < i < end_idx-2]
    
    clean_text = clean("".join(selected_lines))
    
    words = clean_text.split()
    i = 0
    
    the_texts = []
    
    while i < len(words):
        next_text = ""
        while i < len(words) and len(next_text+words[i])+ 1 < max_length:
            next_text += " " + words[i]
            i+=1
        the_texts.append(next_text)
    return the_texts

def build_texts_from_books(book_names: List[str]):
    result = []
    for bn in book_names:
        result.extend(build_texts_from_book(bn))
    return result

import os
the_books = [x for x in os.listdir("books") if x[-4:] == ".txt"]
the_texts = build_texts_from_books(the_books)

total_samples = len(the_texts)
nb_train_samples = int(total_samples * 0.8)

print(f"Total samples # = {total_samples}")

with open("train/train.en", "w") as f:
    for x in the_texts[0:nb_train_samples]:
        f.write(x)
        f.write("\n")
with open("train/train.nb", "w") as g:
    for x in the_texts[0:nb_train_samples]:
        g.write("".join([f"{t:02}" for t in encode(x)]))
        g.write("\n")

with open("valid/val.en", "w") as f:
    for x in the_texts[nb_train_samples:]:
        f.write(x)
        f.write("\n")
with open("valid/val.nb", "w") as g:
    for x in the_texts[nb_train_samples:total_samples]:
        g.write("".join([f"{t:02}" for t in encode(x)]))
        g.write("\n")

Total samples # = 77865


In [15]:
SRC_LANGUAGE = 'nb' # numbers
TGT_LANGUAGE = 'en' # english

# Place-holders
token_transform = {}
vocab_transform = {}

In [16]:
from typing import List

def token_transform_nb(s) -> List[str]: # s even length, only numbers, 
    return [s[i:i+2] for i in range(0, len(s), 2)] # should never yield "00"

def token_transform_en(s) -> List[str]: # s must start with a space and end with letter
    i = 0
    tokens = []
    while i < len(s):
        if s[i] == " ":
            tokens.append(s[i:i+2])
            i += 2
        else:
            tokens.append(s[i])
            i += 1
    return tokens

token_transform[SRC_LANGUAGE] = token_transform_nb
token_transform[TGT_LANGUAGE] = token_transform_en

In [17]:
from collections import OrderedDict
from torchtext.vocab import vocab

# Define special symbols and indices
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3

# Make sure the tokens are in order of their indices to properly insert them in vocab
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

def display_vocab(v):
    print(", ".join(v.lookup_tokens([i for i in range(len(v))])))

def vocab_from(arr):
    d = OrderedDict()
    for x, y in list(zip(arr, range(len(arr)))):
        d[x] = y
        
    voc = vocab(d, min_freq=0, specials=special_symbols, special_first=True)
    voc.set_default_index(UNK_IDX)
    
    return voc

def build_numbers_vocab():
    numbers = [f"{i:02}" for i in range(1, 100)]
    return vocab_from(numbers)

def build_english_vocab():
    letters = [chr(i) for i in range(65, 65+26)]
    letters.extend([" "+chr(i) for i in range(65, 65+26)])
    return vocab_from(letters)
    
vocab_transform[SRC_LANGUAGE] = build_numbers_vocab()
vocab_transform[TGT_LANGUAGE] = build_english_vocab()

In [18]:
import torchdata.datapipes as dp

from torch.utils.data import DataLoader
from torchdata.datapipes.iter import FileOpener

def get_data_iter(split: str):
    src_dp = dp.iter.FileLister([split]).filter(filter_fn=lambda filename: filename.endswith('.nb'))
    src_data_dp = FileOpener(src_dp, encoding="utf-8").readlines(
        return_path=False, strip_newline=True
    )
    
    tgt_dp = dp.iter.FileLister([split]).filter(filter_fn=lambda filename: filename.endswith('.en'))
    tgt_data_dp = FileOpener(tgt_dp, encoding="utf-8").readlines(
        return_path=False, strip_newline=True
    )
    
    return src_data_dp.zip(tgt_data_dp).shuffle().set_shuffle(False).sharding_filter()

In [19]:
from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
import math

from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter()

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(DEVICE)
# helper Module that adds positional encoding to the token embedding to introduce a notion of word order.
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

# helper Module to convert tensor of input indices into corresponding tensor of token embeddings
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

# Seq2Seq Network
class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 num_encoder_layers: int,
                 num_decoder_layers: int,
                 emb_size: int,
                 nhead: int,
                 src_vocab_size: int,
                 tgt_vocab_size: int,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(
            emb_size, dropout=dropout)

    def forward(self,
                src: Tensor,
                trg: Tensor,
                src_mask: Tensor,
                tgt_mask: Tensor,
                src_padding_mask: Tensor,
                tgt_padding_mask: Tensor,
                memory_key_padding_mask: Tensor):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        return self.generator(outs)

    def encode(self, src: Tensor, src_mask: Tensor):
        return self.transformer.encoder(self.positional_encoding(
                            self.src_tok_emb(src)), src_mask)

    def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
        return self.transformer.decoder(self.positional_encoding(
                          self.tgt_tok_emb(tgt)), memory,
                          tgt_mask)

cpu


In [8]:
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask


def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

In [9]:
torch.manual_seed(0)

SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 128
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(DEVICE)

loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)

optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)



In [10]:
from torch.nn.utils.rnn import pad_sequence

# helper function to club together sequential operations
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

# function to add BOS/EOS and create tensor for input sequence indices
def tensor_transform(token_ids: List[int]):
    return torch.cat((torch.tensor([BOS_IDX]),
                      torch.tensor(token_ids),
                      torch.tensor([EOS_IDX])))

# ``src`` and ``tgt`` language text transforms to convert raw strings into tensors indices
text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[ln] = sequential_transforms(token_transform[ln], #Tokenization
                                               vocab_transform[ln], #Numericalization
                                               tensor_transform) # Add BOS/EOS and create tensor

from torch.nn.utils.rnn import pad_sequence
# function to collate data samples into batch tensors
def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))

    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
    return src_batch, tgt_batch

In [11]:
from torch.utils.data import DataLoader

def train_epoch(model, optimizer, epoch_nb):
    model.train()
    losses = 0
    # train_iter = Cifra(root="./", split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    train_iter = get_data_iter("train")
    train_dataloader = DataLoader(train_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)

    cnt = 0
    
    for src, tgt in train_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)
        

        tgt_input = tgt[:-1, :]

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

        optimizer.zero_grad()
        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        
        writer.add_scalar(f"Loss/train epoch #{epoch_nb}", loss, cnt)
        
        loss.backward()

        optimizer.step()
        losses += loss.item()
        cnt += 1

    return losses / len(list(train_dataloader))


def evaluate(model):
    model.eval()
    losses = 0

    # val_iter = Cifra(root="./", split='valid', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    val_iter = get_data_iter("valid")
    val_dataloader = DataLoader(val_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)

    for src, tgt in val_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        tgt_input = tgt[:-1, :]

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        losses += loss.item()

    return losses / len(list(val_dataloader))

In [12]:
from timeit import default_timer as timer
NUM_EPOCHS = 18

for epoch in range(1, NUM_EPOCHS+1):
    start_time = timer()
    train_loss = train_epoch(transformer, optimizer, epoch)
    end_time = timer()
    val_loss = evaluate(transformer)
    print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}, "f"Epoch time = {(end_time - start_time):.3f}s"))


# function to generate output sequence using greedy algorithm
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)

    memory = model.encode(src, src_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len-1):
        memory = memory.to(DEVICE)
        tgt_mask = (generate_square_subsequent_mask(ys.size(0))
                    .type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat([ys,
                        torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        if next_word == EOS_IDX:
            break
    return ys


# actual function to translate input sentence into target language
def translate(model: torch.nn.Module, src_sentence: str):
    model.eval()
    src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
    num_tokens = src.shape[0]
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    tgt_tokens = greedy_decode(
        model,  src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
    return " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "")



Epoch: 1, Train loss: 3.207, Val loss: 2.786, Epoch time = 1076.006s
Epoch: 2, Train loss: 2.792, Val loss: 2.721, Epoch time = 1145.103s
Epoch: 3, Train loss: 2.740, Val loss: 2.699, Epoch time = 1126.058s
Epoch: 4, Train loss: 2.712, Val loss: 2.683, Epoch time = 1193.360s


KeyboardInterrupt: 

In [None]:
print(translate(transformer, "422213246754308062694958558135649599833490518985738370693929612690755512535741419051487090515799865122852458421691796001346118397857912334993025919397152649255512789594307078759295419536994381625703039590701695023926943008061647127826122264797834485905414828978375584893457083483953951244284305343079066197830889157826925157839545792276285970837559699390571552098559035764260578121991236990128061700191275745131889799096349006394841956603909193577920347223698152261941303077342348387030236448595812488167697064951255245348991286956936093878699290789539"))

In [None]:
"OF US AND ALL OF US AND SO AS TINY TIM OBSERVED"

In [95]:
len("422213246754308062694958558135649599833490518985738370693929612690755512535741419051487090515799865122852458421691796001346118397857912334993025919397152649255512789594307078759295419536994381625703039590701695023926943008061647127826122264797834485905414828978375584893457083483953951244284305343079066197830889157826925157839545792276285970837559699390571552098559035764260578121991236990128061700191275745131889799096349006394841956603909193577920347223698152261941303077342348387030236448595812488167697064951255245348991286956936093878699290789539")/2.

276.0

In [80]:
len("WOULD BE BLIND ANYWAY HE THOUGHT IT QUITE AS WELL THAT THEY SHOULD WRINKLE UP THEIR EYES IN GRINS AS HAVE THE MALADY IN LESS ATTRACTIVE FORMS HIS OWN HEART LAUGHED AND THAT WAS QUITE ENOUGH FOR HIM HE HAD NO FURTHER INTERCOURSE WITH SPIRITS BUT LIVED UPON THE TOTAL ABSTINENCE PRINCIPLE EVER AFTERWARDS AND IT WAS ALWAYS SAID OF HIM THAT")-11

36

In [15]:
!pip install torch torchvision

Collecting torchvision
  Downloading torchvision-0.18.1-cp310-cp310-macosx_11_0_arm64.whl.metadata (6.6 kB)
Collecting torch
  Downloading torch-2.3.1-cp310-none-macosx_11_0_arm64.whl.metadata (26 kB)
Downloading torchvision-0.18.1-cp310-cp310-macosx_11_0_arm64.whl (1.6 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m0:01[0m:01[0m
[?25hDownloading torch-2.3.1-cp310-none-macosx_11_0_arm64.whl (61.0 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0m
[?25hInstalling collected packages: torch, torchvision
  Attempting uninstall: torch
    Found existing installation: torch 2.2.0
    Uninstalling torch-2.2.0:
      Successfully uninstalled torch-2.2.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. Th