In [78]:
import re, json, torch, torch.nn as nn
from torch.utils.data import DataLoader

path = "./deu.txt"

lines = open(path, encoding="utf-8").read().strip().split("\n")
lines = lines[:20000]

pairs = [ln.split("\t")[:2] for ln in lines] 
src_texts, tgt_texts = zip(*pairs)

In [79]:
src_texts[:5], tgt_texts[:5]
PAD, UNK, BOS, EOS = 0, 1, 2, 3 # Spezialtokens 
# PAD = Padding, UNK = Unknown, 
# BOS = Begin of Sequence, EOS = End of Sequence
VOCAB_SIZE = 20004
def tokenize(s): return re.findall(r"\b\w+\b", s.lower())
def build_vocab(texts, max_tokens=VOCAB_SIZE):
    from collections import Counter
    freq = Counter(tok for t in texts for tok in tokenize(t))
    itos = ["<pad>", "<unk>", "<bos>", "<eos>"] + [w for w,_ in freq.most_common(max_tokens-4)]
    return {w:i for i,w in enumerate(itos)}, itos
src_texts_vocab, src_itos = build_vocab(src_texts)
tgt_texts_vocab, tgt_itos = build_vocab(tgt_texts)

src_t = "find a job"
tgt_t = "Grüß Gott, Tom"
#src_t = ("<bos>",1,2,3,4,"<pad>","<pad>","<eos>") 
#tgt_t = ("<bos>",5,3,6,2,3,4, "<eos>")


def vectorize(text, stoi, max_len, add_bos_eos=False):
    ids = [stoi.get(tok, UNK) for tok in tokenize(text)]
    if add_bos_eos: ids = [BOS] + ids + [EOS]
    ids = ids[:max_len]
    if len(ids) < max_len: ids += [PAD]*(max_len-len(ids))
    return ids




max_src, max_tgt = 30, 30  # Maximale Länge 

def collate_fn(batch):
    src_batch, tgt_batch = zip(*batch)
    src = torch.tensor([vectorize(t, src_texts_vocab, max_src) for t in src_batch])
    tgt = torch.tensor([vectorize(t, tgt_texts_vocab, max_tgt, add_bos_eos=True) for t in tgt_batch])
    tgt_in, tgt_out = tgt[:, :-1], tgt[:, 1:]
    return src, tgt_in, tgt_out

dataset = list(zip(src_texts, tgt_texts))
loader = DataLoader(dataset, batch_size= 64, shuffle=True, collate_fn=collate_fn)


In [None]:
emb_dim, hid_dim = 128, 256

class Encoder(nn.Module):
    def __init__(self, vocab, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab, emb_dim , padding_idx=PAD)
        self.rnn = nn.GRU(emb_dim, hid_dim , batch_first=True) # [B, S, E] 
# für das Mini-Batch Training [B, S, E]
# B ... Anzahl der Sätze im Batch 
# S ... Länge der Sequenz
# E ... Embedding-Dimension 
# ohne batch_first [S, B, E]
    def forward(self,x):
        x = self.embedding(x)
        _, hidden = self.rnn(x)
        return hidden # Hidden State 

class Decoder(nn.Module):
    def __init__(self, vocab, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab, emb_dim, padding_idx=PAD)
        self.rnn = nn.GRU(emb_dim, hid_dim, num_layers=1 , batch_first=True)
        self.fc = nn.Linear(hid_dim, vocab) # 
    
    def forward(self, x, h):
        x = self.embedding(x)
        out, h = self.rnn(x,h) 
        return self.fc(out), h # logits, hidden_state
 
class Seq2Seq(nn.Module):
    def __init__(self, enc, dec):
        super().__init__()
        self.enc = enc
        self.dec = dec 
    
    def forward(self, src, tgt_in_dec):
    # src ... englische Sätze, 
    # tgt_in_dec ... aktuelle deutsche Sätze als Eingabe für den Decoder 
        hidden_enc = self.enc(src)
        logits, _ = self.dec(tgt_in_dec, hidden_enc) 
        # Hidden State aus dem Encoder
        return logits 





