# Create Kale Workflows from Fireworks Workflows

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import itertools as it
import yaml

In [3]:
from fireworks import Firework, Workflow, LaunchPad, ScriptTask
from fireworks.core.rocket_launcher import rapidfire
import fireworks

In [4]:
from kale import workflow_objects as wo
from kale import workflow_widgets as ww

In [5]:
# set up the LaunchPad and reset it
fwconfig_file = '/opt/conda/lib/python3.6/site-packages/my_launchpad.yaml'
with open(fwconfig_file) as param_file:
    params = yaml.load(param_file)
launchpad = LaunchPad(**params)
#launchpad = LaunchPad()

In [6]:
launchpad.reset('', require_password=False)

2018-01-24 19:01:48,573 INFO Performing db tune-up
2018-01-24 19:01:48,596 INFO LaunchPad was RESET.


In [7]:
def sort_fw_links(links):
    """Use NetworkX to topologically sort tasks
    so that they can be created and added to the workflow
    """

In [8]:
def kale_from_fw(fw_wf):
    """Create Kale Workflow from Fireworks Workflow.
    Currently only implemented for ScriptTasks."""
    fireworks = fw_wf.fws
    kale_wf = wo.Workflow(fw_wf.name)
    
    # This dict will be used to index tasks by fw_id
    kale_tasks = dict()
    
    # Create tasks
    for fw in fireworks:
        # FireWorks are composed of FireTasks
        # We assume that FireTasks are all run simultaneously,
        # so they don't depend on one another.
        # All FireTasks within a FireWork will have the same
        # dependencies and children.
        fw_tasks = []
        for ft_num, ft in enumerate(fw.to_dict()['spec']['_tasks']):
            # Create Kale Task
            if ft['_fw_name'] == 'ScriptTask':
                kale_task = wo.CommandLineTask(
                    command=''.join(ft['script']),
                    name="{}_{}".format(fw.name, ft_num)
                )
            else:
                raise NotImplementedError("Only ScriptTasks are supported now.")
                
            # Add to workflow without dependencies
            kale_wf.add_task(kale_task)
            
            # Save tasks in this firework
            fw_tasks.append(kale_task)
                
        # Save this set of firetasks by firework ID.
        kale_tasks[fw.fw_id] = fw_tasks
    
    # Link tasks
    for parent_id, child_ids in wf.links.items():
        parent_task_list = kale_tasks[parent_id]
        for child_id in child_ids:
            child_task_list = kale_tasks[child_id]
            # Set all elements of child_task_list to depend on all elements of parent_task_list.
            for child_task, parent_task in it.product(child_task_list, parent_task_list):
                kale_wf.add_dependencies(child_task, [parent_task])
    
    return kale_wf

In [9]:
wpw = ww.WorkerPoolWidget()
wpw.add_pool('fw_pool', 4, 'fireworks')
wpw

In [10]:
# create the individual FireWorks and Workflow
fw1 = Firework(
    [
        ScriptTask.from_str(
            'echo "hello $(date)" >> ~/tmpmsg'
        ), 
        ScriptTask.from_str(
            'echo "hi $(date)" >> ~/tmpmsg'
        ), 
    ],
    name="hello"
)
fw2 = Firework(
    ScriptTask.from_str(
        'echo "goodbye $(date)" >> ~/tmpmsg'
    ), 
    name="goodbye"
)
fw3 = Firework(
    ScriptTask.from_str(
        'echo "" >> ~/tmpmsg'
    ), 
    name="blank"
)

wf = Workflow(
    [fw1, fw2, fw3],
    {fw1:fw2, fw2: fw3},
    name="test workflow"
)

In [11]:
kale_wf = kale_from_fw(wf)

