In [40]:
#!pip install torchtext torchdata spacy
#!python -m spacy download en_core_web_md

In [41]:
import os
from functools import partial
from pathlib import Path

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pytorch_lightning as pl
from torchdata.datapipes.iter import FileOpener, HttpReader, IterableWrapper, MapToIterConverter, Demultiplexer
from torch.utils.data import DataLoader
from torchtext.data.utils import get_tokenizer
#from torchtext.data.functional import to_map_style_dataset
from torch.utils.data.dataset import random_split
from torchtext.vocab import build_vocab_from_iterator

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [42]:
# Utils Included
# The following utility functions are copied from torchtext
# https://github.com/pytorch/text/blob/main/torchtext/data/datasets_utils.py
import functools
import inspect
import os


def _check_default_set(split, target_select, dataset_name):
    # Check whether given object split is either a tuple of strings or string
    # and represents a valid selection of options given by the tuple of strings
    # target_select.
    if isinstance(split, str):
        split = (split,)
    if isinstance(target_select, str):
        target_select = (target_select,)
    if not isinstance(split, tuple):
        raise ValueError("Internal error: Expected split to be of type tuple.")
    if not set(split).issubset(set(target_select)):
        raise TypeError(
            "Given selection {} of splits is not supported for dataset {}. Please choose from {}.".format(
                split, dataset_name, target_select
            )
        )
    return split


def _wrap_datasets(datasets, split):
    # Wrap return value for _setup_datasets functions to support singular values instead
    # of tuples when split is a string.
    if isinstance(split, str):
        if len(datasets) != 1:
            raise ValueError("Internal error: Expected number of datasets is not 1.")
        return datasets[0]
    return datasets


def _dataset_docstring_header(fn, num_lines=None, num_classes=None):
    """
    Returns docstring for a dataset based on function arguments.
    Assumes function signature of form (root='.data', split=<some tuple of strings>, **kwargs)
    """
    argspec = inspect.getfullargspec(fn)
    if not (argspec.args[0] == "root" and argspec.args[1] == "split"):
        raise ValueError(
            f"Internal Error: Given function {fn} did not adhere to standard signature."
        )
    default_split = argspec.defaults[1]

    if not (isinstance(default_split, tuple) or isinstance(default_split, str)):
        raise ValueError(
            f"default_split type expected to be of string or tuple but got {type(default_split)}"
        )

    header_s = fn.__name__ + " dataset\n"

    if isinstance(default_split, tuple):
        header_s += "\nSeparately returns the {} split".format("/".join(default_split))

    if isinstance(default_split, str):
        header_s += f"\nOnly returns the {default_split} split"

    if num_lines is not None:
        header_s += "\n\nNumber of lines per split:"
        for k, v in num_lines.items():
            header_s += f"\n    {k}: {v}\n"

    if num_classes is not None:
        header_s += "\n\nNumber of classes"
        header_s += f"\n    {num_classes}\n"

    args_s = "\nArgs:"
    args_s += "\n    root: Directory where the datasets are saved."
    args_s += "\n        Default: .data"

    if isinstance(default_split, tuple):
        args_s += "\n    split: split or splits to be returned. Can be a string or tuple of strings."
        args_s += "\n        Default: {}" "".format(str(default_split))

    if isinstance(default_split, str):
        args_s += "\n     split: Only {default_split} is available."
        args_s += (
            "\n         Default: {default_split}.format(default_split=default_split)"
        )

    return "\n".join([header_s, args_s]) + "\n"


def _add_docstring_header(docstring=None, num_lines=None, num_classes=None):
    def docstring_decorator(fn):
        old_doc = fn.__doc__
        fn.__doc__ = _dataset_docstring_header(fn, num_lines, num_classes)
        if docstring is not None:
            fn.__doc__ += docstring
        if old_doc is not None:
            fn.__doc__ += old_doc
        return fn

    return docstring_decorator


def _wrap_split_argument_with_fn(fn, splits):
    """
    Wraps given function of specific signature to extend behavior of split
    to support individual strings. The given function is expected to have a split
    kwarg that accepts tuples of strings, e.g. ('train', 'valid') and the returned
    function will have a split argument that also accepts strings, e.g. 'train', which
    are then turned single entry tuples. Furthermore, the return value of the wrapped
    function is unpacked if split is only a single string to enable behavior such as
    train = AG_NEWS(split='train')
    train, valid = AG_NEWS(split=('train', 'valid'))
    """
    argspec = inspect.getfullargspec(fn)
    if not (
        argspec.args[0] == "root"
        and argspec.args[1] == "split"
        and argspec.varargs is None
        and argspec.varkw is None
        and len(argspec.kwonlyargs) == 0
        and len(argspec.annotations) == 0
    ):
        raise ValueError(
            f"Internal Error: Given function {fn} did not adhere to standard signature."
        )

    @functools.wraps(fn)
    def new_fn(root=os.path.expanduser("~/.torchtext/cache"), split=splits, **kwargs):
        result = []
        for item in _check_default_set(split, splits, fn.__name__):
            result.append(fn(root, item, **kwargs))
        return _wrap_datasets(tuple(result), split)

    new_sig = inspect.signature(new_fn)
    new_sig_params = new_sig.parameters
    new_params = []
    new_params.append(new_sig_params["root"].replace(default=".data"))
    new_params.append(new_sig_params["split"].replace(default=splits))
    new_params += [entry[1] for entry in list(new_sig_params.items())[2:]]
    new_sig = new_sig.replace(parameters=tuple(new_params))
    new_fn.__signature__ = new_sig

    return new_fn


