In [1]:
# Downloads required packages and files
required_files = "https://github.com/jhu-intro-hlt/jhu-intro-hlt.github.io/raw/master/assignments/hw3-files/student/required_files.zip"
! wget $required_files && unzip -o required_files.zip
! pip install -r requirements.txt

--2025-10-29 20:04:53--  https://github.com/jhu-intro-hlt/jhu-intro-hlt.github.io/raw/master/assignments/hw3-files/student/required_files.zip
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/jhu-intro-hlt/jhu-intro-hlt.github.io/master/assignments/hw3-files/student/required_files.zip [following]
--2025-10-29 20:04:53--  https://raw.githubusercontent.com/jhu-intro-hlt/jhu-intro-hlt.github.io/master/assignments/hw3-files/student/required_files.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5675 (5.5K) [application/zip]
Saving to: ‘required_files.zip’


2025-10-29 20:04:54 (84.9 MB/s) - ‘re

In [2]:
# Initialize Otter
import otter

grader = otter.Notebook(colab=True)

# Assignment 3

You have now learnt about sequence-to-sequence models in the context of machine translation. In the upcoming lectures, you will see that these models are useful for a wide variety of tasks, wherever the source sequence and the target sequence have different lengths. You have also been introduced to phonemes, which are the building blocks of speech.

In this assignment, you will build sequence-to-sequence models for pronunciation prediction of English words, which simply means that given a word (sequence of characters), the model should predict its pronunciation (sequence of phonemes). You should be able to see why this is a straightforward application of sequence-to-sequence models.

