## PyTorch 2.0 - TorchData Live Q&A Example

TorchData offers:
- Nightly releases:
  - Wheel: `pip install --pre torchdata -f https://download.pytorch.org/whl/nightly/cpu/torch_nightly.html`
  - Conda: `conda install torchdata -c pytorch-nightly`
- Official releases:
  - Wheel: `pip install torchdata`
  - Conda: `conda install torchdata -c pytorch`

In [None]:
!pip uninstall -y torch torchdata torchvision torchtext torchaudio fastai
!pip install portalocker
!pip install --pre torch torchdata -f https://download.pytorch.org/whl/nightly/cpu/torch_nightly.html

Found existing installation: torch 1.13.1+cu116
Uninstalling torch-1.13.1+cu116:
  Successfully uninstalled torch-1.13.1+cu116
[0mFound existing installation: torchvision 0.14.1+cu116
Uninstalling torchvision-0.14.1+cu116:
  Successfully uninstalled torchvision-0.14.1+cu116
Found existing installation: torchtext 0.14.1
Uninstalling torchtext-0.14.1:
  Successfully uninstalled torchtext-0.14.1
Found existing installation: torchaudio 0.13.1+cu116
Uninstalling torchaudio-0.13.1+cu116:
  Successfully uninstalled torchaudio-0.13.1+cu116
Found existing installation: fastai 2.7.10
Uninstalling fastai-2.7.10:
  Successfully uninstalled fastai-2.7.10
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting portalocker
  Downloading portalocker-2.7.0-py2.py3-none-any.whl (15 kB)
Installing collected packages: portalocker
Successfully installed portalocker-2.7.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheel

### SQuAd Dataset

For this example, we will be loading the Stanford Question Answering Dataset (SQuAD).

It is "a reading comprehension dataset, consisting of questions posed by crowdworkers on a set of Wikipedia articles, where the answer to every question is a segment of text, or span, from the corresponding reading passage, or the question might be unanswerable."

The dataset is hosted in a GitHub repo from which we can download it.

In [None]:
import os
from functools import partial

from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterableWrapper, IterDataPipe


URL = {
    "train": "https://rajpurkar.github.io/SQuAD-explorer/dataset/train-v1.1.json",
    "dev": "https://rajpurkar.github.io/SQuAD-explorer/dataset/dev-v1.1.json",
}

MD5 = {
    "train": "981b29407e0affa3b1b156f72073b945",
    "dev": "3e85deb501d4e538b6bc56f786231552",
}

NUM_LINES = {
    "train": 87599,
    "dev": 10570,
}

DATASET_NAME = "SQuAD1"

In [None]:
import urllib.request
urllib.request.urlretrieve("https://rajpurkar.github.io/SQuAD-explorer/dataset/train-v1.1.json", "train-v1.1.json")
urllib.request.urlretrieve("https://rajpurkar.github.io/SQuAD-explorer/dataset/dev-v1.1.json", "dev-v1.1.json")

('dev-v1.1.json', <http.client.HTTPMessage at 0x7f46303c87f0>)

### Loading Data with Built-in DataPipes

We will load the dataset using built-in DataPipes provided by TorchData.

DataPipes can be invoked in two ways, using the class constructor (import statements are necessary) or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

In [None]:
root = "."
split = "train"

HASH_DICT = {}
for split in URL.keys():
    HASH_DICT[os.path.join(root, os.path.basename(URL[split]))] = MD5[split]

# Helper function to construct a proper file path
def _filepath_fn(path):
    return os.path.join(root, os.path.basename(path))

For this dataset, we need custom logic to parse through the content of the JSON in order to extract the relevant question, context, and answers.

We can create a custom `IterDataPipe` for that purpose.

In [None]:
@functional_datapipe("read_squad")
class _ParseSQuADQAData(IterDataPipe):
    r"""Iterable DataPipe to parse the contents of a stream of JSON objects
    as provided by SQuAD QA. Used in SQuAD1 and SQuAD2.
    """

    def __init__(self, source_datapipe) -> None:
        self.source_datapipe = source_datapipe

    def __iter__(self):
        for _, stream in self.source_datapipe:
            raw_json_data = stream["data"]
            for layer1 in raw_json_data:
                for layer2 in layer1["paragraphs"]:
                    for layer3 in layer2["qas"]:
                        _context, _question = layer2["context"], layer3["question"]
                        _answers = [item["text"] for item in layer3["answers"]]
                        _answer_start = [item["answer_start"] for item in layer3["answers"]]
                        if len(_answers) == 0:
                            _answers = [""]
                            _answer_start = [-1]
                        yield _context, _question, _answers, _answer_start

If you have downloaded the data files ahead of time, you can simply load the data and parse through them with built-in DataPipe.

The detailed steps are:
1. Open the file
2. Parse through the JSON
3. Custom parsing specific to the SQuAD data format
4. Shuffle the outputs
5. Mark sharding point for DataLoader

In [None]:
squad_dp = IterableWrapper(["./train-v1.1.json"]) \
    .open_files(encoding="utf-8") \
    .parse_json_files() \
    .read_squad() \
    .shuffle() \
    .sharding_filter()

next(iter(squad_dp))


('Despite being an original story, Spectre draws on Ian Fleming\'s source material, most notably in the character of Franz Oberhauser, played by Christoph Waltz. Oberhauser shares his name with Hannes Oberhauser, a background character in the short story "Octopussy" from the Octopussy and The Living Daylights collection, and who is named in the film as having been a temporary legal guardian of a young Bond in 1983. Similarly, Charmian Bond is shown to have been his full-time guardian, observing the back story established by Fleming. With the acquisition of the rights to Spectre and its associated characters, screenwriters Neal Purvis and Robert Wade revealed that the film would provide a minor retcon to the continuity of the previous films, with the Quantum organisation alluded to in Casino Royale and introduced in Quantum of Solace reimagined as a division within Spectre rather than an independent organisation.',
 "Who was Bond's full time guardian?",
 ['Charmian Bond'],
 [427])

For a more advanced use case, you can download (stream) the source files and cache them before the rest of the pre-processing steps.

In [None]:
def make_dp(split):
    # First, we download the file via the URL and cache it locally.
    cache_dp = IterableWrapper([URL[split]]).on_disk_cache(
        filepath_fn=_filepath_fn,
        hash_dict=HASH_DICT,
        hash_type="md5",
    )
    cache_dp = cache_dp.read_from_http() \
        .end_caching(mode="wb", same_filepath_fn=True)

    # Then, we read and parse the cached file the same way as we did before.
    cache_squad_dp = cache_dp.open_files(encoding="utf-8") \
        .parse_json_files() \
        .read_squad() \
        .shuffle() \
        .sharding_filter()
    return cache_squad_dp

You can quickly perform a sanity check with the DataPipe on its own to see check the result.

In [None]:
it = iter(make_dp("train"))
print(next(it))

('The Portuguese language is derived from the Latin spoken by the romanized Pre-Roman peoples of the Iberian Peninsula around 2000 years ago—particularly the Celts, Tartessians, Lusitanians and Iberians. In the 15th and 16th centuries, the language spread worldwide as Portugal established a colonial and commercial empire between 1415 and 1999. Portuguese is now spoken as a native language in five different continents, with Brazil accounting for the largest number of native Portuguese speakers of any country (200 million speakers in 2012).', 'Between what years did Portugal establish a colonial and commercial empire?', ['1415 and 1999'], [329])


After the DataPipe is ready, we can pass it into `DataLoader2` with a `ReadingService`.

The `PrototypeMultiProcessingReadingService` functions similarly to multiprocessing `DataLoader` of `torch.utils.data`. Aside from feature improvements, it also provides prefetching at the worker level (compared to prefetching only at the main process of old `DataLoader`). That helps improves performance, especially in cases where it may take workers some time to establish a remote connection.

In [None]:
from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService

rs = MultiProcessingReadingService(num_workers=2)
dl = DataLoader2(make_dp("train"), reading_service=rs)
for i, x in enumerate(dl):
    print(x)
    if i == 10:
        break
dl.shutdown()

('Internationally, the torch and its accompanying party traveled in a chartered Air China Airbus A330 (registered B-6075), painted in the red and yellow colors of the Olympic Games. Air China was chosen by the Beijing Committees of the Olympic Game as the designated Olympic torch carrier in March 2008 for its long-standing participation in the Olympic cause. The plane traveled a total of 137,000 km (85,000 mi) for a duration of 130 days through 21 countries and regions.', 'What color was the chartered plane?', ['red and yellow'], [136])
('Solar power is the conversion of sunlight into electricity, either directly using photovoltaics (PV), or indirectly using concentrated solar power (CSP). CSP systems use lenses or mirrors and tracking systems to focus a large area of sunlight into a small beam. PV converts light into electric current using the photoelectric effect.', 'What does a concentrated solar power system use?', ['lenses or mirrors and tracking systems'], [170])
("China Mobile h

You can switch to distributed training for the same pipeline with `DistributedReadingService`

In [None]:
import torch.distributed as dist
import torch.multiprocessing as mp


MASTER_ADDR = "127.0.0.1"
WORLD_SIZE = 2

In [None]:
import socket


def _get_open_port():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(("", 0))
    port = s.getsockname()[1]
    s.close()
    return str(port)


os.environ["MASTER_ADDR"] = MASTER_ADDR
os.environ["MASTER_PORT"] = _get_open_port()

There are total 87599 elements in the training set of SQuAd. And, with `WORLD_SIZE = 2`, the data cannot be sharded evenly. And, `DistributedReadingService` would prevent hanging issue due to the `all_reduce` step to train the distributed model per iteration.
Each rank should receive 43799 elements without hanging.

In [None]:
from torchdata.dataloader2.reading_service import DistributedReadingService




In [None]:
ctx = mp.get_context("fork")  # Notebook doesn't work well with spawn
pqs = []
for rank in range(WORLD_SIZE):
    q = ctx.Queue()
    p = ctx.Process(target=distributed_training, args=(rank, WORLD_SIZE, q))
    pqs.append((p, q))
    p.start()

for rank in range(WORLD_SIZE):
    cnt = pqs[rank][1].get()
    print(f"DataLoader2 on rank {rank} received {cnt} data")
    pqs[rank][0].join()

DataLoader2 on rank 0 received 43799 data
DataLoader2 on rank 1 received 43799 data


You can always switch to SequentialReadingService to combine multiple ReadingServices to achieve multiprocessing and distributed trianing.



In [None]:
from torchdata.dataloader2.reading_service import DistributedReadingService, MultiProcessingReadingService, SequentialReadingService


def mp_distributed_training(rank, world_size, q):
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    mp_rs = MultiProcessingReadingService(num_workers=2)
    dist_rs = DistributedReadingService()
    rs = SequentialReadingService(dist_rs, mp_rs)
    dl = DataLoader2(make_dp("train"), reading_service=rs)
    cnt = 0
    for d in dl:
        cnt += 1
        # Mimic distributed training step
        dist.barrier()
    q.put(cnt)
    dl.shutdown()

In [None]:
ctx = mp.get_context("fork")  # Notebook doesn't work well with spawn
pqs = []
for rank in range(WORLD_SIZE):
    q = ctx.Queue()
    p = ctx.Process(target=mp_distributed_training, args=(rank, WORLD_SIZE, q))
    pqs.append((p, q))
    p.start()

for rank in range(WORLD_SIZE):
    cnt = pqs[rank][1].get()
    print(f"DataLoader2 on rank {rank} received {cnt} data")
    pqs[rank][0].join()

DataLoader2 on rank 0 received 43799 data
DataLoader2 on rank 1 received 43799 data
