In [None]:
import os
import torch
import torch.nn as nn
import math
import time
import sentencepiece as spm


from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from speakleash import Speakleash

base_dir = "speakleash_data"
dataset_name = "wolne_lektury_corpus"
batch_size = 20
eval_batch_size = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
# Pobieranie i wczytywanie danych
if not os.path.exists(base_dir):
    os.makedirs(base_dir)

sl = Speakleash(base_dir)

if not os.path.exists(os.path.join(base_dir, dataset_name)):
    print(f"Pobieranie zbioru danych: {dataset_name}...")
    sl.get(dataset_name)
    print("Pobieranie zakończone.")
else:
    print(f"Zbiór danych {dataset_name} już istnieje.")
  
    
texts = []
dataset = sl.get(dataset_name)
for doc in dataset.data:
    texts.append(doc)
    if len(texts) >= 20: # Zmniejszona liczba dokumentów do szybkiego testu
        break

corpus = " ".join(texts)
print(f"Wczytano {len(texts)} dokumentów.")
print(f"Rozmiar korpusu: {len(corpus)} znaków.")

In [None]:
# Transformer z lab 1 

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class TransformerModel(nn.Module):
    def __init__(self, ntoken, ninp, nhead, nhid, nlayers, dropout=0.5):
        super(TransformerModel, self).__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        encoder_layers = nn.TransformerEncoderLayer(ninp, nhead, nhid, dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, ninp)
        self.ninp = ninp
        self.decoder = nn.Linear(ninp, ntoken)
        self.init_weights()

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src, src_mask):
        src = self.encoder(src) * math.sqrt(self.ninp)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output)
        return output

In [None]:
# funkcje pomocnicze z lab 1

bptt = 35

def batchify(data, bsz):
    seq_len = data.size(0) // bsz
    data = data[:seq_len * bsz]
    data = data.view(bsz, seq_len).t().contiguous()
    return data.to(device)

def get_batch(source, i):
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].reshape(-1)
    return data, target

