# **Tutorial 3**

### **Specialized Operators**

In this tutorial, we will present how a developer can create your own processing block to be used and connected into the pipeline. In the current stage of the framework, DASF Core has many specialized operators: `Transform`, `Fit`, `Preict`, `MappedTransform` and others.

The `Transform` object is a generic operator that can be used to any purpose. Developers are free to implement the logic they want with some facilities provided by the object inheritance. The main idea of this operator is to extend a operation that transform the content of a dataset. The same idea behing machine learning algorithms implemented by Scikit Learn for example. This operator defines a method called `transform()` that does the magic.

The `MappedTransform` applies a pre-defined function to each dask block if the geographic dependency is strongly connected. It also uses the `transform()` method exactly how the original class does. One example that we will present is the `mean()` method. It requires all data processed to calculate one value. It also requires some special logic to unify the results. The `MappedTransform` should change only that block with minimum effort.

Let's create a simple example of plotting an attribute like we did for **Tutorial 1**.

In [None]:
from dasf_seismic.datasets import F3
from dasf_seismic.attributes.complex_trace import Envelope
from dasf_seismic.visualization import Plot2DIline
from dasf.transforms import ExtractData

dataset = F3(chunks={"iline": 5})

extracted_data = ExtractData()

envelope = Envelope()

plot = Plot2DIline(name="Plot F3 block iline", iline_index=100, swapaxes=(0, 1))

Now, we can create our cluster instance.

In [None]:
from dasf.pipeline.executors import DaskPipelineExecutor

dask = DaskPipelineExecutor()

We can create our simple pipeline.

In [None]:
from dasf.pipeline import Pipeline

pipeline = Pipeline("F3 Block plot pipeline", executor=dask)

pipeline.add(extracted_data, X=dataset) \
        .add(envelope, X=extracted_data) \
        .add(plot.plot, X=envelope) \
        .visualize()

In [None]:
%time pipeline.run()

Now, let's create our own operator to modify the result.

In [None]:
import cupy as cp
import numpy as np
import dask.array as da

from dasf.transforms import Transform

class MyNormalize(Transform):
    def _lazy_transform_gpu(self, X):
        mean = da.mean(X)
        std = da.std(X, ddof=0)
        
        return (X - mean)/std
    
    def _lazy_transform_cpu(self, X):
        mean = da.mean(X)
        std = da.std(X, ddof=0)
        
        return (X - mean)/std

    def _transform_gpu(self, X):
        mean = cp.mean(X)
        std = cp.std(X, ddof=0)
        
        return (X - mean)/std
    
    def _transform_cpu(self, X):
        mean = np.mean(X)
        std = np.std(X, ddof=0)
        
        return (X - mean)/std

    
normalize = MyNormalize()

Notice here that we need to implement 4 methods to cover all possible architectures implemented by the default handlers. The Dask handlers usually manipulate `_lazy_*{cpu,gpu}` functions. On the other hand, local and simple pipelines do not require lazy operations and can be directly applied into the default target: the machine (CPU) or a GPU.

Now, we can append this new block. Let's see what happens when we execute a new pipeline.

In [None]:
pipeline = Pipeline("F3 Block plot pipeline", executor=dask)

pipeline.add(extracted_data, X=dataset) \
        .add(envelope, X=extracted_data) \
        .add(normalize, X=envelope) \
        .add(plot.plot, X=normalize) \
        .visualize()

In [None]:
%time pipeline.run()

As we can see, the output is the same, but now the data is normalized.

Now, let's create a different example using `MappedTransform`. This operator processes each chunk of a dask data (Array or DataFrame) and it works only when the executor is **Dask**.

In [None]:
# For a GPU example only. It is not necessary.
import cupy as cp
import numpy as np

from dasf.transforms import MappedTransform


cupy_raw_kernel = cp.RawKernel(r'''
                extern "C" __global__
                void limit_percent(const float *a, float *out,
                                   float percent,
                                   unsigned int nx, unsigned int ny,
                                   unsigned int nz) {
                    unsigned int idx = threadIdx.x + blockIdx.x * blockDim.x;
                    unsigned int idy = threadIdx.y + blockIdx.y * blockDim.y;
                    unsigned int idz = threadIdx.z + blockIdx.z * blockDim.z;
                    
                    unsigned int center = 0;
                    
                    if ((idx >= 0 && idy >= 0 && idz >= 0) &&
                        (idx < nx) && (idy < ny) && (idz < nz)) {
                        center = ((ny * nz) * idx) + (idy * nz + idz);
                        
                        if (a[center] > percent) {
                            out[center] = percent;
                        } else {
                            if (a[center] < -percent) {
                                out[center] = -percent;
                            } else {
                                out[center] = a[center];
                            }
                        }
                    }
                }
            ''', 'limit_percent')

def limit_percent_block(block):
    """
    A Simple function to calculate the 75% limits for a block.
    """
    dimx = block.shape[0]
    dimy = block.shape[1]
    dimz = block.shape[2]

    out = cp.zeros((dimz * dimy * dimx), dtype=cp.float32)
    inp = cp.asarray(block.flatten(), dtype=cp.float32)

    block_size = 10

    grid = (int(np.ceil(dimx/block_size)),
            int(np.ceil(dimy/block_size)),
            int(np.ceil(dimz/block_size)),)
    block = (block_size, block_size, block_size,)
    
    cupy_raw_kernel(grid, block, (inp, out, cp.float32(0.75),
                                  cp.int32(dimx), cp.int32(dimy), cp.int32(dimz)))
                    
    return cp.asnumpy(out).reshape(dimx, dimy, dimz)
                    

class BlockPercent(MappedTransform):
    def __init__(self):
        super().__init__(function=limit_percent_block)
        
block_percent = BlockPercent()

In [None]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0" # The old executor set CUDA_VISIBLE_DEVICES to "", in order to avoid using GPU memory
dask_gpu = DaskPipelineExecutor(use_gpu=True)

Now, we can append the new block and recreate the pipeline.

In [None]:
pipeline = Pipeline("F3 Block plot pipeline", executor=dask_gpu)

pipeline.add(extracted_data, X=dataset) \
        .add(envelope, X=extracted_data) \
        .add(normalize, X=envelope) \
        .add(block_percent, X=normalize) \
        .add(plot.plot, X=block_percent) \
        .visualize()

Let's test our new block implemented using a Raw Cupy Kernel.

In [None]:
%time pipeline.run()

Now, let's reduce the size of each block to understand how we can take advantage of map/reduce capabilities. For this, we need to have a clear view of which shape each block will return.

For the next example, our function will calculate the `mean()` of each block and return it into an array. Let's create a debug block to read the output either.

In [None]:
def block_mean(block):
    """
    A Simple function the mean of each block.
    """
    return cp.mean(block)
                    

class BlockMean(MappedTransform):
    def __init__(self):
        super().__init__(function=block_mean, output_chunk=(1,))
        
class BlockDebug(Transform):
    def transform(self, X):
        # Aggregate debug function
        print("Mean of the whole cube ---> " + str(np.mean(X.compute())))

        return X

block_mean = BlockMean()
debug_mean = BlockDebug()

Let's see what happens now.

In [None]:
pipeline = Pipeline("F3 Block plot pipeline", executor=dask)

pipeline.add(extracted_data, X=dataset) \
        .add(envelope, X=extracted_data) \
        .add(normalize, X=envelope) \
        .add(block_percent, X=normalize) \
        .add(block_mean, X=block_percent) \
        .add(debug_mean, X=block_mean) \
        .visualize()

Time to test it again.

In [None]:
%time pipeline.run()