In [1]:
from transformers import GPT2Tokenizer, GPT2Config, GPT2LMHeadModel, AdamW
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
import torch
import os

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
characters = ['.', '<PAD>', 'a', 'b', 'c', 'd', 'e']

vocab = {c: i for i, c in enumerate(characters)}

In [3]:
class PCFGTokenizer:

    def __init__(self, vocab):
        self.vocab = vocab
        self.inv_vocab = {v: k for k, v in vocab.items()}
    
    def encode(self, text):
        # Split text into sentences, then terminals; encode terminals and dots
        token_ids = []
        sentences = text.split('.')
        for sentence in sentences[:-1]:  # exclude the last, as it will be empty if text ends with a dot
            terminals = sentence.split()
            for terminal in terminals:
                if terminal in self.vocab:
                    token_ids.append(self.vocab[terminal])
            token_ids.append(self.vocab['.'])  # Add the sentence separator
        return token_ids

    def decode(self, token_ids):
        # Convert token IDs back to text, reconstructing spaces and dots
        tokens = [self.inv_vocab[token_id] for token_id in token_ids]
        text = ' '.join(tokens).replace(' .', '.').replace('. ', '.')
        return text

def encode_document(document_name):
    """
    Takes a document name and returns the encoded tokens of the document
    """
    # Read the document and encode it
    with open('documents/' + document_name, 'r') as file:
        document = file.read()
    return tokenizer.encode(document)

def encode_documents(directory_path : str ='documents'):
    encoded_sentences = []
    for document_name in os.listdir(directory_path):
        if document_name.endswith(".txt"):
            with open(os.path.join(directory_path, document_name), 'r') as file:
                document = file.read()
                sentences = document.split('.')
                for sentence in sentences:
                    if sentence:  # Check if sentence is not empty
                        encoded_sentence = tokenizer.encode(sentence + '.')
                        encoded_sentences.append(encoded_sentence)
    return encoded_sentences

# Initialize the tokenizer with the vocabulary
tokenizer = PCFGTokenizer(vocab)

In [4]:
encode_documents('documents')

