# Benchmarks

In this notebook, we show timings of different parts of **seismiQB**: data generation, segmentation mask creation, model training. We perform each of them in multiple ways, showing different approaches and interfaces provided by our framework (and by **BatchFlow**).

Note that this is advanced notebook that requires you to read our other [tutorials and notebooks](./Carcass%20interpolation/01_M_cube.ipynb) to understand what is going on: we don't pay too much time explaining what exactly cells are doing and what is achieved by their code, we merely time it.

* [Data loading](data)
* [Model architecture](architecture)
* [Pipelines: loading, augmentation and training](pipelines)
* [Load + augmentation profile](profile1)
* [Model training profile](profile2)
* [Inference profile](inference)
* [Conclusion](conclusion)

In [None]:
# Necessary imports
import os
import sys
import warnings
warnings.filterwarnings("ignore")

from tqdm import tqdm

import numpy as np
import pandas as pd
pd.set_option('display.max_rows', 2000)

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, IterableDataset
from pytorch_lightning import LightningModule, Trainer

sys.path.append('..')
from seismiqb.batchflow import Pipeline, FilesIndex
from seismiqb.batchflow import B, V, C, F, D, P, R, W
from seismiqb.batchflow.batchflow.models.torch import EncoderDecoder, ResBlock # Note the import!

from seismiqb import SeismicCubeset, Horizon, plot_image

# Set GPU
%env CUDA_VISIBLE_DEVICES=0

In [None]:
# Global parameters
FREQUENCIES = [50]               # carcass frequency at `hard` and `easy` locations
CROP_SHAPE = (1, 256, 256)       # shape of sampled 3D crops
ITERS = 100                      # number of train iterations
BATCH_SIZE = 64                  # number of crops inside one batch

<a id='data'></a>
# Load everything

First of all, we load dataset with seismic cube and a horizon. This operation is performed once per dataset and does not take more that one minute even for all of our cubes (10 total) and horizons (50+ total).

In [None]:
%%time
cube_path = '/data/seismic/CUBE_2/M_cube.hdf5'
horizon_path = '/data/seismic/CUBE_2/RAW/t0_B_anon'

dsi = FilesIndex(path=[cube_path], no_ext=True)
dataset = SeismicCubeset(dsi)

dataset.load_geometries()
dataset.create_labels({dataset.indices[0]: [horizon_path]})

geometry = dataset.geometries[0]
horizon = dataset.labels[0][0]

In [None]:
# Create carcass to train on
quality_grid = geometry.make_quality_grid(FREQUENCIES)
grid_coverage = (np.nansum(geometry.quality_grid) /
                 (np.prod(geometry.cube_shape[:2]) - np.nansum(geometry.zero_traces)))

# Create sampler, according to carcass
dataset.create_sampler(quality_grid=True)
dataset.modify_sampler('train_sampler', finish=True)

<a id='architecture'></a>
# Model architecture

In [None]:
MODEL_CONFIG = {
    # Defining input shapes here allows to build model at initialization
    'inputs': {
        'images/shape': CROP_SHAPE,
        'masks/shape': CROP_SHAPE,
    },
    
    # Model layout
    'initial_block': {
        'inputs': 'images',
        'base_block': ResBlock,
        'filters': 16,
        'kernel_size': 5,
        'downsample': False,
    },

    'body/encoder': {
        'num_stages': 4,
        'order': 'sbd',
        'blocks': {
            'base': ResBlock,
            'n_reps': 1,
            'filters': [16, 32, 64, 128],
        },
    },
    'body/embedding': {
        'base': ResBlock,
        'n_reps': 1,
        'filters': 256,
    },
    'body/decoder': {
        'num_stages': 4,
        'upsample': {
            'layout': 'bna',
            'scale_factor': 2,
            'kernel_size': 2,
        },
        'blocks': {
            'base': ResBlock,
            'filters': [128, 64, 32, 16],
        },
    },
    'head': {
        'base_block': ResBlock,
        'filters': [16, 8],
    },
    'output': 'sigmoid',
    # Train configuration
    'loss': 'bdice',
    'optimizer': {'name': 'Adam', 'lr': 0.01,},
    'decay': {'name': 'exp', 'gamma': 0.1, 'frequency': 150},
    'microbatch': 4,
    'common/activation': 'relu6',
}

The first call to a GPU takes some time in order to initialize CUDA states; to eliminate this time from actual model initialization time, we manually put some data to the GPU of choice:

