Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a RepeatedTaskQueue in SqsJobConsumer #1071

Merged
merged 2 commits into from Jun 21, 2019

Conversation

@ryanhall07
Copy link
Collaborator

commented Jun 21, 2019

This fixes #1069, which was originally a bug in RepeatedTaskQueue back
in the day. We should prefer using RepeatedTaskQueue instead of hand
rolling threads to prevent repeating the same mistakes. In addition, job
consumers will backoff if there is no work or run into errors.

Now every parallel job consumer runs as a RepeatedTask and each Task
consumes N messages and dispatches them to separate executor service for
running.

@ryanhall07 ryanhall07 requested review from alecholmes, frojasg and mmihic Jun 21, 2019

Use a RepeatedTaskQueue in SqsJobConsumer
This fixes #1069, which was originally a bug in RepeatedTaskQueue back
in the day. We should prefer using RepeatedTaskQueue instead of hand
rolling threads to prevent repeating the same mistakes. In addition, job
consumers will backoff if there is no work or run into errors.

Now every parallel job consumer runs as a RepeatedTask and each Task
consumes N messages and dispatches them to separate executor service for
running.

@ryanhall07 ryanhall07 force-pushed the rhall-sqs-repeated-task branch from 16ae68a to 23cd661 Jun 21, 2019

log.info { "closing subscription to queue ${queueName.value}" }
running.set(false)
return when {
failure.get() -> Status.FAILED

This comment has been minimized.

Copy link
@mightyguava

mightyguava Jun 21, 2019

Collaborator

don't you need to wait for the dispatched tasks to complete before checking this?

This comment has been minimized.

Copy link
@ryanhall07

ryanhall07 Jun 21, 2019

Author Collaborator

umm yea. should probably have a test :(

This comment has been minimized.

Copy link
@ryanhall07

ryanhall07 Jun 21, 2019

Author Collaborator

fixed in 9cdece6

This comment has been minimized.

Copy link
@ryanhall07

ryanhall07 Jun 21, 2019

Author Collaborator

added a test too

*
* Can be overridden when scheduling a tasks.
*/
val default_jitter_ms: Long = 50,

This comment has been minimized.

Copy link
@mightyguava

mightyguava Jun 21, 2019

Collaborator

is this just for tests?

This comment has been minimized.

Copy link
@ryanhall07

ryanhall07 Jun 21, 2019

Author Collaborator

well i'm using it for tests, but it seems reasonable to make it configurable

for (future in futures) {
try {
future.get()
} catch (e: Exception) {

This comment has been minimized.

Copy link
@mightyguava

mightyguava Jun 21, 2019

Collaborator

not Throwable?

This comment has been minimized.

Copy link
@ryanhall07

ryanhall07 Jun 21, 2019

Author Collaborator

it doesn't really matter here, you're not trying to prevent an Exception from being leaked (RepeatedTaskQueue handles that). Just trying to catch the signal that the dispatched handler threw an exception.

@ryanhall07 ryanhall07 force-pushed the rhall-sqs-repeated-task branch from 9cdece6 to deb6ee0 Jun 21, 2019

@ryanhall07 ryanhall07 force-pushed the rhall-sqs-repeated-task branch from deb6ee0 to cacbf21 Jun 21, 2019

@ryanhall07 ryanhall07 merged commit 18d41ac into master Jun 21, 2019

2 checks passed

ci/circleci: java Your tests passed on CircleCI!
Details
ci/circleci: node Your tests passed on CircleCI!
Details
@Target(AnnotationTarget.FIELD, AnnotationTarget.FUNCTION, AnnotationTarget.VALUE_PARAMETER)

This comment has been minimized.

Copy link
@mmihic

mmihic Jun 21, 2019

Collaborator

hm, what necessitated this change (it's fine by me, just curious)

This comment has been minimized.

Copy link
@ryanhall07

ryanhall07 Jun 21, 2019

Author Collaborator

so you could field inject in the test

@Inject @ForSqsConsumer RepeatedTaskQueue

This comment has been minimized.

Copy link
@ryanhall07

ryanhall07 Jun 21, 2019

Author Collaborator

without having to do @field:ForSqsConsumer

// If the incoming job has an original trace id, set that as a tag on the new span.
// We don't turn that into the parent of the current span because that would
// incorrectly include the execution time of the job in the execution time of the
// action that triggered the job q

This comment has been minimized.

Copy link
@mmihic

mmihic Jun 21, 2019

Collaborator

trailing q

yan pushed a commit that referenced this pull request Jun 21, 2019

Use a RepeatedTaskQueue in SqsJobConsumer (#1071)
* Use a RepeatedTaskQueue in SqsJobConsumer

This fixes #1069, which was originally a bug in RepeatedTaskQueue back
in the day. We should prefer using RepeatedTaskQueue instead of hand
rolling threads to prevent repeating the same mistakes. In addition, job
consumers will backoff if there is no work or run into errors.

Now every parallel job consumer runs as a RepeatedTask and each Task
consumes N messages and dispatches them to separate executor service for
running.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.