[[5, 6, 4, 4, 4, 4, 6, 4, 4, 6, 5, 6, 4, 4, 6, 0],
 [4, 4, 0],
 [5, 4, 0],
 [6, 0],
 [6, 4, 4, 3, 4, 0],
 [4, 4, 4, 0],
 [6, 4, 0],
 [5, 4, 6, 4, 4, 5, 0],
 [3, 4, 0],
 [6, 0],
 [6, 6, 6, 4, 4, 5, 6, 4, 6, 6, 5, 0],
 [5, 4, 0],
 [4, 4, 6, 4, 4, 4, 5, 4, 0],
 [4, 6, 4, 4, 4, 6, 6, 6, 0],
 [6, 4, 5, 6, 0],
 [6, 4, 4, 5, 4, 6, 6, 0],
 [5,
  5,
  6,
  6,
  6,
  4,
  6,
  5,
  4,
  6,
  6,
  4,
  5,
  6,
  4,
  3,
  4,
  4,
  6,
  6,
  6,
  6,
  6,
  4,
  6,
  4,
  4,
  6,
  5,
  4,
  4,
  4,
  4,
  5,
  4,
  4,
  4,
  4,
  4,
  4,
  6,
  5,
  4,
  5,
  5,
  4,
  4,
  4,
  5,
  4,
  6,
  4,
  4,
  4,
  3,
  4,
  4,
  4,
  0],
 [5, 4, 6, 3, 5, 0],
 [6, 0],
 [4, 5, 0],
 [5, 6, 6, 4, 4, 5, 0],
 [6, 6, 6, 4, 6, 6, 3, 4, 4, 3, 4, 5, 5, 4, 4, 6, 5, 4, 6, 4, 5, 4, 4, 0],
 [6, 0],
 [6, 0],
 [5, 6, 6, 0],
 [3, 4, 6, 6, 6, 4, 6, 4, 4, 4, 0],
 [6, 0],
 [6, 6, 6, 4, 4, 5, 4, 5, 4, 5, 4, 6, 6, 4, 4, 4, 4, 4, 5, 0],
 [5, 4, 4, 4, 3, 4, 4, 0],
 [6, 0],
 [4,
  4,
  6,
  6,
  4,
  4,
  4,
  4,
  4,
  4,
  5

In [5]:
class PCFGDataset(Dataset):
    
    def __init__(self, encoded_texts):
        self.encoded_texts = encoded_texts
    
    def __len__(self):
        return len(self.encoded_texts)
    
    def __getitem__(self, idx):
        return torch.tensor(self.encoded_texts[idx], dtype=torch.long)

In [6]:
# custom collate_fn
def collate_fn(batch):
    # Pad sequences so they are all the same length as the longest sequence
    batch_padded = pad_sequence(batch, batch_first=True, padding_value=vocab['<PAD>'])
    
    return batch_padded


In [7]:
# encode all documents
enc = encode_documents()

# create a dataset
dataset = PCFGDataset(enc)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

In [8]:
# test, if the encoding and decoding works
# iterate over all documents, encode them and decode them again. If the result is the same as the original document, the encoding and decoding works
for i in range(1, 101):
    document = f'document_{i}.txt'

    # load original document
    with open('documents/' + document, 'r') as file:
        original_document = file.read()

        # encode document
        enc = encode_document(document)

        # are they the same?
        print(f'{i}: {original_document == tokenizer.decode(enc)}')

1: True
2: True
3: True
4: True
5: True
6: True
7: True
8: True
9: True
10: True
11: True
12: True
13: True
14: True
15: True
16: True
17: True
18: True
19: True
20: True
21: True
22: True
23: True
24: True
25: True
26: True
27: True
28: True
29: True
30: True
31: True
32: True
33: True
34: True
35: True
36: True
37: True
38: True
39: True
40: True
41: True
42: True
43: True
44: True
45: True
46: True
47: True
48: True
49: True
50: True
51: True
52: True
53: True
54: True
55: True
56: True
57: True
58: True
59: True
60: True
61: True
62: True
63: True
64: True
65: True
66: True
67: True
68: True
69: True
70: True
71: True
72: True
73: True
74: True
75: True
76: True
77: True
78: True
79: True
80: True
81: True
82: True
83: True
84: True
85: True
86: True
87: True
88: True
89: True
90: True
91: True
92: True
93: True
94: True
95: True
96: True
97: True
98: True
99: True
100: True


In [9]:
encoded = tokenizer.encode('a c.a b a b.')
print(f"Encoded: {encoded}")

# Decoding back
decoded = tokenizer.decode(encoded)
print(f"Decoded: {decoded}")

Encoded: [2, 4, 0, 2, 3, 2, 3, 0]
Decoded: a c.a b a b.


-----
### Training the model

In [10]:
# model config
config = GPT2Config(
    vocab_size=len(tokenizer.vocab),
    n_positions=512,
    n_ctx=512,
    n_embd=768,
    n_layer=12,
    n_head=12    
)

model = GPT2LMHeadModel(config)

In [11]:
# optimizer
optimizer = AdamW(model.parameters(), lr=1e-5)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)



GPT2LMHeadModel(
  (transformer): GPT2Model(
    (wte): Embedding(7, 768)
    (wpe): Embedding(512, 768)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0-11): 12 x GPT2Block(
        (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2Attention(
          (c_attn): Conv1D()
          (c_proj): Conv1D()
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
        (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (mlp): GPT2MLP(
          (c_fc): Conv1D()
          (c_proj): Conv1D()
          (act): NewGELUActivation()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  )
  (lm_head): Linear(in_features=768, out_features=7, bias=False)
)

In [12]:
epochs = 3
for epoch in tqdm(range(epochs)):

    model.train()

    for batch in dataloader:
        inputs = batch.to(device)
        outputs = model(inputs, labels=inputs)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    
    model.eval()

  0%|          | 0/3 [12:30<?, ?it/s]


IndexError: index out of range in self