In [81]:
source = "I am a boy."	
target = "Ich bin ein Junge."

# ----------

# Encoder [ "I am a boy." ] -> h
# Next Token Prediction 
# 1. Schritt Decoder [ h, <bos> ] -> "ich"
# 2. Schritt Decoder [ h, (<bos>,"ich")] -> "bin" 
# 3. Schritt Decoder [ h, (<bos>,"ich", "bin")] -> "ein"
# 4. Schritt Decoder [ h, (<bos>,"ich", "bin", "ein")] -> "Junge"
# 5. Schritt Decoder [ h, (<bos>,"ich", "bin", "ein", "Junge")] -> "<eos>"

# X = (h, (<bos>,"ich", "bin", "ein", "Junge")) -> Eingabe für Decoder 
# y = ("ich", "bin", "ein", "Junge", "<eos>") -> Zielvorgaben für den Loss 


In [82]:
device = torch.device("mps")

model = Seq2Seq(
    Encoder(len(src_texts_vocab), emb_dim=emb_dim, hid_dim=hid_dim),
    Decoder(len(tgt_texts_vocab), emb_dim=emb_dim, hid_dim=hid_dim),
                ).to(device)

crit = nn.CrossEntropyLoss(ignore_index=PAD)
opt = torch.optim.Adam(model.parameters(), lr = 1e-3)
epochs = 12

@torch.no_grad()
def translate(prompt, max_len=max_tgt):
    model.eval()
    src = torch.tensor([vectorize(prompt, src_texts_vocab, max_src)], device=device)
    h = model.enc(src)
    ys = torch.tensor([[BOS]], device=device)
    out_tokens = []
    for _ in range(max_len):
        logits, h = model.dec(ys, h)
        next_id = logits[0, -1].argmax().item()
        if next_id in (EOS, PAD): break
        out_tokens.append(next_id)
        ys = torch.cat([ys, torch.tensor([[next_id]], device=device)], dim=1)
    return " ".join(tgt_itos[t] for t in out_tokens)


for epoch in range(epochs):
    model.train()
    running_loss = 0.0 
    for src, tgt_in, tgt_out in loader:
        src, tgt_in, tgt_out = src.to(device), tgt_in.to(device), tgt_out.to(device)
        logits = model(src, tgt_in) 
        loss = crit(logits.reshape(-1, logits.size(-1)), tgt_out.reshape(-1))
        opt.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # Gradient-Clipping -> falls Gradient explodiert 
        opt.step()
        running_loss += loss.item()
    print(f"epoch {epoch+1}: loss {running_loss/len(loader):.4f}")
    print(translate("I will do my best."))


KeyboardInterrupt: 

In [None]:
# h_0 -> Zufällig
#[145,
# 10,
# 170,
# 0,
# 0,
# 0,...,
# 
# ]

# h_1 = Wh * h_0 + Wx*145 
# h_2 = .... 


# RNN1: h_t = Wh*h_{t-1} + Wx*x_t 
# RNN2: h_t = Wh*h_{t-1} + Wx*x_t  



# RNN2(RNN1) 

# RNN-Encoder-Decoder with Attention (ab 2014 - 2017)

In [None]:
def translate(prompt, max_len=max_tgt):
    model.eval()
    src = torch.tensor([vectorize(prompt, src_texts_vocab, max_src)], device=device)
    enc_out, h = model.enc(src)
    ys = torch.tensor([[BOS]], device=device)
    out_tokens = []
    for _ in range(max_len):
        logits, h = model.dec(ys, h, enc_out)
        next_id = logits[0, -1].argmax().item()
        if next_id in (EOS, PAD): break
        out_tokens.append(next_id)
        ys = torch.cat([ys, torch.tensor([[next_id]], device=device)], dim=1)
    return " ".join(tgt_itos[t] for t in out_tokens)



