Build training pipeline for models with different hparams.

In [1]:
#!python -m spacy download en_core_web_sm

In [2]:
import gzip
import json
from typing import Callable, List, Tuple, Iterable, Dict, Type, Any
from functools import reduce, lru_cache
from collections import OrderedDict
import inspect

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams["figure.facecolor"] = "white"
from tqdm import tqdm

import torch as th
import torch.nn.functional as F
from torch import nn
from torch import optim
from torch.nn import Embedding
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, PackedSequence
from torchtext.vocab import vocab, Vocab, GloVe, build_vocab_from_iterator
from torchtext.data.utils import get_tokenizer

import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.loggers import TensorBoardLogger, WandbLogger
from torchmetrics import MeanSquaredError

import optuna
from optuna.visualization import plot_parallel_coordinate, plot_contour
from optuna.importance import get_param_importances

import wandb

from transformers import AutoTokenizer, AutoModelForSequenceClassification

PAD_TOKEN = "[PAD]"
UNK_TOKEN = "[UNK]"
SPECIAL_TOKENS = (PAD_TOKEN, UNK_TOKEN)

# Functions

In [4]:
def nums_from_fractions(total: int, fractions: Tuple[float]) -> Tuple[int]:
    """
    :param fractions: fractions of the total number. One elem must be -1, 
        which denotes "remaining"
    """
    assert fractions.count(-1) == 1, (
        "Must have exactly one occurence of -1 to denote a fraction of 'remaining' items"
    )
    nums = [int(total * f) if f != -1 else 0 for f in fractions]
    idx_remaining = fractions.index(-1)
    nums[idx_remaining] = total - sum(nums)
    assert all([elem >= 0 for elem in nums])
    return tuple(nums)

assert nums_from_fractions(100, [0.7, 0.3, -1]) == (70, 30, 0)
assert nums_from_fractions(100, [0.7, 0.155, -1]) == (70, 15, 15)
assert nums_from_fractions(100, [0.7, 0, -1]) == (70, 0, 30)
# tested that these lines raise error, as expected: 
# nums_from_fractions(100, [0.7, 0.3, -2])
# nums_from_fractions(100, [0.7, 0.5, -1])

def build_vocab_from_texts(
    texts: Iterable[str], tokenizer: Callable, specials=SPECIAL_TOKENS, 
    unk_token=UNK_TOKEN, **kwargs
) -> Vocab:
    tk_seqs = [tokenizer(s) for s in tqdm(texts)]
    voc = build_vocab_from_iterator(tk_seqs, specials=specials, **kwargs)
    voc.set_default_index(voc[unk_token])
    return voc

def seqs_from_texts(
    texts: List[str], tokenizer: Callable, voc: Vocab, pad_token=PAD_TOKEN
) -> th.Tensor:
    """
    Returns padded sequences (numericalized texts), in tensor form
    """
    nz_texts = [th.tensor(voc(tokenizer(text))) for text in texts]
    seqs = pad_sequence(nz_texts, padding_value=voc[pad_token])
    return seqs

def count_oov_rate(
    seqs: Iterable[th.Tensor], 
    voc: Vocab, 
    unk_token=UNK_TOKEN, 
    pad_token=PAD_TOKEN
) -> float:
    num_oov = 0
    num_tokens = 0
    for i, item in enumerate(seqs):
        num_oov += th.sum(item == voc[unk_token]).item()
        num_tokens += th.sum(item != voc[pad_token]).item()
    return num_oov / num_tokens

def glove_voc_and_embedding(
    embedding_dim: int, 
    glove_embedding_params: Dict,
    pad_token=PAD_TOKEN,
    unk_token=UNK_TOKEN
) -> Tuple[Vocab, Embedding]:
    embedding_vecs = GloVe(name=glove_embedding_params["name"], dim=embedding_dim)

    embedding_dict = OrderedDict()
    embedding_dict.update({pad_token: 1})
    embedding_dict.update({unk_token: 1})
    embedding_dict.update(embedding_vecs.stoi)
    # min_freq=0 is a hack to read in the 0th token from embedding_vecs.stoi
    voc = vocab(embedding_dict, min_freq=0)
    voc.set_default_index(voc[unk_token])

    embedding = Embedding.from_pretrained(
        embedding_vecs.vectors, freeze=glove_embedding_params["freeze_embedding"], 
        padding_idx=voc[pad_token]
    )
    
    return voc, embedding

