# Complex Execution Strategies with Research Module

## Roadmap

1. Introduction to Research Module
    * Basic example
        * 1 pipeline with fixed parameters
             * creating research
             * running several repetitions of an experiment
             * viewing research results
             * saving and loading research
    * Runnung experiments with different parameters aka grid
        * 1 pipeline with variable parameters
             * creating and viewing grids
             * viewing filtered research results
             
             
2. Complex Execution Strategies with Research Module (**You are here**)
    * Reducing extra dataset loads
        * 1 pipeline with root and branch + grid
    * More complex execution strategies
        * 2 pipelines, train & test + function + root&branch + grid
            * adding test pipeline
            * defining results recording frequency aka execute='%n'
            * adding functions
3. 
    * Cross-validation

    * Performance
        * execution tasks managing
    * Combining it all together
        * Super complex Research

## Reducing extra dataset loads

In [None]:
# import warnings
# warnings.filterwarnings('ignore')

# from tensorflow import logging
# logging.set_verbosity(logging.ERROR)

# import os
# os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [1]:
import sys
sys.path.append('../../..')

import matplotlib
%matplotlib inline

from batchflow import Pipeline, B, C, V
from batchflow.opensets import MNIST
from batchflow.models.tf import VGG7, VGG16
from batchflow.research import Research, Option

In [2]:
BATCH_SIZE=64
ITERATIONS=10
REPETITIONS=4

In [3]:
mnist = MNIST()
train_root = mnist.train.p.run(BATCH_SIZE, shuffle=True, n_epochs=None, lazy=True)

In [None]:
grid = Option('layout', ['cna', 'can']) * Option('bias', [True, False])
        
model_config={
    'inputs': dict(images={'shape': (28, 28, 1)},
                   labels={'classes': 10, 'transform': 'ohe', 'name': 'targets'}),
    'initial_block/inputs': 'images',
    'body/block/layout': C('layout'),
    'common/conv/use_bias': C('bias'),
}

train_template = (Pipeline()
            .init_variable('loss')
            .init_model('dynamic', VGG7, 'conv', config=model_config)
            .to_array()
            .train_model('conv', fetches='loss',
                         feed_dict={'images': B('images'),
                                    'labels': B('labels')},
                         save_to=V('loss', mode='w'))
)

In [None]:
research = (Research()
            .add_pipeline(train_root + train_template, variables='loss', name='train')
            .add_grid(grid))

research.run(n_reps=4, n_iters=10, name='research', bar=True)


In [None]:
research.load_results().info()

Each experiment can be divided into 2 stages: root stage that is roughly same for all experiments (for example data loading and preprocessing) and branch stage that varies. If data preprocessing takes significant time one can use the batches generated on a root stage to feed to several branches that belong to different experiments. 

To do so, one should pass *root* and *branch* parameters to *add_pipeline()* and define number of branches per root via *branches* parameter of *run()*.

A root with corresponding branches is called a **job**. Note that different roots produce different batches due to shuffling.

In [None]:
research = (Research()
            .add_pipeline(root=train_root, branch=train_template, variables='loss', name='train')
            .add_grid(grid))

research.run(n_reps=4, n_iters=10, branches=8, name='no_extra_dataload_research', bar=True)


In this toy example we use only 10 iterations to make the effect of reduced dataset load more visible.
We can see that the numbers of results entries is the same.

In [None]:
research.load_results().info()

## More complex execution strategies

In [4]:
grid = Option('model', [VGG7]) * Option('layout', ['cna', 'can']) #* Option('bias', [True, False])

model_config={
    #'session/config': tf.ConfigProto(gpu_options=tf.GPUOptions(per_process_gpu_memory_fraction=0.45)),
    'inputs': dict(images={'shape': (28, 28, 1)},
                   labels={'classes': 10, 'transform': 'ohe', 'name': 'targets'}),
    'initial_block/inputs': 'images',
    'body/block/layout': C('layout'),
    #'common/conv/use_bias': C('bias'),
}

train_template = (Pipeline()
            .init_variable('train_loss')
            .init_model('dynamic', C('model'), 'conv', config=model_config)
            .to_array()
            .train_model('conv', fetches='loss',
                         feed_dict={'images': B('images'),
                                    'labels': B('labels')},
                         save_to=V('train_loss', mode='w'))
)

In [5]:
test_root = mnist.test.p.run(BATCH_SIZE, shuffle=True, n_epochs=1, lazy=True) #Note  n_epochs=1
test_template = (Pipeline()
                 .init_variable('test_loss')
                 .import_model('conv', C('import_from'))
                 .to_array()
                 .predict_model('conv', 
                               fetches='loss',
                               feed_dict={'images': B('images'),
                                          'labels': B('labels')},
                               save_to=V('test_loss', mode='w')))

In [6]:
TEST_EXECUTE_FREQ = '%5'

In [None]:
research = (Research()
            .add_pipeline(root=train_root, branch=train_template, variables='train_loss', name='train')
            .add_pipeline(root=test_root, branch=test_template, variables='test_loss', name='test',
                         execute=TEST_EXECUTE_FREQ, run=True, import_from='train') # Note run=True
            .add_grid(grid))

research.run(n_iters=ITERATIONS, name='my_research', bar=True)


