This code is a demonstration of Fast File Cache Vision (FFCV) library, which provides an efficient data loading pipeline for training deep learning models. The code compares FFCV with the built-in PyTorch DataLoader to show the performance improvements of FFCV.

The code has several parts:

1. The OttoDataset class definition and an instance creation. This dataset is used to save the data in FFCV's custom format .beton using the DatasetWriter.

1. Creation of an FFCV Loader and a PyTorch DataLoader to load the Otto dataset.

1. Definition of a simple neural network model MySuperPooperNetwork and its training setup.

1. Performance comparison of FFCV and PyTorch DataLoader for the Otto dataset.

1. Definition of a WideResNet model MyWideResNet28_10 and its training setup.

1. Loading the CIFAR-10 dataset and saving it in FFCV's custom format .beton using the DatasetWriter.

1. Creation of an FFCV Loader and a PyTorch DataLoader to load the CIFAR-10 dataset.

1. Performance comparison of FFCV and PyTorch DataLoader for the CIFAR-10 dataset with and without the full forward pass.

In [None]:
%env CUDA_VISIBLE_DEVICES=0

env: CUDA_VISIBLE_DEVICES=0


In [None]:
import numpy as np

class OttoDataset:
    def __init__(self,):
        self.X = np.load('../data/otto_ll/X_num_train.npy')
        self.y = np.load('../data/otto_ll/y_train.npy')

    def __getitem__(self, idx):
        return (self.X[idx].astype('float32'), self.y[idx])

    def __len__(self):
        return len(self.X)


dataset = OttoDataset()

In [None]:
dataset[0]

(array([5., 0., 1., 1., 0., 0., 0., 0., 0., 1., 0., 0., 4., 0., 0., 0., 1.,
        0., 0., 0., 0., 2., 0., 0., 4., 0., 0., 0., 2., 0., 0., 0., 4., 0.,
        0., 2., 0., 0., 0., 0., 3., 2., 0., 0., 0., 0., 0., 1., 1., 0., 0.,
        0., 4., 1., 0., 0., 0., 0., 0., 0., 0., 3., 0., 0., 1., 1., 2., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 4., 0., 1., 0., 0., 0., 0., 0., 0.,
        2., 0., 0., 0., 0., 0., 3., 0.], dtype=float32),
 8)

In [None]:
import ffcv
from ffcv.writer import DatasetWriter

dir(ffcv.fields)

['BytesField',
 'Field',
 'FloatField',
 'IntField',
 'JSONField',
 'NDArrayField',
 'RGBImageField',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__path__',
 '__spec__',
 'base',
 'basics',
 'bytes',
 'decoders',
 'json',
 'ndarray',
 'rgb_image']

In [None]:
from ffcv.fields import NDArrayField, IntField
from ffcv.loader import Loader, OrderOption

writer = DatasetWriter('my_easy_data.beton', {
    'x': NDArrayField(shape=(93,), dtype=np.dtype('float32')),
    'y': IntField(),

}, num_workers=16)

writer.from_indexed_dataset(dataset)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████| 39601/39601 [00:00<00:00, 394486.72it/s]


In [None]:
from ffcv.loader import Loader, OrderOption

ffcv_loader = Loader('my_easy_data.beton',
                batch_size=1024,
                num_workers=1,)

In [None]:
from torch.utils.data import DataLoader


torch_loader = DataLoader(dataset, batch_size=1024, num_workers=1, pin_memory=True)

In [None]:
import torch
from torch import nn


