Notes on Penn Treebank, implementation code below

In [None]:
# pos info not in .dps files (input files)
# parsed has POS tags next to tagged text (after dominating terminals, none = no POS for terminal)

# atis = air travel information system transcripts
# wsj = 1989 wall stret journal articles
# The Brown Corpus dataset includes the following Brown subsets:
	# + cf	popular lore
	# + cg	belles lettres, biography, memoires, etc.
	# + ck	general fiction
	# + cl	mystery and detective fiction
	# + cm	science fiction
	# + cn	adventure and western fiction
	# + cp	romance and love story
	# + cr	humor

# swdb = switchboard dataset includes
  # These files are organized into 3 subdirectories, named "2","3","4",
  # according to the initial digit of the 4-digit file-id number. The
  # number of files per directory is shown here:

  #     tagged and dysfluency-annotated:
  # 	2/  455 files
  # 	3/  477 files
  # 	4/  194 files
  #     parsed:
  # 	2/  236 files
  # 	3/  260 files
  # 	4/  154 files

# tagged has part-of-speech tags only
  # The square brackets surrounding phrases in the texts are the output of a stochastic NP parser that is part of PARTS and are best ignored
  # Words are separated from their part-of-speech tag by a forward slash
  # In cases of uncertainty concerning the proper part-of-speech tag, words are given alternate tags, which are separated from one another by a vertical bar
  # In the Switchboard data, there are also tags including carets (^) which indicate various kinds of transcription errors
# parsed
  # prd = syntactic annotation only
  # mrg = syntactic annotation and part-of-speech tags

## The following two cells were run in my terminal
#### so that I didn't have to upload big parts of the treebank to colab


In [None]:
# merges 100 WSJ articles together, specifically from the 07 category
# merging done in terminal so I would'nt have to upload a huge set of files to colab

import os

output_file = "output07.mrg"

with open(output_file, "w") as outfile:
    for file_name in sorted(os.listdir("./07/")):
        if file_name.endswith(".mrg"):
            print(f"Processing: {file_name}")
            with open("./07/" + file_name, "r") as infile:
                content = infile.read()
                outfile.write(content)
                outfile.write("\n")

print(f"All files have been merged into {output_file}")

FileNotFoundError: [Errno 2] No such file or directory: './07/'

In [None]:
# extracts POS and word and saves it to a new file extracted.txt
import re

def extract_pos_and_words(treebank_str):

    # use regex to fit word format
    pattern = r'\((\S+)\s+(\S+)\)'

    matches = re.findall(pattern, treebank_str)
    pos_word_pairs = [f"{pos} {word}" for pos, word in matches if pos != "-NONE-"]

    return pos_word_pairs

def process_file(input_file, output_file):

    with open(input_file, 'r') as infile:
        content = infile.read()

    pos_word_pairs = extract_pos_and_words(content)

    with open(output_file, 'w') as outfile:
        outfile.write("\n".join(pos_word_pairs))

input_file = "output.mrg07"
output_file = "extracted07.txt"

process_file(input_file, output_file)
print(f"All POS and words from {input_file} have been extracted into {output_file}")

FileNotFoundError: [Errno 2] No such file or directory: 'output.mrg'

In [None]:
import pandas as pd
import random
import numpy as np

# ensure versions are all compatible with one another
!pip uninstall torch torchtext torchvision allennlp cached-path -y
!pip install torch==1.11.0 torchvision==0.12.0 torchtext==0.12.0 allennlp==2.9.3 cached-path==1.0.2

import torch
from torchtext.data.utils import get_tokenizer
from torch.utils.data import DataLoader, Dataset
from torchtext.vocab import build_vocab_from_iterator
import spacy

# use the same seed for reproducibility
# I believe this is necessary when trying to compare different hyperparameters
# to see which ones work best
SEED = 1234

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

torch.backends.cudnn.deterministic = True
tokenizer = get_tokenizer("basic_english")

# format data into a list of tuples in (word, POS) form
def format_data(file_path):
    data = []
    with open(file_path, "r") as f:
        for line in f:
            pos, word = line.strip().split()
            data.append((word.lower(), pos))
    return data


# load dataset from extracted text above
wsj_dataset = format_data("extracted07.txt")

# split the data into 80% training, 10% validation, and 10% test
train_size = int(0.8 * len(wsj_dataset))
validate_size = int(0.1 * len(wsj_dataset))
test_size = len(wsj_dataset) - train_size - validate_size
train_data, validate_data, test_data = torch.utils.data.random_split(
    wsj_dataset, [train_size, validate_size, test_size], generator=torch.Generator().manual_seed(SEED)
)