In [None]:
np_tensor = np.random.random((10,)).astype(np.float32)

tensor = torch.from_numpy(np_tensor)
%time tensor = tensor.cuda()

tensor = torch.from_numpy(np_tensor)
%time tensor = tensor.cuda()

This time we create instance of model directly, without `init_model` action of `Pipeline`:

In [None]:
%%time
model = EncoderDecoder(MODEL_CONFIG)

<a id='pipelines'></a>
# All the pipelines

In [None]:
load_pipeline = (
    Pipeline()

    # Load data/masks
    .make_locations(points=D('train_sampler')(BATCH_SIZE),
          shape=CROP_SHAPE, adaptive_slices=True)
    .create_masks(dst='masks', width=5)
    .load_cubes(dst='images')
    .adaptive_reshape(src=['images', 'masks'], shape=CROP_SHAPE)
    .normalize(mode='q', src='images')
) << dataset

In [None]:
aug_pipeline = (
    Pipeline()

    # Augmentations
    .transpose(src=['images', 'masks'], order=(1, 2, 0))
    .flip(axis=1, src=['images', 'masks'], seed=P(R('uniform', 0, 1)))
    .additive_noise(scale=0.005, src='images', dst='images')
    .rotate(angle=P(R('uniform', -15, 15)),
            src=['images', 'masks'])
    .scale_2d(scale=P(R('uniform', 0.85, 1.15)),
              src=['images', 'masks'])
    .elastic_transform(alpha=P(R('uniform', 35, 45)),
                       sigma=P(R('uniform', 4, 4.5)),
                       src=['images', 'masks'])
    .transpose(src=['images', 'masks'], order=(2, 0, 1))
) << dataset

In [None]:
model_pipeline = (
    Pipeline()

    # Initialize pipeline variables and model
    .init_variable('loss_history', [])
    .import_model(model, name='model')

    # Training
    .train_model('model',
                 fetches='loss',
                 images=B('images'),
                 masks=B('masks'),
                 save_to=V('loss_history', mode='a'))
) << dataset

<a id='profile1'></a>
# Data generation and augmentations profile

## Regular pipeline usage

That is how we usually use pipelines: simple `run` is enough

In [None]:
%%time
data_pipeline = load_pipeline + aug_pipeline
data_pipeline.run(D.size, n_iters=ITERS, bar=True)

One of the design decisions of our framework is that items in our datasets are cubes: in that notebook, there is only one cube in the `dataset`, therefore, `D.size` evaluates to 1. The logic of converting *batch of cubes* into *batch of crops* is conveniently resides inside `crop` action: under the hood the conversion is performed by creating entirely new batch with generated crop locations. That somewhat confusing behaviour, where one needs to pass *number of cubes* as the `batch size` in `Pipeline.run` method while setting the actual amount of crops elsewhere, allows us to threat tasks with one or multiple cubes the same: there is virtually no changes to do in order to move from the task of carcass interpolation (one cube) to the inter-cube generalization (multiple cubes, as can be deduced by the name).

Setting `profile` argument of `run` to `True` allows us to monitor timings of every individual action; it takes some time to parse and log the profiling results, thus giving (a lot of) overhead. Note that the more iteration you run with `profile` on, the slower it becomes.

In [None]:
%%time
data_pipeline = load_pipeline + aug_pipeline
data_pipeline.run(D.size, n_iters=ITERS, bar=True, profile=True)

Method `show_profile_info` returns a formatted dataframe that can be further explored:

In [None]:
result = data_pipeline.show_profile_info()
print('Total time of actions running: ', result['total_time']['sum'].sum())
result

Each row in the dataframe corresponds to individual action: note that we have two `transpose` actions in our augmentation pipeline, thus multiple transposes appear in the table. `total_time` is time take by both action and pipeline inner workings; `pipeline_time` counts only the time of action running. Sub-columns `sum`, `mean` and `max` provide a more detailed description.

We can get a more detailed description of which exactly *lines of code* take the most time inside our actions by passing `detailed` argument to the `show_profile_info` method. To avoid cluttering in the notebook, we limit the output to two slowest calls per action:

In [None]:
data_pipeline.show_profile_info(detailed=True, limit=2)['tottime'][['sum']]

We can also show individual timings for each iteration. That can be helpful to detect memory leaks and other cumulative errors:

