# 🔥Using Azure ML Datastore URIs with PyTorch DataPipes

This shows how you can seamlessly use Azure ML Datastore URIs in PyTorch DataPipes. The `azureml` fsspec implementation uses the Azure ML data runtime capability, which is fast and highly efficient for ML tasks.

## What are PyTorch DataPipes?

Early on, we observed widespread confusion between the PyTorch `Dataset` which represented reusable loading tooling (e.g. TorchVision's `ImageFolder`), and those that represented pre-built iterators/accessors over actual data corpora (e.g. TorchVision's ImageNet). This led to an unfortunate pattern of siloed inheritance of data tooling rather than composition.

DataPipe is simply a renaming and repurposing of the PyTorch `Dataset` for composed usage. A DataPipe takes in some access function over Python data structures, `__iter__` for `IterDataPipes` and `__getitem__` for `MapDataPipes`, and returns a new access function with a slight transformation applied. For example, take a look at this `JsonParser`, which accepts an `IterDataPipe` over file names and raw streams, and produces a new iterator over the filenames and deserialized data:

```python
import json

class JsonParserIterDataPipe(IterDataPipe):
    def __init__(self, source_datapipe, **kwargs) -> None:
        self.source_datapipe = source_datapipe
        self.kwargs = kwargs

    def __iter__(self):
        for file_name, stream in self.source_datapipe:
            data = stream.read()
            yield file_name, json.loads(data, **self.kwargs)

    def __len__(self):
        return len(self.source_datapipe)
```

You can see in this example how DataPipes can be easily chained together to compose graphs of transformations that reproduce sophisticated data pipelines, with streamed operation as a first-class citizen.

Under this naming convention, `Dataset` simply refers to a graph of `DataPipes`, and a dataset module like `ImageNet` can be rebuilt as a factory function returning the requisite composed DataPipes. Note that the vast majority of initial support is focused on `IterDataPipes`, while more `MapDataPipes` support will come later.

## Define the URI to read
Here we define the Azure ML URI to read. It is the famours CIFAR10 dataset in tar format.

In [1]:
cifar_uri = "azureml://subscriptions/5b0bd56f-84b3-4ec2-b7a1-41fd06c7edd3/resourcegroups/azureml-rg/workspaces/azureml-ws-dev/datastores/adls/paths/cifar-10-python.tar.gz"
titanic_uri = "azureml://subscriptions/5b0bd56f-84b3-4ec2-b7a1-41fd06c7edd3/resourcegroups/azureml-rg/workspaces/azureml-ws-dev/datastores/adls/paths/titanic.csv"

In [3]:
from torchdata.datapipes.iter import IterableWrapper

titanic_uri = "azureml://subscriptions/5b0bd56f-84b3-4ec2-b7a1-41fd06c7edd3/resourcegroups/azureml-rg/workspaces/azureml-ws-dev/datastores/adls/paths/titanic"

dp = IterableWrapper([titanic_uri]).list_files_by_fsspec()
print(list(dp))

ValueError: Protocol not known: azureml
This exception is thrown by __iter__ of FSSpecFileListerIterDataPipe(masks='')

## Some simple loading examples

### Load a tar file

Below is an example of loading a tar file from the AzureML datastore.

In [None]:
from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper([cifar_uri]) \
        .open_files_by_fsspec(mode="rb") \
        .load_from_tar()

for path, filestream in dp:
    print(path)

azureml:/subscriptions/5b0bd56f-84b3-4ec2-b7a1-41fd06c7edd3/resourcegroups/azureml-rg/workspaces/azureml-ws-dev/datastores/adls/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_4
azureml:/subscriptions/5b0bd56f-84b3-4ec2-b7a1-41fd06c7edd3/resourcegroups/azureml-rg/workspaces/azureml-ws-dev/datastores/adls/paths/cifar-10-python.tar.gz/cifar-10-batches-py/readme.html
azureml:/subscriptions/5b0bd56f-84b3-4ec2-b7a1-41fd06c7edd3/resourcegroups/azureml-rg/workspaces/azureml-ws-dev/datastores/adls/paths/cifar-10-python.tar.gz/cifar-10-batches-py/test_batch
azureml:/subscriptions/5b0bd56f-84b3-4ec2-b7a1-41fd06c7edd3/resourcegroups/azureml-rg/workspaces/azureml-ws-dev/datastores/adls/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_3
azureml:/subscriptions/5b0bd56f-84b3-4ec2-b7a1-41fd06c7edd3/resourcegroups/azureml-rg/workspaces/azureml-ws-dev/datastores/adls/paths/cifar-10-python.tar.gz/cifar-10-batches-py/batches.meta
azureml:/subscriptions/5b0bd56f-84b3-4ec2-b7a1-41fd06

### Load a CSV file

Below shows loading a CSV file.

In [None]:
import numpy as np
from torchdata.datapipes.iter import IterableWrapper

def row_processer(row):
        # if missing age, set to 50
        if row[5] == "":
            row[5] = 50.0
        return {"label": np.array(row[1], np.int32), "data": np.array([row[2],row[5]], dtype=np.float32)}

