Skip to content

Commit

Permalink
Updating start_time after worker start working
Browse files Browse the repository at this point in the history
  • Loading branch information
dperezrada committed Aug 31, 2015
1 parent d6b286b commit 2b5e54c
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions poliglo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ def update_workflow_instance(
)
pipe.execute()

def update_workflow_instance_key(
connection, workflow, workflow_instance_id, key, value
):
connection.hset(
REDIS_KEY_ONE_INSTANCE % (workflow, workflow_instance_id),
key,
value
)


def workflow_instance_exists(connection, workflow, workflow_instance_id):
return connection.exists(REDIS_KEY_ONE_INSTANCE % (workflow, workflow_instance_id))

Expand Down Expand Up @@ -197,24 +207,21 @@ def start_workflow_instance(
exists_workflow_instance_before = workflow_instance_exists(
connection, workflow, workflow_instance_id
)
workflow_instance_data = {
'workflow': workflow,
'id': workflow_instance_id,
'name': workflow_instance_name,
'creation_time': time.time(),
'start_worker_id': start_worker_id,
'start_meta_worker': start_meta_worker
}

if not exists_workflow_instance_before:
workflow_instance_data = {
'name': workflow_instance_name,
'start_time': time.time(),
'start_worker_id': start_worker_id,
'start_meta_worker': start_meta_worker
}
update_workflow_instance(connection, workflow, workflow_instance_id, workflow_instance_data)

to_send_data = {
'inputs': initial_data,
'workflow_instance': {
'workflow': workflow,
'id': workflow_instance_id,
'name': workflow_instance_name,
'start_time': time.time(),
'start_worker_id': start_worker_id,
'start_meta_worker': start_meta_worker
},
'workflow_instance': workflow_instance_data,
'jobs_ids': [],
'workers_output': {
'initial': initial_data
Expand Down Expand Up @@ -266,6 +273,14 @@ def default_main_inside(
raw_data = queue_message[1]
try:
workflow_instance_data = get_job_data(raw_data)
if not workflow_instance_data.get('start_time'):
update_workflow_instance_key(
connection,
workflow_instance_data['workflow_instance']['workflow'],
workflow_instance_data['workflow_instance']['id'],
'start_time',
process_message_start_time
)
last_job_id = workflow_instance_data['jobs_ids'][-1]
worker_id = workflow_instance_data['workflow_instance']['worker_id']
worker_workflow_data = get_worker_workflow_data(
Expand Down

0 comments on commit 2b5e54c

Please sign in to comment.