Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max_concurrency to tasks #8

Merged
merged 1 commit into from
May 6, 2021

Conversation

bigjools
Copy link
Collaborator

@bigjools bigjools commented Apr 28, 2021

This change adds the max_concurrency parameter to Task, so that it
can define the maximum number of simultaneous jobs that are running, no
matter how many are queued and waiting to run.

Other drive-by changes:

  • Add a tox.ini and docker-compose to start Redis.
    This allows a simple tox command to just run all the relevant tests
    and have a temporary Redis start up with which the tests can interact.

  • Amend hacking docs

    • Change pep8 to the recommended pycodestyle invocation
    • Document the tox way to test

Fixes #6

@bigjools
Copy link
Collaborator Author

I realised after submitting that I didn't convert remove_job_from_running to a Lua script. I can do this later. I will also note that keeping the uninitialised max_concurrency as None in Python and -1 in Lua has created some tension between the two language domains. I've had to put extra checks in all the Lua for both -1 and nil, to avoid a ton of changes in tests that create Tasks and blow up without the check because of nil default values.

@@ -36,7 +52,7 @@ def task_name(self):
def __repr__(self):
return 'Task({}, {}, {}, {}, {})'.format(
self.func, self.name, self.queue, self.max_retries,
self.periodicity
self.periodicity, self.max_concurrency,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work since the string to format has not been changed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, there's no test for it so I'll add one.



def test_set_concurrency_keys_sets_new_keys_on_idempotent_tasks(broker):
max_c = random.randint(1, 10)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you set it to a fixed int? In my experience randomness is best avoided in tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the first time I've had this from reviewers - an Openstack guy said not to use random, but was perfectly happy with a random uuid instead... go figure!

My own experience is the exact opposite, it is a great way to show test intent - that is we take any int and it works. I have a ~38k LOC Python project that uses Spinach based on a test factory that generates random test inputs where required for 1761 tests. Occasionally we find bugs due to the randomness as well.

I will change it if you insist, but honestly it seems quite benign here and seems a nice way to show that the test will work with low integers. Let me know if I convinced you or not and I will change if not.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with leaving it as is but I'm not convinced. I'm not denying that introducing randomness helps uncovering bugs, but in my mind there is a clear distinction between a test suite and fuzzing.

I find immense value in knowing that one version of the code always leads to the same result in CI, a bit like seeing the test suite as a pure function: the same code as input always gives the same output.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had a similar conversation on Twitter with someone I've known for a long time. He said it was fuzzing too, but it's not fuzzing. In this particular case, it is demonstrating that an input value makes it to an output. With a fixed input, you are not testing properly because the code could actually have a bug that results in the same fixed output and unless you vary the input you don't catch it. I have actually seen this happen!

There's two main reasons for using randomness. The reason I just gave above, and also where you need a value for something because it's a required input, but it's not part of the test, so you just need something but don't care what it is. That is an important thing to show in tests as test intent.

Anyway I think I have dragged this out too far now!

*tasks
*_tasks
)
self.set_concurrency_keys(tasks)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tasks argument that register_periodic_tasks receives contains only periodic tasks.

Let's separate register_periodic_tasks and set_concurrency_keys, adding the later as an abstract method to the base Broker and calling it explicitly from the Engine._arbiter_func.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same actually, it felt like a hack after you suggested it.

@NicolasLM
Copy link
Owner

Really great work!

A couple of things in addition to my comments in the code:

  • The tests on Python 3.5 are failing, it's a simple str vs bytes issue
  • The feature is not implemented in the MemoryBroker. It shouldn't be too difficult as this broker is able to take many shortcuts and as long as the implementation is thread-safe it's good enough.

With this definition, no more than one instance of the Task will ever be
spawned as a running Job, no matter how many are queued and waiting to
run.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it's a nice feature that not many tasks frameworks have, do you mind adding a one-line description as well to the README and doc/index.rst?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly!

@@ -67,7 +67,8 @@
'flush.lua',
'get_jobs_from_queue.lua',
'move_future_jobs.lua',
'register_periodic_tasks.lua'
'register_periodic_tasks.lua',
'set_concurrency_keys.lua',
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@bigjools
Copy link
Collaborator Author

Really great work!

Thank you! The Lua took me a day to get used to but it seems pretty simple. At least the duck typing is familiar :)

A couple of things in addition to my comments in the code:

  • The tests on Python 3.5 are failing, it's a simple str vs bytes issue

Right, I can add that to the tox environments so it gets tested locally too. Did you like that use of tox & docker-compose, BTW?

  • The feature is not implemented in the MemoryBroker. It shouldn't be too difficult as this broker is able to take many shortcuts and as long as the implementation is thread-safe it's good enough.

Yeah I was going to mention that after submitting the PR and completely forgot. I didn't want to take that on at the same time as this in case it was going in the wrong direction but since you're happy with things I'll see what I can do.

@NicolasLM
Copy link
Owner

Right, I can add that to the tox environments so it gets tested locally too. Did you like that use of tox & docker-compose, BTW?