class Encoder(nn.Module):
    def __init__(self, vocab, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab, emb_dim , padding_idx=PAD)
        self.rnn = nn.GRU(emb_dim, hid_dim , batch_first=True) # [B, S, E] 
    
    def forward(self, x):
        token_idx = (x != PAD).long() # 1 = Token, 0 = Pad
        x = self.embedding(x)
        enc_out, h = self.rnn(x)
        return enc_out,h

class Attention(nn.Module):

    def __init__(self, hid_dim, attn_dim):
        super().__init__()
        self.W_q = nn.Linear(hid_dim, attn_dim, bias=False) # Q
        self.W_k = nn.Linear(hid_dim, attn_dim, bias=False) # K 
        self.W_v = nn.Linear(hid_dim, attn_dim, bias=False) # V
        self.scale = attn_dim ** 0.5
    
    def forward(self, enc_out, h):
        q = self.W_q(h.transpose(0,1)) # [B, 1, D ]
        #print(q.shape)
        k = self.W_k(enc_out)   # [B ,S , D ]
        #print(k.shape)
        v = self.W_v(enc_out)  # [B ,S , D ]
        #print(v.shape)
        #print(k.T.shape)
        #print(k.transpose(1,2)) # [B, D, 1]
        scores = q @ k.transpose(1,2) / self.scale # [B, 1 , S]

        attn = torch.softmax(scores, dim=1)
        ctx = attn @ v

        return ctx


class AttentionDecoder(nn.Module):
    def __init__(self, vocab, emb_dim, hid_dim, attn_dim):
        super().__init__()
        self.emb = nn.Embedding(vocab, emb_dim, padding_idx=PAD)
        self.attn = Attention(hid_dim=hid_dim, attn_dim=attn_dim)
        self.rnn = nn.RNN(emb_dim + attn_dim , hid_dim, batch_first=True)
        self.fc = nn.Linear(hid_dim,vocab) 

    def forward(self, x , h, enc_out):
        emb = self.emb(x)
        outputs = []

        for t in range(emb.size(1)):
            ctx = self.attn(enc_out,h)
            rnn_in = torch.cat([emb[:, t:t+1, :], ctx], dim=-1) 
            # Kombination aktuelle Decoder-Embedding + Attention Context
            out, h = self.rnn(rnn_in, h)
            outputs.append(out)
        
        out = torch.cat(outputs, dim=1)
        return self.fc(out), h

class Seq2Seq(nn.Module):
    def __init__(self, enc, dec):
        super().__init__()
        self.enc, self.dec = enc, dec
    def forward(self, src, tgt_in):
        enc_out, h = self.enc(src)
        logits, _ = self.dec(tgt_in, h, enc_out)
        return logits
 

model = Seq2Seq(
    Encoder(len(src_texts_vocab), emb_dim=emb_dim, hid_dim=hid_dim),
    AttentionDecoder(len(tgt_texts_vocab), emb_dim=emb_dim, hid_dim=hid_dim, attn_dim=hid_dim),
                ).to(device)
crit = nn.CrossEntropyLoss(ignore_index=PAD)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

for epoch in range(epochs):
    model.train()
    running_loss = 0.0 
    for src, tgt_in, tgt_out in loader:
        src, tgt_in, tgt_out = src.to(device), tgt_in.to(device), tgt_out.to(device)
        logits = model(src, tgt_in) 
        loss = crit(logits.reshape(-1, logits.size(-1)), tgt_out.reshape(-1))
        opt.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # Gradient-Clipping -> falls Gradient explodiert 
        opt.step()
        running_loss += loss.item()
    print(f"epoch {epoch+1}: loss {running_loss/len(loader):.4f}")
    print(translate("I rested"))


epoch 1: loss 3.8980
ich habe
epoch 2: loss 2.6728
ich habe
epoch 3: loss 2.0711
ich habe
epoch 4: loss 1.6220
ich bin
epoch 5: loss 1.2892
ich habe
epoch 6: loss 1.0363
ich habe
epoch 7: loss 0.8505
ich habe geschummelt
epoch 8: loss 0.7165
ich habe
epoch 9: loss 0.6195
ich rastete


KeyboardInterrupt: 