Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dataset timestamps #190

Merged
merged 12 commits into from
Jul 19, 2024
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pytest-env==1.1.3
pytest-cov==5.0.0
pytest-mock==3.14.0
wheel==0.43.0
time-machine==2.14.1
16 changes: 4 additions & 12 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def __init__(self, conn, name):
self.running_key = make_key(PREFIX, "qdj", name, "running")
self.pending_key = make_key(PREFIX, "qdj", name, "pending")
self.start_key = make_key(PREFIX, "qdj", name, "start")
self.end_key = make_key(PREFIX, "qdj", name, "end")
self.last_update_key = make_key(PREFIX, "qdj", name, "last_update")
self.active_stages_key = make_key(PREFIX, "qds", name, "active_stages")

Expand Down Expand Up @@ -115,19 +114,14 @@ def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
pipe = self.conn.pipeline()
self.flush_status(pipe)
# What should happen to the end_key in this case?
pipe.delete(self.end_key)
pipe.execute()

def get_status(self):
"""Status of a given dataset."""
status = {"finished": 0, "running": 0, "pending": 0, "stages": []}

start, end, last_update = self.conn.mget(
(self.start_key, self.end_key, self.last_update_key)
)
start, last_update = self.conn.mget((self.start_key, self.last_update_key))
status["start_time"] = start
status["end_time"] = end
status["last_update"] = last_update

for stage in self.conn.smembers(self.active_stages_key):
Expand Down Expand Up @@ -226,9 +220,8 @@ def add_task(self, task_id, stage):
pipe.sadd(self.pending_key, task_id)

# update dataset timestamps
pipe.set(self.start_key, pack_now())
pipe.set(self.start_key, pack_now(), nx=True)
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
pipe.execute()

def remove_task(self, task_id, stage):
Expand Down Expand Up @@ -279,9 +272,8 @@ def checkout_task(self, task_id, stage):
pipe.srem(self.pending_key, task_id)

# update dataset timestamps
pipe.set(self.start_key, pack_now())
pipe.set(self.start_key, pack_now(), nx=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not entirely sure setting the "start_time" timestamp here is necessary in the first place as a worker shouldn’t be able to check out a task without add_task having been executed before. (The same probably also applies for pipe.sadd(self.key, self.name).)

Didn’t change it because the primary goal of this PR is to fix the timestamps and not to refactor, but I’d still be interested to understand if there’s a legit case where this is relevant or if it’s merely as a precaution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like some sort of defensive programming approach. Which I can't say I dislike :)

pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
pipe.execute()

def mark_done(self, task: Task):
Expand Down Expand Up @@ -309,8 +301,8 @@ def mark_done(self, task: Task):
pipe.incr(make_key(stage_key, "finished"))

# update dataset timestamps
pipe.set(self.end_key, pack_now())
pipe.set(self.last_update_key, pack_now())

pipe.execute()

if self.is_done():
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"pytest >= 3.6",
"coverage",
"pytest-cov",
"time-machine>=2.14.1, <3.0.0",
],
},
test_suite="tests",
Expand Down
203 changes: 196 additions & 7 deletions tests/test_taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest.mock import patch
import json
from random import randrange
import time_machine

import pika
from prometheus_client import REGISTRY
Expand Down Expand Up @@ -73,7 +74,6 @@ def test_task_queue(self):
assert status["finished"] == 0, status
assert status["pending"] == 1, status
assert status["running"] == 0, status
assert status["end_time"] is None
started = unpack_datetime(status["start_time"])
last_updated = unpack_datetime(status["last_update"])
assert started < last_updated
Expand Down Expand Up @@ -121,10 +121,8 @@ def test_task_queue(self):
assert status["finished"] == 0, status
assert status["pending"] == 0, status
assert status["running"] == 0, status
# started = unpack_datetime(status["start_time"])
# last_updated = unpack_datetime(status["last_update"])
# end_time = unpack_datetime(status["end_time"])
# assert started < end_time < last_updated
assert status["start_time"] is None
assert status["last_update"] is None

@patch("servicelayer.taskqueue.Dataset.should_execute")
def test_task_that_shouldnt_execute(self, mock_should_execute):
Expand Down Expand Up @@ -192,6 +190,199 @@ def did_nack():
assert dataset.is_task_tracked(Task(**body))


def test_dataset_get_status():
conn = get_fakeredis()
conn.flushdb()

dataset = Dataset(conn=conn, name="123")
status = dataset.get_status()

assert status["pending"] == 0
assert status["running"] == 0
assert status["finished"] == 0
assert status["start_time"] is None
assert status["last_update"] is None

task_one = Task(
task_id="1",
job_id="abc",
delivery_tag="",
operation="ingest",
context={},
payload={},
priority=5,
collection_id="1",
)

task_two = Task(
task_id="2",
job_id="abc",
delivery_tag="",
operation="ingest",
context={},
payload={},
priority=5,
collection_id="1",
)