@lru_cache()
def get_raw_data(name: str) -> pd.DataFrame:
    if name == "twitter_disaster":
        df = pd.read_csv("data/data_disaster_tweets.csv")
    elif name == "twitter_sentiment140":
        df = pd.read_csv(
            "data/data_twitter_sentiment.csv", header=None, encoding='latin-1'
        )
        df = df.rename(columns={0: "target_raw", 5: "text"})
        df["target"] = df.target_raw / 4
    elif name == "twitter_sentiment140_random_small":
        df = pd.read_csv(
            "data/data_twitter_sentiment.csv", header=None, encoding='latin-1'
        )
        df = df.rename(columns={0: "target_raw", 5: "text"})
        df["target"] = df.target_raw / 4
        
        random_indices = np.random.choice(
            df.shape[0],
            int(30e3),
            replace=False
        )
        df = df.iloc[random_indices, :]
    elif name == "amazon_office_products":
        data = []
        with gzip.open('data/data_reviews_Office_Products_5.json.gz') as f:
            for l in tqdm(f):
                data.append(json.loads(l.strip()))

        df = pd.DataFrame.from_dict(data)
        df = df.rename(columns={"reviewText": "text", "overall": "target_raw"})
        df["target"] = (df.target_raw - 1) / 4
    elif name == "imdb_reviews":
        basepath = "data/stanford_movie_reviews/aclImdb/"
        labels = {'pos': 1, 'neg': 0}
        df = pd.DataFrame()
        for s in ('test', 'train'):
            for l in ('pos', 'neg'):
                path = os.path.join(basepath, s, l)
                for file in tqdm(sorted(os.listdir(path))):
                    with open(os.path.join(path, file),
                              'r', encoding='utf-8') as infile:
                        txt = infile.read()
                    df = df.append([[txt, labels[l]]],
                                   ignore_index=True)
        df.columns = ['text', 'target']
    else: 
        raise NotImplementedError
    
    return df[["text", "target"]].reset_index(drop=True)

In [18]:
class TextDataset(Dataset):
    def __init__(self, df: pd.DataFrame, tokenizer: Callable, voc: Vocab) -> None:
        assert "text" in df.columns
        assert "target" in df.columns
        self.tokenizer = tokenizer
        self.voc = voc
        
        nz_texts = []  # numericalized_texts
        seq_lengths = []  # sequence lengths
        for text in tqdm(df.text):
            nz_text = th.tensor(self.voc(self.tokenizer(text)))
            nz_texts.append(nz_text)
            seq_lengths.append(len(nz_text))
        
        # shape of x is: T x B, where T is length of longest seq, B is batch size
        self.seqs = pad_sequence(nz_texts, padding_value=self.voc[PAD_TOKEN])
        self.seq_lengths = th.tensor(seq_lengths)
        self.targets = th.tensor(df.target.values).float()
        
    def __len__(self) -> int:
        return len(self.targets)
    
    def __getitem__(self, i: int) -> Tuple[Tuple[th.Tensor, int], float]:
        seq = self.seqs[:, i]
        seq_length = self.seq_lengths[i]
        targets = self.targets[i]
        return (seq, seq_length), targets
    
class TextDistilbertDataset(Dataset):
    def __init__(self, df: pd.DataFrame, hf_tokenizer: Callable) -> None:
        assert "text" in df.columns
        assert "target" in df.columns
        
        tk_output = hf_tokenizer(
            list(df.text), 
            return_tensors="pt", 
            padding="max_length", 
            truncation=True
        )
        self.seqs = tk_output["input_ids"]
        self.attention_masks = tk_output["attention_mask"]
        self.targets = th.tensor(df.target.values).float()
    
    def __len__(self) -> int:
        return len(self.targets)
    
    def __getitem__(self, i: int) -> Tuple[Tuple[th.Tensor, th.Tensor], float]:
        seq = self.seqs[i, :]
        attention_mask = self.attention_masks[i, :]
        targets = self.targets[i]
        return (seq, attention_mask), targets

