In [None]:
import sys
import cube, client, workflow, experiment

In [None]:
hosts="1"
cores="1"
threads="8"

In [None]:
cli = client.Client(read_env=True)
cube.Cube.setclient(cli)
workflow.Workflow.setclient(cli)

In [None]:
cube.Cube.cluster(action='undeploy',host_partition='partition',nhost=1,exec_mode='async')

In [None]:
exp = experiment.Experiment(
    name="Cold Spells",
    author="CMCC",
    abstract="Perform the computation of Cold Spells indexes using ESDM PAV",
    exec_mode="sync",
    host_partition="partition",
    ncores=cores)

t1 = exp.newTask(name="Create container",
                type="ophidia",
                operator='oph_createcontainer',
                on_error='skip',
                arguments={'container': 'coldspells',
                           'dim': 'lat|lon|time',
                           'dim_type': 'double|double|double',
                           'hierarchy': 'oph_base|oph_base|oph_time'})

t2 = exp.newTask(name="Import climate averages",
                type="ophidia",
                operator='oph_importnc2',
                arguments={'measure': 'tasmin',
                           'container': 'coldspells',
                           'import_metadata': 'yes',
                           'imp_dim': 'time', 
                           'imp_concept_level': 'd',
                           'hierarchy': 'oph_base|oph_base|oph_time',
                           'description': 'Min temperature climatological mean', 
                           'input': 'climatological_mean.nc',
                           'nhost': hosts, 
                           'nthreads': threads},)
    
tf0 = exp.newTask(name="Begin parallel for",
                type="control",
                operator='for',
                arguments={'input': '[future_tasmin_*.nc]',
                           'key': 'source',
                           'parallel': 'yes'},
                dependencies={t1:''})

tf1 = exp.newTask(name="Import",
                type="ophidia",
                operator='oph_importnc2',
                arguments={'input': '@source',
                           'measure': 'tasmin',
                           'container': 'coldspells',
                           'import_metadata': 'yes',
                           'imp_dim': 'time', 
                           'imp_concept_level': 'd',
                           'hierarchy': 'oph_base|oph_base|oph_time',
                           'description': 'Min temperature in current year', 'nhost': hosts, 'nthreads': threads},
                dependencies={tf0:''})

tf2 = exp.newTask(name="Intercube",
                type="ophidia",
                operator='oph_intercube', 
                arguments={'description': 'Result from intercube'},
                dependencies={tf1:'cube', t2:'cube2'})

tf3 = exp.newTask(name="Apply",
                type="ophidia",
                operator='oph_apply',
                arguments={'query': "oph_predicate('OPH_INT','OPH_INT',oph_sequence('OPH_INT','OPH_INT', oph_predicate('OPH_FLOAT','OPH_INT',oph_predicate('OPH_FLOAT','OPH_FLOAT',measure,'x-1000','>0','0','x'),'x+5','<0','1','0'), 'length', 'yes'),'x-5','>0','x','0')",
                           'description': 'Cold Spell Duration cube'},
                dependencies={tf2:'cube'})

t3 = exp.newTask(name="End parallel for",
                type="control",
                operator='endfor',
                arguments={},
                dependencies={tf3:'cube'})

t4 = exp.newTask(name="Merge",
                type="ophidia",
                operator='oph_mergecubes',
                arguments={'mode': 'a',
                           'hold_values': 'yes',
                           'order': 'source'},
                dependencies={t3:'cubes'})

t5 = exp.newTask(name="Reduce for CSD",
                type="ophidia",
                operator='oph_reduce2', 
                arguments={'operation': 'max', 
                           'dim': 'time', 
                           'description': 'Cold Spell Duration Index cube'}, 
                dependencies={t4:'cube'})

t5e = exp.newTask(name="Export CSD",
                type="ophidia",
                operator='oph_exportnc2', 
                arguments={'output': 'CSD.nc'},
                dependencies={t5:'cube'})

t6 = exp.newTask(name="Apply for CSN",
                type="ophidia",
                operator='oph_apply', 
                arguments={'query': "oph_predicate('OPH_INT','OPH_INT',measure,'x','>0','1','0')",
                           'description': 'Apply for CSN cube'},
                dependencies={t4:'cube'})

t7 = exp.newTask(name="Reduce for CSN",
                type="ophidia",
                operator='oph_reduce2', 
                arguments={'operation': 'sum',
                           'dim': 'time', 
                           'description': 'Cold Spell Number cube'},
                dependencies={t6:'cube'})

t7e = exp.newTask(name="Export CSN",
                type="ophidia",
                operator='oph_exportnc2', 
                arguments={'output': 'CSN.nc'},
                dependencies={t7:'cube'})

t8 = exp.newTask(name="Reduce for CSF",
                type="ophidia",
                operator='oph_reduce2', 
                arguments={'operation': 'sum',
                           'dim': 'time', 
                           'description': 'Reduce for CSF cube'},
                dependencies={t4:'cube'})

t9 = exp.newTask(name="Apply for CSF",
                type="ophidia",
                operator='oph_apply', 
                arguments={'query': "oph_mul_scalar('OPH_INT','OPH_FLOAT',measure,"+ str(1/365) +")",
                           'description': 'Cold Spell Frequency cube'},
                dependencies={t8:'cube'})

t9e = exp.newTask(name="Export CSF",
                type="ophidia",
                operator='oph_exportnc2', 
                arguments={'output': 'CSF.nc'},
                dependencies={t9:'cube'})

In [None]:
exp.check()

In [None]:
wf = workflow.Workflow(exp)

In [None]:
wf.submit()

In [None]:
%load_ext autoreload
%autoreload 2
workflow.Workflow.setclient(cli)
wf.monitor(frequency=1, iterative=True, visual_mode=True)

In [None]:
wf.build_provenance("coldspells", output_format="json", display=True)

In [None]:
cube.Cube.cluster(action='undeploy',host_partition='partition',exec_mode='async')