In [1]:
from DANE_utils import jobspec
from dummyhandler import DummyHandler
import json

def pprint(obj):
    print(json.dumps(json.loads(str(obj)), indent=4))

# Jobspec

In [2]:
job = jobspec.jobspec(source_url='http://127.0.0.1/example', source_id='ITM123', source_set='NISV',
                      tasks=jobspec.taskSequential(['DOWNLOAD', 'CV', 'ASR', 'INDEX', 'DELETE']),
                    metadata={'FOO': 'BAR'})

pprint(job)

{
    "source_url": "http://127.0.0.1/example",
    "source_id": "ITM123",
    "source_set": "NISV",
    "job_id": null,
    "tasks": {
        "taskSequential": [
            "DOWNLOAD",
            "CV",
            "ASR",
            "INDEX",
            "DELETE"
        ]
    },
    "metadata": {
        "FOO": "BAR"
    },
    "priority": 1,
    "response": {}
}


In [3]:
new_job = jobspec.jobspec.from_json(job.to_json())
pprint(new_job)

{
    "source_url": "http://127.0.0.1/example",
    "source_id": "ITM123",
    "source_set": "NISV",
    "job_id": null,
    "tasks": {
        "taskSequential": [
            "DOWNLOAD",
            "CV",
            "ASR",
            "INDEX",
            "DELETE"
        ]
    },
    "metadata": {
        "FOO": "BAR"
    },
    "priority": 1,
    "response": {}
}


## API

In [4]:
# Dummy endpoint so we can 'simulate' behaviour of workflow
dummy = DummyHandler()
new_job.set_api(dummy)

print(new_job.tasks[0].api) # instance, won't be serialised with job

<dummyhandler.DummyHandler object at 0x7febeb66d810>


In [5]:
# register job & all tasks
new_job.register()
pprint(new_job)

{
    "source_url": "http://127.0.0.1/example",
    "source_id": "ITM123",
    "source_set": "NISV",
    "job_id": "9e44026e-40c8-467c-9d89-ded7ef5dfb8d",
    "tasks": {
        "taskSequential": [
            {
                "Task": {
                    "task_key": "DOWNLOAD",
                    "task_id": "0"
                }
            },
            {
                "Task": {
                    "task_key": "CV",
                    "task_id": "1"
                }
            },
            {
                "Task": {
                    "task_key": "ASR",
                    "task_id": "2"
                }
            },
            {
                "Task": {
                    "task_key": "INDEX",
                    "task_id": "3"
                }
            },
            {
                "Task": {
                    "task_key": "DELETE",
                    "task_id": "4"
                }
            }
        ]
    },
    "metadata": {
        "FOO": "BAR"
   

In [6]:
# Explicitly run a specific task
# No guarantee that this task can run successfully if preceding tasks havent been run
new_job.tasks[0].run()

DummyEndpoint: Executed task DOWNLOAD for job: 9e44026e-40c8-467c-9d89-ded7ef5dfb8d


In [7]:
# run next task
new_job.tasks.run()

DummyEndpoint: Executed task CV for job: 9e44026e-40c8-467c-9d89-ded7ef5dfb8d


In [8]:
job.set_api(dummy)
job.register()

while not job.isDone():
    job.run()

DummyEndpoint: Executed task DOWNLOAD for job: 5377eb93-a0fb-4013-8844-5aff009a5088
DummyEndpoint: Executed task CV for job: 5377eb93-a0fb-4013-8844-5aff009a5088
DummyEndpoint: Executed task ASR for job: 5377eb93-a0fb-4013-8844-5aff009a5088
DummyEndpoint: Executed task INDEX for job: 5377eb93-a0fb-4013-8844-5aff009a5088
DummyEndpoint: Executed task DELETE for job: 5377eb93-a0fb-4013-8844-5aff009a5088


# Nesting Task Containers

In [9]:
new_tasks = jobspec.taskSequential(['DOWNLOAD', 
                            jobspec.taskParallel(['CV', 'ASR']), 
                            jobspec.taskParallel(['INDEX', 'DELETE'])])
pprint(new_tasks)

{
    "taskSequential": [
        "DOWNLOAD",
        {
            "taskParallel": [
                "CV",
                "ASR"
            ]
        },
        {
            "taskParallel": [
                "INDEX",
                "DELETE"
            ]
        }
    ]
}


In [10]:
newer_tasks = jobspec.taskContainer.from_json(new_tasks.to_json())
pprint(newer_tasks)

{
    "taskSequential": [
        "DOWNLOAD",
        {
            "taskParallel": [
                "CV",
                "ASR"
            ]
        },
        {
            "taskParallel": [
                "INDEX",
                "DELETE"
            ]
        }
    ]
}


In [11]:
newer_job = jobspec.jobspec(source_url='http://127.0.0.1/example2', source_id='ITM124', source_set='NISV',
                      tasks=newer_tasks)

In [12]:
newer_job.set_api(dummy)
newer_job.register()

In [13]:
newest_job = jobspec.jobspec.from_json(newer_job.to_json())
newest_job.set_api(dummy) # api isn't serialised, so need to set again

newest_job.tasks[0].run()
newest_job.tasks[1].run()

DummyEndpoint: Executed task DOWNLOAD for job: 8bef1ab4-504a-4e44-afbd-a7c0fb0968be
DummyEndpoint: Executed task CV for job: 8bef1ab4-504a-4e44-afbd-a7c0fb0968be
DummyEndpoint: Executed task ASR for job: 8bef1ab4-504a-4e44-afbd-a7c0fb0968be