In [34]:
class GeneralizedTextRNN(pl.LightningModule):
    def __init__(
        self, 
        rnn_cls: Type,
        embedding: nn.Embedding, 
        hidden_size: int = 128, 
        num_layers: int = 1,
        lr: float = 1e-3, 
        dropout: float = 0.5
    ) -> None:
        super().__init__()
        self.rnn_cls = rnn_cls
        self.embedding = embedding
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lr = lr
        self.dropout = dropout
        
        self.save_hyperparameters(ignore=['embedding'])

        # TODO: try using bidirectional in rnn
        self.rnn = self.rnn_cls(
            self.embedding.embedding_dim, self.hidden_size, batch_first=True, 
            dropout=self.dropout, num_layers=self.num_layers
        )
        self.fc = nn.Linear(self.hidden_size, 1)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x: List[th.Tensor]) -> th.Tensor:
        assert len(x) == 2
        seqs, seq_lengths = x
        
        # to work on GPU-enabled machine, need to explicitly set only 
        # the seq_lengths to cpu
        seq_lengths = seq_lengths.to("cpu")
        
        embedded = self.embedding(seqs)
        packed = pack_padded_sequence(
            embedded, seq_lengths, batch_first=True, enforce_sorted=False
        )
        
        # TODO: try usng a randomly generated initial hidden state 
        # (instead of the zero vector default)
        rnn_outputs = self.rnn(packed)
        h_n = self.hidden_state_from_rnn_outputs(rnn_outputs)
        
        assert h_n.shape[0], h_n.shape[2] == (1, self.hidden_size)
        
        x = h_n[-1, :, :]
        x = self.fc(x)
        x = self.sigmoid(x)
        return x
    
    def hidden_state_from_rnn_outputs(self, rnn_outputs: Any) -> th.Tensor:
        """
        Given the outputs from the forward pass through the torch 
        RNN/LSTM, and returns only the h_n (n'th hidden state) tensor. 
        Not implemented here, but must be implemented in subclasses.
        """
        raise NotImplementedError
    
    def training_step(self, batch: th.Tensor, batch_idx: int) -> th.Tensor:
        return self.generalized_step(batch, batch_idx, "train")

    def validation_step(self, batch: th.Tensor, batch_idx: int) -> th.Tensor:
        return self.generalized_step(batch, batch_idx, "val")

    def test_step(self, batch: th.Tensor, batch_idx: int) -> th.Tensor:
        return self.generalized_step(batch, batch_idx, "test")
    
    def generalized_step(
        self, batch: th.Tensor, batch_idx: int, label: str
    ) -> th.Tensor:
        x, y = batch
        predicted = self(x).squeeze(1)
        loss = F.mse_loss(predicted, y)
        self.log(f"{label}_loss", loss)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), self.lr)
        return optimizer
    
class TextVanillaRNN(GeneralizedTextRNN):
    def __init__(
        self, 
        embedding: nn.Embedding, 
        hidden_size: int = 128, 
        num_layers: int = 1,
        lr: float = 1e-3, 
        dropout: float = 0.5
    ) -> None:
        super().__init__(
            nn.RNN, embedding, hidden_size, num_layers, lr, dropout
        )
    
    def hidden_state_from_rnn_outputs(self, rnn_outputs: Any) -> th.Tensor:
        """
        Given the outputs from the forward pass through the torch 
        RNN, and returns only the h_n (n'th hidden state) tensor. 
        """
        return rnn_outputs[1]
        
class TextLSTM(GeneralizedTextRNN):
    def __init__(
        self, 
        embedding: nn.Embedding, 
        hidden_size: int = 128, 
        num_layers: int = 1,
        lr: float = 1e-3, 
        dropout: float = 0.5
    ) -> None:
        super().__init__(
            nn.LSTM, embedding, hidden_size, num_layers, lr, dropout
        )
    
    def hidden_state_from_rnn_outputs(self, rnn_outputs: Any) -> th.Tensor:
        """
        Given the outputs from the forward pass through the torch 
        LSTM, and returns only the h_n (n'th hidden state) tensor. 
        """
        return rnn_outputs[1][0]

