# Demonstrating Provenance for netcdf workflows.


In [None]:
import xarray
import netCDF4
import json

from dispel4py.workflow_graph import WorkflowGraph 
from dispel4py.provenance import *

import time
import random

from dispel4py.base import create_iterative_chain, ConsumerPE, IterativePE, SimpleFunctionPE

import matplotlib.pyplot as plt
import traceback

Simple Workflow, xarray in xarray out.

In [None]:


# project: CLIP_C
# authors: Alessandro and Andrej
# CLIPC combine function using dispel4py and running over jupyter

#print __dir__

class Read(GenericPE):
    
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input('input')
        self._add_output('xarray')

    def _process(self,inputs):
        self.log('read_process')
     
        self.log(inputs)

        ds = xarray.open_dataset(inputs['input'][0])
    
        self.write( 'xarray' , (ds , inputs['input'][1]) )

class Write(GenericPE):
    
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input('write')
        self._add_output('end')
        
    def _process(self,inputs):
        self.log('WriteFunction')
        
        inputs['write'][0].to_netcdf( inputs['write'][1] )
        
        self.write('end', inputs )
        
        
class Workflow(GenericPE):
        
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input('input')
        self._add_output('output')
        
    def _process(self,inputs):
        self.log('Workflow_process')
        
        #self.log(inputs)
        
        #nc = inputs['input'][0]
        nc = inputs['input'][0]
        #self.log(nc)
        
        #
        self.write('output', (nc , inputs['input'][1] ))


readX  = Read()
readX.name = 'collector'

wf     = Workflow()       
writeX = Write()

#Initialise the graph
def createWorkflowGraph():
    graph = WorkflowGraph()    
    graph.connect(readX ,'xarray'   , wf     ,'input')
    graph.connect(wf    ,'output'   , writeX , 'write')

    return graph

graph = createWorkflowGraph()

from dispel4py.visualisation import display

display(graph)

In [None]:
infile = 'input_simple.json'
with open(infile) as data_file:    
    input_data = json.load(data_file)

print infile                                                     
print input_data                   
                                                     
#Launch in simple process
simple_process.process_and_return(graph, input_data)

In [None]:
#Store via service
#ProvenancePE.REPOS_URL='http://127.0.0.1:8080/j2ep-1.0/prov/workflow/insert'
ProvenancePE.REPOS_URL='http://portal.verce.eu/j2ep-1.0/prov/workflow/insert'

#Store to local path
ProvenancePE.PROV_PATH='./prov-files/'

#Size of the provenance bulk before sent to storage or sensor
ProvenancePE.BULK_SIZE=20

#ProvenancePE.REPOS_URL='http://climate4impact.eu/prov/workflow/insert'
 

def createGraphWithProv():
    
    graph=createWf()
    #Location of the remote repository for runtime updates of the lineage traces. Shared among ProvenanceRecorder subtypes

    # Ranomdly generated unique identifier for the current run
    rid='JUP_SIMPLE_'+getUniqueId()

    
    # Finally, provenance enhanced graph is prepared:


    ##Initialise provenance storage in files:
    #profile_prov_run(graph,None,provImpClass=(ProvenancePE,),componentsType={'CorrCoef':(ProvenancePE,)},username='aspinuso',runId=rid,w3c_prov=False,description="provState",workflowName="test_rdwd",workflowId="xx",save_mode='file')
                  # skip_rules={'CorrCoef':{'ro':{'$lt':0}}})

    #Initialise provenance storage to service:
    profile_prov_run(graph,None,provImpClass=(ProvenancePE,),username='aspinuso',runId=rid,w3c_prov=False,description="provState",workflowName="test_rdwd",workflowId="xx",save_mode='service')
                   #skip_rules={'CorrCoef':{'ro':{'$lt':0}}})

    #clustersRecorders={'record0':ProvenanceRecorderToFileBulk,'record1':ProvenanceRecorderToFileBulk,'record2':ProvenanceRecorderToFileBulk,'record6':ProvenanceRecorderToFileBulk,'record3':ProvenanceRecorderToFileBulk,'record4':ProvenanceRecorderToFileBulk,'record5':ProvenanceRecorderToFileBulk}
    #Initialise provenance storage to sensors and Files:
    #profile_prov_run(graph,ProvenanceRecorderToFile,provImpClass=(ProvenancePE,),username='aspinuso',runId=rid,w3c_prov=False,description="provState",workflowName="test_rdwd",workflowId="xx",save_mode='sensor')
    #clustersRecorders=clustersRecorders)
    
    #Initialise provenance storage to sensors and service:
    #profile_prov_run(graph,ProvenanceRecorderToService,provImpClass=(ProvenancePE,),username='aspinuso',runId=rid,w3c_prov=False,description="provState",workflowName="test_rdwd",workflowId="xx",save_mode='sensor')
   
    #Summary view on each component
    #profile_prov_run(graph,ProvenanceTimedSensorToService,provImpClass=(ProvenancePE,),username='aspinuso',runId=rid,w3c_prov=False,description="provState",workflowName="test_rdwd",workflowId="xx",save_mode='sensor')
   
   
   
    #Configuring provenance feedback-loop
    #profile_prov_run(graph,ProvenanceTimedSensorToService,provImpClass=(ProvenancePE,),username='aspinuso',runId=rid,w3c_prov=False,description="provState",workflowName="test_rdwd",workflowId="xx",save_mode='sensor',feedbackPEs=['Source','MaxClique'])
   
   
    #Initialise provenance storage end associate a Provenance type with specific components:
    #profile_prov_run(graph,provImpClass=ProvenancePE,componentsType={'Source':(ProvenanceStock,)},username='aspinuso',runId=rid,w3c_prov=False,description="provState",workflowName="test_rdwd",workflowId="xx",save_mode='service')

    #
    return graph


graph=createGraphWithProv()

display(graph)