def train(model, data_source, optimizer, scheduler, criterion, ntokens):
    model.train()
    total_loss = 0.
    start_time = time.time()
    src_mask = model.generate_square_subsequent_mask(bptt).to(device)

    for batch, i in enumerate(range(0, data_source.size(0) - 1, bptt)):
        data, targets = get_batch(data_source, i)
        optimizer.zero_grad()
        
        if data.size(0) != bptt:
            src_mask = model.generate_square_subsequent_mask(data.size(0)).to(device)
            
        output = model(data, src_mask)
        loss = criterion(output.view(-1, ntokens), targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        log_interval = 200
        if batch % log_interval == 0 and batch > 0:
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print(f'| {batch:5d}/{len(data_source) // bptt:5d} paczek | '
                  f'lr {scheduler.get_last_lr()[0]:02.2f} | ms/paczkę {elapsed * 1000 / log_interval:5.2f} | '
                  f'strata {cur_loss:5.2f} | ppl {math.exp(cur_loss):8.2f}')
            total_loss = 0
            start_time = time.time()

def evaluate(model, eval_data, criterion, ntokens):
    model.eval()
    total_loss = 0.
    src_mask = model.generate_square_subsequent_mask(bptt).to(device)
    with torch.no_grad():
        for i in range(0, eval_data.size(0) - 1, bptt):
            data, targets = get_batch(eval_data, i)
            if data.size(0) != bptt:
                src_mask = model.generate_square_subsequent_mask(data.size(0)).to(device)
            output = model(data, src_mask)
            output_flat = output.view(-1, ntokens)
            total_loss += len(data) * criterion(output_flat, targets).item()
    return total_loss / (len(eval_data) - 1)

In [None]:
corpus_path = "corpus.txt"
with open(corpus_path, "w", encoding="utf-8") as f:
    f.write(corpus)

print(f"Korpus zapisany do pliku: {corpus_path}")

In [None]:
#parametry treningu
vocab_size = 10000
model_prefix_bpe = 'bpe'
model_prefix_unigram = 'unigram'

In [None]:
#trening BPE
print("\nRozpoczynam trening tokenizatora BPE...")
spm.SentencePieceTrainer.train(
f'--input={corpus_path} --model_prefix={model_prefix_bpe} '
f'--vocab_size={vocab_size} --model_type=bpe'
)
print("Trening BPE zakończony.")

In [None]:
#trening Unigram

print("\nRozpoczynam trening tokenizatora Unigram...")
spm.SentencePieceTrainer.train(
f'--input={corpus_path} --model_prefix={model_prefix_unigram} '
f'--vocab_size={vocab_size} --model_type=unigram'
)
print("Trening Unigram zakończony.")

In [None]:
#testowanie tokenizatorów
print("\nTestowanie tokenizatorów...")

sp_bpe = spm.SentencePieceProcessor()
sp_bpe.load(f'{model_prefix_bpe}.model')

sp_unigram = spm.SentencePieceProcessor()
sp_unigram.load(f'{model_prefix_unigram}.model')

test_sentence = "Dawno, dawno temu za siedmioma górami żył smok."

tokens_bpe = sp_bpe.encode_as_pieces(test_sentence)
tokens_unigram = sp_unigram.encode_as_pieces(test_sentence)

print("\n--- Wyniki Tokenizacji ---")
print(f"Zdanie: {test_sentence}")
print(f"BPE:\n{tokens_bpe}")
print(f"Unigram:\n{tokens_unigram}")

In [None]:
# Stworzenie wrapperów dla tokenizatorów, aby ujednolicić interfejs

class BasicEnglishTokenizer:
    def __init__(self):
        self.tokenizer = get_tokenizer('basic_english')
        self.vocab = None

    def train(self, corpus):
        tokens = self.tokenizer(corpus)
        self.vocab = build_vocab_from_iterator([tokens], specials=["<unk>", "<pad>", "<bos>", "<eos>"])
        self.vocab.set_default_index(self.vocab["<unk>"])

    def encode(self, text):
        if not self.vocab:
            raise Exception("Tokenizer not trained. Call train() first.")
        return self.vocab(self.tokenizer(text))
    
    def get_vocab_size(self):
        return len(self.vocab) if self.vocab else 0

class SentencePieceTokenizer:
    def __init__(self, model_path):
        self.sp = spm.SentencePieceProcessor()
        self.sp.load(model_path)

    def encode(self, text):
        return self.sp.encode_as_ids(text)
        
    def get_vocab_size(self):
        return self.sp.get_piece_size()

#unkcja do przygotowania danych
def prepare_data_for_tokenizer(tokenizer, corpus):
    """Przetwarza korpus przy użyciu danego tokenizatora i zwraca podzielone dane."""
    
    # Dla BasicEnglish musimy go najpierw zbudować słownik
    if isinstance(tokenizer, BasicEnglishTokenizer):
        tokenizer.train(corpus)
        
    # Przetwarzanie danych
    encoded_data = torch.tensor(tokenizer.encode(corpus), dtype=torch.long)
    
    # Podział na zbiory
    n = encoded_data.size(0)
    train_data = batchify(encoded_data[:int(n*0.9)], batch_size)
    val_data = batchify(encoded_data[int(n*0.9):int(n*0.95)], eval_batch_size)
    test_data = batchify(encoded_data[int(n*0.95):], eval_batch_size)
    
    vocab_size = tokenizer.get_vocab_size()
    
    print(f"Zakończono przetwarzanie dla tokenizatora: {type(tokenizer).__name__}")
    print(f"Rozmiar słownika: {vocab_size}")
    print(f"Kształt danych treningowych: {train_data.shape}\n")
    
    return (train_data, val_data, test_data), vocab_size

tokenizers = {
    'basic_english': BasicEnglishTokenizer(),
    'bpe': SentencePieceTokenizer('bpe.model'),
    'unigram': SentencePieceTokenizer('unigram.model')
}

processed_data = {}
vocab_sizes = {}

for name, tokenizer_instance in tokenizers.items():
    processed_data[name], vocab_sizes[name] = prepare_data_for_tokenizer(tokenizer_instance, corpus)

In [None]:
#Trenowanie modeli

# --- Parametry modelu ---
emsize = 200  # wymiar embeddingu
nhid = 200    # wymiar warstwy feedforward w TransformerEncoder
nlayers = 2   # liczba warstw TransformerEncoderLayer
nhead = 2     # liczba głów w multiheadattention
dropout = 0.2 # wartość dropout

# --- Parametry treningu ---
epochs = 100 # Maksymalna liczba epok (przerwiemy po czasie)
lr = 5.0      # learning rate
training_time_limit_seconds = 30 # Skrócony czas treningu do 30 sekund na model

results = {}
trained_models = {}

for name, data_tuple in processed_data.items():
    train_data, val_data, test_data = data_tuple
    vocab_size = vocab_sizes[name]

    print(f"\n{'='*40}")
    print(f"Rozpoczynam trening dla tokenizatora: {name.upper()}")
    print(f"Rozmiar słownika: {vocab_size}")
    print(f"{'='*40}\n")

    model = TransformerModel(vocab_size, emsize, nhead, nhid, nlayers, dropout).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

    best_val_loss = float("inf")
    model_save_path = f'model_{name}.pt'
    start_training_time = time.time()

    for epoch in range(1, epochs + 1):
        epoch_start_time = time.time()
        
        # Sprawdzenie limitu czasu
        if time.time() - start_training_time > training_time_limit_seconds:
            print(f"Przekroczono limit czasu ({int(training_time_limit_seconds / 60)} min). Zakończenie treningu dla '{name}'.")
            break

        train(model, train_data, optimizer, scheduler, criterion, vocab_size)
        val_loss = evaluate(model, val_data, criterion, vocab_size)
        
        elapsed = time.time() - epoch_start_time
        
        print('-' * 89)
        print(f'| koniec epoki {epoch:3d} | czas: {elapsed:5.2f}s | '
              f'val loss {val_loss:5.2f} | val ppl {math.exp(val_loss):8.2f}')
        print('-' * 89)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), model_save_path)
            print(f"Zapisano lepszy model do pliku: {model_save_path}")

        scheduler.step()

    # Zapisanie wyników i wytrenowanego modelu
    final_model = TransformerModel(vocab_size, emsize, nhead, nhid, nlayers, dropout).to(device)
    final_model.load_state_dict(torch.load(model_save_path))
    
    trained_models[name] = final_model
    results[name] = {
        'best_val_loss': best_val_loss,
        'best_val_ppl': math.exp(best_val_loss),
        'model_path': model_save_path
    }
    
    print(f"\nTrening dla '{name}' zakończony. Najlepszy PPL walidacji: {results[name]['best_val_ppl']:.2f}")