I always rely on CI for running the test suite on multiple environments so I don't think that I will personally have a use for tox. That being said I'm completely fine with having it in the code base if it's part of someone's workflow.

@bigjools
Copy link
Collaborator Author

  • The tests on Python 3.5 are failing, it's a simple str vs bytes issue

I just realised 3.5 EOLed 7 months ago. Perhaps you should just drop the CI run there?

@bigjools
Copy link
Collaborator Author

Hi - I've found a problem in the _arbiter_func that I will need your help with please. These lines:

                if (stop_when_queue_empty and available_slots > 0
                        and received_jobs == 0):
                    logger.info("Stopping workers because queue '%s' is empty",
                                self._working_queue)

Cause a problem in a functional test I've added which I'm using to test that concurrency limits are observed properly in real tasks. It seems to work fine for the Redis broker, which is maybe a fluke, but it fails 100% of the time in the Memory broker.
The test stops early due to the workers stopping, because there is no job being returned on a second worker due to concurrency limits - ie received_jobs == 0 but there are still jobs in the queue!

I suppose I can rewrite the test to avoid using stop_when_queue_empty but that makes things quite tricky and I thought you might have some ideas.

@bigjools
Copy link
Collaborator Author

This is the test I'm trying to get working:

@pytest.fixture(params=[MemoryBroker, RedisBroker])
def spin(request):
    broker = request.param
    spin = Engine(broker(), namespace='tests')
    yield spin


def test_concurrency_limit(spin):
    count = 0

    @spin.task(name='do_something', max_retries=10, max_concurrency=1)
    def do_something(index):
        nonlocal count
        assert index == count
        count += 1

    for i in range(0, 5):
        spin.schedule(do_something, i)

    # Start two workers; test that only one job runs at once as per the
    # Task definition.
    spin.start_workers(number=2, block=True, stop_when_queue_empty=True)
    assert count == 5

@NicolasLM
Copy link
Owner

I think that you uncovered a bug in the stop_when_queue_empty feature. We cannot rely anymore on the absence of task from the broker as an indication that the queue is empty.

One way around it would be to add an explicit call to the broker to check whether if the queue is actually empty or not.

                if (stop_when_queue_empty and available_slots > 0
                        and received_jobs == 0
                        and self._broker.is_queue_empty(self._working_queue)):

Since I don't think stop_when_queue_empty is actually used much in production, I don't mind the extra call to Redis once in a while to make this work.

(Note that it probably works on Redis currently because the content of the task in your test makes it very fast. I believe if you make it slightly longer it will fail also on Redis.)

I just realised 3.5 EOLed 7 months ago. Perhaps you should just drop the CI run there?

Yes, I will remove 3.5 from the supported versions but for you it might less work to just add the .decode() that's missing as I think that 3.5 is referenced in a few places.

@@ -85,6 +85,15 @@ def _to_namespaced(self, value: str) -> str:
def register_periodic_tasks(self, tasks: Iterable[Task]):
"""Register tasks that need to be scheduled periodically."""

def set_concurrency_keys(self, tasks: Iterable[Task]):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the @abstractmethod decorator please? It acts mostly as documentation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to force new brokers to implement this method, but given the likelihood of new brokers, it's probably fine :)

@bigjools
Copy link
Collaborator Author

Yes, I will remove 3.5 from the supported versions but for you it might less work to just add the .decode() that's missing as I think that 3.5 is referenced in a few places.

Yeah no worries I already did that. And of course I can't use f-strings on python3.5 either.

Ok I think I addressed everything, let me know how it's looking.

@bigjools
Copy link
Collaborator Author

I'll also note once you are happy with the code I'll squash the commits and do a better commit message.

)

def register_periodic_tasks(self, tasks: Iterable[Task]):
"""Register tasks that need to be scheduled periodically."""
tasks = [task.serialize() for task in tasks]
_tasks = [task.serialize() for task in tasks]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also change the len(tasks) to len(_tasks) in case someone changes the list comprehension without checking the code below it?


-- Override max_concurrency whatever it is already set to, if
-- anything.
redis.call('hset', max_concurrency_key, task["name"], task["max_concurrency"])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails for tasks with max_retry but no max_concurrency:

['{"max_concurrency": null, "max_retries": 10, "name": "compute", "periodicity": null, "queue": "spinach"}']
Traceback (most recent call last):
  File "/home/nicolas/dev/spinach/spinach/utils.py", line 29, in run_forever
    func(*args, **kwargs)
  File "/home/nicolas/dev/spinach/spinach/engine.py", line 144, in _arbiter_func
    [task for task in self._tasks.tasks.values()]
  File "/home/nicolas/dev/spinach/spinach/brokers/redis.py", line 319, in set_concurrency_keys
    *_tasks,
  File "/home/nicolas/dev/spinach/spinach/brokers/redis.py", line 95, in _run_script
    return script(args=args)
  File "/home/nicolas/dev/spinach/venv/lib/python3.7/site-packages/redis/client.py", line 4079, in __call__
    return client.evalsha(self.sha, len(keys), *args)
  File "/home/nicolas/dev/spinach/venv/lib/python3.7/site-packages/redis/client.py", line 3143, in evalsha
    return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
  File "/home/nicolas/dev/spinach/venv/lib/python3.7/site-packages/redis/client.py", line 901, in execute_command
    return self.parse_response(conn, command_name, **options)
  File "/home/nicolas/dev/spinach/venv/lib/python3.7/site-packages/redis/client.py", line 915, in parse_response
    response = connection.read_response()
  File "/home/nicolas/dev/spinach/venv/lib/python3.7/site-packages/redis/connection.py", line 756, in read_response
    raise response