class TextDistilbert(pl.LightningModule):
    def __init__(self, hf_model: Callable, lr: float = 1e-3) -> None:
        super().__init__()
        self.lr = lr
        self.save_hyperparameters()
        
        self.hf_model = hf_model
        self.sigmoid = nn.Sigmoid()
        
    
    def forward(self, x: List[th.Tensor]) -> th.Tensor:
        assert len(x) == 2
        seqs, attention_masks = x
        
        logits = self.hf_model(seqs, attention_masks).logits
        return self.sigmoid(logits[:, 1] - logits[:, 0])
    
    def training_step(self, batch: th.Tensor, batch_idx: int) -> th.Tensor:
        return self.generalized_step(batch, batch_idx, "train")

    def validation_step(self, batch: th.Tensor, batch_idx: int) -> th.Tensor:
        return self.generalized_step(batch, batch_idx, "val")

    def test_step(self, batch: th.Tensor, batch_idx: int) -> th.Tensor:
        return self.generalized_step(batch, batch_idx, "test")
    
    def generalized_step(
        self, batch: th.Tensor, batch_idx: int, label: str
    ) -> th.Tensor:
        x, y = batch
        predicted = self(x)
        loss = F.mse_loss(predicted, y)
        self.log(f"{label}_loss", loss)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), self.lr)
        return optimizer

def construct_model(model_config: Dict, embedding: Embedding):
    if model_config["model_arch"] in ["VanillaRNN", "LSTM"]:
        rnn_cls = (
            TextVanillaRNN if model_config["model_arch"] == "VanillaRNN" 
            else TextLSTM
        )
        
        params = list(inspect.signature(rnn_cls).parameters)
        relevant_params = [p for p in params if p != "embedding"]
        hparams = {k: v for k, v in model_config.items() if (k in relevant_params)}
        return rnn_cls(embedding, **hparams)
    else:
        raise NotImplementedError

# Pipeline

In [35]:
def perform_run(run_config: Dict) -> float:
    """
    Returns the performance metric. In this case, it's val_loss
    """
    rc = run_config
    
    df = get_raw_data(rc["data_config"]["name"])
    texts = {}
    texts["train"], texts["val"], texts["test"] = random_split(
        df.text, nums_from_fractions(len(df.text), rc["data_config"]["fractions"])
    )
    
    voc = None
    embedding = None
    oov_rates = {}
    dss = {}  # datasets
    model = None
    if rc["hf_model_name"]:
        hf_tokenizer = AutoTokenizer.from_pretrained(rc["hf_model_name"])
        for label in ["train", "val", "test"]:
            # TODO: test count_oov_rate with hf_tokenizer
            oov_rates[label] = -1
            dss[label] = TextDistilbertDataset(
                df.iloc[texts[label].indices], hf_tokenizer
            )
        hf_model = AutoModelForSequenceClassification.from_pretrained(rc["hf_model_name"])
        model = TextDistilbert(hf_model)
    else:  # use selected config to get tokenizer, vocab, embedding, datasets
        tokenizer = get_tokenizer(**rc["tokenizer_config"])

        if rc["glove_embedding_config"]:
            voc, embedding = glove_voc_and_embedding(
                rc["embedding_dim"], rc["glove_embedding_config"]
            )
        else:
            voc = build_vocab_from_texts(texts["train"], tokenizer)
            embedding = Embedding(len(voc), rc["embedding_dim"], padding_idx=voc[PAD_TOKEN])

        for label in ["train", "val", "test"]:
            oov_rates[label] = count_oov_rate(
                seqs_from_texts(texts[label], tokenizer, voc), 
                voc
            )
            dss[label] = TextDataset(df.iloc[texts[label].indices], tokenizer, voc)
        
        model = construct_model(rc["model_config"], embedding)
    
    dls = {}  # dataloaders
    for label in ["train", "val", "test"]:
        shuffle = True if label == "train" else False
        dls[label] = DataLoader(
            dss[label], 
            batch_size=rc["data_config"]["batch_size"], 
            shuffle=shuffle,
            num_workers=rc["data_config"]["num_workers"]
        )

    logger = WandbLogger(**rc["wandb_config"])
    logger.watch(model, log="all")

    # log more stuff
    wandb.log(dict(
        run_config = wandb.Table(
            columns=list(rc.keys()),
            data=[list(rc.values())],
        ),
    ))
    wandb.log(dict(
        oov_rate_train=oov_rates["train"],
        oov_rate_val=oov_rates["val"],
        oov_rate_test=oov_rates["test"],
        voc_size=len(voc) if voc else None,
    ))

    trainer = pl.Trainer(logger=logger, **rc["trainer_config"])
    trainer.fit(model, dls["train"], dls["val"])
    wandb.finish()
    
    return trainer.logged_metrics['val_loss']