def _wrap_split_argument(splits):
    def new_fn(fn):
        return _wrap_split_argument_with_fn(fn, splits)

    return new_fn


def _create_dataset_directory(dataset_name):
    def decorator(func):
        argspec = inspect.getfullargspec(func)
        if not (
            argspec.args[0] == "root"
            and argspec.args[1] == "split"
            and argspec.varargs is None
            and argspec.varkw is None
            and len(argspec.kwonlyargs) == 0
            and len(argspec.annotations) == 0
        ):
            raise ValueError(
                f"Internal Error: Given function {func} did not adhere to standard signature."
            )

        @functools.wraps(func)
        def wrapper(root=os.path.expanduser("~/.torchtext/cache"), *args, **kwargs):
            new_root = os.path.join(root, dataset_name)
            if not os.path.exists(new_root):
                os.makedirs(new_root)
            return func(root=new_root, *args, **kwargs)

        return wrapper

    return decorator


In [43]:
# Loading IMDB Pipeline From https://github.com/pytorch/data/blob/main/examples/text/imdb.py
URL = "http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"

MD5 = "7c2ac02c03563afcf9b574c7e56c153a"

NUM_LINES = {
    "train": 25000,
    "test": 25000,
}

_PATH = "aclImdb_v1.tar.gz"

DATASET_NAME = "IMDB"


def _path_fn(root, path):
    return os.path.join(root, os.path.basename(path))


def _filter_fn(split, t):
    return Path(t[0]).parts[-3] == split and Path(t[0]).parts[-2] in ["pos", "neg"]


def _file_to_sample(t):
    return Path(t[0]).parts[-2], t[1].read().decode("utf-8")


In [44]:
@_add_docstring_header(num_lines=NUM_LINES, num_classes=2)
@_create_dataset_directory(dataset_name=DATASET_NAME)
@_wrap_split_argument(("train", "test"))
def IMDB(root, split):
    """Demonstrates complex use case where each sample is stored in separate file and compressed in tar file
    Here we show some fancy filtering and mapping operations.
    Filtering is needed to know which files belong to train/test and neg/pos label
    Mapping is needed to yield proper data samples by extracting label from file name
        and reading data from file
    """

    url_dp = IterableWrapper([URL])
    # cache data on-disk
    cache_dp = url_dp.on_disk_cache(
        filepath_fn=partial(_path_fn, root),
        hash_dict={_path_fn(root, URL): MD5},
        hash_type="md5",
    )
    cache_dp = HttpReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True)

    cache_dp = FileOpener(cache_dp, mode="b")

    # stack TAR extractor on top of load files data pipe
    extracted_files = cache_dp.load_from_tar()

    # filter the files as applicable to create dataset for given split (train or test)
    filter_files = extracted_files.filter(partial(_filter_fn, split))

    # map the file to yield proper data samples
    sample = filter_files.map(_file_to_sample)

    def convlabel(x):
        r = None
        if x[0] == 'pos':
            r = 1
        elif x[0] == 'neg':
            r = 0
        else:
            r = -1
            raise (ValueError(f'Error: {x[0]} is not proper value'))

        return r, x[1]

    # sample = sample.map(lambda x: (1 if x[0]=='pos' else 0, x[1]))
    sample = sample.map(convlabel)
    # sample = sample.shuffle().set_shuffle(False).sharding_filter()
    sample = sample.shuffle()
    sample = sample.shuffle().sharding_filter()
    return sample


In [45]:
train_iter = IMDB(split='train')

# Tokenize
tokenizer = get_tokenizer('spacy', 'en_core_web_md')


def yield_tokens(data_iter):
    for _, text in data_iter:
        yield (tokenizer(text))


vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["<unk>"])

print(vocab(['here', 'is', 'an', 'example']))


[161, 8, 42, 490]


In [46]:
# Hyperparameters
BATCH_SIZE = 64
SPLIT_SIZE = 0.8
LR = 0.001
MAX_EPOCHS = 15
vocab_size = len(vocab)
n_classes = 2


