# Pipeline

This notebook builds upon the model defined in the [quickstart](quickstart.ipynb).

The goal of this notebook is to define a full pipeline that not only trains the model, but also builds the dataset, and run this pipeline on a yarn cluster.

We'll see how to

1. Define a custom job to build the dataset.
2. Define a pipeline that builds and trains the model.
3. Use configs to run the pipeline on yarn.

First, some imports

In [1]:
import logging
import sys
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logging.getLogger("tensorflow").setLevel(logging.CRITICAL)
logging.getLogger("cluster_pack").setLevel(logging.CRITICAL)

In [2]:
from dataclasses import dataclass

import tensorflow as tf
import numpy as np
import deepr as dpr

In [3]:
if dpr.io.Path("model").is_dir():
    dpr.io.Path("model").delete_dir()

## 1. Custom Build Dataset Job

The quickstart shortly introduced the concept of a job with the `Trainer` job.

Real-life pipelines consist of multiple jobs. In our example, we want to define a special job that creates the dataset.

Let's see how we would define a custom job that writes the dataset content in a tfrecord file.

### Build Dataset Job

In [4]:
@dataclass
class BuildDataset(dpr.jobs.Job):
    """Build a dummy dataset of random (x, 2*x) as a tfrecord file"""
    
    path_dataset: str
    num_examples: int = 1000
        
    def run(self):
        
        def _generator_fn():
            for _ in range(self.num_examples):
                x = np.random.random()
                yield {"x": x, "y": 2 * x}
                
        def _dict_to_example(data):
            features = {
                "x": dpr.readers.float_feature([data["x"]]),
                "y": dpr.readers.float_feature([data["y"]])
            }
            example = tf.train.Example(features=tf.train.Features(feature=features))
            return example

        with tf.python_io.TFRecordWriter(self.path_dataset) as writer:
            for data in _generator_fn():
                example = _dict_to_example(data)
                writer.write(example.SerializeToString())
        
        print(f"Wrote dataset to '{self.path_dataset}'")

In [5]:
build_job = BuildDataset(path_dataset="data.tfrecord", num_examples=1000)

Let's run the job

In [6]:
build_job.run()

Wrote dataset to 'data.tfrecord'


### Prepro

Because the data is now stored in tfrecord files, the `prepro_fn` needs to deserialize the file's content.

Let's define a preprocessor and check that everything works correctly with the dataset created by the `BuildDataset` job.

In [7]:
def DefaultPrepro(batch_size, repeat_size):
    return dpr.prepros.Serial(
        dpr.prepros.TFRecordSequenceExample(fields=[
            dpr.Field(name="x", shape=(), dtype=tf.float32),
            dpr.Field(name="y", shape=(), dtype=tf.float32)
        ]),
        dpr.prepros.Batch(batch_size=batch_size),
        dpr.prepros.Repeat(repeat_size, modes=[tf.estimator.ModeKeys.TRAIN]),
    )

The `@prepro` decorator creates a class from the function that would be equivalent to 

```python
class DefaultPrepro(dprp.Prepro):

    def __init__(self, batch_size, repeat_size):
        super().__init__()
        self.batch_size = batch_size
        self.repeat_size = repeat_size
        
    def apply(self, dataset: tf.data.Dataset, mode: str = None) -> tf.data.Dataset:
        prepro_fn = dpr.prepros.Serial(
            dprp.TFRecordSequenceExample(fields=[
                dpr.Field(name="x", shape=(), dtype=tf.float32),
                dpr.Field(name="y", shape=(), dtype=tf.float32)
            ]),
            dpr.prepros.Batch(batch_size=batch_size),
            dpr.prepros.Repeat(repeat_size, modes=[tf.estimator.ModeKeys.TRAIN]),
        )
        return prepro_fn(dataset, mode)
```

One of the advantages of the decorator is that the body of the function `DefaultPrepro` does not get executed until the preprocessor is actually applied to the dataset.

This lazy behavior is convenient when resources are created in the function (like tables), resources that should only be defined at runtime.


Let's create an instance of `DefaultPrepro`

In [8]:
prepro_fn = DefaultPrepro(batch_size=32, repeat_size=10)

### Reader

In the quickstart we used a `GeneratorReader`. With tfrecords, let's use a `TFRecordReader`.

In [9]:
reader = dpr.readers.TFRecordReader("data.tfrecord")

In [10]:
for batch in dpr.readers.base.from_dataset(prepro_fn(reader())):
    print(batch)
    break

