## Practical examples of simple workflow building and integration with PyOphidia and ESDM-PAV Client modules

This notebook will show the basic usage and some advanced features (dependency specification, iterative and parallel interfaces, selection interface, task error handling) related to the Ophidia workflows.

We will be using **PyOphidia** module and the **ESDM-PAV Client**, a Python module providing the features to model and execute a *Post-processing, Analytics and Visualisation* (PAV) experiment (https://github.com/OphidiaBigData/esdm-pav-client).

It implements three main classes:

- **Workflow**: it submits, cancels and monitors an PAV experiment execution (a workflow)
- **Experiment**: it creates or loads a PAV experiment that is a sequence of tasks
- **Task**: it creates a Task object that can be embedded in an PAV experiment workflow

First of all, import **ESDM-PAV Client** module and **PyOphidia** module setting up a connection to the **Ophidia Server**

In [None]:
from esdm_pav_client import Workflow, Experiment, Task
import sys
from PyOphidia import cube,client
cube.Cube.setclient(read_env=True)

Make sure you restart the container in the right folder. The cube space should be completely **empty**.

In [None]:
cube.Cube.list(level=2)

<hr style="height:7px;border-top:2px solid #0000FF" />

### 1. Basic workflow: import + subset + export

Create a simple PAV experiment consisting of 3 tasks: 
- **one independent task**: *Import* to import a NetCDF file into a datacube
- **two dependent tasks**: *Subset* and *Export* to perform a subsetting operation along the dimensions of the  datacube and export the result into a new NetCDF file.


<img src="imgs/1_Basic_workflow.svg" alt="basic_workflow" width="100">


Let's start building the first workflow step by step!

First of all, we define in the experiment a few **global attributes**, which include a number of metadata and default parameters values common to all the tasks. 
Some of these keywords are mandatory:
- ```name```: the title of the workflow
- ```author```: the author’s name
- ```abstract```: a short description of the workflow

The parameter ```exec_mode``` specifies the execution mode, synchronous or asynchronous, and it refers to the entire workflow, not to single tasks. In case of synchronous mode the workflow will be executed in a blocking way, so the submitter will have to wait until it will be finished to display the results. If the execution mode is asynchronous, the workflow will be processed in a non-blocking way (like a batch mode), allowing the submitter to immediately take the control and eventually submit other requests.

Since a lot of tasks could be launched in parallel, an important parameter is the number of cores per task (```ncores```) which specifies the default value to be associated with all the workflow’s tasks. This value can be overridden with another one tailored to a task with a different behaviour.

By using the ```on_exit``` parameter the user can select the cubes that will be dropped out when a workflow ends.
The default value of ```on_exit``` can be set as global attribute. Admitted values are:
- oph_delete: remove the output cube
- oph_deletecontainer: remove the output container (valid only for OPH_CREATECONTAINER)
- nop: no operation has to be applied (default).

In [None]:
e1 = Experiment(
    name= "Basic workflow",
    author= "CMCC",
    abstract= "Perform some basics operations using workflows",
    exec_mode= "sync",
    ncores="2",
    on_exit="oph_delete"
)

<hr style="height:1px;border-top:1px solid #0000FF" />

Each task is uniquely identified within the workflow by its ```name``` and it is related to a specific Ophidia Operator set as ```operator```. According to that operator, the user can optionally insert an array of key-value pairs (```arguments```) in order to call the operator with the appropriate arguments.

The ```type``` field can be used to specify the tool for task execution (e.g. Ophidia or CDO), or to identify tasks that “control” the flow execution, as for example: iterative loops, parallel branches, selection statements, waits for data availability, etc. 

The first task of the workflow is related to the *oph_importnc* operator. According to the operator specification, we need to specify the mandatory arguments: *input* (i.e. the src_path) and *measure*. In this example, the two arguments values, ```$1``` and ```$2```, will be replaced with the workflow input parameters before sending the request to the Ophidia Server.

In addition, we can specify other arguments, such as:
- *import_metadata=yes* to import also metadata from the input NetCDF file
- *imp_dim=time* to arrange data in order to operate on time series
- *imp_concept_level=d* to set the concept level to *day*
- *description* to add a datacube description

In [None]:
t1 = e1.newTask(name="Import",
               type="ophidia",
               operator='oph_importnc',
               arguments={'measure': '$2',
                          'import_metadata': 'yes',
                          'imp_dim': 'time', 
                          'imp_concept_level': 'd',
                          'hierarchy': 'oph_base|oph_base|oph_time',
                          'description': 'Max Temp', 
                          'input': '/home/ophidia/notebooks/$1'})

<hr style="height:1px;border-top:1px solid #0000FF" />

The second task is related to the *oph_subset* operator. 

For example, we can consider the whole spatial domain and specify a subset only on the time range, as indicated by the *subset_dims* parameter.
We can select a particular season by using the corresponding code for the *subset filter* argument:
 - *DJF* for winter
 - *MAM* for spring
 - *JJA* for summer
 - *SON* for autumn

The ```dependencies``` argument can be used to specify a list of dependencies in a compact format using a Python dictionary structure. Each dependency can be defined by specifying the task object to be linked (e.g., task *t1*) followed by the input argument in the dependent task.
The input cube is the output cube of the previous task (*Import*), so we have to specify a dependency between these two tasks, so the ```cube``` parameter of the *oph_subset* operator will be set to the PID of the cube imported in the *Import* task.


In [None]:
t2 = e1.newTask(name="Subset",
               type="ophidia", 
               operator='oph_subset', 
               arguments={'subset_filter': 'JJA', 'subset_dims': 'time', 'subset_type': 'coord', 
                          'description': 'JJA'},
               dependencies={t1:'cube'})

<hr style="height:1px;border-top:1px solid #0000FF" />

Finally, we can export the subsetted cube by using the *oph_exportnc2* operator.

In this case we have to:
- provide the argument for the oph_exportnc2 operator: *output*
- set a dependency from the previous (i.e. Subset) task

In [None]:
t3 = e1.newTask(name="Export",
               type="ophidia",
               operator='oph_exportnc2', 
               arguments={'output': '/home/ophidia/notebooks/JJA.nc'},
               dependencies={t2:'cube'})

<hr style="height:1px;border-top:1px solid #0000FF" />

The first workflow is ready!

We can save it as a JSON object by using the ```save``` method, in case we need to submit it later from a batch script. The JSON file is created in the same folder. 

In [None]:
e1.save("simple_exp")
print(open("simple_exp.json", "r").read())

Thanks to the ```check``` method, the user can verify the experiment structure before submitting the execution.
The argument *visual* (set to ```True```) can be used to visualise the experiment definition graph as a picture.

In [None]:
e1.check(visual=True)

<hr style="height:1px;border-top:1px solid #0000FF" />

Before submitting the experiment, we have to define its input arguments:
- *NetCDF filename* under the */home/ophidia/notebooks/* folder
- *variable* to be imported

In [None]:
file="tasmax_day_CMCC-CESM_rcp85_r1i1p1_20960101-21001231.nc"
variable="tasmax"

<hr style="height:1px;border-top:1px solid #0000FF" />

Once a new workflow has been instantiated from an experiment definition, it can be executed with the ```submit``` method, providing the input parameters (```$1``` and ```$2```):
- the source path used in the oph_importnc operator to load the NetCDF file
- the variable name used to set the measure argument in the import operator

In [None]:
w1 = Workflow(e1)
w1.submit(file,variable)

<hr style="height:1px;border-top:1px solid #0000FF" />

Finally, the ```monitor``` method allows users to check the experiment execution progress and the status of all the tasks composing the experiment; the status can be displayed in a graphical format, similar to what shown by the check method.
The function takes 3 optional parameters:
- *frequency* parameter, i.e., an integer that determines how often the status is updated
- *iterative* parameter which defines if the status has to be periodically updated (with ```True```) or not (with ```False```)
- *visual_mode* parameter which defines if the status will be displayed via a graph (with ```True```), otherwise the status will be provided by text.

In [None]:
w1.monitor(frequency=1, iterative=True, visual_mode=True)

<hr style="height:1px;border-top:1px solid #0000FF" />

Let's check exported file

In [None]:
! ls -lh /home/ophidia/notebooks | grep "\.nc"

<hr style="height:1px;border-top:1px solid #0000FF" />

The cube workspace is already empty (we have no datacubes) because we've used ```"on_exit": "oph_delete"``` as workflow global attribute.

In [None]:
cube.Cube.list(level=2)

<hr style="height:1px;border-top:1px solid #0000FF" />

We just need to remove the container automatically created by the *oph_importnc* operator. By default, the container name is equal to the name of the imported file.

In [None]:
cube.Cube.deletecontainer(container=file,force='yes')

<hr style="height:7px;border-top:2px solid #0000FF" />

### 2. Workflows: iterative and parallel interfaces

Let's now consider a slightly more complex workflow, in which we are going to:

- import and subset multiple NetCDF files
- merge all the subsetted datacubes
- perform a reduction (avg, max, min, ...) operation
- export the output datacube

<img src="imgs/2_Iterative_parallel.svg" alt="iterative_parallel" width="900">

As input files, we can use the daily NetCDF files produced by the CMCC-CESM model and related to the *tasmax* variable for the years 2096-2100. 

In [None]:
! ls /home/ophidia/notebooks | grep "tasmax"

<hr style="height:1px;border-top:1px solid #0000FF" />

In addition to the Ophidia Data Import/Export & analysis operators, we are going to exploit the ```for``` and ```endfor``` flow control operators to implement a **for** loop. 

Unlike other operators, these do not operate on data or metadata, but can be adopted to set particular flow control rules for the Workflow manager. In particular, the operators are used to begin/end a sub-section that has to be executed several times.

Let's build the workflow step by step.

In the following cell we define some global attributes as in the previous example.

In [None]:
e2 = Experiment(
    name="Loop operations",
    author="CMCC",
    abstract="Perform some basics operations using workflows",
    exec_mode="sync",
    ncores="1",
    on_exit="oph_delete"
)

<hr style="height:1px;border-top:1px solid #0000FF" />

**Create a new container**

This is the first task, so it has no dependencies. We just have to provide the proper arguments to the *oph_createcontainer* operator:
- the container name
- the name and the type of the dimensions allowed
- the concept hierarchy name of the dimensions

In [None]:
t1 = e2.newTask(name="Create container",
                type="ophidia",
                operator='oph_createcontainer',
                on_error='skip',
                arguments={'container': 'workflow',
                           'dim': 'lat|lon|time',
                           'dim_type': 'double|double|double',
                           'hierarchy': 'oph_base|oph_base|oph_time'})

<hr style="height:1px;border-top:1px solid #0000FF" />

**FOR statement**

The FOR operator is used to configure the iterative block and, in particular, to set the number N of loops to be executed. By this aim, we have to provide an ordered list of N labels to be assigned to cycles in order to distinguish one cycle from another one. The list is assigned to the ```values``` parameter, separating each value by | ("pipe"). 

In our example, we provide a list of years in order to import the corresponding NetCDF file in the next task.

A name has to be associated to the list values by setting the ```key``` parameter (e.g. ```year```), which is used in the inner tasks in the form **@{key_name}** to access the current value of the counter/label. 

The ```type="control"``` identifies a flow control operator.  

Finally, we define a simple flow dependency (```t1:''```), since this task has to begin only after the previous "Create container" task has finished, but no data is generated.

In [None]:
t2 = e2.newTask(name="Start loop",
                type="control",
                operator='for',
                arguments={"key": "year", "values": "2096|2097|2098|2099|2100"},
                dependencies={t1:''})

<hr style="height:1px;border-top:1px solid #0000FF" />

**Import and subset multiple datacubes in parallel**

The two inner tasks to be repeated (import and subset) have to depend on FOR task directly or indirectly, namely they depend on other tasks in the iterative block. 

Setting the parameters of these tasks the user is able to exploit the value of the label associated with current iteration. 

*IMPORT task*

The **input** as well the **description** parameters in the *oph_importnc* operator are defined in a parametrized way to get the current value of the **year** key for each iteration.

This task has a flow dependency from the "Start loop" task in order to start after this task and retrieve the right value of the label associated with current iteration. 

In [None]:
t3 = e2.newTask(name="Import",
                type="ophidia",
                operator='oph_importnc',
                arguments={'measure': 'tasmax',
                           'container': 'workflow',
                           'import_metadata': 'yes',
                           'imp_dim': 'time', 
                           'imp_concept_level': 'd',
                           'hierarchy': 'oph_base|oph_base|oph_time',
                           'description': 'Max Temp @{year}', 
                           'input': '/home/ophidia/notebooks/tasmax_day_CMCC-CESM_rcp85_r1i1p1_@{year}0101-@{year}1231.nc'},
                dependencies={t2:''})

*SUBSET task*

This task has a dependency from the "Import" task since each subset operation has to be performed on the corresponding datacube imported at the previous import step, so the output from the *Import* task is the input for the *Subset*

In [None]:
t4 = e2.newTask(name="Subset",
                type="ophidia",
                operator='oph_subset', 
                arguments={'subset_filter': 'JJA', 
                           'subset_dims': 'time', 
                           'subset_type': 'coord', 
                           'description': 'JJA @{year}'},
                dependencies={t3:'cube'})

<hr style="height:1px;border-top:1px solid #0000FF" />

**End loop**

The ENDFOR operator ends an iterative block, has no arguments and depends on the inner tasks.

In our example, it depends on the "Subset" task. In this way, it can gather PIDs of all cubes generated by the (subset) inner task and transfer them to next tasks.

In [None]:
t5 = e2.newTask(name="End loop",
                type="control",
                operator='endfor',
                arguments={},
                dependencies={t4:'cube'})

<hr style="height:1px;border-top:1px solid #0000FF" />

**Merge all the subsetted datacubes into a single datacube**

All the subsetted datacubes can be now merged into a single datacube by using the **oph_mergecubes** operator: the resulting datacube will contain the JJA subset for each of the imported years. 

As for the previous task, we need to specify a dependency to get all the datacubes PIDs from the previous task. In addition, we have to set the ```argument``` parameter to ```cubes``` so that the value of the *cubes* parameter for the *oph_mergecubes* operator will be set to a list of pipe-separated PIDs retrieved from the "End loop" task.

In [None]:
t6 = e2.newTask(name="Merge",
                type="ophidia",
                operator='oph_mergecubes', 
                arguments={'description': 'Merged cube'}, 
                dependencies={t5:'cubes'})

<hr style="height:1px;border-top:1px solid #0000FF" />

**Perform a reduction operation**

Starting from the merged datacube, we can perform a reduction operation with respect to the implicit dimension (time).

We just need to define a datacube dependency between the **Reduce** task and the previous **Merge** task.

The reduced cube will contain the average value for the tasmax variable over the 2096-2100 JJA period for each point in the spatial domain.

In [None]:
t7 = e2.newTask(name="Reduce",
                type="ophidia",
                operator='oph_reduce', 
                arguments={'operation': 'avg', 
                           'dim': 'time',
                           'description': 'Reduced cube'},
                dependencies={t6:'cube'})

<hr style="height:1px;border-top:1px solid #0000FF" />

**Export the averaged datacube**

In a similar way, we can define an *\"Export\"* task that depends on the *\"Reduce\"* task to export data into a single NetCDF file.

In [None]:
t8 = e2.newTask(name="Export",
                type="ophidia",
                operator='oph_exportnc2', 
                arguments={'output': '/home/ophidia/notebooks/avg_JJA.nc'},
                dependencies={t7:'cube'})

<hr style="height:1px;border-top:1px solid #0000FF" />

**Empty workspace**

Finally, we can remove the container created by the first task of the workflow.

In [None]:
t9 = e2.newTask(name="Delete container",
                type="ophidia",
                operator='oph_deletecontainer', 
                arguments={'container': 'workflow', 
                           'force': 'yes'},
                dependencies={t8:''})

<hr style="height:1px;border-top:1px solid #0000FF" />

**Run workflow**

We can submit the workflow and monitor the experiment execution progress

In [None]:
w2 = Workflow(e2)
w2.submit()

In [None]:
w2.monitor(frequency=1, iterative=True, visual_mode=True)

<hr style="height:1px;border-top:1px solid #0000FF" />

Let's check exported file

In [None]:
! ls -lh /home/ophidia/notebooks/ | grep "\.nc"

<hr style="height:7px;border-top:2px solid #0000FF" />

### Parallel loop statement

The FOR operator used to configure the iterative block can also be executed in parallel to speed up the execution, if there is no dependency between the iteration branches.

In this case we set ```parallel``` argument to ```yes``` for parallel processing.

If this option is enabled, the engine, before executing the workflow, transforms it into an equivalent version in which iterative blocks are expanded into N independent sub-workflows, where N is the number of initial iterations. The new workflow is then executed taking into account the usual rules based on task dependencies.

For example in the previous definition for the "Start loop" task:

```
t2 = e3.newTask(name="Start loop",
                type="control",
                operator='for',
                arguments={"key": "year", "values": "2096|2097|2098|2099|2100", 
                           'parallel': 'yes'},
                dependencies={t1:''})
```




The resulting workflow with parallel loop is:

In [None]:
e3 = Experiment(
    name="Loop operations",
    author="CMCC",
    abstract="Perform some basics operations using workflows",
    exec_mode="sync",
    ncores="1",
    on_exit="oph_delete"
)
t1 = e3.newTask(name="Create container",
                type="ophidia",
                operator='oph_createcontainer',
                on_error='skip',
                arguments={'container': 'workflow',
                           'dim': 'lat|lon|time',
                           'dim_type': 'double|double|double',
                           'hierarchy': 'oph_base|oph_base|oph_time'})
t2 = e3.newTask(name="Start loop",
                type="control",
                operator='for',
                arguments={"key": "year", "values": "2096|2097|2098|2099|2100",
                           'parallel': 'yes'},
                dependencies={t1:''})
t3 = e3.newTask(name="Import",
                type="ophidia",
                operator='oph_importnc',
                arguments={'measure': 'tasmax',
                           'container': 'workflow',
                           'import_metadata': 'yes',
                           'imp_dim': 'time', 
                           'imp_concept_level': 'd',
                           'hierarchy': 'oph_base|oph_base|oph_time',
                           'description': 'Max Temp @{year}', 
                           'input': '/home/ophidia/notebooks/tasmax_day_CMCC-CESM_rcp85_r1i1p1_@{year}0101-@{year}1231.nc'},
                dependencies={t2:''})
t4 = e3.newTask(name="Subset",
                type="ophidia",
                operator='oph_subset', 
                arguments={'subset_filter': 'JJA', 
                           'subset_dims': 'time', 
                           'subset_type': 'coord', 
                           'description': 'JJA @{year}'},
                dependencies={t3:'cube'})
t5 = e3.newTask(name="End loop",
                type="control",
                operator='endfor',
                arguments={},
                dependencies={t4:'cube'})
t6 = e3.newTask(name="Merge",
                type="ophidia",
                operator='oph_mergecubes', 
                arguments={'description': 'Merged cube'}, 
                dependencies={t5:'cubes'})
t7 = e3.newTask(name="Reduce",
                type="ophidia",
                operator='oph_reduce', 
                arguments={'operation': 'avg', 
                           'dim': 'time',
                           'description': 'Reduced cube'},
                dependencies={t6:'cube'})
t8 = e3.newTask(name="Export",
                type="ophidia",
                operator='oph_exportnc2', 
                arguments={'output': '/home/ophidia/notebooks/avg_JJA_parallel.nc'},
                dependencies={t7:'cube'})
t9 = e3.newTask(name="Delete container",
                type="ophidia",
                operator='oph_deletecontainer', 
                arguments={'container': 'workflow', 
                           'force': 'yes'},
                dependencies={t8:''})

<hr style="height:1px;border-top:1px solid #0000FF" />

**Run workflow**

We can submit the workflow using the **submit** method again

*Note the execution time.*

In [None]:
w3 = Workflow(e3)
w3.submit()

<hr style="height:1px;border-top:1px solid #0000FF" />

We can check the experiment execution graph

In [None]:
w3.monitor(frequency=1, iterative=True, visual_mode=True)

<hr style="height:1px;border-top:1px solid #0000FF" />

Let's check exported file

In [None]:
! ls -lh /home/ophidia/notebooks/ | grep "\.nc"

<hr style="height:7px;border-top:2px solid #0000FF" />

###  3. Workflows: Selection interface

The Selection interface provides further flexibility by enabling the Workflow manager to execute one or more tasks based on boolean conditions that could be checked at run-time and depend on input parameters, data, metadata, etc.

The development of the Selection interface involves specific control operators:
 - IF
 - ELSEIF
 - ELSE
 - ENDIF

Similarly to other flow control operators, they does not process data or metadata directly, but they could be adopted to enable (or to skip) the execution of a set of tasks based on run-time conditions.

In the following workflow, we'll consider a selection statement with two selection blocks.

<img src="imgs/3_Selection_Interface.svg" alt="selection_interface" width="800">



**Workflow global attributes**

In [None]:
e4 = Experiment(
    name="Selection Interface",
    author="CMCC",
    abstract="Selection statement with two selection blocks",
    exec_mode="sync",
    ncores="1"
)

<hr style="height:1px;border-top:1px solid #0000FF" />

**IF block**

The selection interface is used to code two possible implementations of a task that imports data into the Ophidia platform from an external source:
 1. import only the subset from the input file
 2. import all the dataset and then extract a data subset

The actual implementation to be adopted is selected by means of the input parameter ```$1```: a numerical non-zero value for option A, 0 for option B.

In [None]:
t1 = e4.newTask(name="IF block",
                type="control",
                operator='if',
                arguments={'condition': '$1'})

<hr style="height:1px;border-top:1px solid #0000FF" />

**CASE A: Import only the subset from the input file**

In general, the set of tasks belonging to the branch that begins from IF and ends to ENDIF is the sub-workflow to be executed in case the condition set for IF is satisfied.

In our example, there is only one task, named *\"Import and subset\"*, which is related to the *oph_importnc* operator and has a flow dependency from the "IF block" task.

The *input* and the *measure* arguments will be set according to the second and third workflow input arguments (```$2``` and ```$3```).

To import only a subset from the input file we have to specify in addition the following parameters:
- **subset_dims**: the dimension names used for the subsetting;
- **subset_type=coord** so that the filter is considered on dimension values;
- **subset_filter**: list of pipe-separated filters associated to each dimension specified in *subset_dims* (set according to the fourth workflow input argument ```$4```).



In [None]:
t2 = e4.newTask(name="Import and subset",
                type="ophidia",
                operator='oph_importnc',
                arguments={'measure': '$3', 
                           'import_metadata': 'yes',
                           'imp_dim': 'time', 
                           'imp_concept_level': 'd',
                           'hierarchy': 'oph_base|oph_base|oph_time',
                           'description': 'Max Temp imported and subsetted',
                           'subset_dims': 'lat|lon|time',
                           'subset_filter': '$4',
                           'subset_type': 'coord',
                           'time_filter': 'no',
                           'input': '/home/ophidia/notebooks/$2'},
                dependencies={t1:''})

<hr style="height:1px;border-top:1px solid #0000FF" />

**ELSE block**

The task with the ELSE operator has to be a child of the task with the IF operator. It has no arguments: it simply starts the last sub-block of a selection block "if".

In [None]:
t3 = e4.newTask(name="ELSE block",
                type="control",
                operator='else',
                arguments={},
                dependencies={t1:''})

<hr style="height:1px;border-top:1px solid #0000FF" />

**CASE B: import all the dataset and then extract a data subset**

The set of tasks belonging to the branch that begins from ELSE and ends to ENDIF is the sub-workflow to be executed in case the condition set for IF is not satisfied.

In our example, we have two tasks:
- the first one, **\"Import data\"**, is related to the *oph_importnc* operator and is child of the task with the "ELSE" operator.
- the second one,**\"Subset data\"**, is related to the *oph_subset* operator and has a dependency from the "Import data" task since the input datacube to be subsetted is the datacube generated from the import task.

In [None]:
t4 = e4.newTask(name="Import data",
                type="ophidia",
                operator='oph_importnc',
                arguments={'measure': '$3', 
                           'import_metadata': 'yes',
                           'imp_dim': 'time', 
                           'imp_concept_level': 'd',
                           'hierarchy': 'oph_base|oph_base|oph_time',
                           'description': 'Max Temp imported',
                           'input': '/home/ophidia/notebooks/$2'},
                dependencies={t3:''})

In [None]:
t5 = e4.newTask(name="Subset data",
                type="ophidia",
                operator='oph_subset', 
                arguments={'subset_dims': 'lat|lon|time', 
                           'subset_filter': '$4', 
                           'subset_type': 'coord', 
                           'description': 'Max Temp subsetted'},
                dependencies={t4:'cube'})

<hr style="height:1px;border-top:1px solid #0000FF" />

**ENDIF block**

The *endif* operator simply closes a selection block "if".

If we want to gather the PID of the output datacube produced in each of the two branches, we have to specify a dependency from both final tasks (*\"Subset data\"* and *\"Import and subset\"*) of each sub-workflow.

In [None]:
t6 = e4.newTask(name="Selection block end",
                type="control",
                operator='endif',
                arguments={},
                dependencies={t2:'', t5:''})

<hr style="height:1px;border-top:1px solid #0000FF" />

Let's define the workflow arguments:
- *if condition* ```$1```
- *NetCDF filename* ```$2```
- *variable* to be imported ```$3```
- *subset filter* (lat|lon|time) ```$4```

In [None]:
condition=0
file="tasmax_day_CMCC-CESM_rcp85_r1i1p1_20960101-21001231.nc"
variable="tasmax"
subset="-50:10|20:140|150:240"

<hr style="height:1px;border-top:1px solid #0000FF" />

Let's run the workflow

In [None]:
w4 = Workflow(e4)
w4.submit(condition, file, variable, subset)

<hr style="height:1px;border-top:1px solid #0000FF" />

and monitor it

In [None]:
w4.monitor(frequency=1, iterative=True, visual_mode=True)

<hr style="height:1px;border-top:1px solid #0000FF" />

Check produced datacube. As we can note:
- if **condition** equal **1** ---> datacube is imported and subsetted at the same time
- else ---> datacube is first imported, then subsetted
    

In [None]:
cube.Cube.list(level=2)

<hr style="height:1px;border-top:1px solid #0000FF" />

Check the subsetted datacube

In [None]:
subsetted_cube = cube.Cube(pid='http://127.0.0.1/ophidia/.../...')
subsetted_cube.info()

<hr style="height:1px;border-top:1px solid #0000FF" />

and delete the container

In [None]:
cube.Cube.deletecontainer(container=file,force='yes')

<hr style="height:7px;border-top:2px solid #0000FF" />

# Handling task errors

The Workflow manager provides also support for handling errors in task executions.

Four behaviours are supported by setting the proper value in the ```on_error``` argument:
- *skip*: the task is skipped and execution continues on the descendant tasks
- *continue*: the task and all depending task will be ignored, while other tasks will be executed
- *abort*: the workflow is interrupted (default)
- *repeat N*: the task is re-executed N times

In all the previous examples the default behaviour of interrupting the workflow in case of errors (*abort*) was used.

Note that the behaviour can be defined both at general workflow level and at the task level.

We can now define a workflow where some of the tasks are expected to fail.  In particular, we want to import NetCDF files related to different years and compute the average on the time dimensions. Let's suppose we try to access a file that does not exist.

<hr style="height:1px;border-top:1px solid #0000FF" />

Here we define the global workflow attributes and the ```on_error``` argument through the ```$1``` variable to control the general workflow execution.

In [None]:
e5 = Experiment(
    name="error_handling",
    author="CMCC",
    abstract="Perform some basics operations using workflows",
    exec_mode="sync",
    ncores="1",
    on_error="$1"
)

<hr style="height:1px;border-top:1px solid #0000FF" />

We define then the first two tasks: "Create container" and "Start loop". 

**Note** that the ```values``` argument points to year (*2095*) for which we do not have any file.

In [None]:
t1 = e5.newTask(name="Create container",
                type="ophidia",
                operator='oph_createcontainer',
                arguments={'container': 'workflow',
                           'dim': 'lat|lon|time',
                           'dim_type': 'double|double|double',
                           'hierarchy': 'oph_base|oph_base|oph_time'})
t2 = e5.newTask(name="Start loop",
                type="control",
                operator='for',
                arguments={"key": "year", "values": "2095|2096|2097", 
                           'parallel': 'yes'},
                dependencies={t1:''})

<hr style="height:1px;border-top:1px solid #0000FF" />

Let's now specify the import operator. Here we are defining a specific ```on_error``` behaviour at the task level, with ```$2```, which supersedes the one define at the global level.

In [None]:
t3 = e5.newTask(name="Import",
                type="ophidia",
                operator='oph_importnc',
                on_error='$2',
                arguments={'measure': 'tasmax', 
                           'container': 'workflow',
                           'import_metadata': 'yes',
                           'imp_dim': 'time', 
                           'imp_concept_level': 'd',
                           'hierarchy': 'oph_base|oph_base|oph_time',
                           'description': 'Max Temp @{year}',
                           'input': '/home/ophidia/notebooks/tasmax_day_CMCC-CESM_rcp85_r1i1p1_@{year}0101-@{year}1231.nc',},
                dependencies={t2:''})

<hr style="height:1px;border-top:1px solid #0000FF" />

and then specify the remaining tasks

In [None]:
t4 = e5.newTask(name="Reduce",
                type="ophidia",
                operator='oph_reduce', 
                arguments={'operation': 'avg',
                           'dim': 'time',
                           'description': 'Reduced cube'},
                dependencies={t3:'cube'})
t5 = e5.newTask(name="End loop year",
                type="control",
                operator='endfor',
                arguments={},
                dependencies={t4:'cube'})

<hr style="height:1px;border-top:1px solid #0000FF" />

We can try to run it with the default behaviour and see what happens...

In [None]:
w5 = Workflow(e5)
w5.submit('abort','abort')

In [None]:
w5.monitor(frequency=1, iterative=True, visual_mode=True)

<hr style="height:1px;border-top:1px solid #0000FF" />

We could then set a specific behaviour for the whole workflow to skip a failed task

In [None]:
w5 = Workflow(e5)
w5.submit('skip','skip')

In [None]:
w5.monitor(frequency=1, iterative=True, visual_mode=True)

<hr style="height:1px;border-top:1px solid #0000FF" />

Or define a specific one for the import operator we expect to fail

In [None]:
w5 = Workflow(e5)
w5.submit('skip','continue')

In [None]:
w5.monitor(frequency=1, iterative=True, visual_mode=True)

<hr style="height:1px;border-top:1px solid #0000FF" />

List the cubes created

In [None]:
cube.Cube.list(level=2)

<hr style="height:1px;border-top:1px solid #0000FF" />

Before running the last example, empty the workspace

In [None]:
cube.Cube.deletecontainer(container="workflow",force='yes')

<hr style="height:7px;border-top:2px solid #0000FF" />

## Compute the Heat Waves Duration Indices as a workflow of operators

Let's now make a workflow of Ophidia operators to include the computation of some Heat Waves Duration Indices:
- the **HWD (Heat Wave Duration)**: starting from the daily maximum temperature TSMX, the Heat Wave Duration index is the maximum number of days at intervals of at least 6 days with $TSMX > 5°C$ + average calculated for each calendar day (based on 20 years 1980 - 2000) using a current 5-day window
- the **HWF (Heat Wave Frequency)**: the number of days that contribute to heatwaves in a year
- the **HWN (Heat Wave Number)**: the number of heatwaves in a year

<hr style="height:1px;border-top:1px solid #0000FF" />

The task list includes the following tasks:

1. **Create container**
 - the oph_createcontainer operator creates a container called "Heatwaves"

2. **Import climatological mean**
 - the climatological mean represents the average computed for each calendar day (based on 20 years)
 - the input NetCDF data set located at ```input``` (set to the last workflow input parameter) is imported into the Ophidia platform, with maximum temperature in °K (see http://ophidia.cmcc.it/documentation/users/operators/OPH_IMPORTNC.html)
 - the ```measure``` is set to *tasmax*
 - data is arranged in order to operate on time series (as indicated by the ```imp_dim``` parameter)
 - the task has a dependency from the previous task. 
 
3. **Import**
 - the input NetCDF data set located at ```input``` is imported into the Ophidia platform, with maximum temperature in °K and represent the new year on which compute the indicators
 - the task has a dependency from the **Create container** task and it is executed in parallel with the second task
 
4. **Intercube**
 - the *oph_intercube* operator (see http://ophidia.cmcc.it/documentation/users/operators/OPH_INTERCUBE.html) is used to subtract the elements of the second cube from the first cube one by one
 - the task has a dependency from the **Import climatological mean** task and another from the **Import** task
 

5. **Apply**
 - the *oph_apply* operator (see http://ophidia.cmcc.it/documentation/users/operators/OPH_APPLY.html) is used to compute a sequence of operations in order to identifies the Heat Wave durations: $\{day \mid TSMX(day) > 5°C\}$ 
 - a dependency from **Intercube** task
 
6. **Reduce**
 - the *oph_reduce* operator (see http://ophidia.cmcc.it/documentation/users/operators/OPH_REDUCE.html) is used with ```operation=max``` and ```dim=time``` to extract the maximum duration (**HWDI**)
 - a dependency from the previous task
 
7. **Apply for HWN**
 - the *oph_apply* operator is used for basically creating a mask by using the *oph_predicate* primitive (see http://ophidia.cmcc.it/documentation/users/primitives/OPH_PREDICATE.html) setting to 1 the durations greater than 0
 - a dependency from **Reduce** task
 
8. **Reduce for HWN**
 - the *oph_reduce2* operator is used with ```operation=sum``` and ```dim=time``` to count the days of heatwaves in a year (**HWN**)
 - a dependency from the previous task
 
9. **Reduce for HWF**
 - the *oph_reduce2* operator is used with ```operation=sum``` and ```dim=time``` to sum the heatwaves durations
 - a dependency from the **Reduce** task
 
10. **Apply for HWF**
 - the *oph_apply* operator is used to divide the sum of durations by 365 using the *oph_mul_scalar* primitive (see http://ophidia.cmcc.it/documentation/users/primitives/OPH_MUL_SCALAR.html) to identify the **HWF** index
 - a dependency from **Reduce for HWF** task

In [None]:
e6 = Experiment(
    name="Heat Waves",
    author="CMCC",
    abstract="Perform the computation of Heat Waves indices using workflows",
    exec_mode="sync",
    ncores="1",
)
t1 = e6.newTask(name="Create container",
                type="ophidia",
                operator='oph_createcontainer',
                on_error='skip',
                arguments={'container': 'heatwaves',
                           'dim': 'lat|lon|time',
                           'dim_type': 'double|double|double',
                           'hierarchy': 'oph_base|oph_base|oph_time'})
t2 = e6.newTask(name="Import climatological mean",
                type="ophidia",
                operator='oph_importnc',
                arguments={'measure': 'tasmax',
                           'container': 'heatwaves',
                           'import_metadata': 'yes',
                           'imp_dim': 'time', 
                           'imp_concept_level': 'd',
                           'hierarchy': 'oph_base|oph_base|oph_time',
                           'description': 'Max Temp climatological mean', 
                           'input': '/home/ophidia/notebooks/climatological_mean.nc'},
                dependencies={t1:''})
t3 = e6.newTask(name="Import",
                type="ophidia",
                operator='oph_importnc',
                arguments={'measure': 'tasmax',
                           'container': 'heatwaves',
                           'import_metadata': 'yes',
                           'imp_dim': 'time', 
                           'imp_concept_level': 'd',
                           'hierarchy': 'oph_base|oph_base|oph_time',
                           'description': 'Max Temp year 2100', 
                           'input': '/home/ophidia/notebooks/tasmax_day_CMCC-CESM_rcp85_r1i1p1_21000101-21001231.nc'},
                dependencies={t1:''})
t4 = e6.newTask(name="Intercube",
                type="ophidia",
                operator='oph_intercube', 
                arguments={'operation': 'sub', 
                           'description': 'Result from intercube'},
                dependencies={t2:'cube2', t3:'cube'})
t5 = e6.newTask(name="Apply",
                type="ophidia",
                operator='oph_apply',
                arguments={'query': "oph_predicate('OPH_INT','OPH_INT',oph_sequence('OPH_INT','OPH_INT', oph_predicate('OPH_FLOAT','OPH_INT',oph_predicate('OPH_FLOAT','OPH_FLOAT',measure,'x-100','>0','0','x'),'x-5','>0','1','0'), 'length', 'yes'),'x-5','>0','x','0')",
                           'description': 'Heat Wave Duration cube'},
                dependencies={t4:'cube'})
t6 = e6.newTask(name="Reduce",
                type="ophidia",
                operator='oph_reduce', 
                arguments={'operation': 'max', 
                           'dim': 'time', 
                           'description': 'Heat Wave Duration Index cube'}, 
                dependencies={t5:'cube'})
t7 = e6.newTask(name="Apply for HWN",
                type="ophidia",
                operator='oph_apply', 
                arguments={'query': "oph_predicate('OPH_INT','OPH_INT',measure,'x','>0','1','0')",
                           'description': 'Apply for HWN cube'},
                dependencies={t5:'cube'})
t8 = e6.newTask(name="Reduce for HWN",
                type="ophidia",
                operator='oph_reduce2', 
                arguments={'operation': 'sum',
                           'dim': 'time', 
                           'description': 'Heat Wave Number cube'},
                dependencies={t7:'cube'})
t9 = e6.newTask(name="Reduce for HWF",
                type="ophidia",
                operator='oph_reduce2', 
                arguments={'operation': 'sum',
                           'dim': 'time', 
                           'description': 'Reduce for HWF cube'},
                dependencies={t5:'cube'})
t10 = e6.newTask(name="Apply for HWF",
                type="ophidia",
                operator='oph_apply', 
                arguments={'query': "oph_mul_scalar('OPH_INT', 'OPH_FLOAT', measure,"+ str(1/365) +")",
                           'description': 'Heat Wave Frequency cube'},
                dependencies={t9:'cube'})

<hr style="height:1px;border-top:1px solid #0000FF" />

Submit the workflow...

In [None]:
w6 = Workflow(e6)
w6.submit()

<hr style="height:1px;border-top:1px solid #0000FF" />

...and check the experiment execution progress

In [None]:
w6.monitor(frequency=1, iterative=True, visual_mode=True)

<hr style="height:1px;border-top:1px solid #0000FF" />

List the cubes created

In [None]:
cube.Cube.list(level=2)

<hr style="height:1px;border-top:1px solid #0000FF" />

Get PID of 'Heat Wave Duration Index cube'

In [None]:
HWDI = cube.Cube(pid='http://127.0.0.1/ophidia/.../...')

<hr style="height:1px;border-top:1px solid #0000FF" />

Plot a map of HWDI

In [None]:
%matplotlib inline
import cartopy.crs as ccrs
import matplotlib.pyplot as plt
from cartopy.mpl.geoaxes import GeoAxes
from cartopy.util import add_cyclic_point
import numpy as np
import warnings
warnings.filterwarnings("ignore")

fig = plt.figure(figsize=(15, 6), dpi=100)

#Add Geo axes to the figure with the specified projection (PlateCarree)
projection = ccrs.PlateCarree()
ax = plt.axes(projection=projection)

#Draw coastline and gridlines
ax.coastlines()

gl = ax.gridlines(crs=projection, draw_labels=True, linewidth=1, color='black', alpha=0.9, linestyle=':')
gl.xlabels_top = False
gl.ylabels_right = False

data = HWDI.export_array(show_time='yes')
lat = data['dimension'][0]['values'][ : ]
lon = data['dimension'][1]['values'][ : ]
var = data['measure'][0]['values'][ : ]
var = np.reshape(var, (len(lat), len(lon)))

#Wraparound points in longitude
var_cyclic, lon_cyclic = add_cyclic_point(var, coord=np.asarray(lon))
x, y = np.meshgrid(lon_cyclic,lat)

#Define color levels for color bar
levStep = (np.nanmax(var)-np.nanmin(var))/20
clevs = np.arange(np.nanmin(var),np.nanmax(var)+levStep,levStep)

#Set filled contour plot
cnplot = ax.contourf(x, y, var_cyclic, clevs, transform=projection,cmap=plt.cm.Oranges)
plt.colorbar(cnplot,ax=ax)

ax.set_aspect('auto', adjustable=None)

plt.title('HWDI (year 2100)')
plt.show()

<hr style="height:1px;border-top:1px solid #0000FF" />

Empty the workspace

In [None]:
cube.Cube.deletecontainer(container="heatwaves",force='yes')

The virtual file system should now be \"clean\"