dp = IterableWrapper([titanic_uri]) \
        .open_files_by_fsspec() \
        .parse_csv(delimiter=",", skip_lines=1) \
        .map(row_processer)

print(list(dp)[:3])

[{'label': array(0, dtype=int32), 'data': array([ 3., 22.], dtype=float32)}, {'label': array(1, dtype=int32), 'data': array([ 1., 38.], dtype=float32)}, {'label': array(1, dtype=int32), 'data': array([ 3., 26.], dtype=float32)}]


## A more in-depth example

### Define functions to parse data

In [None]:
import pickle
import io
import pathlib
import numpy as np
from typing import cast, Tuple, Dict, Any, Iterator
from torchdata.datapipes.iter import IterDataPipe
from torchdata.datapipes import functional_datapipe


# is this a data file
def is_data_file(data: Tuple[str, Any]) -> bool:
    path = pathlib.Path(data[0])
    return path.name.startswith("data")

# function to unpickle file and cast to a dict
def unpickle(data: Tuple[str, io.BytesIO]) -> Dict[str, Any]:
    filename, file = data
    content = cast(Dict[str, Any], pickle.load(file, encoding="latin1"))
    file.close()
    return content

@functional_datapipe("read_cifar")
class CifarFileReader(IterDataPipe[Tuple[np.ndarray, int]]):
    def __init__(self, datapipe: IterDataPipe[Dict[str, Any]], *, labels_key: str) -> None:
        self.datapipe = datapipe
        self.labels_key = labels_key

    def __iter__(self) -> Iterator[Tuple[np.ndarray, int]]:
        for mapping in self.datapipe:
            image_arrays = np.float32(mapping["data"].reshape((-1, 3, 32, 32))/255.0)
            category_idcs = mapping[self.labels_key]
            yield from iter(zip(image_arrays, category_idcs))

### Create the data pipe

In [1]:
from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(iterable=[cifar_uri]) \
    .open_files_by_fsspec(mode='rb') \
    .load_from_tar() \
    .filter(is_data_file) \
    .map(unpickle) \
    .read_cifar(labels_key="labels")

NameError: name 'cifar_uri' is not defined

### Leveraging DataPipe in `DataLoader`

In [7]:
from torch.utils.data import DataLoader

dl = DataLoader(dataset=dp, batch_size=5)
first = next(iter(dl))
features, labels = first[0], first[1]
print(f"Labels batch shape: {labels.size()}")
print(f"Feature batch shape: {features.size()}")
print(f"{labels = }\n{features = }")

Labels batch shape: torch.Size([5])
Feature batch shape: torch.Size([5, 3, 32, 32])
labels = tensor([0, 6, 0, 2, 7])
features = tensor([[[[0.6980, 0.6980, 0.6980,  ..., 0.6667, 0.6588, 0.6471],
          [0.7059, 0.7020, 0.7059,  ..., 0.6784, 0.6706, 0.6588],
          [0.6941, 0.6941, 0.6980,  ..., 0.6706, 0.6627, 0.6549],
          ...,
          [0.4392, 0.4431, 0.4471,  ..., 0.3922, 0.3843, 0.3961],
          [0.4392, 0.4392, 0.4431,  ..., 0.4000, 0.4000, 0.4000],
          [0.4039, 0.3922, 0.4039,  ..., 0.3608, 0.3647, 0.3569]],

         [[0.6902, 0.6902, 0.6902,  ..., 0.6588, 0.6510, 0.6392],
          [0.6980, 0.6941, 0.6980,  ..., 0.6706, 0.6627, 0.6510],
          [0.6863, 0.6863, 0.6902,  ..., 0.6627, 0.6549, 0.6471],
          ...,
          [0.4196, 0.4275, 0.4314,  ..., 0.3804, 0.3686, 0.3725],
          [0.4000, 0.4039, 0.4039,  ..., 0.3725, 0.3647, 0.3608],
          [0.3765, 0.3647, 0.3725,  ..., 0.3294, 0.3373, 0.3294]],

         [[0.7412, 0.7412, 0.7412,  ..., 0.705

### Train a model using PyTorch

#### Define Neural Net

In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

net = Net()
net.to(device=device)

Net(
  (conv1): Conv2d(3, 6, kernel_size=(5, 5), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=400, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=10, bias=True)
)

#### Define loss function and optimization strategy

In [9]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

#### Train model

In [10]:
for epoch in range(2):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(dl, 0):
        # get the inputs; data is a tuple of [inputs, labels]
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0

print('Finished Training')

[1,  2000] loss: 2.248
[1,  4000] loss: 2.014
[1,  6000] loss: 1.868
[1,  8000] loss: 1.751
[1, 10000] loss: 1.657
[2,  2000] loss: 1.608
[2,  4000] loss: 1.534
[2,  6000] loss: 1.526
[2,  8000] loss: 1.487
[2, 10000] loss: 1.447
Finished Training