class MySuperPooperNetwork(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.model = nn.Sequential(nn.Linear(in_features, 256), nn.ReLU(),
                              nn.Linear(256, 256), nn.ReLU(),
                              nn.Linear(256, out_features))
    
    def forward(self, x):
        return self.model(x)

In [None]:
model = MySuperPooperNetwork(93, 9).cuda()
optimizer = torch.optim.Adam(model.parameters(), 5e-4)
critetion =  nn.CrossEntropyLoss()

In [None]:
import time
from tqdm import tqdm

ffcv_time = []

for j in range(10):
    for i, b in enumerate(tqdm(ffcv_loader)):
        x, y = b
        x = x.cuda()
        y = y.cuda().flatten()
        loss = critetion(model(x), y)
        torch.cuda.synchronize()
        if i > 10:
            ffcv_time.append(time.time() - now)
        now = time.time()
    

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 38/38 [00:00<00:00, 57.90it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 38/38 [00:00<00:00, 879.14it/s]
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 38/38 [00:00<00:00, 1066.70it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 38/38 [00:00<00:00, 964.65it/s]
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 38/38 [00:00<00:00, 1048.61it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 38/38 [00:00<00:00, 967.43it/s]
100%|█████████████████████████████████████████

In [None]:
import time
from tqdm import tqdm

torch_time = []

for j in range(10):
    for i, b in enumerate(tqdm(torch_loader)):
        x, y = b
        x = x.cuda()
        y = y.cuda().flatten()
        loss = critetion(model(x), y)
        torch.cuda.synchronize()
        if i > 10:
            torch_time.append(time.time() - now)
        now = time.time()


100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 39/39 [00:00<00:00, 53.03it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 39/39 [00:00<00:00, 51.84it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 39/39 [00:00<00:00, 50.65it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 39/39 [00:00<00:00, 50.19it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 39/39 [00:00<00:00, 50.42it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 39/39 [00:00<00:00, 50.62it/s]
100%|█████████████████████████████████████████

In [None]:
import pandas as pd

means = []
medians = []
quantiles = []
names = []

for name, times in zip(['FFCV', 'TORCH'], [ffcv_time, torch_time]):
    means.append(np.mean(times))
    medians.append(np.median(times))
    quantiles.append(np.quantile(times, 0.95))
    names.append(name)
    
d = pd.DataFrame({"Mean": means, "Median": medians, "0.95-Quantile": quantiles})
d.index = names
up = ((d.loc['FFCV'] - d.loc['TORCH']) / d.loc['TORCH']) * 100
d.loc['UPLIFT (%)'] = up.values
d

Unnamed: 0,Mean,Median,0.95-Quantile
FFCV,0.000952,0.000957,0.00116
TORCH,0.010384,0.010803,0.014767
UPLIFT (%),-90.832665,-91.143333,-92.144388


Отлично! В данном случае из-за большого батча и маленькой сетки мы получили идеальный кейс использования FFCV с ботлнеком в даталоудинге.

# Кефир-10

In [None]:
#!g1.1
import torch
import torch.nn as nn

class WideBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super(WideBlock, self).__init__()
        self.block_layer = nn.Sequential(
            nn.BatchNorm2d(in_channels),
            nn.ReLU(),
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, 3, stride=stride, padding=1)
        )
        self.use_residual = stride != 1 or in_channels != out_channels
        res_conv_layer = []
        if self.use_residual:
            res_conv_layer.append(nn.Sequential(nn.Conv2d(in_channels, out_channels, 1, stride=stride)))
        self.res_conv = nn.Sequential(*res_conv_layer)

    def forward(self, x):
        out = self.block_layer(x)
        out += self.res_conv(x)
        return out

class WideResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, num_blocks, init_stride):
        super(WideResidualBlock, self).__init__()
        layers = []
        for i in range(num_blocks):
            if i == 0:
                layers.append(WideBlock(in_channels, out_channels, init_stride))
            else:
                layers.append(WideBlock(out_channels, out_channels, 1))
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        x = self.layers(x)
        return x

class MyWideResNet28_10(nn.Module):
    def __init__(self, num_classes):
        super(MyWideResNet28_10, self).__init__()
        self.in_planes = 16

        self.model = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1),
            WideResidualBlock(16, 160, 4, 1),
            WideResidualBlock(160, 320, 4, 2),
            WideResidualBlock(320, 640, 4, 2),
            nn.BatchNorm2d(640, momentum=0.9),
            nn.ReLU(),
            nn.AvgPool2d(8)
        )
        self.linear = nn.Linear(640, num_classes)


    def forward(self, x):
        x = self.model(x)
        return self.linear(x.view(x.shape[0], -1))

In [None]:
import torchvision

dataset = torchvision.datasets.CIFAR10('/tmp', train=True)

In [None]:
dataset[0]

(<PIL.Image.Image image mode=RGB size=32x32 at 0x7F9CF71D5E80>, 6)

In [None]:
from ffcv.fields import RGBImageField, IntField


cifar_writer = DatasetWriter('cifar.beton', {
        'image': RGBImageField(),
        'label': IntField()
    },
)
cifar_writer.from_indexed_dataset(dataset)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50000/50000 [00:00<00:00, 496850.66it/s]


In [None]:
dir(ffcv.transforms)

['Convert',
 'Cutout',
 'ImageMixup',
 'LabelMixup',
 'MixupToOneHot',
 'ModuleWrapper',
 'NormalizeImage',
 'Poison',
 'RandomHorizontalFlip',
 'RandomResizedCrop',
 'RandomTranslate',
 'ReplaceLabel',
 'Squeeze',
 'ToDevice',
 'ToTensor',
 'ToTorchImage',
 'View',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__path__',
 '__spec__',
 'common',
 'cutout',
 'flip',
 'mixup',
 'module',
 'normalize',
 'ops',
 'poisoning',
 'random_resized_crop',
 'replace_label',
 'translate',
 'utils']

In [None]:
from ffcv.fields.decoders import IntDecoder

In [None]:
ffcv_loader = Loader('cifar.beton',
    batch_size=128,
    num_workers=1,
    order=OrderOption.QUASI_RANDOM,
    pipelines = {
        'image': [ffcv.fields.decoders.CenterCropRGBImageDecoder((32, 32), 2/3),
                  ffcv.transforms.ToTensor(),
                  ffcv.transforms.ToDevice('cuda', non_blocking=True),
                  ffcv.transforms.ToTorchImage(),],
        'label': [ffcv.fields.decoders.IntDecoder(),
                  ffcv.transforms.ToTensor(),
                  ffcv.transforms.ToDevice('cuda', non_blocking=True),
                  ffcv.transforms.Squeeze()]
    }
)

In [None]:
from torch.utils.data import DataLoader


torch_loader = DataLoader(torchvision.datasets.CIFAR10('/tmp',
                                                       transform=torchvision.transforms.Compose(
                                                           [
                                                           torchvision.transforms.ToTensor(),
                                                           torchvision.transforms.Resize((48, 48)),
                                                           torchvision.transforms.CenterCrop((32, 32))
                                                           ]
                                                       ),train=True), 
                          batch_size=128, num_workers=1, pin_memory=True, drop_last=True)

In [None]:
model = MyWideResNet28_10(10).cuda()
optimizer = torch.optim.Adam(model.parameters(), 1e-2)
critetion =  nn.CrossEntropyLoss()

In [None]:
import time
from tqdm import tqdm

ffcv_time = []


for i, b in enumerate(tqdm(ffcv_loader)):
    x, y = b
    x = (x / 255)
    loss = critetion(model(x), y)
    torch.cuda.synchronize()
    if i > 10:
        ffcv_time.append(time.time() - now)
    now = time.time()

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 390/390 [00:12<00:00, 32.14it/s]


In [None]:
ffcv_time_load_only = []

for i, b in enumerate(tqdm(ffcv_loader)):
    torch.cuda.synchronize()
    if i > 10:
        ffcv_time_load_only.append(time.time() - now)
    now = time.time()


100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 390/390 [00:00<00:00, 1048.43it/s]


In [None]:
torch_time_load_only = []

for i, b in enumerate(tqdm(torch_loader)):
    torch.cuda.synchronize()
    if i > 10:
        torch_time_load_only.append(time.time() - now)
    now = time.time()

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 390/390 [00:11<00:00, 33.57it/s]


In [None]:
import time
from tqdm import tqdm

torch_time = []


for i, b in enumerate(tqdm(torch_loader)):
    x, y = b
    x = x.cuda()
    y = y.cuda().flatten()
    loss = critetion(model(x), y)
    torch.cuda.synchronize()
    if i > 10:
        torch_time.append(time.time() - now)
    now = time.time()

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 390/390 [00:11<00:00, 33.34it/s]


Full forward pass:

In [None]:
import pandas as pd

means = []
medians = []
quantiles = []
names = []

for name, times in zip(['FFCV', 'TORCH'], [ffcv_time, torch_time]):
    means.append(np.mean(times))
    medians.append(np.median(times))
    quantiles.append(np.quantile(times, 0.95))
    names.append(name)
    
d = pd.DataFrame({"Mean": means, "Median": medians, "0.95-Quantile": quantiles})
d.index = names
up = ((d.loc['FFCV'] - d.loc['TORCH']) / d.loc['TORCH']) * 100
d.loc['UPLIFT (%)'] = up.values
d

Unnamed: 0,Mean,Median,0.95-Quantile
FFCV,0.025751,0.025428,0.027036
TORCH,0.029059,0.029022,0.03023
UPLIFT (%),-11.383726,-12.383448,-10.56397


Just dataloader step:

In [None]:
import pandas as pd

means = []
medians = []
quantiles = []
names = []

for name, times in zip(['FFCV', 'TORCH'], [ffcv_time_load_only, torch_time_load_only]):
    means.append(np.mean(times))
    medians.append(np.median(times))
    quantiles.append(np.quantile(times, 0.95))
    names.append(name)
    
d = pd.DataFrame({"Mean": means, "Median": medians, "0.95-Quantile": quantiles})
d.index = names
up = ((d.loc['FFCV'] - d.loc['TORCH']) / d.loc['TORCH']) * 100
d.loc['UPLIFT (%)'] = up.values
d

Unnamed: 0,Mean,Median,0.95-Quantile
FFCV,0.000815,0.0008,0.000891
TORCH,0.028923,0.028822,0.030172
UPLIFT (%),-97.182661,-97.2239,-97.045736


Мы получили, что очень сильно ускоряемся на этапе датлоудинга, но в рамках полного форвард степа разница не такая заметная. В чем причина? Дело в том, что моделька теперь уже не такая маленькая и в случае торчового даталоадера имеет сопоставимое с ним время форвард паса, а это значит что, после того, как CPU положила все свои инструкции для форварда модели на GPU планировщик, то оно может спокойно идти считать тяжелый батч, пока модель будет пыхтеть на GPU. А значит два тяжелых времени по сути пересекаются, а не суммируются.