In [36]:
study = optuna.create_study()

[32m[I 2022-01-19 13:58:00,388][0m A new study created in memory with name: no-name-4ea436b8-748d-413b-8f6a-1d01cf35732c[0m


In [None]:
run_config = dict(
    data_config = dict(
        name="twitter_sentiment140_random_small",
        fractions=[0.7, 0.15, -1],
        batch_size=64,
        num_workers=0,  # default is 0
    ),
    hf_model_name="distilbert-base-uncased-finetuned-sst-2-english",
    tokenizer_config = None,
    embedding_dim = None,
    glove_embedding_config = None,  # contains keys: name, freeze_embedding
    model_config = None,
    wandb_config = dict(
        project='scratch', 
        log_model=False
    ),
    trainer_config = dict(
        max_epochs=10,
        gpus=None,
    )
)

val_loss = perform_run(run_config)

[34m[1mwandb[0m: logging graph, to disable use `wandb.watch(log_graph=False)`
GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs

  | Name     | Type                                | Params
-----------------------------------------------------------------
0 | hf_model | DistilBertForSequenceClassification | 67.0 M
1 | sigmoid  | Sigmoid                             | 0     
-----------------------------------------------------------------
67.0 M    Trainable params
0         Non-trainable params
67.0 M    Total params
267.820   Total estimated model params size (MB)


Validation sanity check: 0it [00:00, ?it/s]

  rank_zero_warn(


Training: 0it [00:00, ?it/s]

  rank_zero_warn("Detected KeyboardInterrupt, attempting graceful shutdown...")


VBox(children=(Label(value=' 0.53MB of 0.53MB uploaded (0.00MB deduped)\r'), FloatProgress(value=1.0, max=1.0)…

In [None]:
# for i in range(1):
#     trial = study.ask()
#     trial_hparams = dict(
#         hidden_size=trial.suggest_int('hidden_size', 64, 256),
#         dropout=trial.suggest_uniform('dropout', 0.1, 0.8),
#         lr=trial.suggest_loguniform('lr', 1e-5, 1e-3),
#         num_layers=trial.suggest_categorical('num_layers', [1,2,3,4,5,6])
#     )
    
#     run_config = dict(
#         data_config = dict(
#             name="twitter_sentiment140_random_small",
#             fractions=[0.7, 0.15, -1],
#             batch_size=64,
#             num_workers=0,  # default is 0
#         ),
#         hf_model_name="distilbert-base-uncased-finetuned-sst-2-english",
#         tokenizer_config = dict(
#             tokenizer="spacy",
#             language="en_core_web_sm"
#         ),
#         embedding_dim = 100,
#         glove_embedding_config = None,  # contains keys: name, freeze_embedding
#         model_config = dict(
#             model_arch="LSTM",
#             num_layers=trial_hparams["num_layers"],
#             hidden_size=trial_hparams["hidden_size"],
#             lr=trial_hparams["lr"],
#             dropout=trial_hparams["dropout"],
#             layer_norm=False,
#             residual_connections=False,
#             loss_fn="MSELoss",
#         ),
#         wandb_config = dict(
#             project='expt2b_datasetSentiment140Small30k', 
#             log_model=False
#         ),
#         trainer_config = dict(
#             max_epochs=40,
#             gpus=None,
#         )
#     )

#     val_loss = perform_run(run_config)
#     study.tell(trial, val_loss)

# Scratch