When we use .add_pipeline with run=False (which is the default value) this means that the pipeline is executed batch-wise and an 'iteration' refers to processing a single batch. If we use run=True than the pipeline is executed on the whole dataset and an 'iteration' is rather an epoch.

When we wish to evaluate the models performance naturally we want to execute the test pipeline on the whole validation set 

Here for each config from grid the train pipeline's gen_batch() is run n_iters times. After each 100-th train's gen_batch() the whole test pipeline is run and model's loss on test set is evaluated. Gathering test set loss might seem rather useless, we do it solely to demonstrate how to control multiple pipeline execution, and we will soon move to a more interesting case.

In [None]:
results = research.load_results(iterations=[4,9])
results

### Functions

In [7]:
test_template = (Pipeline()
                 .init_variable('predictions')
                 .init_variable('metrics')
                 .import_model('conv', C('import_from'))
                 .to_array()
                 .predict_model('conv', 
                               fetches='predictions',
                               feed_dict={'images': B('images'),
                                          'labels': B('labels')},
                               save_to=V('predictions'))
                .gather_metrics('class', targets=B('labels'), predictions=V('predictions'), 
                                fmt='logits', axis=-1, save_to=V('metrics')))

In [8]:
def get_accuracy(iteration, experiment, pipeline):
    pipeline = experiment[pipeline].pipeline
    metrics = pipeline.get_variable('metrics')
    return metrics.evaluate('accuracy')

In [None]:
research = (Research()
            .add_pipeline(root=train_root, branch=train_template, variables='train_loss', name='train_ppl')
            .add_pipeline(root=test_root, branch=test_template, name='test_ppl',
                         execute=TEST_EXECUTE_FREQ, run=True, import_from='train_ppl')
            .add_function(get_accuracy, returns='accuracy', name='test_accuracy_fn', execute=TEST_EXECUTE_FREQ, pipeline='test_ppl')
            .add_grid(grid))

research.run(n_reps=1, n_iters=ITERATIONS, name='my_research', bar=True)

In [None]:
results = research.load_results()
results

In [9]:
def function_on_root(iteration, experiments):
    print("Running configs", iteration)
    
research = (Research()
            .add_function(function_on_root, execute=1, on_root=True)
            .add_pipeline(root=train_root, branch=train_template, variables='train_loss', name='train')
            .add_pipeline(root=test_root, branch=test_template, name='test',
                         execute=TEST_EXECUTE_FREQ, run=True, import_from='train')
            .add_function(get_accuracy, returns='accuracy', name='test_accuracy', execute=TEST_EXECUTE_FREQ, pipeline='test')
            .add_grid(grid))

research.run(n_reps=2, n_iters=ITERATIONS, branches=2, name='my_research', bar=True)
#research.run(n_reps=1, n_iters=100, workers=2, branches=2, gpu=[2, 4, 5, 6], name='my_research', bar=True)

Research my_research_31 is starting...
Distributor has 2 jobs with 10 iterations. Totally: 20


100%|██████████████████████████████████████████████████████████████████████████████████| 20/20 [00:55<00:00,  2.76s/it]


<batchflow.research.research.Research at 0x1df08efac18>

## Improving performance

Each worker starts in a separate process and performs one or several jobs assigned to it. Moreover if several GPU's are accessible one can pass indices of GPUs to use via *gpu* parameter.

timeout - minutes, default=5
trials default=2

In [11]:
import tensorflow as tf
model_config={
    'session/config': tf.ConfigProto(gpu_options=tf.GPUOptions(per_process_gpu_memory_fraction=0.45)),
    'inputs': dict(images={'shape': (28, 28, 1)},
                   labels={'classes': 10, 'transform': 'ohe', 'name': 'targets'}),
    'initial_block/inputs': 'images',
    'body/block/layout': C('layout'),
    #'common/conv/use_bias': C('bias'),
}

train_template = (Pipeline()
            .init_variable('train_loss')
            .init_model('dynamic', C('model'), 'conv', config=model_config)
            .to_array()
            .train_model('conv', fetches='loss',
                         feed_dict={'images': B('images'),
                                    'labels': B('labels')},
                         save_to=V('train_loss', mode='w'))
)

research = (Research()
            .add_pipeline(root=train_root, branch=train_template, variables='train_loss', name='train_ppl',
                          dump=TEST_EXECUTE_FREQ)
            .add_pipeline(root=test_root, branch=test_template, name='test_ppl',
                         execute=TEST_EXECUTE_FREQ, run=True, import_from='train_ppl')
            .add_function(get_accuracy, returns='accuracy', name='test_accuracy_fn', 
                          execute=TEST_EXECUTE_FREQ, dump=TEST_EXECUTE_FREQ,
                          pipeline='test_ppl')
            .add_grid(grid))

research.run(n_reps=4, n_iters=ITERATIONS, name='my_research', bar=True, workers=2, gpu=[0], timeout=2, )

Research my_research_32 is starting...
Distributor has 8 jobs with 10 iterations. Totally: 80


100%|██████████████████████████████████████████████████████████████████████████████████| 80/80 [02:24<00:00,  1.80s/it]


<batchflow.research.research.Research at 0x1df06944ac8>