In [None]:
data_pipeline.show_profile_info(per_iter=True).ix[:2]

Refer to [profiling tutorial](https://github.com/analysiscenter/batchflow/blob/master/examples/tutorials/08_profiling.ipynb) to learn more about exact collected information and how to format it.

## Pipeline as data generator

`Pipeline` has multiple interfaces; `gen_batch` allows to iterate over batches, using their attributes with data as usual `NumPy` arrays:

In [None]:
%%time
data_pipeline = load_pipeline + aug_pipeline 

for batch in tqdm(data_pipeline.gen_batch(D.size, n_iters=ITERS), total=ITERS):
    images, masks = batch.images, batch.masks

You can pass the `profile` argument to the `gen_batch` method. For now, let's just make sure that `images` and `masks` variables contain what we expect:

In [None]:
print(f'images: {(type(images), images.dtype, images.shape)}')
print(f'masks:  {(type(masks), masks.dtype, masks.shape)}')

plot_image((images[0, 0, ...], masks[0, 0, ...]), mode='overlap', y=1.,
           xlabel='xlines', ylabel='depth', title='images and masks')

## Convert pipeline to DataLoader

Sometimes, `PyTorch DataLoader` is convenient to use. Pipeline is already of a generative nature, so all we need to do is to wrap it into iterable dataset:

In [None]:
class PipelineDataset(IterableDataset):
    def __init__(self, pipeline, microbatch=4):
        self.pipeline = pipeline
        self.microbatch = microbatch
    
    def get_data(self):
        while True:
            batch = self.pipeline.next_batch(D('size'))
            images, masks = batch.images, batch.masks
                
            for i in range(0, len(images), self.microbatch):
                yield images[i:i+self.microbatch, ...], masks[i:i+self.microbatch, ...]

    
    def __iter__(self):
        return self.get_data()

In [None]:
%%time
pds = PipelineDataset(load_pipeline + aug_pipeline, microbatch=64)

# Note the `None` batch_size: it is already set as part of the loading pipeline
loader =  DataLoader(pds, batch_size=None, pin_memory=True)

for batch, _ in tqdm(zip(loader, range(ITERS)), total=ITERS):
    images, masks = batch

In [None]:
print(f'images: {(type(images), images.dtype, images.shape)}')
print(f'masks:  {(type(masks), masks.dtype, masks.shape)}')

`DataLoader` converts all the data from regular `NumPy` arrays to `Torch.Tensor`s; nevertheless, underlying data is [shared](https://pytorch.org/docs/stable/tensors.html).

<a id='profile2'></a>
# Model train profile

Most of the cells do exactly the same, yet implore model training step as well.

## Regular pipeline usage

We run one iteration of something-GPU-related to ensure warm start for all of the subsequent cells:

In [None]:
%%time
train_pipeline = load_pipeline + aug_pipeline + model_pipeline
train_pipeline.run(D.size, n_iters=1, bar=True)

In [None]:
%%time
train_pipeline.run(D.size, n_iters=ITERS, bar=True)

As before, we can use `profile` flag to get more detailed information:

In [None]:
%%time
train_pipeline = load_pipeline + aug_pipeline + model_pipeline
train_pipeline.run(D.size, n_iters=ITERS, bar=True, profile=True)

In [None]:
result = train_pipeline.show_profile_info()
print('Total time of actions running: ', result['total_time']['sum'].sum())
result['pipeline_time']

Note that this does not allow to profile individual GPU operations. You can this option in model configuration, in `train` method call or directly into the config of already existing model:

In [None]:
%%time
model.full_config['profile'] = True
train_pipeline.run(D.size, n_iters=ITERS, bar=True)
model.full_config['profile'] = False

As you can see, it takes a lot of time, though allows us to granularly inspect every cuda kernel time taken:

In [None]:
model.show_profile_info()

## Pipeline as data generator

That is roughly the same as what goes under the hood of `model_train` action of `Pipeline`:

In [None]:
%%time
data_pipeline = load_pipeline + aug_pipeline

for batch in tqdm(data_pipeline.gen_batch(D.size, n_iters=ITERS), total=ITERS):
    images, masks = batch.images, batch.masks
    model.train(fetches='loss', images=images, masks=masks)

## Convert pipeline to DataLoader; use Lightning to train the model

Obviously, behind our `PyTorch` wrapper lies a plain old `PyTorch` model, that can be accessed via `model` attribute. There are also other attributes to store loss function, optimizer, etc. The `Lightning` wrapper just simply borrows them from our model:

In [None]:
class LightningModel(LightningModule):
    def __init__(self, bf_model, pipeline=None):
        super().__init__()
        self.bf_model = bf_model
        self.pipeline = pipeline
        
        self.loss_list = []
        
    def forward(self, x):
        return self.bf_model.model(x)
    
    def configure_optimizers(self):
        return self.bf_model.train_steps['']['optimizer']
    
    def training_step(self, batch, batch_idx):
        images, targets = batch
        predictions = self(images)
        
        loss_func = self.bf_model.train_steps['']['loss'][0]
        loss = loss_func(predictions, targets)
        
        self.loss_list.append(loss.detach().cpu().numpy())
        logs = {'loss': loss}
        return {'loss': loss, 'log': logs}
    
    def train_dataloader(self):
        if self.pipeline is not None:
            pds = PipelineDataset(self.pipeline,
                                  microbatch=self.bf_model.config.get('microbatch'))
            return DataLoader(pds, batch_size=None, pin_memory=True)

In [None]:
%%time
l_model = LightningModel(bf_model=model, pipeline=(load_pipeline + aug_pipeline) << dataset)

trainer = Trainer(gpus=1,
                  accumulate_grad_batches=16,    # reverse microbatch
                  max_steps=ITERS,               # number of iterations
                  weights_summary=None)

In [None]:
%%time
# Total iterations number is 800 = 16 * 100 = accumulated batches * ITERS 
trainer.fit(model=l_model)

<a id='inference'></a>
# Inference profile

During training we were sampling points along some sparce carcass: that procedure heavily benefits of caching mechanisms in our framework: we need to load very few slices of data during the whole process, so they are instantly loaded into cache and the cube is never touched again.

During inference the whole dynamic changes: we are moving from slice to slice sequentially, so each slide is used multiple times (and cache still works wonders), but the overall amount of cube access increases dramatically. Making more overlapping predictions (contolled by the `stride` argument of `make_grid`) benefits more from caching.

Action `assemble_crops` is not like the others: it is performed only once, at the end of pipeline run. Specifically:

- pipeline iterates over crops, cut from the cube in order to cover the required volume
- for each crop, it makes a prediction with trained neural network
- after all predictions are available, `assemble_crop` creates a huge 3D array from them, taking overlapping crops into account

In [None]:
inference_pipeline = (
    Pipeline()
    # Initialize everything
    .init_variable('result_preds', [])
    .import_model(model, name='model')

    # Load data
    .make_locations(points=D('grid_gen')(),
          shape=CROP_SHAPE)
    .load_cubes(dst='images')
    .adaptive_reshape(src='images', shape=CROP_SHAPE)
    .normalize(mode='q', src='images')

    # Predict with model, then aggregate
    .predict_model('model',
                   B('images'),
                   fetches='predictions',
                   save_to=V('result_preds', mode='e'))
) << dataset

In [None]:
%%time
dataset.make_grid(dataset.indices[0], CROP_SHAPE,
                  [0, 417], [0, 868], [800, 1000],
                  overlap=(1, 96, 96),
                  batch_size=BATCH_SIZE*2)

inference_pipeline.run(D('size'), n_iters=dataset.grid_iters, bar='n')
assembled_pred = dataset.assemble_crops(inference_pipeline.v('result_preds'), order=(0, 1, 2))

There is no use for the created 3D array: we need an actual 2D surface. That is done via `from_mask` staticmethod of `Horizon` class:

In [None]:
%%prun -l 10
horizons = Horizon.from_mask(assembled_pred, dataset.grid_info,
                             minsize=50, threshold=0.5)

In [None]:
horizons[-1].show()

<a id='conclusion'></a>
# Conclusion

We presented detailed benchmarks for various parts of **seismiQB**, as well as showcased multiple interfaces to do data loading and model training. The timings of each approach are roughly the same, so the real difference is API.

This notebook is assumed to be run from time to time to monitor progress and speed-ups with following table to log them:

| date, DD.MM.YYYY | load + augmentations, s | load + augmentations + train, s | inference, s |
| --- | --- | --- | --- |
| 01.06.2019 | ~1600 | ~3600 | ~10000 |
| 01.06.2020 | 50 | 115 | 55 |
| 26.08.2020 | 45 | 110 | 36 |