# **CIS 5300 Fall 2025 - Homework 3 - Language Modeling with N-Grams and Transformers**

In this assignment you will build language models (LM) that perform next-token prediction. You will implement and optimize an n-gram LM and a transformer-based LM.

***

#### TASK A: N-Gram LM
Implement an n-gram model from scratch, based on the starter code. This implementation needs to be compatible with different levels of tokenization (e.g. word-level, sub-word level, character-level).
#### TASK B: Transformer LM
You are provided with an implementation in PyTorch in the starter code. The implementation includes errors that you will need to find and fix. There are hints in the function docstrings about the errors.

***

### Evaluation
You will evaluate your language models in two ways: (a) Perplexity on a held-out test set, and (b) Word error rate on a speech recognition re-ranking task.

This assignment consists of two parts. Completing the coding section will help you to complete the report:
1.   (75 points) A coding portion within this notebook, with test cases evaluated through PennGrader.
2.   (75 points) A written report asking you to evaluate and analyze variants of the systems you design. Please answer all questions within the the following template: ![todo](https://img.shields.io/badge/allen_todo:-overleaf_link-8A2BE2)

Please read the written report requirements before you start the coding portion.

## Setup 1. PennGrader Setup

In [1]:
%%capture
## DO NOT CHANGE ANYTHING, JUST RUN
!pip install penngrader-client dill

In [2]:
%%writefile notebook-config.yaml

grader_api_url: 'https://23whrwph9h.execute-api.us-east-1.amazonaws.com/default/Grader23'
grader_api_key: 'flfkE736fA6Z8GxMDJe2q8Kfk8UDqjsG3GVqOFOa'

Writing notebook-config.yaml


In [3]:
!cat notebook-config.yaml


grader_api_url: 'https://23whrwph9h.execute-api.us-east-1.amazonaws.com/default/Grader23'
grader_api_key: 'flfkE736fA6Z8GxMDJe2q8Kfk8UDqjsG3GVqOFOa'


In [4]:
from penngrader.grader import *

## TODO - Start
STUDENT_ID = 31109579 # YOUR PENN-ID GOES HERE AS AN INTEGER#
## TODO - End

SECRET = STUDENT_ID
grader = PennGrader('notebook-config.yaml', 'cis5300_25f_HW3', STUDENT_ID, SECRET)

PennGrader initialized with Student ID: 31109579

Make sure this correct or we will not be able to store your grade


In [5]:
def reload_grader():
    grader = PennGrader('notebook-config.yaml', 'cis5300_25f_HW3', STUDENT_ID, SECRET)
    return grader

In [6]:
# check if the PennGrader is set up correctly
# do not change this cell, see if you get 1/1!
name_str = 'Ravi Raghavan'
grader.grade(test_case_id = 'name_test', answer = name_str)

Correct! You earned 4/4 points. You are a star!

Your submission has been successfully recorded in the gradebook.


## Setup 2: Dataset/Packages

Importing packages

In [7]:
!pip install evaluate==0.4.0
!pip install jiwer==3.0.3

Collecting evaluate==0.4.0
  Downloading evaluate-0.4.0-py3-none-any.whl.metadata (9.4 kB)
Collecting responses<0.19 (from evaluate==0.4.0)
  Downloading responses-0.18.0-py3-none-any.whl.metadata (29 kB)
Downloading evaluate-0.4.0-py3-none-any.whl (81 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.4/81.4 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading responses-0.18.0-py3-none-any.whl (38 kB)
Installing collected packages: responses, evaluate
Successfully installed evaluate-0.4.0 responses-0.18.0
Collecting jiwer==3.0.3
  Downloading jiwer-3.0.3-py3-none-any.whl.metadata (2.6 kB)
Collecting rapidfuzz<4,>=3 (from jiwer==3.0.3)
  Downloading rapidfuzz-3.14.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (12 kB)
Downloading jiwer-3.0.3-py3-none-any.whl (21 kB)
Downloading rapidfuzz-3.14.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB

In [8]:
import os
import json
from tqdm.notebook import tqdm

import urllib.request

import math
import numpy as np
import pandas as pd
import tokenizers
from tokenizers import Tokenizer

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

from evaluate import load

from collections import defaultdict
from typing import List, Any, Tuple

## Preliminary Task: Dataloader

Downloading Required Files:

- We are downloading the training, development and test data for the WSJ dataset, and the development and test data for the HUB dataset, in the `data/` directory.

- The subdirectory `lm_data` contains the WSJ corpus from the Penn Treebank, and `wer_data` contains the HUB dataset.

- `lm_data` will be used to train your LMs and evaluate perplexity

- `wer_data` will be used to evaluate word error rate for speech recognition re-ranking (i.e., with a good LM, you should be able to help rank sentence predictions with better grammar above those that are unlikely to have been said)

In [9]:
## DO NOT CHANGE ANYTHING, JUST RUN

# Downloading Required Files
!gdown 1BfEyyk7q_ZBPtZspTh-SKjA3OWGpSDqM
!gdown 1JbCMyMWuifPu8SdzW68HkprKtyKgqhcN
!gdown 141wBLF4Rzip9SJx-5MWQOUJmnBVXVaEn
!unzip -o data_dependencies.zip
!rm data_dependencies.zip

Downloading...
From: https://drive.google.com/uc?id=1BfEyyk7q_ZBPtZspTh-SKjA3OWGpSDqM
To: /content/data_dependencies.zip
100% 2.02M/2.02M [00:00<00:00, 169MB/s]
Downloading...
From: https://drive.google.com/uc?id=1JbCMyMWuifPu8SdzW68HkprKtyKgqhcN
To: /content/unit_test_state_dict.pth
100% 299k/299k [00:00<00:00, 143MB/s]
Downloading...
From: https://drive.google.com/uc?id=141wBLF4Rzip9SJx-5MWQOUJmnBVXVaEn
To: /content/testing_data.pth
100% 52.2k/52.2k [00:00<00:00, 61.5MB/s]
Archive:  data_dependencies.zip
   creating: data/
   creating: data/wer_data/
  inflating: data/wer_data/dev_sentences.json  
  inflating: data/wer_data/dev_ground_truths.csv  
  inflating: data/wer_data/test_sentences.json  
   creating: data/lm_data/
  inflating: data/lm_data/treebank-sentences-test.txt  
  inflating: data/lm_data/treebank-sentences-dev.txt  
  inflating: data/lm_data/treebank-sentences-train.txt  


First, build out your tokenizer for different **tokenization levels** and **model types**. The purpose of the tokenizer is to map sentences into sequences of token ID's.

Helpful hints:
1. Do not forget about [pre-tokenizers](https://huggingface.co/docs/tokenizers/en/api/pre-tokenizers), [normalizers](https://huggingface.co/docs/tokenizers/en/api/normalizers), [post-processers](https://huggingface.co/docs/tokenizers/en/api/post-processors), or padding. These steps will be important to improve the performance of your LMs.
2. It is optional to train your tokenizer on the `train_data` input.
3. You will want to pad inputs for the transformer, but not necessarily for the n-gram model.


⚠️ **Important**: You can get tokenizers, including pre-trained BPE tokenizers, from the [tokenizers](https://github.com/huggingface/tokenizers) package.

⚠️ **Important**: The transformer code assumes the ID of the `pad_token` to be the last index of the vocabulary. Be sure to assert this behavior or to change the transformer implementation accordingly.

In [10]:
### STUDENT CELL ###: DONE

## Steps to Building a Tokenizer: https://huggingface.co/learn/llm-course/en/chapter6/8
from tokenizers.models import WordLevel
from tokenizers.pre_tokenizers import Whitespace, Split
from tokenizers.normalizers import NFD, Lowercase, StripAccents, Sequence
from tokenizers.trainers import WordLevelTrainer
from tokenizers.processors import TemplateProcessing

def get_tokenizer(train_data: List[str], tokenization_level: str, model_type: str):
    """
    Function for creating the appropriate tokenizer.

    Inputs:
    train_data (list): A list of data to build your tokenizer vocabulary
    tokenization_level (str): The level at which to tokenize the input
        Options are: "word", "subword", or "character"
    model_type (str): "n_gram" or "transformer"
    """

    SPECIAL_TOKENS = {'pad_token': '[PAD]', 'bos_token': '[BOS]', 'eos_token': '[EOS]', 'unk_token': '[UNK]'}

    #### STUDENT CODE HERE ####
    match tokenization_level:
        case "word":
            # Create a word-level tokenizer
            trainer = WordLevelTrainer(special_tokens = list(SPECIAL_TOKENS.values()))
            tokenizer = Tokenizer(WordLevel(unk_token = SPECIAL_TOKENS['unk_token']))
            tokenizer.normalizer = Sequence([NFD(), Lowercase(), StripAccents()])
            tokenizer.pre_tokenizer = Whitespace() # Splits into Words
            tokenizer.train_from_iterator(train_data, trainer=trainer)

            tokenizer.enable_padding(pad_id=tokenizer.token_to_id(SPECIAL_TOKENS['pad_token']),
                                     pad_token=SPECIAL_TOKENS['pad_token'])

            tokenizer.post_processor = TemplateProcessing(
                single=f"{SPECIAL_TOKENS['bos_token']} $0 {SPECIAL_TOKENS['eos_token']}",
                special_tokens=[
                    (SPECIAL_TOKENS['bos_token'], tokenizer.token_to_id(SPECIAL_TOKENS['bos_token'])),
                    (SPECIAL_TOKENS['eos_token'], tokenizer.token_to_id(SPECIAL_TOKENS['eos_token']))
                ]
            )

            print(f"Case: word, PAD ID: {tokenizer.token_to_id(SPECIAL_TOKENS['pad_token'])}")

        case "subword":
            # Loads a pre-trained subword tokenizer. No need to edit. Padding ID is 0
            tokenizer = Tokenizer.from_pretrained("bert-base-uncased")
            print(f"Case: subword, PAD ID: {tokenizer.token_to_id(SPECIAL_TOKENS['pad_token'])}")

        case "character":
            # Character-level tokenizer
            trainer = WordLevelTrainer(special_tokens = list(SPECIAL_TOKENS.values()))
            tokenizer = Tokenizer(WordLevel(unk_token = SPECIAL_TOKENS['unk_token']))
            tokenizer.normalizer = Sequence([NFD(), Lowercase(), StripAccents()])
            tokenizer.pre_tokenizer = Split(pattern = r"", behavior = "isolated")  # splits into chars
            tokenizer.train_from_iterator(train_data, trainer=trainer)

            tokenizer.enable_padding(pad_id=tokenizer.token_to_id(SPECIAL_TOKENS['pad_token']),
                                     pad_token=SPECIAL_TOKENS['pad_token'])

            tokenizer.post_processor = TemplateProcessing(
                single=f"{SPECIAL_TOKENS['bos_token']} $0 {SPECIAL_TOKENS['eos_token']}",
                special_tokens=[
                    (SPECIAL_TOKENS['bos_token'], tokenizer.token_to_id(SPECIAL_TOKENS['bos_token'])),
                    (SPECIAL_TOKENS['eos_token'], tokenizer.token_to_id(SPECIAL_TOKENS['eos_token']))
                ]
            )

            ### Dhruv Changed
            vocab = tokenizer.get_vocab()
            vocab_size = len(vocab)
            pad_id = tokenizer.token_to_id(SPECIAL_TOKENS['pad_token'])

            if model_type == "transformer":
                # Transformer requires pad_id = vocab_size - 1
                if pad_id != vocab_size - 1:
                    # Rebuild vocab so [PAD] is last
                    vocab_items = sorted(vocab.items(), key=lambda x: x[1])
                    new_vocab = [token for token, _ in vocab_items if token != SPECIAL_TOKENS['pad_token']]
                    new_vocab.append(SPECIAL_TOKENS['pad_token'])  # put PAD last
                    tokenizer = Tokenizer(WordLevel(vocab={t: i for i, t in enumerate(new_vocab)},
                                                    unk_token=SPECIAL_TOKENS['unk_token']))

                    tokenizer.normalizer = Sequence([NFD(), Lowercase(), StripAccents()])
                    tokenizer.pre_tokenizer = (Whitespace() if tokenization_level == "word"
                                              else Split(pattern=r"", behavior="isolated"))
                    tokenizer.post_processor = TemplateProcessing(
                        single=f"{SPECIAL_TOKENS['bos_token']} $0 {SPECIAL_TOKENS['eos_token']}",
                        special_tokens=[
                            (SPECIAL_TOKENS['bos_token'], new_vocab.index(SPECIAL_TOKENS['bos_token'])),
                            (SPECIAL_TOKENS['eos_token'], new_vocab.index(SPECIAL_TOKENS['eos_token']))
                        ]
                    )
                    pad_id = len(new_vocab) - 1  # last index

            if model_type == "transformer":
                tokenizer.enable_padding(pad_id=pad_id, pad_token=SPECIAL_TOKENS['pad_token'])
            ### Dhruv Changed

            print(f"Case: character, PAD ID: {tokenizer.token_to_id(SPECIAL_TOKENS['pad_token'])}")

        case _:
            raise ValueError(f"Unknown tokenization level: {tokenization_level}")
    #### STUDENT CODE ENDS HERE ####

    return tokenizer

In [11]:
### Check that your tokenizer works. Feel free to change these examples
### to get a sense of the tokenizer behavior.

train_data = [
    "Curiouser and curiouser!",
    "Oh dear! Oh dear! I shall be late!",
    "Sentence first, verdict afterwards",
]

tokens = {}
for tokenization_level in ["word", "subword", "character"]:
    print("Tokenization Level:", tokenization_level)

    tokenizer = get_tokenizer(train_data, tokenization_level, "ngram")

    print("1. Sentence from training")
    print(f'{"TOKEN":10}', "ID")
    print("-" * 20)
    toks = tokenizer.encode(train_data[0])
    for token, id in zip(toks.tokens, toks.ids):
        print(f'{token:10}', id)
    print()

    print("2. Sentence not from training")
    print(f'{"TOKEN":10}', "ID")
    print("-" * 20)
    toks = tokenizer.encode("Most curious thing I ever saw")
    for token, id in zip(toks.tokens, toks.ids):
        print(f'{token:10}', id)
    print()
    print("=" * 20)

    # Accumulate for PennGrader
    tokens[tokenization_level] = {
        'ids': toks.ids,
        'tokens': toks.tokens
    }


### If you have implemented your tokenizer correctly, the first few lines of
### your output should be similar to the following (note IDs
### may not match exactly, and that is OK):

# Tokenization Level: word
# 1. Sentence from training
# TOKEN      ID
# --------------------
# [BOS]      1
# curiouser  4
# and        5
# curiouser  4
# !          6
# [EOS]      2

# 2. Sentence not from training
# TOKEN      ID
# --------------------
# [BOS]      1
# [UNK]      3
# [UNK]      3
# [UNK]      3
# i          9
# [UNK]      3
# [UNK]      3
# [EOS]      2

Tokenization Level: word
Case: word, PAD ID: 0
1. Sentence from training
TOKEN      ID
--------------------
[BOS]      1
curiouser  5
and        10
curiouser  5
!          4
[EOS]      2

2. Sentence not from training
TOKEN      ID
--------------------
[BOS]      1
[UNK]      3
[UNK]      3
[UNK]      3
i          13
[UNK]      3
[UNK]      3
[EOS]      2

Tokenization Level: subword


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Case: subword, PAD ID: 0
1. Sentence from training
TOKEN      ID
--------------------
[CLS]      101
curious    8025
##er       2121
and        1998
curious    8025
##er       2121
!          999
[SEP]      102

2. Sentence not from training
TOKEN      ID
--------------------
[CLS]      101
most       2087
curious    8025
thing      2518
i          1045
ever       2412
saw        2387
[SEP]      102

Tokenization Level: character
Case: character, PAD ID: 0
1. Sentence from training
TOKEN      ID
--------------------
[BOS]      1
c          13
u          15
r          6
i          10
o          14
u          15
s          8
e          5
r          6
           4
a          7
n          18
d          9
           4
c          13
u          15
r          6
i          10
o          14
u          15
s          8
e          5
r          6
!          12
[EOS]      2

2. Sentence not from training
TOKEN      ID
--------------------
[BOS]      1
[UNK]      3
o          14
s          8
t        

In [12]:
# PennGrader Grading Cell, do not modify.
# This tests that your tokenizer has the correct splitting lengths and
# includes minimal special tokens.

grader.grade(test_case_id = 'test_tokenizer', answer = tokens)

Correct! You earned 6/6 points. You are a star!

Your submission has been successfully recorded in the gradebook.


Next, we will need to load the data from `lm_data` and `wer_data` from the `data` directory.

- For the n-gram model, this function should simply return a list of tokens.

- For the transformer model, this function should return a PyTorch [DataLoader](https://docs.pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader) that wraps `LMDataset`.

In [13]:
### STUDENT CELL ###

class LMDataset(Dataset):
    """
    Class for loading tokens as input and target token IDs
    (only used for transformer implementation)
    """

    def __init__(self, tokenized_sentences: List[List[int]]):
        self.tokenized_sentences = tokenized_sentences

    def __len__(self):
        return len(self.tokenized_sentences)

    def __getitem__(self, idx) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Returns input and output token IDs
        """
        ### STUDENT CODE HERE ###
        # Note to Dhruv: May need to adjust for padding here. Let's discuss this when u begin Transformer implementation
        input_tokens = torch.tensor(self.tokenized_sentences[idx])
        target_tokens = torch.tensor(self.tokenized_sentences[idx][1:] + [1]) # 1 denotes NEW EOS. NEED A BETTER WAY FOR THIS

        ### STUDENT CODE ENDS HERE ###

        return input_tokens, target_tokens

### DHRUV CHANGE FUNCTION
def collate_fn(batch, pad_token_id: int):
    """
    Pads a batch of variable-length input and target sequences to the same length.
    """
    input_seqs, target_seqs = zip(*batch)

    # Pad sequences dynamically for this batch
    input_padded = torch.nn.utils.rnn.pad_sequence(input_seqs, batch_first=True, padding_value=pad_token_id)
    target_padded = torch.nn.utils.rnn.pad_sequence(target_seqs, batch_first=True, padding_value=pad_token_id)

    return input_padded, target_padded
### DHRUV CHANGE FUNCTION


# Load datasets
def load_data(tokenization_level: str, model_type: str, batch_size: int = 16):
    """
    Function for loading data for language modeling and WER computation. You
    may modify the function header and outputs as necessary.

    Inputs:
        tokenization_level (str): The level at which to tokenize the input
        model_type (str): n_gram or transformer
        batch_size (int): The batch size. You may want to update this value.
    """
    data = defaultdict(dict)

    # Load data from files
    treebank_datadir = "data/lm_data"
    for f in os.listdir(treebank_datadir):
        data['treebank'][f.split('-')[-1][:-4]] = open(os.path.join(treebank_datadir, f), 'r').readlines()
    hub_datadir = "data/wer_data"
    for f in os.listdir(hub_datadir):
        if 'ground_truths' in f: continue
        data['hub'][f.split('_')[0]] = json.load(open(os.path.join(hub_datadir, f), 'r'))

    # Load tokenizer
    tokenizer = get_tokenizer(
        train_data=data['treebank']['train'],
        tokenization_level=tokenization_level,
        model_type=model_type
    )

    # Tokenize data
    tokenized_data = {}
    for split, sentences in tqdm(data['treebank'].items(), desc="Tokenizing WSJ"):

        ### STUDENT CODE HERE ###
        # Update tokenized_data['treebank', split] with the tokenized inputs
        li = []
        for sentence in sentences:
            li.append(tokenizer.encode(sentence).ids)
        tokenized_data['treebank', split] = li

        ### STUDENT CODE ENDS HERE ###

        if model_type == "transformer":
            # Wrap in dataset with input_tokens and target_tokens
            tokenized_data['treebank', split] = LMDataset(tokenized_data['treebank', split])

            ### STUDENT CODE STARTS HERE ###
            # Wrap the tokens in a DataLoader
            pad_token_id = tokenizer.token_to_id("[PAD]")  # Dhruv Change
            tokenized_data['treebank', split] = DataLoader(tokenized_data['treebank', split], batch_size = batch_size, shuffle = True, collate_fn=lambda batch: collate_fn(batch, pad_token_id))  # Dhruv Change
            ### STUDENT CODE ENDS HERE ###

    for split, values in tqdm(data['hub'].items(), desc="Tokenizing HUB"):
        wer_data = {}
        for id, generations in values.items():
            asr_item = {}
            asr_item['sentences'] = generations['sentences']
            ### STUDENT CODE HERE ###
            # Update asr_item['tokens'] with the tokenized inputs

            li = []
            for sentence in generations['sentences']:
                li.append(tokenizer.encode(sentence).ids)
            asr_item['tokens'] = li

            ### STUDENT CODE ENDS HERE ###
            asr_item['acoustic_scores'] = generations['acoustic_scores']
            wer_data[id] = asr_item
        tokenized_data['hub', split] = wer_data

    return tokenized_data['treebank', 'train'], \
        tokenized_data['treebank', 'dev'], \
        tokenized_data['treebank', 'test'], \
        tokenized_data['hub', 'dev'], \
        tokenized_data['hub', 'test'], \
        tokenizer

In [14]:
### Check that the code runs without errors. This does not necessarily mean
### that it has been implemented correctly.

_ = load_data("character", "n_gram")
_ = load_data("character", "transformer")

Case: character, PAD ID: 0


Tokenizing WSJ:   0%|          | 0/3 [00:00<?, ?it/s]

Tokenizing HUB:   0%|          | 0/2 [00:00<?, ?it/s]

Case: character, PAD ID: 47


Tokenizing WSJ:   0%|          | 0/3 [00:00<?, ?it/s]

Tokenizing HUB:   0%|          | 0/2 [00:00<?, ?it/s]

You are now complete with data loading. Note that if you have difficulties training or evaluating your model, you may have to revisit these cell blocks and assert that they are implemented correctly.

## Task A: N-Gram Language Model

Implement an n-gram model from scratch, based on the
starter code. This implementation needs to be compatible with different levels of tokenization (e.g. word-level, sub-word level, character-level).

Helpful hints:
1. It may be helpful to write helper functions, such as transforming a sequence into n-grams or computing `Pr(w | ctx)`.
2. For higher order n-grams, you will want to pad the beginning of sequences, so that the starting tokens are used.


In [15]:
### STUDENT CELL ###
class NGramLM():
    """
    N-gram language model
    """


    def __init__(self, n):
        """
        Initializes the n-gram model. You may add keyword arguments to this function
        to modify the behavior of the n-gram model. The default behavior for unit tests should
        be that of an n-gram model without any label smoothing.

        Important for unit tests: If you add <bos> or <eos> tokens to model inputs, this should
        be done in data processing, outside of the NGramLM class.

        Inputs:
            n (int): The value of n to use in the n-gram model
        """
        self.n = n
        ### STUDENT CODE HERE ###
        self.counts = defaultdict(lambda:0) # Store counts of tuples
        self.vocab = set() # Store the n-grams
        ### STUDENT CODE ENDS HERE ###

    # Getter Function
    def get_counts(self):
        return self.counts

    # Helper Function to Pad Sentence. Ideally, we need self.n - 1 padding at beginning of sentence
    def pad_sentence(self, sentence):
        padding_id = 0
        existing_padding_amount = 0
        for idx in range(len(sentence)):
            if sentence[idx] == 0:
                existing_padding_amount += 1
            else:
                break

        required_padding = max(self.n - 1 - existing_padding_amount, 0)
        sentence = [padding_id] * required_padding + sentence
        return sentence

    # Compute Log Prob of a Sentence
    def log_probability(self, model_input: List[Any], base=np.e):
        """
        Returns the log-probability of the provided model input.

        Inputs:
            model_input (List[Any]): The list of tokens associated with the input text.
            base (float): The base with which to compute the log-probability

        Returns:
            float: Log probability of the sequence
        """
        ### STUDENT CODE HERE ###
        # Start by padding the sentence
        sentence = self.pad_sentence(model_input)

        # Cumulative log prob
        log_prob = 0.0

        # Start iterating at a non-padding index
        start_index = self.n - 1
        for idx in range(start_index, len(sentence)):
            prev = tuple(sentence[idx - (self.n - 1):idx]) # previous n - 1 tokens
            curr = tuple([sentence[idx]]) # current token
            overall = prev + curr # total set of n tokens

            # Ignore Unknown
            if self.counts[overall] == 0 or self.counts[prev] == 0:
                continue

            log_prob += np.log(self.counts[overall] / self.counts[prev])

        return log_prob / np.log(base)

        ### STUDENT CODE ENDS HERE ###

    # Train the Model
    def learn(self, training_data: List[List[Any]]):
        """
        Function for learning n-grams from the provided training data. You may
        add keywords to this function as needed, provided that the default behavior
        is that of an n-gram model without any label smoothing.

        Inputs:
            training_data (List[List[Any]]): A list of model inputs, which should each be lists
                                             of input tokens
        """
        ### STUDENT CODE HERE ###
        # First as sanity check, we pad sentences
        sentences = [self.pad_sentence(sentence) for sentence in training_data]

        # Iterate thru sentences from non-padding value
        start_index = self.n - 1
        for sentence in sentences:
            # Iterate through sentence
            for idx in range(start_index, len(sentence)):
                prev = tuple(sentence[idx - (self.n - 1):idx]) # previous n - 1 tokens
                curr = tuple([sentence[idx]]) # current token
                overall = prev + curr # total set of n tokens

                # Update vocab
                self.vocab.add(sentence[idx]) # Add current token to vocab set

                # Update counts
                self.counts[overall] += 1 # Adjust statistics for overall = prev + curr
                self.counts[prev] += 1 # Adjust statistics for prev

        ### STUDENT CODE ENDS HERE ###

    # Generate Sentences: TODO
    def generate(self, num_samples: int, max_steps: int):
        """
        Function for generating text using the n-gram model.

        Inputs:
            num_samples (int): How many samples to generate
            max_steps (int): The maximum length of any sampled output
        """
        ## OPTIONAL, but helpful to get understanding of n-gram performance
        ## Note you will have to decode text before being able to read it

        ### STUDENT CODE HERE ###
        return []
        ### STUDENT CODE ENDS HERE ###

We have provided some test cases below for you to test your n-gram LM implementation. PennGrader will use some of these test cases and evaluate you on additional hidden ones.

In [16]:
## Test script provided to assist you
def num_tokens_in_corpus(model, data):
    """
    Helper function returning the number of tokens in the corpus
    """
    if type(data) == list:
        total = sum([len(dp) for dp in data])
    else:
        padding_idx = model.padding_idx

        total = 0
        for input_tokens, target_tokens in data:
            total += torch.sum(input_tokens != padding_idx).item()
            total += torch.sum(target_tokens[:, -1] != padding_idx).item()
    return total

def corpus_log_probability(model, data):
    """
    Helper function computing the total log-probability of the input corpus
    """
    log_p = 0
    if type(data) == list:
        for datapoint in data:
            log_p += model.log_probability(datapoint, base=2)
    else:
        device = next(model.parameters()).device
        for input_tokens, target_tokens in data:
            input_tokens = input_tokens.to(device)
            target_tokens = target_tokens.to(device)
            input_tokens = input_tokens
            target_tokens = target_tokens
            log_p += torch.sum(model.log_probability(input_tokens, target_tokens, base=2)).item()
    return log_p

def evaluate_perplexity(model: Any, data: Any):
    """
    Function for computing perplexity on the provided dataset.

    Inputs:
        model (Any): An n-gram or Transformer model.
        data (Any):  Data in the form suitable for the input model. For the n-gram model,
                    data should be of type List[List[Any]]. For the transformer, the data
                    should by of type torch.utils.data.DataLoader.
    """

    m = num_tokens_in_corpus(model, data)
    l = corpus_log_probability(model, data)
    return 2 ** (- l / m)

# Evaluation data
DATA = [
    [0, 8, 143, 144, 145, 8, 146, 147, 75, 148, 149, 58, 11, 150, 151, 29, 141, 139, 58, 13],
    [0, 59, 74, 75, 76, 9, 77, 78, 79, 80, 81, 9, 82, 83, 9, 84, 85, 86, 87, 13],
    [0, 6, 63, 9, 64, 65, 66, 67, 1, 68, 69, 70, 71, 25, 72, 73, 13],
    [0, 202, 240, 202, 241, 242, 20, 243, 39, 244, 245, 59, 246, 61, 247, 248, 182, 249, 13],
    [0, 9, 133, 134, 135, 136, 39, 137, 138, 139, 11, 139, 140, 97, 21, 141, 20, 137, 97, 142, 13],
    [0, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 11, 57, 58, 59, 60, 61, 62, 58, 13],
    [0, 175, 2, 176, 177, 43, 178, 9, 179, 94, 180, 181, 182, 183, 184, 185, 38, 186, 187, 184, 188, 189, 13],
    [0, 14, 15, 16, 17, 18, 19, 9, 20, 21, 22, 23, 13],
    [0, 152, 94, 153, 154, 155, 38, 156, 157, 2, 158, 159, 160, 161, 13],
    [0, 99, 100, 6, 101, 102, 103, 6, 104, 105, 29, 106, 59, 107, 108, 109, 110, 94, 111, 19, 9, 112, 38, 9, 113, 1, 100, 30, 114, 115, 116, 117, 118, 13],
    [0, 202, 203, 9, 204, 205, 206, 94, 207, 29, 81, 208, 59, 209, 210, 211, 212, 11, 213, 188, 214, 100, 215, 13],
    [0, 162, 25, 190, 191, 192, 9, 193, 1, 194, 195, 196, 197, 94, 178, 198, 199, 38, 200, 201, 13],
    [0, 9, 216, 118, 6, 30, 217, 9, 218, 94, 9, 219, 8, 220, 221, 29, 222, 223, 224, 225, 226, 13],
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
    [0, 116, 119, 120, 1, 35, 121, 122, 123, 9, 124, 94, 125, 126, 9, 127, 128, 129, 130, 30, 114, 83, 131, 132, 29, 39, 58, 13],
    [0, 35, 9, 36, 37, 38, 39, 40, 41, 42, 43, 44, 13],
    [0, 162, 63, 59, 163, 164, 94, 84, 165, 75, 166, 9, 167, 168, 169, 170, 9, 171, 172, 173, 174, 13],
    [0, 142, 227, 59, 228, 229, 38, 9, 230, 231, 94, 200, 232, 233, 29, 234, 235, 236, 9, 237, 223, 238, 239, 13],
    [0, 9, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 13],
    [0, 88, 89, 69, 90, 91, 92, 93, 56, 94, 52, 95, 96, 97, 98, 13],
]
SUBSETS = [
    [4, 19, 6, 5, 2],
    [8, 5, 10, 18, 12, 1, 14, 9],
    [15, 17, 4, 10, 9, 6, 12, 13, 5, 7, 18, 19, 14]
]
EXPECTED = {
    (1, 5): 177.899204,
    (1, 8): 161.944637,
    (1, 13): 160.453624,
    (2, 5): 1.899786,
    (2, 8): 2.077985,
    (2, 13): 2.084503
}

# iterate through all test cases
for n in range(1, 3):
    model = NGramLM(n)
    model.learn(DATA)

    for subset in SUBSETS:
        subset_len = len(subset)
        eval_data = [DATA[idx] for idx in subset]
        perplexity = round(evaluate_perplexity(model, eval_data), 6)

        assert(EXPECTED[(n, subset_len)] == perplexity), \
              f"Expected {EXPECTED[(n, subset_len)]} but got {perplexity}"

print("All test cases passed successfully!")

All test cases passed successfully!


In [17]:
# PennGrader Grading Cell, do not modify.
# This tests that your NGramLM is implemented correctly.
# - 6 of 12 points: 6 visible test cases
# - 6 of 12 points: 6 hidden test cases

grader.grade(test_case_id = 'test_n_gram', answer = NGramLM)

Correct! You earned 12/12 points. You are a star!

Your submission has been successfully recorded in the gradebook.


You will now evaluate the performance of various n-gram LMs.

For the word-level n-gram models, to improve the performance of your model, you will need to experiment with:
1. Values of n: how does performance change when you vary the value of n? At least, experiment with n = 1, 2, 3.
2. Smoothing techniques: at least, you need to experiment with using linear interpolation.
3. Tokenization: compare word- and BPE sub-word level tokenization. How does different tokenization influence your model's performance?

If you have implemented the smoothed n-gram correctly, you should see the smoothed model obtaining lower perplexity than any other n-gram model. Try to obtain hyperparameters that obtain good performance, which will be important for evaluating the re-ranking performance of your n-gram.

In [18]:
### STUDENT CELL ###
# Smoothed N Gram Model
class SmoothedNGram():
    """
    Smoothed N-gram language model
    """
    # Assumption: ngram_models contains unigram, bigram, then trigram
    # Assumption: Lambdas are ordered same way
    def __init__(self,
            ngram_models: List[Any],
            lambdas: List[float],
            epsilon = 1):
        self.ngram_models = ngram_models
        self.lambdas = lambdas
        self.n = 3
        self.epsilon = epsilon

        assert self.ngram_models[2].n == 3
        assert self.ngram_models[1].n == 2
        assert self.ngram_models[0].n == 1

    # Call learn for each of the individual ngram models
    def learn(self, training_data: List[List[Any]]):
        for ngram_model in self.ngram_models:
            ngram_model.learn(training_data)

    # Pad sentence to the necessary length for the longest n-gram model we have
    def pad_sentence(self, sentence):
        padding_id = 0
        existing_padding_amount = 0
        for idx in range(len(sentence)):
            if sentence[idx] == 0:
                existing_padding_amount += 1
            else:
                break

        required_padding = max(self.n - 1 - existing_padding_amount, 0)
        sentence = [padding_id] * required_padding + sentence
        return sentence

    # Compute log probability
    def log_probability(self,
            model_input: List[Any],
            base=np.e):
        """
        Function for generating a linear-interpolated log prob
        """
        ### STUDENT CODE HERE ###
        # Pad sentence just for sanity check
        model_input = self.pad_sentence(model_input)

        # Get unigram, bigram, and trigram models
        trigram_model = self.ngram_models[2]
        bigram_model = self.ngram_models[1]
        unigram_model = self.ngram_models[0]

        # Get count dictionaries for the above models
        counts_trigram = trigram_model.get_counts()
        counts_bigram = bigram_model.get_counts()
        counts_unigram = unigram_model.get_counts()

        # Store lambda values
        lambda3 = self.lambdas[2]
        lambda2 = self.lambdas[1]
        lambda1 = self.lambdas[0]

        start_idx = self.n - 1 # start iteration where there is no padding
        log_prob = 0 # Store cumulative log prob

        # Iterate!
        for idx in range(start_idx, len(model_input)):
            curr = tuple([model_input[idx]])
            trigram_prev = tuple(model_input[idx-2:idx])
            bigram_prev = tuple(model_input[idx-1:idx])
            unigram_prev = tuple(model_input[idx:idx])

            ### When we want to use the basic NGram Model 1 of the lambdas will be 1 and the rest will be 0 (one hot encoded)
            # Handle case where lambda3 = 1 (We just want to run the Trigram Model or Trigram Model + Laplace Smoothing)
            if lambda3 == 1:
                # Ignore unknowns if we have 0 epsilon(i.e. no smoothing)
                if (counts_trigram[trigram_prev + curr] == 0 or counts_trigram[trigram_prev] == 0) and self.epsilon == 0:
                    log_prob += -9999999 # Add a very large negative number instead of -inf just for stability
                    continue
                gram_prob = (counts_trigram[trigram_prev + curr] + self.epsilon) / (counts_trigram[trigram_prev] + self.epsilon * len(trigram_model.vocab))
                log_prob += (np.log(gram_prob) / np.log(base))
                continue

            # Handle case where lambda2 = 1 (We just want to run the Bigram model or Bigram model + Laplace Smoothing)
            if lambda2 == 1:
                # Ignore unknowns if we have 0 epsilon(i.e. no smoothing)
                if (counts_bigram[bigram_prev + curr] == 0 or counts_bigram[bigram_prev] == 0) and self.epsilon == 0:
                    log_prob += -9999999 # Add a very large negative number instead of -inf just for stability
                    continue
                gram_prob = (counts_bigram[bigram_prev + curr] + self.epsilon) / (counts_bigram[bigram_prev] + self.epsilon * len(bigram_model.vocab))
                log_prob += (np.log(gram_prob) / np.log(base))
                continue

            # Handle case where lambda1 = 1 (We just want to run a Unigram model or Unigram model + Laplace Smoothing)
            if lambda1 == 1:
                # Ignore unknowns if we have 0 epsilon(i.e. no smoothing)
                if (counts_unigram[unigram_prev + curr] == 0 or counts_unigram[unigram_prev] == 0) and self.epsilon == 0:
                    log_prob += -9999999  # Add a very large negative number instead of -inf just for stability
                    continue
                gram_prob = (counts_unigram[unigram_prev + curr] + self.epsilon) / (counts_unigram[unigram_prev] + self.epsilon * len(unigram_model.vocab))
                log_prob += (np.log(gram_prob) / np.log(base))
                continue

            # Now we move towards Linear Interpolation + Laplace Smoothing
            # If epsilon == 0 and counts are 0, we ignore unknowns!
            if self.epsilon == 0 and (counts_trigram[trigram_prev + curr] == 0 or counts_trigram[trigram_prev] == 0):
                log_prob += -9999999 # Add a very large negative number instead of -inf just for stability
                continue
            if self.epsilon == 0 and (counts_bigram[bigram_prev + curr] == 0 or counts_bigram[bigram_prev] == 0):
                log_prob += -9999999 # Add a very large negative number instead of -inf just for stability
                continue
            if self.epsilon == 0 and (counts_unigram[unigram_prev + curr] == 0 or counts_unigram[unigram_prev] == 0):
                log_prob += -9999999 # Add a very large negative number instead of -inf just for stability
                continue

            # Compute trigram prob w/Laplace Smoothing
            trigram_prob = (counts_trigram[trigram_prev + curr] + self.epsilon) / (counts_trigram[trigram_prev] + self.epsilon * len(trigram_model.vocab))

            # Compute bigram prob w/Laplace Smoothing
            bigram_prob = (counts_bigram[bigram_prev + curr] + self.epsilon) / (counts_bigram[bigram_prev] + self.epsilon * len(bigram_model.vocab))

            # Compute unigram prob w/Laplace Smoothing
            unigram_prob = (counts_unigram[unigram_prev + curr] + self.epsilon) / (counts_unigram[unigram_prev] + self.epsilon * len(unigram_model.vocab))

            # Linear Interpolation among the unigram, bigram, and trigram probabilities
            gram_prob = lambda1 * unigram_prob + lambda2 * bigram_prob + lambda3 * trigram_prob
            log_prob += (np.log(gram_prob) / np.log(base))

        return log_prob

        ### STUDENT CODE ENDS HERE ###


In [19]:
### STUDENT CODE HERE ###
import itertools

# Store best model w/best perplexity for use later on
best_perplexity = float("inf")
best_config = None
best_model = None

results = [] # Store Results

# Store tokenization levels
tokenization_levels = ["word", "subword"]

# Store different linear interpolations
lambda_sets = [[0.33, 0.33, 0.33], [0.05, 0.05, 0.9], [0.1, 0.1, 0.8],
               [0.15, 0.15, 0.7], [0.2, 0.2, 0.6], [0.25, 0.25, 0.5], [1, 0, 0], [0, 1, 0], [0, 0, 1]]

# Store different levels of Laplacian Smoothing
epsilon_values = [1e-7, 1e-2, 10]

product_list = list(itertools.product(tokenization_levels, lambda_sets, epsilon_values))

## Test NGram Models with N = 1, 2, 3 (no Smoothing, no Linear Interpolation, and Different Tokenizations)
product_list.append(("word", [1, 0, 0], 0))
product_list.append(("word", [0, 1, 0], 0))
product_list.append(("word", [0, 0, 1], 0))

product_list.append(("subword", [1, 0, 0], 0))
product_list.append(("subword", [0, 1, 0], 0))
product_list.append(("subword", [0, 0, 1], 0))

# Pre-load data and store in dictionaries
loaded_data = dict()
loaded_data["word"] = load_data("word", "n_gram")
loaded_data["subword"] = load_data("subword", "n_gram")
loaded_data["character"] = load_data("character", "n_gram")


for token_level, lambdas, epsilon in product_list:
    print(f"\n=== Tokenization level: {token_level} ===")
    print(f"-> Testing lambdas = {lambdas}")
    print(f"-> Testing Epsilon: {epsilon}")

    train_data, dev_data, test_data, \
        dev_wer_data, test_wer_data, \
        tokenizer = loaded_data[token_level]

    ngram_models = [NGramLM(1), NGramLM(2), NGramLM(3)]
    model = SmoothedNGram(ngram_models, lambdas, epsilon)
    model.learn(train_data)

    train_perplexity = evaluate_perplexity(model, train_data)
    print(f'Model perplexity on the train set: {train_perplexity}')
    dev_perplexity = evaluate_perplexity(model, dev_data)
    print(f'Model perplexity on the dev set: {dev_perplexity}')
    test_perplexity = evaluate_perplexity(model, test_data)
    print(f'Model perplexity on the test set: {test_perplexity}')
    print()

    results.append({
        "tokenization": token_level,
        "lambda1": lambdas[0],
        "lambda2": lambdas[1],
        "lambda3": lambdas[2],
        "epsilon": epsilon,
        "train_perplexity": train_perplexity,
        "dev_perplexity": dev_perplexity,
        "test_perplexity": test_perplexity
    })

    if dev_perplexity < best_perplexity:
        best_perplexity = dev_perplexity
        best_config = (token_level, lambdas, epsilon)
        best_model = model

### STUDENT CODE END HERE ###
results_df = pd.DataFrame(results)
results_df.to_csv("ngram_perplexity_results.csv", index=False)
print("All results saved to 'ngram_perplexity_results.csv'")

Case: word, PAD ID: 0


Tokenizing WSJ:   0%|          | 0/3 [00:00<?, ?it/s]

Tokenizing HUB:   0%|          | 0/2 [00:00<?, ?it/s]

Case: subword, PAD ID: 0


Tokenizing WSJ:   0%|          | 0/3 [00:00<?, ?it/s]

Tokenizing HUB:   0%|          | 0/2 [00:00<?, ?it/s]

Case: character, PAD ID: 0


Tokenizing WSJ:   0%|          | 0/3 [00:00<?, ?it/s]

Tokenizing HUB:   0%|          | 0/2 [00:00<?, ?it/s]


=== Tokenization level: word ===
-> Testing lambdas = [0.33, 0.33, 0.33]
-> Testing Epsilon: 1e-07
Model perplexity on the train set: 12.859275925055545
Model perplexity on the dev set: 235.62702635233936
Model perplexity on the test set: 216.81872529946102


=== Tokenization level: word ===
-> Testing lambdas = [0.33, 0.33, 0.33]
-> Testing Epsilon: 0.01
Model perplexity on the train set: 113.67415518534337
Model perplexity on the dev set: 319.58979883510705
Model perplexity on the test set: 302.59009913503536


=== Tokenization level: word ===
-> Testing lambdas = [0.33, 0.33, 0.33]
-> Testing Epsilon: 10
Model perplexity on the train set: 1637.2130451577648
Model perplexity on the dev set: 1646.3877167853848
Model perplexity on the test set: 1623.414117479101


=== Tokenization level: word ===
-> Testing lambdas = [0.05, 0.05, 0.9]
-> Testing Epsilon: 1e-07
Model perplexity on the train set: 6.380171365532968
Model perplexity on the dev set: 451.08727392102645
Model perplexity on t

  return 2 ** (- l / m)


Model perplexity on the dev set: inf
Model perplexity on the test set: inf


=== Tokenization level: word ===
-> Testing lambdas = [0, 1, 0]
-> Testing Epsilon: 0
Model perplexity on the train set: 47.51167568119998
Model perplexity on the dev set: inf
Model perplexity on the test set: inf


=== Tokenization level: word ===
-> Testing lambdas = [0, 0, 1]
-> Testing Epsilon: 0
Model perplexity on the train set: 5.887677312979145
Model perplexity on the dev set: inf
Model perplexity on the test set: inf


=== Tokenization level: subword ===
-> Testing lambdas = [1, 0, 0]
-> Testing Epsilon: 0
Model perplexity on the train set: 1005.8047643782653
Model perplexity on the dev set: inf
Model perplexity on the test set: inf


=== Tokenization level: subword ===
-> Testing lambdas = [0, 1, 0]
-> Testing Epsilon: 0
Model perplexity on the train set: 46.41336662211598
Model perplexity on the dev set: inf
Model perplexity on the test set: inf


=== Tokenization level: subword ===
-> Testing lambd

In [20]:
best_config, best_model

(('subword', [0.33, 0.33, 0.33], 1e-07),
 <__main__.SmoothedNGram at 0x7eed8dca3200>)

You will now evaluate your model on word error rate (lower is better) during a speech recognition re-ranking task. You are provided a development set that can be used to tune your model, but will ultimately be graded on a held-out test set.

You will need to implement the re-ranking function, which should use your model to help re-rank the transcription predictions. Each transcription is already paired with an `acoustic_score`, which was the initial log probability by the automatic speech recognition model.

Helpful hints:
1. You may want to play around with different weights of `acoustic_score` and `lm_score`.
2. A simple baseline that randomly chooses the
candidate sentences would achieve an WER of around 14% on the HUB task. If your results are worse than this baseline, something has gone horribly wrong.


In [21]:
### STUDENT CELL ###

def rerank_sentences_for_wer(model: Any, wer_data: List[Any], acoustic_weight = 0.33, log_prob_sentence_weight = 0.33, sentence_length_weight = 0.33) -> None:
    """
    Function to rerank candidate sentences in the HUB dataset. For each set of sentences,
    you must assign each sentence a score in the form of the sentence's acoustic score plus
    the sentence's log probability. You should then save the top scoring sentences in a .csv
    file similar to those found in the results directory.

    Inputs:
        model (Any): An n-gram or Transformer model.
        wer_data (List[Any]): Processed data from the HUB dataset.

    Returns:
        A pandas DataFrame pairing sentence set ids and the top ranked sentences.
        - The DataFrame should have two columns, `id` and `sentences`.
        - See `data/wer_data/dev_ground_truths.csv` for an example.
    """
    ### STUDENT CODE HERE ###
    df_ids = []
    df_sentences = []
    sentence_set_ids = list(wer_data.keys())
    for sentence_set_id in sentence_set_ids:
        dic = wer_data[sentence_set_id]
        sentences = dic['sentences']
        token_lists = dic['tokens']
        acoustic_scores = dic['acoustic_scores']

        highest_ranked_sentence = None
        highest_score = -float("inf")

        for sentence, token_list, acoustic_score in zip(sentences, token_lists, acoustic_scores):
            log_prob_sentence = model.log_probability(token_list)
            score = acoustic_weight * acoustic_score + log_prob_sentence_weight * log_prob_sentence + sentence_length_weight * len(token_list)

            if score > highest_score:
                highest_score = score
                highest_ranked_sentence = sentence

        df_ids.append(sentence_set_id)
        df_sentences.append(highest_ranked_sentence)

    return pd.DataFrame({
                'id': df_ids,
                'sentences': df_sentences
            })

    ### STUDENT CODE END HERE ###

### EVALUATION CODE -- DO NOT EDIT ###
def compute_wer(gt_path, pred_csv):
    # Re-index pred_csv
    pred_csv.index = pred_csv.id

    # Load the sentences
    ground_truths = pd.read_csv(gt_path)
    guesses = pred_csv.loc[ground_truths.id]['sentences'].tolist()
    ground_truths = ground_truths['sentences'].tolist()

    # Compute wer
    wer = load("wer")
    wer_value = wer.compute(predictions=guesses, references=ground_truths)
    return wer_value

In [22]:
train_data, dev_data, test_data, \
    dev_wer_data, test_wer_data, \
    tokenizer = load_data(best_config[0], "n_gram")

ngram_models = [NGramLM(1), NGramLM(2), NGramLM(3)]
best_model = SmoothedNGram(ngram_models, best_config[1], best_config[2])
best_model.learn(train_data)

alpha_beta_gamma_tuples = [[0.4, 0.6, 0]]

best_dev_wer = float("inf")
best_alpha_beta_gamma = None

for alpha, beta, gamma in alpha_beta_gamma_tuples:
    preds = rerank_sentences_for_wer(best_model, dev_wer_data, alpha, beta, gamma)
    dev_wer = compute_wer('data/wer_data/dev_ground_truths.csv', preds)
    print(f"{(alpha, beta, gamma)} -> Dev set WER was: ", dev_wer)

    if dev_wer < best_dev_wer:
        best_dev_wer = dev_wer
        best_alpha_beta_gamma = (alpha, beta, gamma)

print(f"best alpha_beta_gamma: {best_alpha_beta_gamma}")

ngram_models = [NGramLM(1), NGramLM(2), NGramLM(3)]
best_model = SmoothedNGram(ngram_models, best_config[1], best_config[2])
best_model.learn(train_data)

preds = rerank_sentences_for_wer(best_model, dev_wer_data, best_alpha_beta_gamma[0], best_alpha_beta_gamma[1], best_alpha_beta_gamma[2])
dev_wer = compute_wer('data/wer_data/dev_ground_truths.csv', preds)
print("Dev set WER was: ", dev_wer)

score = math.floor(20 * max(0, min(1,2.2 - 15 * dev_wer)))
score

Case: subword, PAD ID: 0


Tokenizing WSJ:   0%|          | 0/3 [00:00<?, ?it/s]

Tokenizing HUB:   0%|          | 0/2 [00:00<?, ?it/s]

Downloading builder script: 0.00B [00:00, ?B/s]

(0.4, 0.6, 0) -> Dev set WER was:  0.066815144766147
best alpha_beta_gamma: (0.4, 0.6, 0)
Dev set WER was:  0.066815144766147


20

In [23]:
# PennGrader Grading Cell, do not modify.

# This tests your re-ranking word error rate performance.
# Your credit will be assigned with the following function:
#  SCORE(x) = FLOOR[20 * max(0, min(1,2.2 - 15x))]
# where x is your word error rate

preds = rerank_sentences_for_wer(best_model, test_wer_data, best_alpha_beta_gamma[0], best_alpha_beta_gamma[1], best_alpha_beta_gamma[2])
grader.grade(test_case_id = 'test_n_gram_wer', answer = preds)

Correct! You earned 20/20 points. You are a star!

Your submission has been successfully recorded in the gradebook.


## Task B: Character-Level Transformer Model

You are provided with an implementation in PyTorch in the starter code. The implementation includes errors that you will need to find and fix. There are hints in the function docstrings about the errors.

Helpful hints:
1. You will want to use the Google Colab GPU resources, but do not change the runtime until needed (debug first!) so that your GPU usage is not at limit
2. You may want to use [wandb](https://wandb.ai/) to log experiments and determine which hyperparameter settings are the best.

In [24]:
### STUDENT CELL ###

DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

class CharacterLevelTransformer(nn.Module):
    """
    For this part of the assignment, we provide you with a skeleton for the Transformer
    decoder. However, we've introduced numerous errors to the code! The model currently compiles,
    but performs the incorrect computations. You must fix them to pass the unit tests.

    You may introduce additional keyword arguments after fixing the transformer, as long as the
    default behavior does not stray from the skeleton provided to you.
    """

    def __init__(self, num_layers: int, hidden_dim: int, num_heads: int,
                 ff_dim: int, dropout: float, vocab_size: int):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.embed = nn.Embedding(vocab_size, hidden_dim, padding_idx=vocab_size - 1)  # <----- Dhruv Change
        self.pos_embed = PositionalEncoding(hidden_dim, dropout)
        self.decoder = Decoder(num_layers, hidden_dim, num_heads, ff_dim, dropout)
        self.lm_head = nn.Linear(hidden_dim, vocab_size)
        self.padding_idx = vocab_size - 1  # <----- Dhruv Change

    def log_probability(self, input_tokens: torch.Tensor, target_tokens: torch.Tensor, base=np.e):
        """
        Computes the log-probabilities for the inputs in the given minibatch.

        Input:
            input_tokens (torch.Tensor): A tensor of shape (B, T), where B is the
                                         batch-size and T is the input length.
            target_tokens (torch.Tensor): A tensor of shape (B, T). For a given (i, j),
                                          target_tokens[i, j] should be the token following
                                          input_tokens[i, j]
        Output (torch.Tensor): A tensor of shape (B,) containing the log-probability for each
                               example in the minibatch
        """

        ### STUDENT CODE HERE ###
        # Get log probabilities for each time step for each batch
        log_probabilities = F.log_softmax(self.forward(input_tokens), dim=-1)  # (Batch Size, Token Length, Vocab Size)

        # Gather log-probabilities corresponding to target tokens
        # target_tokens.unsqueeze(-1): (B, T, 1)
        # after gather → (B, T, 1) → squeeze → (B, T)
        target_log_probabilities = log_probabilities.gather(dim=-1, index=target_tokens.unsqueeze(-1)).squeeze(-1)

        # Mask out padding tokens
        target_log_probabilities = target_log_probabilities * (target_tokens != self.padding_idx) # Shape: (B, T)

        # Sum log-probabilities across sequence length and convert log base
        return target_log_probabilities.sum(dim=-1) / np.log(base)
        ### STUDENT CODE ENDS HERE ###

    # Forward Pass for Transformer Model
    def forward(self, model_input):
        # Perform the embedding
        embeds = self.embed(model_input) * math.sqrt(self.hidden_dim)
        embeds = self.pos_embed(embeds)

        # Pass through the decoder
        mask = construct_self_attn_mask(model_input)
        decoder_output = self.decoder(embeds, mask)
        output = self.lm_head(decoder_output)
        return output

    def generate(self, num_samples, max_steps, savepath, tokenizer):
        """
        Generate samples from the model, and save them to a pickle file.

        Input:
            num_samples (int): How many samples to generate
            max_steps (int): The maximum number of generation steps to take
            savepath (str): Where to save the generated samples
        """
        ## OPTIONAL, but helpful to get understanding of transformer performance
        ## Note you will have to decode text before being able to read it

        ### STUDENT CODE HERE ###
        return []
        ### STUDENT CODE ENDS HERE


# Fixed Error in torch.triu :)
def construct_self_attn_mask(x: torch.Tensor):
    """
    The output to this function should be a mask of shape
    (1, T, T). Indices that a token can attend to should be
    set to true.

    There are two errors in this function.
    """
    ### STUDENT CODE HERE ###
    T = x.size(1)
    all_ones = torch.ones(T, T)

    mask = torch.triu(all_ones, diagonal=1) == 0
    mask = mask.unsqueeze(0)
    return mask.to(x.device)
    ### STUDENT CODE ENDS HERE ###

# Fixed Error Here by using nn.ModuleList
class Decoder(nn.Module):

    def __init__(self, num_layers, hidden_dim, num_heads, ff_dim, dropout):
        """
        There is a single error in this function that will prevent the model from learning.
        """
        super().__init__()
        ### STUDENT CODE HERE ###
        layers = []
        for i in range(num_layers):
            layers.append(TransformerBlock(num_heads, hidden_dim, ff_dim, dropout))
        self.layers = nn.ModuleList(layers)
        ### STUDENT CODE ENDS HERE ###

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return x

# Fixed this by adding residual connections in the forward pass
class TransformerBlock(nn.Module):

    def __init__(self, num_heads, hidden_dim, ff_dim, dropout):
        super().__init__()

        # Attention block
        self.attn_block = MultiHeadAttention(num_heads, hidden_dim, dropout)
        self.attn_dropout = nn.Dropout(dropout)
        self.attn_norm = nn.LayerNorm(hidden_dim)

        # Feedforward block
        self.mlp_block = TransformerMLP(hidden_dim, ff_dim, dropout)
        self.mlp_dropout = nn.Dropout(dropout)
        self.mlp_norm = nn.LayerNorm(hidden_dim)

    def forward(self, x, mask):
        """
        There are two types of errors in this function.
        """
        ### STUDENT CODE HERE ###
        # Attention Block
        attn_out = self.attn_block(x, mask)
        x = self.attn_norm(x + self.attn_dropout(attn_out))

        # Feedforward Block
        mlp_out = self.mlp_block(x)
        x = self.mlp_norm(x + self.mlp_dropout(mlp_out))

        return x
        ### STUDENT CODE ENDS HERE ###

# Fixed this one: :)
class MultiHeadAttention(nn.Module):

    def __init__(self, num_heads, hidden_dim, dropout=0.1):
        super().__init__()

        self.h = num_heads
        self.qkv_dim = hidden_dim // num_heads
        self.dropout = nn.Dropout(dropout)

        self.q_proj = nn.Linear(hidden_dim, hidden_dim)
        self.k_proj = nn.Linear(hidden_dim, hidden_dim)
        self.v_proj = nn.Linear(hidden_dim, hidden_dim)
        self.out_proj = nn.Linear(hidden_dim, hidden_dim)

    def attention(self, query, key, value, mask):
        """
        There are three errors in this function to fix.
        """
        ### STUDENT CODE HERE ###
        dot_products = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.qkv_dim) # Error 1
        dot_products = dot_products.masked_fill(mask == 0, -1e9) # Error 2
        attn = self.dropout(F.softmax(dot_products, dim=-1)) # Error 3. Attention distribution must be done over queries!
        return torch.matmul(attn, value)
        ### STUDENT CODE ENDS HERE ###

    def forward(self, x, mask):
        """
        There are two errors in this function to fix
        """
        ### STUDENT CODE HERE ###
        mask = mask.unsqueeze(1)
        B = x.size(0)

        # Compute the query, key and value vectors
        query = self.q_proj(x).view(B, -1, self.h, self.qkv_dim).transpose(1, 2)
        key = self.k_proj(x).view(B, -1, self.h, self.qkv_dim).transpose(1, 2)
        value = self.v_proj(x).view(B, -1, self.h, self.qkv_dim).transpose(1, 2)

        # Perform self-attention
        x = self.attention(query, key, value, mask)

        # Concatenate the outputs for each attention head
        x = x.transpose(1, 2).contiguous().view(B, -1, self.h * self.qkv_dim)
        return self.out_proj(x)
        ### STUDENT CODE ENDS HERE ###

# Fixed: Used same formuation from Attention is All You Need Paper. Tried GELU instead of RELU
class TransformerMLP(nn.Module):

    def __init__(self, hidden_dim, ff_dim, dropout=0.1):
        super().__init__()

        self.fc1 = nn.Linear(hidden_dim, ff_dim)
        self.fc2 = nn.Linear(ff_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout)
        self.gelu = nn.GELU()

    def forward(self, x):
        """
        There is a single error in this function to fix.
        """
        ### STUDENT CODE HERE ###
        return self.fc2(self.dropout(self.gelu(self.fc1(x))))
        ### STUDENT CODE ENDS HERE ###

# Fixed: Same as Original so no need for fixing!
class PositionalEncoding(nn.Module):

    def __init__(self, hidden_dim, dropout, max_len=1000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        positional_encodings = torch.zeros(max_len, hidden_dim)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2) * (- math.log(10000) / hidden_dim))
        positional_encodings[:, 0::2] = torch.sin(position * div_term)
        positional_encodings[:, 1::2] = torch.cos(position * div_term)
        positional_encodings = positional_encodings.unsqueeze(0)

        self.register_buffer('positional_encodings', positional_encodings, persistent=False)

    def forward(self, x):
        x = x + self.positional_encodings[:, :x.size(1)]
        return self.dropout(x)

⚠️ Important: If you haven't changed to a GPU runtime yet, you should do that before proceeding, and you will see a significant speedup.

In [25]:
# PennGrader Grading Cell, do not modify.
# This tests your Transformer is implemented correctly.

test_model = CharacterLevelTransformer(2, 64, 4, 128, 0.1, 36)
test_model.load_state_dict(torch.load('unit_test_state_dict.pth'))
input_tokens, target_tokens = torch.load('testing_data.pth')

test_model.eval()
log_probs = test_model.log_probability(input_tokens, target_tokens, base=2)

grader.grade(test_case_id = 'test_transformer', answer = log_probs)

Correct! You earned 13/13 points. You are a star!

Your submission has been successfully recorded in the gradebook.


You will now write the training loop for your transformer, including any intermediate logging that will be helpful to tune your model (e.g., validation perplexity).

Helpful hints:
1. In order to obtain a performant model, it is important that you test different hyperparameters relating to model architecture and optimization.
2. `evaluate_perplexity` was implemented for you before in Task A, so you don't have to re-implement it.
3. You may want to use [wandb](wandb.ai) to help with logging different experiments.

In [26]:
### STUDENT CELL ###

def train(model, train_data, val_data, dev_wer_data, loss_fct, optimizer, max_epochs):
    """
    Training loop for the transformer model. You may change the header as you see fit.
    """
    ### STUDENT CODE HERE ###
    model.train()
    best_val_ppl = float('inf')

    for epoch in range(1, max_epochs + 1):
        total_loss = 0.0
        total_tokens = 0

        for batch_idx, (input_tokens, target_tokens) in enumerate(train_data):
            input_tokens = input_tokens.to(DEVICE)
            target_tokens = target_tokens.to(DEVICE)

            # Forward pass
            logits = model(input_tokens)  # (B, T, V)
            logits_flat = logits.view(-1, logits.size(-1))  # (B*T, V)
            targets_flat = target_tokens.view(-1)           # (B*T,)

            # Compute loss (ignore padding)
            loss = loss_fct(logits_flat, targets_flat)

            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            # Track statistics
            num_tokens = (target_tokens != model.padding_idx).sum().item()
            total_loss += loss.item() * num_tokens
            total_tokens += num_tokens

            if (batch_idx + 1) % 100 == 0:
                avg_loss = total_loss / total_tokens
                ppl = math.exp(avg_loss)
                print(f"Epoch {epoch} | Step {batch_idx+1} | Train Loss: {avg_loss:.4f} | PPL: {ppl:.2f}")

        # End of epoch
        avg_epoch_loss = total_loss / total_tokens
        train_ppl = math.exp(avg_epoch_loss)
        val_ppl = evaluate_perplexity(model, val_data)

        print(f"\nEpoch {epoch} done.")
        print(f"  Train Perplexity: {train_ppl:.3f}")
        print(f"  Validation Perplexity: {val_ppl:.3f}\n")

        # Save best model (optional)
        if val_ppl < best_val_ppl:
            best_val_ppl = val_ppl
            torch.save(model.state_dict(), "best_transformer.pt")

    print(f"Training complete. Best Validation Perplexity: {best_val_ppl:.3f}")

    # raise(NotImplementedError)
    ### STUDENT CODE ENDS HERE ###


### STUDENT CODE HERE ###
train_data, dev_data, test_data, \
    dev_wer_data, test_wer_data, \
    tokenizer = load_data("character", "transformer", batch_size=16) # TODO

num_layers = 6
hidden_dim = 512
num_heads = 8
ff_dim = 2048
dropout_p = 0.1
vocab_size = tokenizer.get_vocab_size()

model = CharacterLevelTransformer(num_layers, hidden_dim, num_heads, ff_dim,
                                    dropout_p, vocab_size).to(DEVICE)

optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4, betas=(0.9, 0.98), eps=1e-9) # TODO
loss_fct = nn.CrossEntropyLoss(ignore_index=vocab_size - 1) # TODO
max_epochs = 3 # TODO
### STUDENT CODE ENDS HERE ###

train(model, train_data, dev_data, dev_wer_data, loss_fct, optimizer, max_epochs)

model.eval()
dev_perplexity = evaluate_perplexity(model, dev_data)
print(f'Model perplexity on the dev set: {dev_perplexity}')
test_perplexity = evaluate_perplexity(model, test_data)
print(f'Model perplexity on the test set: {test_perplexity}')

Case: character, PAD ID: 47


Tokenizing WSJ:   0%|          | 0/3 [00:00<?, ?it/s]

Tokenizing HUB:   0%|          | 0/2 [00:00<?, ?it/s]

Epoch 1 | Step 100 | Train Loss: 2.4564 | PPL: 11.66
Epoch 1 | Step 200 | Train Loss: 2.4036 | PPL: 11.06
Epoch 1 | Step 300 | Train Loss: 2.3374 | PPL: 10.35
Epoch 1 | Step 400 | Train Loss: 2.2701 | PPL: 9.68
Epoch 1 | Step 500 | Train Loss: 2.2115 | PPL: 9.13
Epoch 1 | Step 600 | Train Loss: 2.1603 | PPL: 8.67
Epoch 1 | Step 700 | Train Loss: 2.1137 | PPL: 8.28
Epoch 1 | Step 800 | Train Loss: 2.0708 | PPL: 7.93
Epoch 1 | Step 900 | Train Loss: 2.0317 | PPL: 7.63
Epoch 1 | Step 1000 | Train Loss: 1.9968 | PPL: 7.37
Epoch 1 | Step 1100 | Train Loss: 1.9645 | PPL: 7.13
Epoch 1 | Step 1200 | Train Loss: 1.9345 | PPL: 6.92
Epoch 1 | Step 1300 | Train Loss: 1.9080 | PPL: 6.74
Epoch 1 | Step 1400 | Train Loss: 1.8844 | PPL: 6.58
Epoch 1 | Step 1500 | Train Loss: 1.8615 | PPL: 6.43
Epoch 1 | Step 1600 | Train Loss: 1.8403 | PPL: 6.30
Epoch 1 | Step 1700 | Train Loss: 1.8205 | PPL: 6.17
Epoch 1 | Step 1800 | Train Loss: 1.8024 | PPL: 6.06
Epoch 1 | Step 1900 | Train Loss: 1.7856 | PPL: 5.96

⚠️ This part is the same as before.

You will now evaluate your model on word error rate (lower is better) during a speech recognition re-ranking task. You are provided a development set that can be used to tune your model, but will ultimately be graded on a held-out test set.

You will need to implement the re-ranking function, which should use your model to help re-rank the transcription predictions. Each transcription is already paired with an `acoustic_score`, which was the initial log probability by the automatic speech recognition model.

Helpful hints:
1. You may want to play around with different weights of `acoustic_score` and `lm_score`.
2. A simple baseline that randomly chooses the
candidate sentences would achieve an WER of around 14% on the HUB task. If your results are worse than this baseline, something has gone horribly wrong.


In [27]:
### STUDENT CELL ###

def rerank_sentences_for_wer(model: Any, wer_data: List[Any]) -> None:
    """
    Function to rerank candidate sentences in the HUB dataset. For each set of sentences,
    you must assign each sentence a score in the form of the sentence's acoustic score plus
    the sentence's log probability. You should then save the top scoring sentences in a .csv
    file similar to those found in the results directory.

    Inputs:
        model (Any): An n-gram or Transformer model.
        wer_data (List[Any]): Processed data from the HUB dataset.

    Returns:
        A pandas DataFrame pairing sentence set ids and the top ranked sentences.
        - The DataFrame should have two columns, `id` and `sentences`.
        - See `data/wer_data/dev_ground_truths.csv` for an example.
    """
    # You may copy this cell from your above implementation,
    # but note that the parameters for model.log_probability()
    # are different.

    ### STUDENT CODE HERE ###
    acoustic_weight = 1.0  # TUNE THIS
    lm_weight = 0.5  # TUNE THIS
    df_ids = []
    df_sentences = []

    for sentence_set_id, dic in wer_data.items():
        sentences = dic["sentences"]
        token_lists = dic["tokens"]
        acoustic_scores = dic["acoustic_scores"]

        best_sentence = None
        best_score = -float("inf")

        # For each candidate transcription
        for sentence, token_list, acoustic_score in zip(sentences, token_lists, acoustic_scores):
            # Convert token_list to tensor
            token_tensor = torch.tensor(token_list, dtype=torch.long).unsqueeze(0).to(DEVICE)

            # Prepare input and target (shifted version)
            input_tokens = token_tensor[:, :-1]
            target_tokens = token_tensor[:, 1:]

            # Compute LM log-probability (sum over tokens)
            with torch.no_grad():
                lm_logprob = model.log_probability(input_tokens, target_tokens).item()

            # Weighted combined score
            combined_score = acoustic_weight * acoustic_score + lm_weight * lm_logprob

            if combined_score > best_score:
                best_score = combined_score
                best_sentence = sentence

        df_ids.append(sentence_set_id)
        df_sentences.append(best_sentence)

    # Return final DataFrame
    return pd.DataFrame({
        "id": df_ids,
        "sentences": df_sentences
    })
    ### STUDENT CODE ENDS HERE ###

preds = rerank_sentences_for_wer(model, dev_wer_data)
dev_wer = compute_wer('data/wer_data/dev_ground_truths.csv', preds)
print("Dev set WER was: ", dev_wer)

Dev set WER was:  0.0801781737193764


In [28]:
# PennGrader Grading Cell, do not modify.

# This tests your re-ranking word error rate performance.
# Your credit will be assigned with the following function:
#  SCORE(x) = FLOOR[20 * max(0, min(1,2.4 - 20x))]
# where x is your word error rate

preds = rerank_sentences_for_wer(model, test_wer_data)
grader.grade(test_case_id = 'test_transformer_wer', answer = preds)

You earned 5/20 points.

But, don't worry, you can re-submit and we will keep only your latest score.


## Report

You have successfully built an n-gram LM and a transformer LM!

Great job, you should be proud of yourself :).

Last part: go to the homework template, ![todo](https://img.shields.io/badge/allen_todo:-overleaf_link-8A2BE2), make a copy of the latex file, and complete all sections and questions.


## Submission:





These are the files you have to submit to gradescope

1.   Report, with all sections answered
2.   Download your jupyter notebook as a python file and submit it as "homework3.ipynb".