{'x': array([4.4587484e-01, 7.4005979e-01, 4.4701755e-01, 7.2319156e-01,
       9.4345409e-01, 1.7840658e-01, 7.2584116e-01, 9.9800736e-01,
       2.0697144e-01, 7.9954338e-01, 4.6816257e-01, 5.7633847e-01,
       5.0513422e-01, 7.7767682e-04, 7.4887043e-01, 4.0597153e-01,
       3.5977897e-01, 2.4122241e-01, 7.7473664e-01, 7.0897418e-01,
       2.2657214e-01, 4.1462871e-01, 9.0375322e-01, 5.8769149e-01,
       2.9490185e-01, 8.1388289e-01, 2.5151196e-01, 4.3744230e-01,
       8.1327051e-01, 9.7379398e-01, 1.4938317e-01, 4.8239645e-01],
      dtype=float32), 'y': array([8.9174968e-01, 1.4801196e+00, 8.9403510e-01, 1.4463831e+00,
       1.8869082e+00, 3.5681316e-01, 1.4516823e+00, 1.9960147e+00,
       4.1394287e-01, 1.5990868e+00, 9.3632513e-01, 1.1526769e+00,
       1.0102684e+00, 1.5553536e-03, 1.4977409e+00, 8.1194305e-01,
       7.1955794e-01, 4.8244482e-01, 1.5494733e+00, 1.4179484e+00,
       4.5314428e-01, 8.2925743e-01, 1.8075064e+00, 1.1753830e+00,
       5.8980370e-01, 1.6277

## 2. Define a Pipeline

So far, we have defined

1. A custom `BuildDataset` job
2. Custom layers `Multiply` and `SquaredL2` (in the [quickstart](quickstart.ipynb))
3. A custom preprocessor `DefaultPrepro`


We will need to make these classes available on the `pex` that will be shipped to yarn, so let's add them to a module living alongside the core library.

For example,

```
deepr
├── __init__.py
├── core
├── example
│   ├── __init__.py
│   ├── jobs
│   │   ├── __init__.py
│   │   └── build_dataset.py  # BuildDataset
│   ├── layers
│   │   ├── __init__.py
│   │   ├── loss.py           # SquaredL2
│   │   └── model.py          # Multiply
│   └── prepros
│       ├── __init__.py
│       └── default.py        # DefaultPrepro
```

Now, these classes can easily be imported from anywhere.

Let's replicate the quickstart by defining and running a full pipeline that builds the dataset and then trains a model.

In [11]:
import deepr.example

In [12]:
build_job = deepr.example.jobs.BuildDataset(path_dataset="data.tfrecord", num_examples=1000)

In [13]:
trainer_job = dpr.jobs.Trainer(
    path_model="model", 
    pred_fn=deepr.example.layers.Multiply(), 
    loss_fn=deepr.example.layers.SquaredL2(),
    optimizer_fn=dpr.optimizers.TensorflowOptimizer("Adam", 0.1),
    train_input_fn=dpr.readers.TFRecordReader("data.tfrecord"),
    eval_input_fn=dpr.readers.TFRecordReader("data.tfrecord"),
    prepro_fn=deepr.example.prepros.DefaultPrepro(batch_size=32, repeat_size=10)
)

In [14]:
pipeline = dpr.jobs.Pipeline([build_job, trainer_job])

The pipeline is made of 2 jobs

1. The `BuildDataset` that creates the `tfrecord` file
2. The `Trainer` that trains the model

We can simply run it with

In [15]:
pipeline.run()

INFO:deepr.example.jobs.build_dataset:Wrote dataset to 'data.tfrecord'
INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)
INFO:deepr.jobs.trainer:Running final evaluation, using global_step = 320
INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)
INFO:deepr.jobs.trainer:{'loss': 0.0, 'global_step': 320}


## 3. Run on Yarn

We can't just submit python objects on yarn.

We need to parametrize the execution. Though this could be done in a ad-hoc manner using custom entry points, you can use the `config` capabilities.

To read more about the config system, see the [config introduction](config.ipynb).


In short, you can define arbitrary trees of objects using dictionaries. The special key "type" contains the full import string of the object's class. Other keys will be given as keyword arguments at instantiation time.

### Build Job

In [16]:
build_job_config = {
    "type": "deepr.example.jobs.BuildDataset",
    "path_dataset": "viewfs://root/user/deepr/dev/example/data.tfrecord",
    "num_examples": 1000
}

### Reader

In [17]:
reader_config = {
    "type": "deepr.readers.TFRecordReader",
    "path": "viewfs://root/user/deepr/dev/example/data.tfrecord",
    "num_parallel_reads": 8,
    "num_parallel_calls": 8,
    "shuffle": True
}

### Prepro

In [18]:
prepro_fn_config = {
    "type": "deepr.example.prepros.DefaultPrepro",
    "batch_size": 32,
    "repeat_size": 10
}

### Prediction Function

In [19]:
pred_fn_config = {"type": "deepr.example.layers.Multiply"}

### Loss Function

In [20]:
loss_fn_config = {"type": "deepr.example.layers.SquaredL2"}

### Trainer Job

This is a good example of what a nested config looks like

In [21]:
trainer_job_config = {
    "type": "deepr.jobs.Trainer",
    "path_model": "viewfs://root/user/deepr/dev/example/model",
    "pred_fn": pred_fn_config,
    "loss_fn": loss_fn_config,
    "optimizer_fn": {
        "type": "deepr.optimizers.TensorflowOptimizer",
        "optimizer": "Adam",
        "learning_rate": 0.1
    },
    "prepro_fn": prepro_fn_config,
    "train_input_fn": reader_config,
    "eval_input_fn": reader_config
}

### Train Locally

We can use these configs to re-instantiate the objects using the `from_config` function, which supports arbitrary nesting of configs.

For example, we can re-create the build and trainer jobs with 

In [22]:
build_job = dpr.from_config(build_job_config)

In [23]:
trainer_job = dpr.from_config(trainer_job_config)

and define a new pipeline, that we could then run like above

In [24]:
pipeline = dpr.jobs.Pipeline([build_job, trainer_job])

Instead of training locally (something we've already done twice), let's see how we can leverage the configs to execute the pipeline on yarn.

### Train on Yarn

Let's not run any code on the local machine, but instead submit the pipeline to a `yarn` machine.

Also, instead of running the trainer job on the same machine as the build job, let's use `tf_yarn` distributed training capabilities and launch the trainer job on other yarn machines.

To submit jobs on yarn, it's actually as simple as wrapping job configs into special jobs.

- `YarnLauncher`: submits a job to yarn
- `YarnTrainer`: uses `tf_yarn` to run a `Trainer` job on multiple machines


Let's do it

In [25]:
yarn_launcher_config = deepr.jobs.YarnLauncherConfig(
    path_pex_prefix="viewfs://root/user/deepr/dev/example/envs"
)
job_config = {
    "type": "deepr.jobs.Pipeline",
    "jobs": [
        build_job_config,
        {
            "type": "deepr.jobs.YarnTrainer",
            "trainer": {
                **trainer_job_config, 
                "eval": None  # from_config will not instantiate the trainer argument
            },
            "config": {
                "type": "deepr.jobs.YarnTrainerConfig"
            }
        }
    ]
}
pipeline_yarn = dpr.jobs.YarnLauncher(config=yarn_launcher_config, job=job_config)

Once the `YarnLauncher` job is defined, we can run it. 

It uploads the current environment as a `pex` to HDFS using the settings provided by the `DefaultYarnLauncherConfig`, and then executes the job from its config by simply doing something equivalent to what we did above, i.e. `from_config(job).run()`.

In [26]:
HAS_HADOOP = False

In [27]:
if HAS_HADOOP:
    pipeline_yarn.run()

When the job completes, it only means that the job was successfully submitted to yarn. We need to wait for the job to finish.

After a few minutes, we can check that the build and training jobs ran successfully by looking at the files on the HDFS!

In [28]:
if HAS_HADOOP:
    list(dpr.io.Path("viewfs://root/user/deepr/dev/example").glob("*"))

### Using config files

Because it is sometimes convenient to commit config files for reproducibility and production, it is possible (and recommended) to store configs as `.json` files.


A convenient way to compose configs (similar to what we did by defining different dictionaries before putting them together) is to use [jsonnet](https://jsonnet.org/).

For example, we can define a file `build.jsonnet` like so

```json
{
    "type": "deepr.example.jobs.BuildDataset",
    "path_dataset": "viewfs://root/user/deepr/dev/example/data.tfrecord",
    "num_examples": 1000
}
```

and import it into our pipline config file `config.jsonnet` with

```json
local build = import 'build.jsonnet';
{
    "type": "dpr.jobs.YarnLauncher",
    "config": {
        "type": "deepr.jobs.YarnLauncherConfig",
    },
    "job": build
}
```

You can run config files defining jobs with

```bash
deepr run config.jsonnet
```