# print(train_data)
# print(validate_data)
# print(test_data)



In [None]:
# get tokens
def yield_tokens(data, type_token):
    for words, tags in data:
        yield [type_token]

# build word vocabulary with:
    # the minimum frequency of words requried of a word to be 2
    # special tokens for unknown characters and padding
word_vocab = build_vocab_from_iterator(
    yield_tokens(train_data, 'words'),
    min_freq=2,
    specials=["<unk>", "<pad>"]
)
word_vocab.set_default_index(word_vocab["<unk>"])

# build tag vocabulary with:
    # special tokens for no tag (like in the real treebank) and padding tags
tag_vocab = build_vocab_from_iterator(
    yield_tokens(train_data, 'tags'),
    specials=["-NONE-", "<pad>"]
)
tag_vocab.set_default_index(tag_vocab["-NONE-"])

# print(word_vocab.get_itos())
# print(tag_vocab.get_itos())

In [None]:
import torchtext.vocab

# Load GloVe vectors
embedding_dim = 100
glove = torchtext.vocab.GloVe(name="6B", dim=embedding_dim)
embedding_matrix = torch.zeros(len(word_vocab), embedding_dim)

# map each word in vocab to a glove vector
# if word is not in glove vocabulary, it will randomly initialize its word embedding vecotr
for idx, word in enumerate(word_vocab.get_itos()):
    if word in glove.stoi:
        embedding_matrix[idx] = glove.vectors[glove.stoi[word]]
    else:
        embedding_matrix[idx] = torch.randn(embedding_dim)

# print("embedding for 'example':", embedding_matrix[word_vocab["example"]])

# import elmo library
    # credit to ChatGPT for giving me both the options and weight files
from allennlp.modules.elmo import Elmo, batch_to_ids
options_file = "https://allennlp.s3.amazonaws.com/models/elmo/2x4096_512_2048cnn_2xhighway/elmo_2x4096_512_2048cnn_2xhighway_options.json"
weight_file = "https://allennlp.s3.amazonaws.com/models/elmo/2x4096_512_2048cnn_2xhighway/elmo_2x4096_512_2048cnn_2xhighway_weights.hdf5"

elmo = Elmo(options_file, weight_file, num_output_representations=1, dropout=0.25)


100%|█████████▉| 399999/400000 [00:18<00:00, 21832.71it/s]


In [None]:
# UNCOMMENT BELOW WHEN USING GLOVE (STATIC) WORD EMBEDDINGS

# def collate_fn(batch):
#     words, tags = zip(*batch)
#     word_indices = [word_vocab[word] for word in words]
#     tag_indices = [tag_vocab[tag] for tag in tags]
#     return torch.tensor(word_indices), torch.tensor(tag_indices)

# UNCOMMENT BELOW WHEN USING ELMO WORD EMBEDDINGS

# credit to ChatGPT model-o1 for helping me debug this
from allennlp.modules.elmo import batch_to_ids
def collate_fn(batch):
    words, tags = zip(*batch)

    # Convert words to character IDs and pad them
    word_char_ids = batch_to_ids(words)  # words is a list of lists of tokens

    # word_char_ids has shape [batch_size, seq_length, word_length]
    # Get the max sequence length from word_char_ids
    seq_length = word_char_ids.size(1)

    # Convert tags to indices
    tag_indices = [[tag_vocab[tag] for tag in seq] for seq in tags]

    # Pad tag sequences to match the sequence length
    padded_tag_indices = []
    for seq in tag_indices:
        if len(seq) < seq_length:
            # Pad with tag_pad_idx
            seq = seq + [tag_vocab["<pad>"]] * (seq_length - len(seq))
        else:
            # Truncate if necessary
            seq = seq[:seq_length]
        padded_tag_indices.append(seq)

    tag_tensor = torch.tensor(padded_tag_indices)

    return word_char_ids, tag_tensor