print("\n\n--- Podsumowanie wszystkich treningów ---")
for name, result in results.items():
    print(f"Tokenizator: {name.upper():<15} | Najlepszy PPL walidacji: {result['best_val_ppl']:.2f}")

In [None]:
#ewaluacja 
print("\n\n--- Końcowa ewaluacja na zbiorze testowym ---")

for name, model in trained_models.items():
    _, _, test_data = processed_data[name]
    vocab_size = vocab_sizes[name]
    
    test_loss = evaluate(model, test_data, criterion, vocab_size)
    results[name]['test_loss'] = test_loss
    results[name]['test_ppl'] = math.exp(test_loss)
    
    print(f"Tokenizator: {name.upper():<15} | Test PPL: {results[name]['test_ppl']:8.2f}")

#Generowanie tekstu i porównanie jakościowe

tokenizers['basic_english'].vocab.get_itos = lambda: tokenizers['basic_english'].vocab.get_itos()

def decode_basic_english(ids):
    itos = tokenizers['basic_english'].vocab.get_itos()
    return " ".join([itos[i] for i in ids])

def decode_sentencepiece(name, ids):
    return tokenizers[name].sp.decode(ids)

# Funkcja do generowania tekstu
def generate_text(model, tokenizer_name, prompt, num_words_to_generate):
    model.eval()
    
    tokenizer = tokenizers[tokenizer_name]
    if tokenizer_name == 'basic_english':
        input_ids = tokenizer.encode(prompt)
    else:
        input_ids = tokenizer.sp.encode_as_ids(prompt)
        
    input_tensor = torch.tensor(input_ids, dtype=torch.long).unsqueeze(1).to(device)
    
    generated_ids = list(input_ids)
    
    with torch.no_grad():
        for _ in range(num_words_to_generate):
            src_mask = model.generate_square_subsequent_mask(input_tensor.size(0)).to(device)
            output = model(input_tensor, src_mask)
            
            last_word_logits = output[-1, 0, :]
            predicted_id = torch.multinomial(torch.softmax(last_word_logits, dim=0), 1).item()

            generated_ids.append(predicted_id)
            input_tensor = torch.cat([input_tensor, torch.tensor([[predicted_id]], dtype=torch.long).to(device)], dim=0)

            # (dla SentencePiece)
            if tokenizer_name != 'basic_english' and predicted_id == tokenizer.sp.eos_id():
                print("(Model wygenerował token końca sekwencji)")
                break

    if tokenizer_name == 'basic_english':
        generated_text = decode_basic_english(generated_ids)
    else:
        generated_text = decode_sentencepiece(tokenizer_name, generated_ids)
        
    return generated_text


print("\n\n--- Generowanie tekstu ---")
prompt_text = "Pewnego dnia"

for name, model in trained_models.items():
    print(f"\n--- Model z tokenizatorem: {name.upper()} ---")
    generated_output = generate_text(model, name, prompt_text, num_words_to_generate=15)
    print(f"Prompt: '{prompt_text}'")
    print(f"Wygenerowany tekst:\n{generated_output}")