task_three = Task(
task_id="3",
job_id="abc",
delivery_tag="",
operation="index",
context={},
payload={},
priority=5,
collection_id="1",
)

# Adding a task updates `start_time` and `last_update`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate your comments in this non-trivial test!

with time_machine.travel("2024-01-01T00:00:00"):
dataset.add_task(task_one.task_id, task_one.operation)

status = dataset.get_status()
assert status["pending"] == 1
assert status["running"] == 0
assert status["finished"] == 0
assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-01T00:00:00")

# Once a worker starts processing a task, only `last_update` is updated
with time_machine.travel("2024-01-02T00:00:00"):
dataset.checkout_task(task_one.task_id, task_one.operation)

status = dataset.get_status()
assert status["pending"] == 0
assert status["running"] == 1
assert status["finished"] == 0
assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-02T00:00:00")

# When another task is added, only `last_update` is updated
with time_machine.travel("2024-01-03T00:00:00"):
dataset.add_task(task_two.task_id, task_two.operation)

status = dataset.get_status()
assert status["pending"] == 1
assert status["running"] == 1
assert status["finished"] == 0
assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-03T00:00:00")

# When the first task has been processed, `last_update` is updated
with time_machine.travel("2024-01-04T00:00:00"):
dataset.mark_done(task_one)

status = dataset.get_status()
assert status["pending"] == 1
assert status["running"] == 0
assert status["finished"] == 1
assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-04T00:00:00")

# When the worker starts processing the second task, only `last_update` is updated
with time_machine.travel("2024-01-05T00:00:00"):
dataset.checkout_task(task_two.task_id, task_two.operation)

status = dataset.get_status()
assert status["pending"] == 0
assert status["running"] == 1
assert status["finished"] == 1
assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-05T00:00:00")

# Once all tasks have been processed, status data is flushed
with time_machine.travel("2024-01-06T00:00:00"):
dataset.mark_done(task_two)

status = dataset.get_status()
assert status["pending"] == 0
assert status["running"] == 0
assert status["finished"] == 0
assert status["start_time"] is None
assert status["last_update"] is None

# Adding a new task to an inactive dataset sets `start_time`
with time_machine.travel("2024-01-07T00:00:00"):
dataset.add_task(task_three.task_id, task_three.operation)

status = dataset.get_status()
assert status["pending"] == 1
assert status["running"] == 0
assert status["finished"] == 0
assert status["start_time"].startswith("2024-01-07T00:00:00")
assert status["last_update"].startswith("2024-01-07T00:00:00")

# Cancelling a dataset flushes status data
with time_machine.travel("2024-01-08T00:00:00"):
dataset.checkout_task(task_three.task_id, task_three.operation)
dataset.cancel()

status = dataset.get_status()
assert status["pending"] == 0
assert status["running"] == 0
assert status["finished"] == 0
assert status["start_time"] is None
assert status["last_update"] is None

# Tasks that were already running when the dataset was cancelled
# have no effect
with time_machine.travel("2024-01-09T00:00:00"):
dataset.mark_done(task_three)

assert status["pending"] == 0
assert status["running"] == 0
assert status["finished"] == 0
assert status["start_time"] is None
assert status["last_update"] is None


def test_dataset_cancel():
conn = get_fakeredis()
conn.flushdb()

dataset = Dataset(conn=conn, name="abc")
assert conn.keys() == []

# Enqueueing tasks stores status data in Redis
dataset.add_task("1", "ingest")
dataset.add_task("2", "index")
dataset.checkout_task("1", "ingest")
assert conn.keys() != []

# Cancelling a dataset removes associated data from Redis
dataset.cancel()
assert conn.keys() == []


def test_dataset_mark_done():
conn = get_fakeredis()
conn.flushdb()

dataset = Dataset(conn=conn, name="abc")
assert conn.keys() == []

task = Task(
task_id="1",
job_id="abc",
delivery_tag="",
operation="ingest",
context={},
payload={},
priority=5,
collection_id="abc",
)

# Enqueueing a task stores status data in Redis
dataset.add_task(task.task_id, task.operation)
dataset.checkout_task(task.task_id, task.operation)
assert conn.keys() != []

# Marking the last task as done cleans up status data in Redis
dataset.mark_done(task)
assert conn.keys() == []


@pytest.fixture
def prom_registry():
# This relies on internal implementation details of the client to reset
Expand Down Expand Up @@ -325,7 +516,6 @@ def test_get_priority_bucket():
}
],
"start_time": "2024-06-25T10:58:49.779811",
"end_time": None,
"last_update": "2024-06-25T10:58:49.779819",
}
},
Expand Down Expand Up @@ -354,7 +544,6 @@ def test_get_priority_bucket():
}
],
"start_time": "2024-06-25T10:58:49.779811",
"end_time": None,
"last_update": "2024-06-25T10:58:49.779819",
}
},
Expand Down
Loading