# define batch size and initialize loaders
batch_size = 32

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
validate_loader = DataLoader(validate_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [None]:
import torch.nn as nn

class POSTagger(nn.Module):
    def __init__(self,
                 embedding_matrix,
                 elmo,
                 hidden_dim,
                 output_dim,
                 n_layers,
                 bidirectional,
                 dropout,
                 use_elmo=False,
                 rnn_type='LSTM'):

        super().__init__()

        self.use_elmo = use_elmo
        self.elmo = elmo

        if not use_elmo:
            vocab_size, embedding_dim = embedding_matrix.shape
            self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)
            embedding_dim = embedding_dim
        else:
            # elmo embeddings have a fixed size of 1024
            embedding_dim = 1024

        if rnn_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_dim,
                               hidden_dim,
                               num_layers=n_layers,
                               bidirectional=bidirectional,
                               dropout=dropout if n_layers > 1 else 0,
                               batch_first=True)

        elif rnn_type == 'RNN':
            self.rnn = nn.RNN(embedding_dim,
                              hidden_dim,
                              num_layers=n_layers,
                              bidirectional=bidirectional,
                              dropout=dropout if n_layers > 1 else 0,
                              batch_first=True)
        else:
            raise ValueError("Invalid rnn_type, please choose either 'LSTM' or 'RNN'.")

        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)


    def forward(self, text):
        if self.use_elmo:
            embedded = self.elmo(text)["elmo_representations"][0]
        else:
            embedded = self.dropout(self.embedding(text))

        outputs, hidden = self.rnn(embedded)
        predictions = self.fc(self.dropout(outputs))

        return predictions


In [None]:
# from github repo
def categorical_accuracy(preds, y, tag_pad_idx):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    non_pad_elements = (y != tag_pad_idx).nonzero()
    correct = max_preds[non_pad_elements].squeeze(1).eq(y[non_pad_elements])
    return correct.sum() / y[non_pad_elements].shape[0]

def train(model, iterator, optimizer, criterion, tag_pad_idx, device):
    model.train()
    epoch_loss = 0
    epoch_acc = 0

    for words, tags in iterator:
        words, tags = words.to(device), tags.to(device)
        optimizer.zero_grad()

        predictions = model(words)
        predictions = predictions.view(-1, predictions.shape[-1])
        tags = tags.view(-1)

        loss = criterion(predictions, tags)
        acc = categorical_accuracy(predictions, tags, tag_pad_idx)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)


def evaluate(model, iterator, criterion, tag_pad_idx, device):
    epoch_loss = 0
    epoch_acc = 0

    model.eval()

    with torch.no_grad():
        for words, tags in iterator:
            words, tags = words.to(device), tags.to(device)

            predictions = model(words)
            predictions = predictions.view(-1, predictions.shape[-1])
            tags = tags.view(-1)

            loss = criterion(predictions, tags)
            acc = categorical_accuracy(predictions, tags, tag_pad_idx)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)



In [None]:
import time

HIDDEN_DIM = 128
OUTPUT_DIM = len(tag_vocab)
N_LAYERS = 2
BIDIRECTIONAL = False
DROPOUT = 0.25
PAD_IDX = word_vocab["<pad>"]

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# initialize model with LSTM
# model = POSTagger(embedding_matrix, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, BIDIRECTIONAL, DROPOUT, rnn_type='LSTM').to(device)

# RNN with glove embeddings
# model = POSTagger(embedding_matrix, None, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, BIDIRECTIONAL, DROPOUT, use_elmo=False, rnn_type='RNN').to(device)

# RNN with elmo embeddings
model = POSTagger(None, elmo, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, BIDIRECTIONAL, DROPOUT, use_elmo=True, rnn_type='RNN')

# optimizer and loss function
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=tag_vocab["<pad>"])  # Ignore pad tokens in loss calculation

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

N_EPOCHS = 10
best_valid_loss = float('inf')

total_mins = 0
total_secs = 0

for epoch in range(N_EPOCHS):
    start_time = time.time()

    train_loss, train_acc = train(model, train_loader, optimizer, criterion, tag_vocab["<pad>"], device)
    valid_loss, valid_acc = evaluate(model, validate_loader, criterion, tag_vocab["<pad>"], device)

    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    total_mins += epoch_mins
    total_secs += epoch_secs

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'pos-tagger-model.pt')

    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')


Epoch: 01 | Epoch Time: 53m 9s
	Train Loss: 0.030 | Train Acc: 99.60%
	 Val. Loss: 0.001 |  Val. Acc: 99.99%


KeyboardInterrupt: 

In [None]:
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
model.load_state_dict(torch.load('pos-tagger-model.pt'))
test_loss, test_acc = evaluate(model, test_loader, criterion, tag_vocab["<pad>"], device)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')
print(f'Total time: {total_mins + int(total_secs // 60)}:{total_secs % 60 + 1:02}')

new_test = format_data("extracted13.txt")
test_loader = DataLoader(new_test, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
model.load_state_dict(torch.load('pos-tagger-model.pt'))
test_loss, test_acc = evaluate(model, test_loader, criterion, tag_vocab["<pad>"], device)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')
print(f'Total time: {total_mins + int(total_secs // 60)}:{total_secs % 60 + 1:02}')

Test Loss: 0.003 | Test Acc: 99.96%
Total time: 53:10
Test Loss: 0.004 | Test Acc: 99.95%
Total time: 53:10