Adding deps: <kale.workflow_objects.CommandLineTask object at 0x7f8a652dc978> <- [<kale.workflow_objects.CommandLineTask object at 0x7f8a652dc8d0>]
Adding deps: <kale.workflow_objects.CommandLineTask object at 0x7f8a652dc978> <- [<kale.workflow_objects.CommandLineTask object at 0x7f8a652dc7f0>]
Adding deps: <kale.workflow_objects.CommandLineTask object at 0x7f8a652dc940> <- [<kale.workflow_objects.CommandLineTask object at 0x7f8a652dc978>]


In [12]:
w = ww.WorkflowWidget(kale_wf, wpw)
w


Adding task hello_1
Adding link hello_1 -> goodbye_0
Adding task blank_0


In [None]:
wf.links

# Run workflow with FireWorks alone

In [59]:
# store workflow and launch it locally
launchpad.add_wf(wf)
rapidfire(launchpad)

2018-01-24 19:38:26,761 INFO Added a workflow. id_map: {-3: 1, -2: 2, -1: 3}
2018-01-24 19:38:27,120 INFO Created new dir /opt/kale/kale/examples/notebooks/fireworks/launcher_2018-01-24-19-38-27-120477
2018-01-24 19:38:27,121 INFO Launching Rocket
2018-01-24 19:38:27,150 INFO RUNNING fw_id: 3 in directory: /opt/kale/kale/examples/notebooks/fireworks/launcher_2018-01-24-19-38-27-120477
2018-01-24 19:38:27,162 INFO Task started: ScriptTask.
2018-01-24 19:38:27,257 INFO Task completed: ScriptTask 
2018-01-24 19:38:27,267 INFO Task started: ScriptTask.
2018-01-24 19:38:27,279 INFO Task completed: ScriptTask 
2018-01-24 19:38:27,305 INFO Rocket finished
2018-01-24 19:38:27,311 INFO Created new dir /opt/kale/kale/examples/notebooks/fireworks/launcher_2018-01-24-19-38-27-311528
2018-01-24 19:38:27,312 INFO Launching Rocket
2018-01-24 19:38:27,332 INFO RUNNING fw_id: 2 in directory: /opt/kale/kale/examples/notebooks/fireworks/launcher_2018-01-24-19-38-27-311528
2018-01-24 19:38:27,345 INFO Tas

In [70]:
cat ~/tmpmsg

hello Wed Jan 24 19:38:27 UTC 2018
hi Wed Jan 24 19:38:27 UTC 2018
goodbye Wed Jan 24 19:38:27 UTC 2018



In [67]:
ls -lrth ~

total 8.0K
drwxr-xr-x 3 jovyan  1000 4.0K Dec 28 06:20 [0m[01;34mwork[0m/
lrwxrwxrwx 1 jovyan users    9 Jan 24 18:02 [01;36mkale[0m -> [01;34m/opt/kale[0m/
-rw-r--r-- 1 jovyan users  105 Jan 24 19:38 tmpmsg


In [69]:
!date

Wed Jan 24 19:43:04 UTC 2018


# Custom Kale LaunchPad

In [None]:
def KaleRocketLauncher(fw.core.rocket_)

In [None]:
class KaleLaunchPad(fw.core.launchpad):
    
    def add_wf(self, wf, reassign_all=True):
        """
        Add workflow(or firework) to the launchpad. The firework ids will be reassigned.

        Args:
            wf (Workflow/Firework)

        Returns:
            dict: mapping between old and new Firework ids
        """
        if isinstance(wf, Firework):
            wf = Workflow.from_Firework(wf)
        # sets the root FWs as READY
        # prefer to wf.refresh() for speed reasons w/many root FWs
        for fw_id in wf.root_fw_ids:
            wf.id_fw[fw_id].state = 'READY'
            wf.fw_states[fw_id] = 'READY'
        # insert the FireWorks and get back mapping of old to new ids
        old_new = self._upsert_fws(list(wf.id_fw.values()), reassign_all=reassign_all)
        # update the Workflow with the new ids
        wf._reassign_ids(old_new)
        # insert the WFLinks
        self.workflows.insert_one(wf.to_db_dict())
        self.m_logger.info('Added a workflow. id_map: {}'.format(old_new))
        return old_new

