In [1]:
import random
from parsl import *
import pprint
import json

workers = ThreadPoolExecutor(max_workers=4)
dfk = DataFlowKernel(executors=[workers])

In [2]:
import time    
@App('python', dfk)
def sleep_fail(sleep_dur, sleep_rand_max, fail_prob, inputs=[]):
    import time
    import random

    s = sleep_dur + random.randint(-sleep_rand_max, sleep_rand_max)
    print("Sleeping for : ", s)
    time.sleep(s)
    x = float(random.randint(0,100)) / 100
    if x <= fail_prob :
        print("Fail")
        raise Exception("App failure")
    else:
        print("Succeed")

@App('python', dfk)
def test1 (numtasks ) :
    fus = []
    for i in range(0,10):

        fu = sleep_fail(0, 0, .1)
        fus.extend([fu])


    count = 0
    for fu in fus :
        try:
            x = fu.result()
        except Exception as e:
            count += 1
            print("Caught fail ")

    print("Caught failures of  {0}/{1}".format(count, len(fus)))

@App('python', dfk)
def test_deps (numtasks) :

    '''
    App1   App2  ... AppN
    '''
    fus = []
    for i in range(0,numtasks):
        fu = sleep_fail(1, 0, .4)
        fus.extend([fu])

    '''
    App1   App2  ... AppN
    |       |        |
    V       V        V
    App1   App2  ... AppN
    '''

    fus_2 = []
    for fu in fus:
        fu = sleep_fail(0, 0, .1, inputs=[fu])
        fus_2.extend([fu])

    '''
    App1   App2  ... AppN
      |       |        |
      V       V        V
    App1   App2  ... AppN
       \      |       /
        \     |      /
          App_Final
    '''
    fu_final = sleep_fail(1, 0, 0, inputs=fus_2)


    print("Final status : ", fu_final.result())
    time.sleep(1)

In [3]:
test_deps(15)

Sleeping for : Sleeping for : Sleeping for : 

<AppFuture at 0x107bac400 state=running>

   111




In [4]:
from parsl.dataflow.states import States
from IPython.core.display import Javascript, HTML, Javascript, display
import json
from threading import Thread
import time

class DFKListener():
    def __init__(self, dfk):
        self.dfk = dfk
        self.nodes_list = []
        self.edges_list = []
        self.color_lookup_table = {
                     States.unsched : "Pink",
                      States.pending : "Yellow",
                      States.runnable: "Blue",
                      States.running : "Lime",
                      States.done: "Green",
                      States.failed  : "Red",
                      States.dep_fail: "orange"
        }
        self.state_lookup_table = {
                     States.unsched : "Unscheduled",
                      States.pending : "Pending",
                      States.runnable: "Runnable",
                      States.running : "Running",
                      States.done: "Finished",
                      States.failed  : "Failed",
                      States.dep_fail: "Dependency Failure"
        }


    @property
    def nodes(self):
        return self.nodes_list
        
    @property
    def edges(self):
        return self.edges_list
    
    
    def create_nodes(self, dfke=None):
        dfk = dfke if dfke else self.dfk
        for task in dfk.tasks:
            state = dfk.tasks[task]['status']
            if state == States.done:
                if dfk.tasks[task]["exec_fu"]._exception is not None:
                    state = States.failed
            tid = dfk.tasks[task]['app_fu'].tid
            fname = dfk.tasks[task]['func'].__name__
            task = {"id": tid, "label": "{}\n{}".format(str(tid),fname), "color": self.color_lookup_table[state], "font": {"background": "white"}, "title": self.state_lookup_table[state]}
            self.nodes_list.append(task)
            self.nodes_list = list({v['id']:v for v in self.nodes_list}.values())
        return self.nodes_list
    
    def create_edges(self, dfke=None):
        dfk = dfke if dfke else self.dfk
        for task in dfk.tasks:
            deps = state = dfk.tasks[task]['depends']
            for dep in deps:
                edge = {"from" : dep.tid, "to": task, "arrows":'to'}
                self.edges_list.append(edge)
        return self.edges
    
    def update(self):
        self.nodes_list = []
        self.edges_list = []
        self.create_nodes()
        self.create_edges()
        return json.dumps({"nodes":self.nodes,"edges":self.edges})
    
    def set_javascript(self):
        self.update()
        nodes = json.dumps(self.nodes)
        edges = json.dumps(self.edges)
        display(Javascript("""window.nodesList={};
                      window.edgesList={};
                   """.format(nodes,edges)))
    def show_window(self):
        display(Javascript("""require.config({paths: {vis: 'https://cdnjs.cloudflare.com/ajax/libs/vis/4.21.0/vis.min'}});
require(['vis'], function (vis){
    if (document.getElementById("mynetwork") == null) {
        element.append('<div id="mynetwork" style="width:950px;height:750px;border:1px solid lightgray;"></div>');
    }
    var nodes = new vis.DataSet(window.nodesList);
    var edges = new vis.DataSet(window.edgesList);
    var container = document.getElementById('mynetwork');
    var data = {
         nodes: nodes,
         edges: edges
     };
     var options = {interaction:{hover:true}, 
                   layout: {
                    hierarchical: {
                        direction: 'UD'
                    }}
                   };
     var network = new vis.Network(container, data, options);
     network.on("hoverNode", function (params) {
             
     });
     network.on("hoverEdge", function (params) {
            
     });
     network.on("blurNode", function (params) {
             
     });
     network.on("blurEdge", function (params) {
             
     });
});"""))
    
    def update_thread_handler(self, secs):
        while True:
            self.set_javascript()
            self.show_window()
            time.sleep(secs)

    def auto_updater(self, time):
        threads = []
        t = Thread(target=self.update_thread_handler, args=(time,))
        threads.append(t)
        t.start()
        

In [8]:
dfkl = DFKListener(dfk)

In [9]:
dfkl.auto_updater(10)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>