In [47]:
class BasicRNN(pl.LightningModule):
    def __init__(
        self, n_layers, hidden_dim, n_vocab, embed_dim, n_classes, dropout_p=0.2
    ):
        super(BasicRNN, self).__init__()
        self.n_layers = n_layers  # ------ RNN 계층에 대한 개수
        self.embed = nn.Embedding(n_vocab, embed_dim)  # ------ 워드 임베딩 적용
        self.hidden_dim = hidden_dim
        self.dropout = nn.Dropout(dropout_p)  # ------ 드롭아웃 적용
        self.rnn = nn.RNN(
            embed_dim, self.hidden_dim, num_layers=self.n_layers, batch_first=True
        )
        self.out = nn.Linear(self.hidden_dim, n_classes)

    def forward(self, x):
        x = self.embed(x)  # ------ 문자를 숫자/벡터로 변환
        h_0 = self._init_state(batch_size=x.size(0))  # ------ 최초 은닉 상태의 값을 0으로 초기화
        x, _ = self.rnn(x, h_0)  # ------ RNN 계층을 의미하며, 파라미터로 입력과 이전 은닉 상태의 값을 받습니다.
        h_t = x[:, -1, :]  # ------ 모든 네트워크를 거쳐서 가장 마지막에 나온 단어의 임베딩 값(마지막 은닉 상태의 값)
        self.dropout(h_t)
        logit = torch.sigmoid(self.out(h_t))
        return logit

    def _init_state(self, batch_size=1):
        weight = next(self.parameters()).data  # ------ 모델의 파라미터 값을 가져와서 weight 변수에 저장
        return weight.new(self.n_layers, batch_size, self.hidden_dim).zero_()

    def training_step(self, batch, batch_idx):
        return self._common_step(batch, batch_idx, "train")

    def validation_step(self, batch, batch_idx):
        self._common_step(batch, batch_idx, "val")

    def test_step(self, batch, batch_idx):
        self._common_step(batch, batch_idx, "test")

    def predict_step(self, batch, batch_idx, dataloader_idx=None):
        label, text = self._prepare_batch(batch)
        return self(text)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=LR)
        return optimizer

    def _prepare_batch(self, batch):
        label, text = batch
        return label, text

    def _common_step(self, batch, batch_idx, stage: str):
        # x = self._prepare_batch(batch)
        label, text = batch
        criterion = nn.CrossEntropyLoss()
        loss = criterion(self(text), label)
        self.log(f"{stage}_loss", loss, on_step=True)
        return loss


In [48]:
text_pipeline = lambda x: vocab(tokenizer(x))


def collate_batch(batch):
    label_list, text_list = [], []
    for (_label, _text) in batch:
        label_list.append(_label)
        processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
        text_list.append(processed_text)

    label_list = torch.tensor(label_list, dtype=torch.int64)
    text_list = torch.cat(text_list)
    return label_list.to(device), text_list.to(device)

count_data = 0
def spliter(n):
    print(n)
    if count_data < int(NUM_LINES['train'] * SPLIT_SIZE):
        count_data += 1
        return 0
    else:
        return 1

# Refill Generators & Put in the DataLoader
train_iter, test_iter = IMDB(split=('train', 'test'))
split_train_, split_valid_ = train_iter.demux(classifier_fn=spliter, num_instances=2, buffer_size=NUM_LINES['train'])
print(count_data)
print(split_train_, split_valid_)

#num_train = int(len(train_dataset) * SPLIT_SIZE)

# split_train_, split_valid_ = random_split(
#     train_dataset, [num_train, len(train_dataset) - num_train]
# )

train_dataloader = DataLoader(
    split_train_,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_batch,
    num_workers=8,
)
valid_dataloader = DataLoader(
    split_valid_,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_batch,
    num_workers=8,
)
test_dataloader = DataLoader(
    test_iter,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_batch,
    num_workers=8,
)


0
_ChildDataPipe _ChildDataPipe


In [49]:
model = BasicRNN(
    n_layers=1,
    hidden_dim=256,
    n_vocab=vocab_size,
    embed_dim=128,
    n_classes=n_classes,
    dropout_p=0.5,
)

if torch.cuda.is_available():
    trainer = pl.Trainer(max_epochs=MAX_EPOCHS, accelerator='gpu', devices=1)
else:
    trainer = pl.Trainer(max_epochs=MAX_EPOCHS)

trainer.fit(
    model=model, train_dataloaders=train_dataloader, val_dataloaders=valid_dataloader,
)
trainer.test(model, dataloaders=test_dataloader)


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name    | Type      | Params
--------------------------------------
0 | embed   | Embedding | 15.5 M
1 | dropout | Dropout   | 0     
2 | rnn     | RNN       | 98.8 K
3 | out     | Linear    | 514   
--------------------------------------
15.6 M    Trainable params
0         Non-trainable params
15.6 M    Total params
62.383    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

RuntimeError: DataLoader worker (pid(s) 13496, 19524, 5712, 17624, 9104, 32224, 31296, 32360) exited unexpectedly