Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cleanup of task IDs in order to fix zombie tasks #202

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 78 additions & 69 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,36 @@ def __init__(self, conn, name):
self.last_update_key = make_key(PREFIX, "qdj", name, "last_update")
self.active_stages_key = make_key(PREFIX, "qds", name, "active_stages")

def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
pipe = self.conn.pipeline()
def flush_status(self, pipe):
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# clean up tasks and task counts
pipe.delete(self.finished_key)
pipe.delete(self.running_key)
pipe.delete(self.pending_key)

# reset timestamps
pipe.delete(self.start_key)
pipe.delete(self.end_key)
pipe.delete(self.last_update_key)

# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

# delete information about tasks per dataset
pipe.delete(self.pending_key)
pipe.delete(self.running_key)
pipe.delete(self.finished_key)

# delete stages key
pipe.delete(self.active_stages_key)

def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
pipe = self.conn.pipeline()
self.flush_status(pipe)
# What should happen to the end_key in this case?
pipe.delete(self.end_key)
pipe.execute()

def get_status(self):
Expand Down Expand Up @@ -167,24 +178,9 @@ def cleanup_dataset_status(cls, conn):
datasets_key = make_key(PREFIX, "qdatasets")
for name in conn.smembers(datasets_key):
dataset = cls(conn, name)
status = dataset.get_status()
if status["running"] == 0 and status["pending"] == 0:
if dataset.is_done():
pipe = conn.pipeline()
# remove the dataset from active datasets
pipe.srem(dataset.key, dataset.name)
# reset finished task count
pipe.delete(dataset.finished_key)
# delete information about running stages
for stage in dataset.conn.smembers(dataset.active_stages_key):
stage_key = dataset.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(dataset.active_stages_key)
pipe.set(dataset.last_update_key, pack_now())

dataset.flush_status(pipe)
pipe.execute()

def should_execute(self, task_id):
Expand Down Expand Up @@ -217,13 +213,19 @@ def add_task(self, task_id, stage):
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# update status of stages per dataset
stage_key = self.get_stage_key(stage)
# add the stage to the list of active stages per dataset
pipe.sadd(self.active_stages_key, stage)

# add the task to the set of tasks per stage
# and the set of pending tasks per stage
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.sadd(make_key(stage_key, "pending"), task_id)

# add the task to the set of pending tasks per dataset
pipe.sadd(self.pending_key, task_id)

# update dataset timestamps
pipe.set(self.start_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
Expand All @@ -233,48 +235,50 @@ def remove_task(self, task_id, stage):
"""Remove a task that's not going to be executed"""
log.info(f"Removing task: {task_id}")
pipe = self.conn.pipeline()

# remove the task from the set of pending tasks per dataset
pipe.srem(self.pending_key, task_id)

# remove the task from the set of tasks per stage
# and the set of pending tasks per stage
stage_key = self.get_stage_key(stage)
pipe.srem(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)

# delete the retry key for this task
pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id))

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)
pipe.set(self.last_update_key, pack_now())
pipe.execute()

if self.is_done():
pipe = self.conn.pipeline()
self.flush_status(pipe)
pipe.execute()

def checkout_task(self, task_id, stage):
"""Update state when a task is checked out for execution"""
log.info(f"Checking out task: {task_id}")
pipe = self.conn.pipeline()
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# update status of stages per dataset
stage_key = self.get_stage_key(stage)
# add the stage to the list of active stages per dataset
pipe.sadd(self.active_stages_key, stage)

# add the task to the set of tasks per stage
# and the set of running tasks per stage
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.sadd(make_key(stage_key, "running"), task_id)
# remove the task from the set of pending tasks per stage
pipe.srem(make_key(stage_key, "pending"), task_id)

pipe.srem(self.pending_key, task_id)
# add the task to the set of running tasks per dataset
pipe.sadd(self.running_key, task_id)
# remove the task from the set of pending tasks per dataset
pipe.srem(self.pending_key, task_id)

# update dataset timestamps
pipe.set(self.start_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
Expand All @@ -284,51 +288,55 @@ def mark_done(self, task: Task):
"""Update state when a task is finished executing"""
log.info(f"Finished executing task: {task.task_id}")
pipe = self.conn.pipeline()

# remove the task from the pending and running sets of tasks per dataset
pipe.srem(self.pending_key, task.task_id)
pipe.srem(self.running_key, task.task_id)

# increase the number of finished tasks per dataset
pipe.incr(self.finished_key)

# delete the retry key for the task
pipe.delete(task.retry_key)

# remove the task from the set of tasks per stage
# and the pending and running tasks per stage
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "pending"), task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)
# increase the number of finished tasks per stage
pipe.incr(make_key(stage_key, "finished"))

# update dataset timestamps
pipe.set(self.end_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.execute()

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)

if self.is_done():
pipe = self.conn.pipeline()
self.flush_status(pipe)
pipe.execute()

def mark_for_retry(self, task):
pipe = self.conn.pipeline()
stage_key = self.get_stage_key(task.operation)

log.info(
f"Marking task {task.task_id} (stage {task.operation})"
f" for retry after NACK"
)

# remove the task from the pending and running sets of tasks per dataset
pipe.srem(self.pending_key, task.task_id)
pipe.srem(self.running_key, task.task_id)

# remove the task from the set of tasks per stage
# and the set of running tasks per stage
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)

# delete the retry key for the task
pipe.delete(task.retry_key)
pipe.srem(stage_key, task.task_id)

pipe.set(self.last_update_key, pack_now())

Expand All @@ -345,11 +353,12 @@ def get_stage_key(self, stage):
def is_task_tracked(self, task: Task):
tracked = True

stage_key = self.get_stage_key(task.operation)
dataset = dataset = dataset_from_collection_id(task.collection_id)
dataset = dataset_from_collection_id(task.collection_id)
task_id = task.task_id
stage = task.operation

stage_key = self.get_stage_key(stage)

# A task is considered tracked if
# the dataset is in the list of active datasets
if dataset not in self.conn.smembers(self.key):
Expand Down Expand Up @@ -662,9 +671,9 @@ def nack_message(self, task, channel, requeue=True):
dataset = task.get_dataset(conn=self.conn)
# Sync state to redis
if requeue:
dataset.mark_for_retry(task)
if not dataset.is_task_tracked(task):
dataset.add_task(task.task_id, task.operation)
dataset.mark_for_retry(task)
else:
dataset.mark_done(task)
if channel.is_open:
Expand Down
8 changes: 4 additions & 4 deletions tests/test_taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ def test_task_queue(self):
assert status["finished"] == 0, status
assert status["pending"] == 0, status
assert status["running"] == 0, status
started = unpack_datetime(status["start_time"])
last_updated = unpack_datetime(status["last_update"])
end_time = unpack_datetime(status["end_time"])
assert started < end_time < last_updated
# started = unpack_datetime(status["start_time"])
# last_updated = unpack_datetime(status["last_update"])
# end_time = unpack_datetime(status["end_time"])
# assert started < end_time < last_updated

@patch("servicelayer.taskqueue.Dataset.should_execute")
def test_task_that_shouldnt_execute(self, mock_should_execute):
Expand Down
Loading