The input is a sequence of characters making up an English word e.g. `a l g e b r a i c a l l y`. The output should be a sequence of phonemes that describe the pronunciation. For the example above, the desired output should be `AE2 L JH AH0 B R EY1 IH0 K L IY0`. The data for this task was obtained from the [CMU Pronunciation Dictionary](http://www.speech.cs.cmu.edu/cgi-bin/cmudict). We will use a small subset of the CMU dict data.

# Setup

For this assignment, as in the previous one, we will be using Google Colab, for both code as well as descriptive questions. Your task is to finish all the questions in the Colab notebook and then upload a PDF version of the notebook, and a viewable link on Gradescope.

### Google colaboratory

Before getting started, get familiar with google colaboratory:
https://colab.research.google.com/notebooks/welcome.ipynb

This is a neat python environment that works in the cloud and does not require you to
set up anything on your personal machine
(it also has some built-in IDE features that make writing code easier).
Moreover, it allows you to copy any existing collaboratory file, alter it and share
with other people.

__Note:__
1. You may need to change your Runtime setting to GPU in order to run the following code blocks.
2. On changing the Runtime setting, you would be required to run the previous code-blocks again.

### Submission

Before you start working on this homework do the following steps:

1. Press __File > Save a copy in Drive...__ tab. This will allow you to have your own copy and change it.
2. Follow all the steps in this collaboratory file and write / change / uncomment code as necessary.
3. Do not forget to occasionally press __File > Save__ tab to save your progress.
4. After all the changes are done and progress is saved press __Share__ button (top right corner of the page), press __get shareable link__ and make sure you have the option __Anyone with the link can view__ selected. Copy the link and paste it in the box below.
5. After completing the notebook, press __File > Download .ipynb__ to download a local copy on your computer, and then upload the file to Gradescope.
6. Please export the notebook to PDF and upload the PDF to the writing part.

__Special handling for model checkpoints.__

6. As the homework requires training neural models, such trained model checkpoints should also be submitted together with the notebook, hence avoiding re-training during the grading phase. For such model checkpoints, they would be stored at `./lightning_logs` directory. You have to first locate the directory from the left side panel (`Files`) on Colab.
7. Enter `./lightning_logs` and find the training label that you would like to submit. The versions are labelled with respect to the training calls.
8. Download the `.ckpt` file from `./lightning_logs/<your_version>/checkpoints/<name>.ckpt`.
9. Rename the downloaded checkpoint and re-name it as the corresponding name shown in the question, say `vanilla_rnn_model.ckpt`.
10. Submit checkpoint file(s) together with your notebook to the autograder. Please make sure that checkpoint files should be put at the same directory level as the notebook (content root).
11. Please enter your leaderboard name the same as how it is written on your gradescope account.



__Paste your notebook link in the box below.__ _(0 points)_



```
# Paste your Colab notebook link here
```



In [3]:
import os
import urllib
import enum
import json
from itertools import zip_longest
from dataclasses import dataclass
from typing import Optional, List, Dict, Tuple, Any, Union

import torch
import torchmetrics
import pytorch_lightning as pl
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import PreTrainedModel, PretrainedConfig, GenerationMixin
from transformers.modeling_outputs import BaseModelOutput, BaseModelOutputWithPastAndCrossAttentions, Seq2SeqLMOutput

In [4]:
# Checks whether it is in the autograder grading mode
# Checks whether GPU accelerators are available
is_autograder = os.path.exists('is_autograder.py')
if torch.cuda.is_available() and not is_autograder:
    accelerator = 'gpu'
else:
    accelerator = 'cpu'
print(f'The notebook is running for "{"autograder" if is_autograder else "student"}".')
print('Students should make sure you are running under the "student" mode.')
print(f'You are using "{accelerator}".')

The notebook is running for "student".
Students should make sure you are running under the "student" mode.
You are using "gpu".


In [5]:
# Seed everything to make sure all experiments are reproducible
pl.seed_everything(seed=777)

INFO:lightning_fabric.utilities.seed:Seed set to 777


777

In [6]:
# Defines constants
HOMEWORK_DATA_URL = "https://github.com/jhu-intro-hlt/jhu-intro-hlt.github.io/raw/master/assignments/hw3-files/student/"

SRC_SUFFIX = '.src'
TGT_SUFFIX = '.tgt'

CMUDICT_BASE = 'cmudict'
CMUDICT_TRAIN_SMALL = CMUDICT_BASE + '.small.train'
CMUDICT_TRAIN = CMUDICT_BASE + '.train'
CMUDICT_DEV = CMUDICT_BASE + '.dev'

CMUDICT_SRC_VOCAB = 'cmudict.src.vocab.json'
CMUDICT_TGT_VOCAB = 'cmudict.tgt.vocab.json'

# Special tokens
class SpecialToken(enum.Enum):
    BOS = '<BOS>'
    EOS = '<EOS>'
    UNK = '<UNK>'
    PAD = '<PAD>'

In [7]:
def download_data():
    def _download(url: str, filename: str) -> str:
        txt = urllib.request.urlopen(url)
        with open(filename, 'w') as f:
            f.write(txt.read().decode('utf-8'))

    for suffix in (SRC_SUFFIX, TGT_SUFFIX):
        _download(f'{HOMEWORK_DATA_URL}/{CMUDICT_TRAIN_SMALL}{suffix}', f'{CMUDICT_TRAIN_SMALL}{suffix}')
        _download(f'{HOMEWORK_DATA_URL}/{CMUDICT_TRAIN}{suffix}', f'{CMUDICT_TRAIN}{suffix}')
        _download(f'{HOMEWORK_DATA_URL}/{CMUDICT_DEV}{suffix}', f'{CMUDICT_DEV}{suffix}')
        _download(f'{HOMEWORK_DATA_URL}/{CMUDICT_SRC_VOCAB}', f'{CMUDICT_SRC_VOCAB}')
        _download(f'{HOMEWORK_DATA_URL}/{CMUDICT_TGT_VOCAB}', f'{CMUDICT_TGT_VOCAB}')

download_data()

# Vocabulary

In the previous homework, we did not specifically define a class to serve as `Vocabulary`. In this homework, as we have to deal with both the source side vocabulary and target side vocabulary, which are different, it is easier to have a class to perform the index-string and string-index mappings.

**Although there is nothing for you to implement, we suggest you to walk through the whole implementation to understand each function and think about what they might be used.** If you are not familiar with any concepts here, you should either figure it out from previous homework or post questions on Piazza.

In [8]:
class Vocabulary(object):
    def __init__(self,
                 tokens: Optional[List[str]] = None):
        # Registers `SpecialToken`
        self.special_token_enum = SpecialToken
        self._special_tokens = set([s.value for s in SpecialToken])
        if tokens is not None:
            tokens = set(tokens) | set(self._special_tokens)
        else:
            tokens = set(self._special_tokens)
        self._idx2token = list(tokens)
        self._token2idx = {t: i for i, t in enumerate(self._idx2token)}

    def add_token(self, token: str) -> int:
        if token not in self._idx2token:
            self._token2idx[token] = len(self._idx2token)
            self._idx2token.append(token)
        return self.token2idx(token)

    def to_file(self, file_path: str):
        with open(file_path, 'w') as f:
            json.dump({
                'idx2token': self._idx2token,
                'special_tokens': list(self._special_tokens),
                'token2idx': self._token2idx
            }, f, indent=2)

    @classmethod
    def from_file(cls, file_path: str) -> 'Vocabulary':
        with open(file_path) as f:
            read_data = json.load(f)
        vocab = Vocabulary()
        vocab._special_tokens = set(read_data['special_tokens'])
        vocab._idx2token = read_data['idx2token']
        vocab._token2idx = read_data['token2idx']
        return vocab

    def is_special(self, token: str) -> bool:
        return token in self._special_tokens

    def token2idx(self, token: str) -> int:
        return self._token2idx.get(token, self._token2idx[str(self.unk().value)])

    def idx2token(self, index: int) -> str:
        return self._idx2token[index]

    def bos(self) -> SpecialToken:
        return self.special_token_enum.BOS

    def eos(self) -> SpecialToken:
        return self.special_token_enum.EOS

    def unk(self) -> SpecialToken:
        return self.special_token_enum.UNK

    def pad(self) -> SpecialToken:
        return self.special_token_enum.PAD

    def special_to_id(self, st: SpecialToken) -> int:
        return self.token2idx(token=str(st.value))

    def bos_id(self) -> int:
        return self.special_to_id(st=self.bos())

    def eos_id(self) -> int:
        return self.special_to_id(st=self.eos())

    def unk_id(self) -> int:
        return self.special_to_id(st=self.unk())

    def pad_id(self) -> int:
        return self.special_to_id(st=self.pad())

    def __len__(self):
        return len(self._idx2token)

In [9]:
def decode_as_str(tgt_vocab: Vocabulary, output_tensor: torch.Tensor) -> List[str]:
    """Decodes generation outputs to a list of strings."""
    outputs: List[List[int]] = output_tensor.detach().tolist()
    ignore_token_ids = (tgt_vocab.pad_id(), tgt_vocab.bos_id(), tgt_vocab.eos_id())
    decoded_strs: List[str] = [
        ' '.join([
            tgt_vocab.idx2token(index=tid)  # Converts token id to token
            for tid in b
            if tid not in ignore_token_ids  # Filters special tokens
        ])  # Creates a decoded str
        for b in outputs  # Iterates over the batch
    ]
    return decoded_strs


def encode_as_tensor(src_vocab: Vocabulary, sentence: str) -> torch.Tensor:
    """Encodes a sentence to a tensor via the source vocabulary."""
    return torch.tensor(
        [
            [src_vocab.bos_id()]
            + [
                src_vocab.token2idx(token)
                for token in sentence.strip().split()
            ]
            + [src_vocab.eos_id()]
        ],
        dtype=torch.long
    )

In [13]:
# We have pre-built vocabs to ensure deterministic mappings across models
# So we can directly load these vocabs from files
cmudict_src_vocab = Vocabulary.from_file(CMUDICT_SRC_VOCAB)
cmudict_tgt_vocab = Vocabulary.from_file(CMUDICT_TGT_VOCAB)

In [12]:
len(cmudict_src_vocab), len(cmudict_tgt_vocab)

(33, 76)

# Dataset

For your convenience, we have preselected a subset of the CMU pronunciation dictionary (input-output pairs) and split the subset into training, validation (dev) and test sets.

# Data Reader

We will use the `ParallelDataset` class below to load, process and iterate through the data. Since you worked quite a bit on data loading implementation in the previous assignment, we will provide you the loader for this one, so you can get on with the more interesting parts of this assignment. Why is it parallel? Each source sequence is paired with a target sequence. The task is to generate a target sequence given the source sequence.

Note the use of special symbols `<BOS>`, `<EOS>`, `<UNK>`, and `<PAD>`. All the sequences (both input and output) are made to begin with `<BOS>` (Begin of sequence) and end with `<EOS>` (End of sequence). The `<UNK>` symbol is used if we encouter any new symbol that we have not seen (unknown symbol).

In [10]:
@dataclass
class ParallelInstance:
    src_seq: torch.Tensor  # Shape: (seq_len)
    src_tokens: List[str]
    tgt_seq: Optional[torch.Tensor]  # Shape: (seq_len)
    tgt_tokens: Optional[List[str]]


@dataclass
class ParallelBatch:
    src_seqs: torch.Tensor  # Shape: (batch_size, src_seq_len)
    src_attention_mask: torch.Tensor  # Shape: (batch_size, src_seq_len)
    src_tokens: List[List[str]]
    tgt_seqs: Optional[torch.Tensor]  # Shape: (batch_size, tgt_seq_len)
    tgt_attention_mask: Optional[torch.Tensor]  # Shape: (batch_size, src_seq_len)
    tgt_tokens: Optional[List[List[str]]]


class ParallelDataset(Dataset):
    """A dataset class that reads the parallel data.
    """

    def __init__(self,
                 src_file: str,
                 tgt_file: Optional[str] = None,
                 src_vocab: Optional[Vocabulary] = None,
                 tgt_vocab: Optional[Vocabulary] = None):
        """
        Parameters
        ----------
        src_file : str
            Path to the source side file.
        tgt_file : str, optional
            Path to the target side file. During the testing time, it could be `None`.
        src_vocab : Vocabulary, optional
            An existing `Vocabulary` object for the source side. If not provided,
            one will be created by iterating over source data.
        tgt_vocab : Vocabulary, optional
            An existing `Vocabulary` object for the target side. If not provided,
            one will be created by iterating over target data.
        """
        src_token_sequences = self.read_data(src_file)
        tgt_token_sequences = self.read_data(tgt_file) if tgt_file is not None else []
        self.src_vocab = src_vocab if src_vocab is not None else self.build_vocab(src_token_sequences)
        self.tgt_vocab = tgt_vocab if tgt_vocab is not None else self.build_vocab(tgt_token_sequences)
        src_tensors = self.tensorize(vocab=self.src_vocab, sequences=src_token_sequences)
        tgt_tensors = self.tensorize(vocab=self.tgt_vocab,
                                     sequences=tgt_token_sequences) if tgt_file is not None else []

        self.instances: List[ParallelInstance] = [
            ParallelInstance(s, st, t, tt)
            for s, t, st, tt in zip_longest(
                src_tensors, tgt_tensors, src_token_sequences, tgt_token_sequences,
                fillvalue=None
            )
        ]

    def tensorize(self, vocab: Vocabulary, sequences: List[List[str]]) -> List[torch.Tensor]:
        indexed_tensors: List[torch.Tensor] = []
        for seq in sequences:
            # Shape: (1, seq_len)
            new_tensor = torch.tensor(
                [vocab.token2idx(t) for t in self.add_special_tokens(vocab, seq)],
                dtype=torch.long
            )
            indexed_tensors.append(new_tensor)
        return indexed_tensors

    @staticmethod
    def read_data(filepath: str) -> List[List[str]]:
        data: List[List[str]] = []
        with open(filepath, 'r', encoding='utf8') as f:
            for l in f:
                d = [tok for tok in l.strip().split()]
                data.append(d)
        return data

    @staticmethod
    def add_special_tokens(vocab: Vocabulary, seq: List[str]) -> List[str]:
        return [str(vocab.bos().value)] + seq + [str(vocab.eos().value)]

    @staticmethod
    def build_vocab(sequences: List[List[str]]) -> Vocabulary:
        return Vocabulary(tokens=[t for s in sequences for t in s])

    def __len__(self) -> int:
        """Returns the number of instances read in the dataset."""
        return len(self.instances)

    def __getitem__(self, index: int) -> ParallelInstance:
        return self.instances[index]

In [11]:
class ParallelDataModule(pl.LightningDataModule):
    """Wraps PyTorch dataset as a lightning data module."""

    def __init__(self,
                 dataset_paths: Dict[str, str],
                 batch_size: int = 32,
                 shuffle: bool = True,
                 src_vocab: Optional[Vocabulary] = None,
                 tgt_vocab: Optional[Vocabulary] = None):
        super(ParallelDataModule, self).__init__()

        self.datasets: Dict[str, Dataset] = {}
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        for split in ('train', 'val', 'test'):
            if split not in dataset_paths:
                continue
            split_path = dataset_paths[split]
            new_dataset = ParallelDataset(src_file=split_path + SRC_SUFFIX,
                                          tgt_file=split_path + TGT_SUFFIX,
                                          src_vocab=self.src_vocab,
                                          tgt_vocab=self.tgt_vocab)
            if self.src_vocab is None:
                self.src_vocab = new_dataset.src_vocab
            if self.tgt_vocab is None:
                self.tgt_vocab = new_dataset.tgt_vocab
            self.datasets[split] = new_dataset

        self.batch_size = batch_size
        self.shuffle = shuffle

    @staticmethod
    def _pad_sequence(seq: torch.Tensor,
                      max_length: int,
                      padding_value:
                      Union[int, float]
                      ) -> torch.Tensor:
        seq_len = seq.shape[-1]
        if seq_len < max_length:
            return torch.cat(
                [seq, torch.tensor([padding_value] * (max_length - seq_len), dtype=seq.dtype, device=seq.device)],
                dim=-1
            )
        else:
            return seq

    def collate_fn(self, instances: List[ParallelInstance]) -> ParallelBatch:
        """Collates a list of instances and composes a batch.

        Parameters
        ----------
        instances : List[ParallelInstance]
            A list of `ParallelInstance` to comprise.

        Returns
        -------
        batch : ParallelBatch
            A single `ParallelBatch` where tensors are batched instances.
        """
        max_src_seq_len = max([x.src_seq.shape[0] for x in instances])
        has_tgt = instances[0].tgt_seq is not None
        max_tgt_seq_len = max([x.tgt_seq.shape[0] for x in instances]) if has_tgt else -1
        return ParallelBatch(
            src_seqs=torch.stack(
                [
                    self._pad_sequence(x.src_seq, max_length=max_src_seq_len,
                                       padding_value=self.src_vocab.token2idx(str(self.src_vocab.pad().value)))
                    for x in instances
                ],
                dim=0
            ),
            src_attention_mask=torch.stack(
                [
                    self._pad_sequence(
                        seq=torch.ones_like(x.src_seq, dtype=torch.bool),
                        max_length=max_src_seq_len,
                        padding_value=0
                    )
                    for x in instances
                ],
                dim=0
            ),
            tgt_seqs=torch.stack(
                [
                    self._pad_sequence(x.tgt_seq, max_length=max_tgt_seq_len,
                                       padding_value=self.tgt_vocab.token2idx(str(self.tgt_vocab.pad().value)))
                    for x in instances
                ],
                dim=0
            ) if has_tgt else None,
            tgt_attention_mask=torch.stack(
                [
                    self._pad_sequence(
                        seq=torch.ones_like(x.tgt_seq, dtype=torch.bool),
                        max_length=max_tgt_seq_len,
                        padding_value=0
                    )
                    for x in instances
                ],
                dim=0
            ) if has_tgt else None,
            src_tokens=[x.src_tokens for x in instances],
            tgt_tokens=[x.tgt_tokens for x in instances] if has_tgt else None
        )

    def train_dataloader(self):
        return DataLoader(self.datasets['train'],
                          batch_size=self.batch_size,
                          shuffle=self.shuffle,
                          collate_fn=lambda x: self.collate_fn(x))

    def val_dataloader(self):
        return DataLoader(self.datasets['val'],
                          batch_size=self.batch_size,
                          shuffle=False,
                          collate_fn=lambda x: self.collate_fn(x))

    def test_dataloader(self):
        return DataLoader(self.datasets['test'],
                          batch_size=self.batch_size,
                          shuffle=False,
                          collate_fn=lambda x: self.collate_fn(x))

In [14]:
cmudict_corpus = ParallelDataModule(
    dataset_paths={'train': CMUDICT_TRAIN, 'val': CMUDICT_DEV},
    src_vocab=cmudict_src_vocab,
    tgt_vocab=cmudict_tgt_vocab
)

In [15]:
# Please take a look at the output to get the sense of what the data looks like
for i, x in enumerate(cmudict_corpus.train_dataloader()):
    if i > 0:
        break
    print(x)

ParallelBatch(src_seqs=tensor([[18,  9, 26, 10, 10, 22,  7, 30, 11, 23,  6, 15, 15, 15, 15, 15, 15],
        [18, 28, 23, 14, 26, 11, 21, 12, 22, 26,  7, 22, 13, 22,  7, 30,  6],
        [18, 21,  7, 19, 21,  5, 11, 22,  2, 25, 23,  4,  6, 15, 15, 15, 15],
        [18, 32,  1,  7,  4,  1, 25,  1, 28,  6, 15, 15, 15, 15, 15, 15, 15],
        [18, 30, 28,  1,  7,  4, 22, 11, 11, 26,  6, 15, 15, 15, 15, 15, 15],
        [18, 19, 28, 26, 12, 26, 12, 20, 19, 22,  9,  1, 11,  6, 15, 15, 15],
        [18,  5, 28, 23,  1, 12, 25, 12,  1, 32, 22,  7, 30,  6, 15, 15, 15],
        [18,  5, 23,  7, 23,  4, 22,  9, 12,  1,  6, 15, 15, 15, 15, 15, 15],
        [18, 21,  7, 26, 29, 29, 22,  9, 22,  1, 11,  6, 15, 15, 15, 15, 15],
        [18,  2, 12, 21,  7,  7, 22,  7, 30, 11, 20,  6, 15, 15, 15, 15, 15],
        [18,  4, 22,  2,  9, 21,  2,  2, 23,  2,  6, 15, 15, 15, 15, 15, 15],
        [18, 28, 23,  9, 26,  7, 29, 22, 30, 21, 28, 23,  4,  6, 15, 15, 15],
        [18, 29, 28, 21,  2, 12, 28,  1, 

## Evaluation Routine

The evaluation routine is to take model predictions and compare them with gold labels. The measurements for the quality of predictions is called metric. The routine also includes the mechanism to integrate such metrics to the training loop so that model selection techniques such as [early stopping](https://en.wikipedia.org/wiki/Early_stopping) can be employed.

In this section, we will walk you through how to implement a metric, say Character Error Rate, and how to integrate it into the existing Lightning training loop.

### Metric: Character Error Rate (CER)

We are going to evaluate our model's predictions using Character Error Rate (CER). This measures the number of edits (insertions, deletions and substitutions) needed to convert our model's prediction to the correct output sequence. [Edit distance computation](https://nlp.stanford.edu/IR-book/html/htmledition/edit-distance-1.html) is one of the popular applications of dynamic programming, and is used for measuring character/phone/word error rates in speech recognition.

Complete the following function which takes two sequences (list of characters) and computes the edit distance between them. Another function computes statistics (errors and totals) that will later be used to get CER. These are what we called functional primitives, which themselves can be used to compute edit distance and CER without getting involved in any Lightning contexts.  _(12 points)_

In [16]:
def compute_edit_distance(prediction_tokens: List[str], reference_tokens: List[str]) -> int:
    """Computes edit distance for two sequences using dynamic programming.
    This is actually a LeetCode problem :-)

    Parameters
    ----------
    prediction_tokens : List[str]
        A tokenized predicted sentence.
    reference_tokens : List[str]
        A tokenized reference sentence.

    Returns
    -------
    distance : int
        Edit distance between the predicted sentence and the reference sentence.
    """
    # TODO: Your implementation here
    n, p = len(prediction_tokens), len(reference_tokens)
    first_row = list(range(p + 1))
    for i in range(1, n + 1):
        curr = [i] + [0] * p
        for j in range(1, p + 1):
            if prediction_tokens[i - 1] == reference_tokens[j - 1]:
                curr[j] = first_row[j - 1]
            else:
                curr[j] = 1 + min(first_row[j], curr[j - 1], first_row[j - 1])
        first_row = curr
    distance = first_row[p]
    ...
    return distance


def update_cer(
        preds: Union[str, List[str]],
        targets: Union[str, List[str]]
) -> Tuple[torch.Tensor, torch.Tensor]:
    """Updates the CER score with the current set of references and predictions.

    Parameters
    ----------
        preds: Transcription(s) to score as a string or list of strings
        targets: Reference(s) for each speech input as a string or list of strings

    Returns
    -------
        Number of edit operations to get from the reference to the prediction, summed over all samples
        Number of character overall references
    """
    if isinstance(preds, str):
        preds = [preds]
    if isinstance(targets, str):
        targets = [targets]
    errors = torch.tensor(0, dtype=torch.float)
    total = torch.tensor(0, dtype=torch.float)

    # TODO: you have to compute edit distance for each pair of prediction and target
    # Then update total errors and total number of references.

    for p,t in zip(preds, targets):
        errors += compute_edit_distance(p, t)
        total += len(t)
    ...
    return errors, total


def compute_cer(errors: torch.Tensor, total: torch.Tensor) -> torch.Tensor:
    """Computes the CER."""
    if total.item() == 0:
        return torch.tensor(0.0)
    return errors / total
    ...


def character_error_rate(preds: Union[str, List[str]], targets: Union[str, List[str]]) -> torch.Tensor:
    """character error rate is a common metric of the performance of an automatic speech recognition system. This
    value indicates the percentage of characters that were incorrectly predicted. The lower the value, the better
    the performance of the ASR system with a CER of 0 being a perfect score.

    Parameters
    ----------
        preds: Transcription(s) to score as a string or list of strings
        targets: Reference(s) for each speech input as a string or list of strings

    Returns
    -------
        Character error rate score

    Examples
    --------
        preds = ["this is the prediction", "there is an other sample"]
        target = ["this is the reference", "there is another one"]
        character_error_rate(preds=preds, targets=targets)
        tensor(0.3415)
    """
    errors, total = update_cer(preds, targets)
    return compute_cer(errors, total)

In [18]:
grader.check("metric-cer-impl")

With the above functional primitives implemented, we will then wrap them to be a `TorchMetric`, which can be integrated into Lightning.  _(5 points)_

In [17]:
class CharErrorRate(torchmetrics.Metric):
    """ Character Error Rate metric wrapper.

    Parameters
    ----------
        kwargs: Additional keyword arguments, see :ref:`Metric kwargs` for more info.

    Returns
    -------
        Character error rate score
    """
    is_differentiable: bool = False
    higher_is_better: bool = False
    full_state_update: bool = False

    error: torch.Tensor
    total: torch.Tensor

    def __init__(
            self,
            **kwargs: Any,
    ):
        super().__init__(**kwargs)
        self.add_state("errors", torch.tensor(0, dtype=torch.float), dist_reduce_fx="sum")
        self.add_state("total", torch.tensor(0, dtype=torch.float), dist_reduce_fx="sum")

    def update(self, preds: Union[str, List[str]], targets: Union[str, List[str]]) -> None:
        """Stores references/predictions for computing Character Error Rate scores.

        Parameters
        ----------
            preds : Union[str, List[str]]
                Transcription(s) to score as a string or list of strings.
            targets: Union[str, List[str]]
                Reference(s) for each speech input as a string or list of strings.
        """
        errors, total = update_cer(preds, targets)
        self.errors += errors
        self.total += total
        ...

    def compute(self) -> torch.Tensor:
        """Calculates the character error rate.

        Returns
        -------
           Character error rate score
        """
        return compute_cer(self.errors, self.total)
        ...

In [20]:
grader.check("metric-torchmetric-impl")

## Sequence-to-sequence model with encoder-decoder architecture

In this homework, we require you to implement a sequence-to-sequence model that uses encoder-decoder architecture. You are free to implement the model with either RNN or Transformer. However, you have to bear in mind that the computing resource provided by Colab is limited, and the autograder is configured to run with CPU-only for up to 40 mins with 6GB memory. The autograder runtime is for inference only, so if your training takes relatively longer, it might still be fine with the autograder. To emphasize, you have to test your submission runs with the autograder as in previous homework.

For people that are not familiar with the concept of encoder-decoder model, we refer you to read the [Chapter 10 in Speech and Language Processing](https://web.stanford.edu/~jurafsky/slp3/10.pdf). You have at least understand what is an encoder-decoder architecture, what is sequence-to-sequence model, how the model is going to be trained, and how decoding works. Some important connections among the described task in the book chapter and this homework, and previous homework:
- Machine Translation task is to translate from one language to the other. In this homework, you can imagine that the task is translating from English to phonemes. One obvious difference is that the target vocabulary is no longer another language, but phonemes.
- In the previous homework, we played with language models, which is very similar to a decoder. For each step, we feed the decoder with what has been generated and ask it to predict a next token. This is the behavior happened at the inference time (using the model to do something). However, we also noticed that the behavior for training is different as we offset the sequence by 1. This is actually exactly the same as the inference, in which given a previous one you have to predict the next one; the only difference is that it happens in parallel as we assume a gold previous one is fed to the model. This is what we called [teacher forcing](https://cedar.buffalo.edu/~srihari/CSE676/10.2.1%20TeacherForcing.pdf). Please make sure you have understood these concepts before proceeding.
- With the trained model, you have again to use decoding methods learned from the previous homework to decode output sequences.

Note:
- The homework is open to different architecture designs as long as your model takes in one English sentence and spits one phoneme sequence.

Grading
- Simple checks of model function total 7 points.
- There is a baseline on Gradescope using the similar architecture as in the previous homework. Any submission reaches a comparable test score would be given __30 points__. This baseline achieves a value of 0.6834.
- We set a cutoff above the baseline. This cutoff is 0.617. Any submission passes the cutoff would be given another __10 points__. This problem is considered to have __40 points__ in total.
- There will also be a leaderboard. The top 15% performers would be given __10 points__, and top 30% performers would be given __5 points__. **These are considered as extra credits.**


In [20]:
from transformers.modeling_outputs import BaseModelOutput
import torch
import torch.nn as nn
from typing import Optional
from transformers import PreTrainedModel, PretrainedConfig
from transformers.modeling_outputs import BaseModelOutputWithPastAndCrossAttentions

In [36]:
# This cell provides an example skeleton that you can implement encoder-decoder model
# You are free to change the skeleton as long as your final model implements a `generate()` method (its signature is commented out).
# To use the provided skeleton, it would provide you with access to Huggingface transformers' generation API, which means that you can directly use GreedyDecoding and SamplingDecoding. However, we did not make the effort to make the implementation compatible with BeamDecoding. If you are interested, you could implement yourself (which requires you to dig into the hugging face `generation_utils.py` to understand how their interfaces work).

def shift_tokens_right(input_ids: torch.Tensor, pad_token_id: int, decoder_start_token_id: int):
    """Shifts input ids one token to the right."""
    shifted_input_ids = input_ids.new_zeros(input_ids.shape)
    shifted_input_ids[:, 1:] = input_ids[:, :-1].clone()
    shifted_input_ids[:, 0] = decoder_start_token_id

    if pad_token_id is None:
        raise ValueError("self.model.config.pad_token_id has to be defined.")
    # replace possible -100 values in labels by `pad_token_id`
    shifted_input_ids.masked_fill_(shifted_input_ids == -100, pad_token_id)

    return shifted_input_ids


class Encoder(PreTrainedModel):
    def __init__(self,
                 vocab_size: int,
                 embedding_size: int,
                 hidden_size: int,
                 num_layers: int,
                 dropout: float,
                 pad_token_id: int,
                 bos_token_id: int,
                 eos_token_id: int,
                 bidirectional: bool = True,
        ):
      #for RNN
        super(Encoder, self).__init__(config=PretrainedConfig(
            vocab_size=vocab_size,
            pad_token_id=pad_token_id,
            bos_token_id=bos_token_id,
            eos_token_id=eos_token_id,
            hidden_size=hidden_size,
            num_hidden_layers=num_layers,
            hidden_dropout_prob=dropout,
        ))

        self.pad_token_id = pad_token_id
        self.bidirectional = bidirectional
        self.num_layers = num_layers
        self.hidden_size = hidden_size

        # Configures the embedding - source side
        # We name the embedding as `self.embed_tokens`
        # which is aligned with the following get and set embeddings methods.

        self.embed_tokens = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embedding_size,
            padding_idx=pad_token_id,
        )
        self.dropout = nn.Dropout(dropout)
        # Configures the network
        # We assume using RNN here - this aligns with the output
        #Setting up RNN Encoder
        self.rnn = nn.LSTM(
            input_size=embedding_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0,
            bidirectional=bidirectional,
        )
        self.post_init()

    def get_input_embeddings(self) -> nn.Module:
        return self.embed_tokens

    def set_input_embeddings(self, new_emb):
        self.embed_tokens = new_emb

    def _lengths_from_mask(self, attention_mask: torch.Tensor) -> torch.Tensor:
        return attention_mask.long().sum(dim=1)

    def forward(self,
                input_ids: torch.LongTensor = None,
                attention_mask: Optional[torch.Tensor] = None,
                **kwargs) -> BaseModelOutput:
        device = input_ids.device
        if attention_mask is not None:
            lengths = self._lengths_from_mask(attention_mask)
        else:
            if self.pad_token_id is not None:
                lengths = (input_ids != self.pad_token_id).sum(dim=1)
            else:
                lengths = torch.full(
                    (input_ids.size(0),), input_ids.size(1), dtype=torch.long, device=device
                )
        emb = self.dropout(self.embed_tokens(input_ids))
        packed = nn.utils.rnn.pack_padded_sequence(
            emb, lengths.cpu(), batch_first=True, enforce_sorted=False
        )
        packed_out, (h_n, c_n) = self.rnn(packed)
        enc_out, _ = nn.utils.rnn.pad_packed_sequence(packed_out, batch_first=True)

        return BaseModelOutput(
            last_hidden_state=enc_out,
            hidden_states=(h_n, c_n),
        )

        """Encodes `input_ids` using some encoding networks.

        Parameters
        ----------
        input_ids : torch.Tensor
            `src_seqs` in the shape of (batch_size, src_seq_len).
        attention_mask : torch.Tensor
            `1` means valid tokens, and `0` means invalid tokens.
            `attention_mask.cpu().sum(-1)` can result in corresponding sequence length for
            inputs.
        kwargs

        Returns
        -------
        `BaseModelOutput` that describes the encoder output.
        """
        # Shape: (batch_size, sequence_length, embedding_size)
        # Embeds the `input_ids` to embeddings
        ...

        # It would output a tuple consists:
        #   `outputs` shape: (batch_size, sequence_length, hidden_size)
        #   `updated_hidden` shape: (num_layers, batch_size, hidden_size)
        # Feed input embeddings to the RNN
        ...

        # Unpacks (back to padded)
        # Unpack the `outputs` to:
        #   `unpacked_outputs` shape: (batch_size, sequence_length, hidden_size)
        #   `output_lengths` shape: (batch_size)
        # Using `torch.nn.utils.rnn.pad_packed_sequence`
        ...

        # You are free to change the outputs


class Decoder(PreTrainedModel):
    def __init__(self,
                 vocab_size: int,
                 embedding_size: int,
                 hidden_size: int,
                 num_layers: int,
                 dropout: float,
                 pad_token_id: int,
                 bos_token_id: int,
                 eos_token_id: int):
        super(Decoder, self).__init__(config=PretrainedConfig(
            vocab_size=vocab_size,
            pad_token_id=pad_token_id,
            bos_token_id=bos_token_id,
            eos_token_id=eos_token_id,
            is_decoder=True,
            num_hidden_layers=num_layers,
            hidden_dropout_prob=dropout,
            hidden_size=hidden_size
        ))

        # Similar configurations as in the encoder
        # But this time they are configured to use the target side vocab
        self.embed_tokens = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embedding_size,
            padding_idx=pad_token_id,
        )
        self.rnn = nn.LSTM(
            input_size=embedding_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )
        # self.output_projection = nn.Linear(hidden_size, vocab_size)
        self.attn_W = nn.Linear(hidden_size, hidden_size, bias=False)
        self.attn_combine = nn.Linear(hidden_size + hidden_size, hidden_size)
        self.dropout = nn.Dropout(dropout)


    def get_input_embeddings(self) -> nn.Module:
        return self.embed_tokens

    def set_input_embeddings(self, value: nn.Module):
        self.embed_tokens = value

    def _masked_softmax(self, scores: torch.Tensor, mask: torch.Tensor, store: bool = False) -> torch.Tensor:
      masked_scores = scores.clone()
      if store:
        self._last_raw_scores = scores.detach().clone()
        if mask is not None:
          expanded_mask = ~mask.unsqueeze(1)  # shape: (batch_size, 1, T_enc
          masked_scores = scores.masked_fill(expanded_mask, float('-inf'))
        else:
          masked_scores = scores
      if store:
        self._last_masked_scores = masked_scores.detach().clone()


      attn_weights = torch.softmax(masked_scores, dim=-1)
      if store:
        self._last_attn_weights = attn_weights.detach().clone()

      return attn_weights

    def forward(self,
                input_ids: torch.LongTensor = None,
                encoder_hidden_states: Optional[Tuple[torch.FloatTensor, torch.FloatTensor]] = None,
                decoder_attention_mask: Optional[torch.Tensor] = None,
                encoder_sequence: Optional[torch.FloatTensor] = None,  # (B, T_enc, H_dec)
                encoder_mask: Optional[torch.BoolTensor] = None,
                **kwargs) -> BaseModelOutputWithPastAndCrossAttentions:
        """Decodes

        Parameters
        ----------
        input_ids : torch.Tensor, optional
            Input sequence ids on the decoder-side. This is different from what is
            fed on the encoder-side. Please make sure you understand why they are
            different from encoder-decoder slides.
        encoder_hidden_states : torch.Tensor, optional
            Hidden states from the encoder. This enables the model to proceed with
            the information from the encoder.
        decoder_attention_mask : torch.Tensor, optional
            This is similar to the encoder attention mask.
        kwargs

        Returns
        -------
        `BaseModelOutputWithPastAndCrossAttentions` similar to the encoder outputs,
        but this also leaves the space for implementing attention.
        """
        # If the decoder mask is not provided, we create one by using non-pad tokens.
        if decoder_attention_mask is None:
            decoder_attention_mask = input_ids != self.config.pad_token_id
        lengths = decoder_attention_mask.long().sum(dim=1)
        embed = self.dropout(self.embed_tokens(input_ids))
        packed_input = nn.utils.rnn.pack_padded_sequence(
            embed, lengths.cpu(), batch_first=True, enforce_sorted=False
        )
        H_dec, (h_n, c_n) = self.rnn(embed, encoder_hidden_states)
        proj_dec = self.attn_W(H_dec)                                    # (B, T_dec, H)
        scores = torch.bmm(proj_dec, encoder_sequence.transpose(1, 2))   # (B, T_dec, T_enc)
        alphas = self._masked_softmax(scores, encoder_mask)
        context = torch.bmm(alphas, encoder_sequence)
        attn_out = torch.tanh(self.attn_combine(torch.cat([H_dec, context], dim=-1)))
        attn_out = self.dropout(attn_out)
        # Shape: (batch_size, sequence_length, hidden_size)
        # This is to give a hint on how encoder hidden states are passed to the module
        # You are free to change it for your implementations.


        # Shape: (batch_size, sequence_length, embedding_size)
        # Similar thing for embeddings
        ...

        # It would output a tuple consists:
        #   `outputs` shape: (batch_size, sequence_length, hidden_size)
        #   `updated_hidden` shape: (num_layers, batch_size, hidden_size)
        # Similar to the encoder, but we can now use the hidden states from the encoder instead of `None`
        ...

        # Unpacks (back to padded)
        # Unpack the `outputs` to:
        #   `unpacked_outputs` shape: (batch_size, sequence_length, hidden_size)
        #   `output_lengths` shape: (batch_size)
        # Using `torch.nn.utils.rnn.pad_packed_sequence`
        ...

        # You are free to change the outputs

        return BaseModelOutputWithPastAndCrossAttentions(
            last_hidden_state=attn_out,           # feed this to lm_head
            hidden_states=(H_dec,),               # raw decoder states if you need them
            attentions=alphas                  # (B, T_dec, T_enc)
        )


class EncoderDecoder(PreTrainedModel, GenerationMixin):
    def __init__(self,
                 src_vocab: Vocabulary,
                 tgt_vocab: Vocabulary,
                 embedding_size: int,
                 hidden_size: int,
                 num_layers: int,
                 dropout: float, ):
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.src_vocab_size = len(src_vocab)
        self.tgt_vocab_size = len(tgt_vocab)

        # You are free to modify the `__init__` to accommodate your model design.
        config = PretrainedConfig(
            vocab_size=76,          # <-- explicitly set vocab size
            decoder_start_token_id=self.tgt_vocab.bos_id(),
            is_encoder_decoder=True,
            bos_token_id=self.tgt_vocab.bos_id(),
            eos_token_id=self.tgt_vocab.eos_id(),
            pad_token_id=self.tgt_vocab.pad_id(),
            hidden_size=hidden_size,
            num_hidden_layers=num_layers,
        )

        super().__init__(config=config)

        self.encoder = Encoder(
            vocab_size=self.src_vocab_size,
            embedding_size=embedding_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout,
            pad_token_id=self.src_vocab.pad_id(),
            bos_token_id=self.src_vocab.bos_id(),
            eos_token_id=self.src_vocab.eos_id(),
            bidirectional=True
        )
        self.decoder = Decoder(
            vocab_size=self.tgt_vocab_size,
            embedding_size=embedding_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout,
            pad_token_id=self.tgt_vocab.pad_id(),
            bos_token_id=self.tgt_vocab.bos_id(),
            eos_token_id=self.tgt_vocab.eos_id()
        )

        # Classification head for generating tokens for the target side
        self.lm_head = nn.Linear(self.decoder.rnn.hidden_size, 76)
        if self.lm_head.out_features != 76:
          print(f"Reinitializing lm_head: {self.lm_head.out_features} → {76}")
          self.lm_head = nn.Linear(self.decoder.rnn.hidden_size, 76)

        enc_dim = hidden_size * 2 if self.encoder.bidirectional else hidden_size
        self.h_bridge = nn.Linear(enc_dim, hidden_size) if self.encoder.bidirectional else None
        self.c_bridge = nn.Linear(enc_dim, hidden_size) if self.encoder.bidirectional else None
        self.enc2dec  = nn.Linear(enc_dim, hidden_size)

        self.dropout = nn.Dropout(dropout)
        # if self.encoder.bidirectional:
        #     self.h_bridge = nn.Linear(hidden_size * 2, hidden_size)
        #     self.c_bridge = nn.Linear(hidden_size * 2, hidden_size)
        # else:
        #     self.h_bridge = None
        #     self.c_bridge = None

    #     # self.post_init()
    #     self._validate_output_projection()

    # def _validate_output_projection(self):
    #     if self.lm_head.in_features != self.decoder.rnn.hidden_size:
    #         raise ValueError

    def get_input_embeddings(self):
        return self.encoder.get_input_embeddings()

    def get_output_embeddings(self) -> nn.Module:
        return self.lm_head

    def set_input_embeddings(self, value: nn.Module):
        if hasattr(self, "encoder"):
          self.encoder.set_input_embeddings(value)

    def get_encoder(self):
        return self.encoder

    def get_decoder(self):
        return self.decoder
    def set_output_embeddings(self, new_emb):
        pass

    # The `generate()` method is commented out
    # It should take `input_ids`, which are batched `src_seqs`, and generate their
    # corresponding outputs using some decoding algorithms; such algorithms can be
    # what we learned from the previous homework.
    # If you decided to base your submission on the current skeleton, then you should
    # be able to directly use the huggingface generation APIs.
    #
    # def generate(self, input_ids, attention_mask, **kwargs) -> torch.Tensor:
    #     raise NotImplementedError

    def _bridge(self, h_n, c_n):
        """Handle bidirectional encoder output to initialize decoder state."""
        if not self.encoder.bidirectional:
            return h_n, c_n
        num_layers = self.encoder.num_layers
        batch_size = h_n.size(1)
        h_n = h_n.view(num_layers, 2, batch_size, -1)
        c_n = c_n.view(num_layers, 2, batch_size, -1)
        h_cat = torch.cat([h_n[:, 0], h_n[:, 1]], dim=-1)
        c_cat = torch.cat([c_n[:, 0], c_n[:, 1]], dim=-1)
        return self.h_bridge(h_cat), self.c_bridge(c_cat)

    def forward(
            self,
            input_ids: torch.LongTensor = None,
            attention_mask: Optional[torch.Tensor] = None,
            decoder_input_ids: Optional[torch.LongTensor] = None,
            decoder_attention_mask: Optional[torch.Tensor] = None,
            encoder_outputs: Optional[BaseModelOutput] = None,
            labels: Optional[torch.LongTensor] = None,
            **kwargs
    ) -> Seq2SeqLMOutput:
        """Runs the encoder-decdoer model.

        Parameters
        ----------
        input_ids : torch.LongTensor, optional
            `src_seqs` to the encoder.
        attention_mask : torch.Tensor, optional
            Encoder side mask for indicating valid tokens (non-pad).
        decoder_input_ids : torch.LongTensor, optional
            Decoder side input tokens.
        decoder_attention_mask : torch.Tensor, optional
            Decoder side mask for indivating valid tokens (non-pad).
        encoder_outputs : BaseModelOutput, optional
            The output from encoder. This is to avoid re-running the encoder
            during the decoding as the encoded information will not be changed.
        labels : torch.LongTensor, optional
            This is used to indicate output labels. Recall that we did language
            modelling in hw2, where the labels are input tokens shifted by 1.
            In this homework, as we have output different from the input, we use
            the target sequence as the label.
        kwargs

        Returns
        -------
        `Seq2SeqLMOutput` see corresponding definitions.
        """
        # Automatically creates decoder_input_ids from
        # input_ids if no decoder_input_ids are provided
        if labels is not None:
            if decoder_input_ids is None:
                decoder_input_ids = shift_tokens_right(
                    labels, self.decoder.config.pad_token_id, self.decoder.config.bos_token_id
                )

        if encoder_outputs is None:
          encoder_outputs = self.encoder(input_ids=input_ids,attention_mask=attention_mask)

        enc_seq = encoder_outputs.last_hidden_state              # (B, T_enc, 2H or H)
        enc_mask = attention_mask.bool() if attention_mask is not None else (input_ids != self.src_vocab.pad_id())

        # Project encoder sequence to decoder hidden size (handles BiLSTM)
        enc_for_attn = self.enc2dec(enc_seq)

        # RUN decoder
        enc_h, enc_c = encoder_outputs.hidden_states
        dec_init_h, dec_init_c = self._bridge(enc_h, enc_c)

        # Passes the decoder output to LM head to get a distribution
        if decoder_attention_mask is None and decoder_input_ids is not None:
          decoder_attention_mask = decoder_input_ids != self.tgt_vocab.pad_id()
        # over the target vocabulary
        # This part is a hint for how to use the decoder output.
        decoder_outputs = self.decoder(
            input_ids=decoder_input_ids,
            encoder_hidden_states=(dec_init_h, dec_init_c),
            decoder_attention_mask=decoder_attention_mask,
            encoder_sequence=enc_for_attn,
            encoder_mask=enc_mask,
        )
        lm_logits = self.lm_head(decoder_outputs.last_hidden_state)

        lm_loss = None
        if labels is not None:
            # Compute the loss function - similar to hw2
            vocab_size = lm_logits.size(-1)
            loss_fct = nn.CrossEntropyLoss(ignore_index=self.tgt_vocab.pad_id())
            loss = loss_fct(lm_logits.view(-1, vocab_size), labels.view(-1))
            lm_loss = loss

        return Seq2SeqLMOutput(loss=lm_loss,
                               logits=lm_logits,
                               decoder_hidden_states=decoder_outputs.hidden_states,
                               encoder_last_hidden_state=encoder_outputs.last_hidden_state,
                               encoder_hidden_states=encoder_outputs.hidden_states,
                               decoder_attentions=getattr(decoder_outputs, "attentions", None),
                               cross_attentions=None,
                               encoder_attentions=None,)

    def prepare_inputs_for_generation(
            self,
            decoder_input_ids: torch.Tensor,
            # attention_mask: Optional[torch.Tensor]=None,
            encoder_outputs: Optional[BaseModelOutput] = None,
            **kwargs
    ):


        """Prepares the inputs for the generation mixin from huggingface.
        This is the key interface to make sure that you could use the generations
        implemented by huggingface.

        You can imagine that:
        - At the beginning, the generation module calls to get encoder outputs, which will
        then be used across all decoding steps.
        - This method prepares input for each decoding step. After each step, the newly
        decoded token would be added to update `decoder_input_ids`.
        - As we implemented `decoder_attention_mask` generation in `Decoder`, there is no
        need to process attention mask in this method.

        Parameters
        ----------
        decoder_input_ids : torch.Tensor
            Input ids on the decoder side (not the encoder side)! Be careful with the vocab.
        encoder_outputs : BaseModelOutput, optional
            The encoded information once the encoding is done with the encoder. This is to
            have the encoder outputs reused across the decoding steps.
        kwargs

        Returns
        -------
        Inputs to the `Decoder`. This is literally what being fed into the decoder.
        """
        input_ids = kwargs.get("input_ids", None)
        attention_mask = kwargs.get("attention_mask", None)


        return {
            "input_ids": None,  # encoder_outputs is defined. input_ids not needed
            "attention_mask": attention_mask,
            "encoder_outputs": encoder_outputs,
            "decoder_input_ids": decoder_input_ids,
        }

    def prepare_decoder_input_ids_from_labels(self, labels: torch.Tensor):
        return shift_tokens_right(labels, self.config.pad_token_id, self.config.bos_token_id)

In [37]:
# Please instantiate your model here.
# This will be used to do the sanity check for test cases.
embedding_size = 512
hidden_size = 512
num_layers = 2
dropout = 0.2
src_vocab = cmudict_corpus.src_vocab
tgt_vocab = cmudict_corpus.tgt_vocab

# Initialize encoder-decoder model
test_model = EncoderDecoder(
    src_vocab=src_vocab,
    tgt_vocab=tgt_vocab,
    embedding_size=embedding_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    dropout=dropout
)

In [27]:
test_model.lm_head

Linear(in_features=512, out_features=76, bias=True)

In [24]:
print("lm_head:", test_model.lm_head)
print("decoder hidden size:", test_model.decoder.rnn.hidden_size)
print("lm_head in_features:", test_model.lm_head.in_features)

assert test_model.lm_head.in_features == test_model.decoder.rnn.hidden_size, \
    f"LM head in_features ({test_model.lm_head.in_features}) must equal decoder hidden size ({test_model.decoder.rnn.hidden_size})"


lm_head: Linear(in_features=512, out_features=76, bias=True)
decoder hidden size: 512
lm_head in_features: 512


In [25]:
test_model.lm_head.out_features

76

In [45]:
len(test_model.tgt_vocab)

76

In [38]:
grader.check("model-generate-checks")

In [28]:
import inspect
print(inspect.signature(test_model.decoder._masked_softmax))

(scores: torch.Tensor, mask: torch.Tensor, store: bool = False) -> torch.Tensor


In [39]:
def wrapped_generate(model_to_wrap: nn.Module, **kwargs) -> torch.Tensor:
    """Wraps the generate method. This function is what will be actually
    called by the evaluation routine for the leaderboard.

    In this function, you can wrap your `generate()` method to allow
    different generation configurations to be used at the test time.

    Parameters
    ----------
    model_to_wrap : nn.Module
        Your encoder-decoder model.
    kwargs
        Argument dict that passes all arguments to the generate function.

    Returns
    -------
        Generated phoneme sequences in the form of torch.Tensor.
    """
    # This is a compatible version with the above `EncoderDecoder` model.
    if hasattr(model_to_wrap, "tie_weights"):
      model_to_wrap.tie_weights = lambda: None

    # Safe-guard against overwriting during generation
    if hasattr(model_to_wrap, "set_output_embeddings"):
        model_to_wrap.set_output_embeddings = lambda new_emb: None
    if hasattr(model_to_wrap, "set_input_embeddings"):
        model_to_wrap.set_input_embeddings = lambda new_emb: None
    # If you would like to enable SamplingDecoding here, you can make it
    #       return model.generate(do_sample=True, **kwargs)
    return model_to_wrap.generate(
        do_sample=False,
        max_length=50,
        **kwargs
    )

In [40]:
class PhonemeGenerationTask(pl.LightningModule):
    """Wraps a PyTorch module as a Lightning Module for the phoneme generation task."""

    def __init__(self,
                 model: nn.Module,
                 src_vocab: Vocabulary,
                 tgt_vocab: Vocabulary,
                 learning_rate: float = 0.001):
        super(PhonemeGenerationTask, self).__init__()
        self.model = model
        self.learning_rate = learning_rate
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab

        self.cer = CharErrorRate()

    def training_step(self, batch: ParallelBatch) -> torch.Tensor:
        """Defines the training step.

        Parameters
        ----------
        batch : ParallelBatch
            The batched training instances.

        Returns
        -------
        loss : torch.Tensor
            The loss computed from the seq-to-seq model.
        """
        # Please make necessary modifications to accommodate your design
        outputs = self.model(
            input_ids=batch.src_seqs,
            attention_mask=batch.src_attention_mask,
            labels=batch.tgt_seqs
        )
        loss = outputs.loss
        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def validation_step(self, batch: ParallelBatch, batch_idx: int) -> torch.Tensor:
        """Defines the validation step - for this module, we have the same
        training and validation behaviors. Usually, we would compute a metric that is
        used to select the best performing model checkpoint.

        Parameters
        ----------
        batch : ParallelBatch
            The batched training instances.
        batch_idx: int
            The index of the batch.

        Returns
        -------
        loss : torch.Tensor
            The loss computed using CrossEntropyLoss.
        """
        # You are free to modify this function to ensure they are being called correctly.
        outputs = self.model.generate(
            input_ids=batch.src_seqs,
            attention_mask=batch.src_attention_mask,
            max_length=batch.tgt_seqs.size(1)
        )
        decoded_outputs = decode_as_str(self.tgt_vocab, outputs)
        curr_cer = self.cer(decoded_outputs, [' '.join(s) for s in batch.tgt_tokens])
        self.log('val_cer', curr_cer, on_epoch=True, prog_bar=True)

    def configure_optimizers(self):
        """Configures optimizers for the training."""
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        return optimizer

In [41]:
# Please modify the model and trainer configurations to accommodate your design
encoder_decoder_model = EncoderDecoder(
    src_vocab=cmudict_corpus.src_vocab,
    tgt_vocab=cmudict_corpus.tgt_vocab,
    embedding_size=512,
    hidden_size=512,
    num_layers=2,
    dropout=0.2,
)
if is_autograder:  # You have to make sure that you checkpoint can be correctly loaded by the autograder
    CHECKPOINT_TO_LOAD = "vanilla_rnn_checkpoint_large.ckpt"  # Please replace this to your checkpoint file name
    phoneme_gen_pl_module = PhonemeGenerationTask.load_from_checkpoint(
        checkpoint_path=CHECKPOINT_TO_LOAD,
        src_vocab=cmudict_corpus.src_vocab,
        tgt_vocab=cmudict_corpus.tgt_vocab,
        model=encoder_decoder_model
    )
else:
    # In the student mode, a new model would be trained
    # You are allowed to change training hyperparameters
    # But you are not allowed to create a new task
    phoneme_gen_pl_module = PhonemeGenerationTask(
        model=encoder_decoder_model,
        src_vocab=cmudict_corpus.src_vocab,
        tgt_vocab=cmudict_corpus.tgt_vocab,
        learning_rate=0.001
    )
    phoneme_gen_trainer = pl.Trainer(
        accelerator=accelerator,
        max_epochs=16,
        callbacks=[pl.callbacks.EarlyStopping(monitor='val_cer', mode='min')]
    )
    phoneme_gen_trainer.fit(model=phoneme_gen_pl_module,
                        datamodule=cmudict_corpus)

INFO:pytorch_lightning.utilities.rank_zero:💡 Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name  | Type           | Params | Mode 
-------------------------------------------------
0 | model | EncoderDecoder | 17.2 M | train
1 | cer   | CharErrorRate  | 0      | train
-------------------------------------------------
17.2 M    Trainable params
0         Non-trainable params
17.2 M    Total params
68.643    Total estimated model params size (MB)
17        Modu

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

/usr/local/lib/python3.12/dist-packages/pytorch_lightning/utilities/data.py:79: Trying to infer the `batch_size` from an ambiguous collection. The batch size we found is 32. To avoid any miscalculations, use `self.log(..., batch_size=batch_size)`.


Training: |          | 0/? [00:00<?, ?it/s]

/usr/local/lib/python3.12/dist-packages/pytorch_lightning/utilities/data.py:79: Trying to infer the `batch_size` from an ambiguous collection. The batch size we found is 4. To avoid any miscalculations, use `self.log(..., batch_size=batch_size)`.


Validation: |          | 0/? [00:00<?, ?it/s]

/usr/local/lib/python3.12/dist-packages/pytorch_lightning/utilities/data.py:79: Trying to infer the `batch_size` from an ambiguous collection. The batch size we found is 16. To avoid any miscalculations, use `self.log(..., batch_size=batch_size)`.


Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=16` reached.


In [28]:
print("LM head out_features:", encoder_decoder_model.lm_head.out_features)
print("Decoder vocab size:", encoder_decoder_model.tgt_vocab_size)
print("True target vocab size:", len(cmudict_tgt_vocab))
print("Target vocab size:", len(cmudict_corpus.tgt_vocab))
print("Decoder Hidden Size", test_model.decoder.rnn.hidden_size )


LM head out_features: 76
Decoder vocab size: 76
True target vocab size: 76
Target vocab size: 76
Decoder Hidden Size 512


In [29]:
print(len(cmudict_corpus.tgt_vocab))
print(len(encoder_decoder_model.tgt_vocab))

# Get a sample batch from the training dataloader
batch = next(iter(cmudict_corpus.train_dataloader()))

print("Max label id in training data:", batch.tgt_seqs.max().item())
print("Target vocab size:", len(cmudict_tgt_vocab))

print("lm_head out_features:", encoder_decoder_model.lm_head.out_features)
print("tgt_vocab size:", len(cmudict_tgt_vocab))


76
76
Max label id in training data: 75
Target vocab size: 76
lm_head out_features: 76
tgt_vocab size: 76


In [32]:
if "phoneme_gen_pl_module" not in globals():
    encoder_decoder_model = EncoderDecoder(
        src_vocab=cmudict_corpus.src_vocab,
        tgt_vocab=cmudict_corpus.tgt_vocab,
        embedding_size=512,
        hidden_size=512,
        num_layers=2,
        dropout=0.2,
    )
    phoneme_gen_pl_module = PhonemeGenerationTask(
        model=encoder_decoder_model,
        src_vocab=cmudict_corpus.src_vocab,
        tgt_vocab=cmudict_corpus.tgt_vocab,
        learning_rate=0.001,
    )

In [42]:
# You can run this cell to test whether you have get your model trained
# You are expecting to see a phoneme sequence
decode_as_str(
    phoneme_gen_pl_module.model.tgt_vocab,
    wrapped_generate(
        model_to_wrap=phoneme_gen_pl_module.model,
        input_ids=encode_as_tensor(phoneme_gen_pl_module.model.src_vocab,
                         'v a n l a n i n g h a m ').to(phoneme_gen_pl_module.device)
    )
)

['V AE2 N L AE1 N IH0 NG HH AE2 M']

In [44]:
decode_as_str(
    cmudict_corpus.tgt_vocab,
    wrapped_generate(
        model_to_wrap=phoneme_gen_pl_module.model,
        input_ids=torch.tensor([[6, 13, 12, 10], [6, 2, 10, 13]], dtype=torch.long),
        attention_mask=torch.tensor([[True, True, True, True],
                                     [True, True, True, False]], dtype=torch.bool)
    )
)

['Z T M IH1 L Z M AH0 N', 'S M M AY1 L D AH0 N']

In [43]:
grader.check("model-generate-test")