redis.exceptions.ResponseError: Error running script (call to f_2627b9f71ac57670aaa14074c3170b5901428991): @user_script:22: @user_script: 22: Lua redis() command arguments must be strings or integers

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was caused by the thinko you pointed out in the next comment.


def set_concurrency_keys(self, tasks: Iterable[Task]):
"""For each retryable Task, set up its concurrency keys."""
_tasks = [task.serialize() for task in tasks if task.max_retries > 0]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you walk me through the reasoning behind filtering by task.max_retries > 0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a thinko, I meant to use max_concurrency. I've fixed it and added a test.

"""For each retryable Task, set up its concurrency keys."""
_tasks = [task.serialize() for task in tasks if task.max_retries > 0]
if not _tasks:
return
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that if the user code goes from a few tasks with max_concurrency > 0 to none, the keys will be left in Redis.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extremely good point.

@NicolasLM
Copy link
Owner

I took the code for a ride and found a couple of issues.

Issue 1

By playing with small variations of the quickstart example (changing the number of workers, max_retry and max_concurrency), I managed to get Redis in an inconsistent state:

127.0.0.1:6379> hgetall spinach/_current_concurrency
1) "compute"
2) "100"
127.0.0.1:6379> hgetall spinach/_max_concurrency
1) "compute"
2) "5"

At this point, workers get stuck because they cannot take tasks anymore. Unfortunately I do not know exactly which combinations got me there but I should note that I got this with a single spinach process, without killing it (so no _running-jobs-on-broker-xxx key) and without jobs failing.

Issue 2

I managed to get the workers stuck in another way. When a worker processes an idempotent job and it fails it will set its status to NO_SET so that it gets re-enqueued for retry. This will remove the job from running in the broker, but will not decrement the concurrency key.

https://github.com/NicolasLM/spinach/blob/master/spinach/engine.py#L255-L268

I do not think that the solution is to change to Engine to the code below because that would not be atomic:

            elif job.status is JobStatus.NOT_SET:
                self._broker.remove_job_from_running(job)
                self._broker.enqueue_jobs([job])

Instead the enqueue_jobs of both brokers should be adapted to take the concurrency into account.

To reproduce:

@spin.task(name='compute', max_retries=10, max_concurrency=2)
def compute(a, b):
    raise RuntimeError('Error')


# Schedule a job to be executed ASAP
for _ in range(100):
    spin.schedule(compute, 5, 3)

print('Starting workers, ^C to quit')
spin.start_workers(number=5)

@bigjools
Copy link
Collaborator Author

bigjools commented May 2, 2021

Thanks for the feedback! I had not expected you to test so much yet, so thank you, I was planning on doing that myself next day as it's been a 3-day weekend here. I was not expecting it to be bug free yet.

@bigjools
Copy link
Collaborator Author

bigjools commented May 4, 2021

I've fixed the smaller points, and issue 2. I am not sure how to recreate issue 1 and I am wondering if it was to do with issue 2. I'll keep poking at it.

@NicolasLM
Copy link
Owner

Great! Now that I think about it, issue 1 was probably due to issue 2.

Is it ready for a final review?

@bigjools
Copy link
Collaborator Author

bigjools commented May 5, 2021 via email

This change adds the `max_concurrency` parameter to `Task`, so that it
can define the maximum number of simultaneous jobs that are running, no
matter how many are queued and waiting to run.

Other drive-by changes:

 - Add a tox.ini and docker-compose to start Redis.
   This allows a simple `tox` command to just run all the relevant tests
   and have a temporary Redis start up with which the tests can interact.

 - Amend hacking docs
   - Change pep8 to the recommended pycodestyle invocation
   - Document the `tox` way to test

Fixes NicolasLM#6
@bigjools
Copy link
Collaborator Author

bigjools commented May 6, 2021

I've been running this on my existing app all morning and it looks good so far. I've seen the concurrency limit observed correctly across multiple worker processes and threads, and I've killed and restarted the whole app/workers and it all recovers correctly too.

@NicolasLM NicolasLM merged commit 8002ca6 into NicolasLM:master May 6, 2021
@NicolasLM
Copy link
Owner

Awesome work! Merged and released as 0.0.14.

@bigjools
Copy link
Collaborator Author

bigjools commented May 6, 2021

Excellent! Thanks for the initial hand-holding and for the nice code base which made it easy to get involved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature idea: Allow a Task to define its maximum number of concurrent jobs
3 participants