Skip to content

Conversation

@NicoHinderling
Copy link
Contributor

An unexpected side effect of moving the analysis to a subprocess was that when the kafka partition is signaled to be revoked, we would normally give the task 10s and then kill it... but in arroyo we wait until all the threads are completed before moving on...

so it seems like this is what is happening in prod currently:

1. Rebalance triggered (t=0s)
2. Arroyo calls `RunTaskInThreads.close()` & `RunTaskInThreads.join(timeout=10)`
3. join() loops through futures with timeout (t=0-10s)
4. join() calls self.__executor.shutdown() (t=~10s)
   └─ ⚠️ BLOCKS waiting for ALL threads to complete
5. Threads are still blocked on process.join(timeout=720)
   └─ Subprocesses still running, up to 12 minutes remaining
6. Subprocess finally finishes after 6 minutes (t=~370s)
7. All threads finally exit
8. executor.shutdown() finally returns
9. join() completes and returns

and the JOIN taking super long causes kafka to think our consumer is dead and kills it.

Instead, if we can kill the subprocess, it should look like this:

1. Rebalance triggered (t=0s)
2. Arroyo calls ShutdownAwareStrategy.close()
3. wrapper.close() calls factory.kill_active_processes() (t=0s)
4. wrapper.close() calls inner.close() (RunTaskInThreads.close()) (t=~5s)
5. RunTaskInThreads.join() loops through pending futures (t=~5s)
   ├─ Tries future.result(remaining) for each message
   ├─ Threads see their subprocesses are dead
   └─ Threads exit their finally blocks quickly
6. RunTaskInThreads.join() calls executor.shutdown() (t=~6s)
   └─ Threads already exited, returns immediately
7. RunTaskInThreads.join() returns (t=~6s)
8. join() completes and returns (t=~6s)

return LaunchpadKafkaConsumer(processor, strategy_factory, healthcheck_path)


class ShutdownAwareStrategy(ProcessingStrategy[KafkaPayload]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id love to not have to do this.. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but i dont think there is another way to intercept the _close_strategy call that we care about..

@codecov
Copy link

codecov bot commented Dec 3, 2025

Codecov Report

❌ Patch coverage is 20.63492% with 50 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.95%. Comparing base (f8153b4) to head (575729d).
⚠️ Report is 1 commits behind head on main.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/launchpad/kafka.py 20.63% 50 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #504      +/-   ##
==========================================
- Coverage   81.13%   80.95%   -0.18%     
==========================================
  Files         164      164              
  Lines       14226    14273      +47     
  Branches     1505     1511       +6     
==========================================
+ Hits        11542    11555      +13     
- Misses       2111     2145      +34     
  Partials      573      573              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

return decoded # type: ignore[no-any-return]
finally:
with registry_lock:
process_registry.pop(process.pid, None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this also handles removing PIDs of successful message processes, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

@NicoHinderling NicoHinderling merged commit 128efbf into main Dec 3, 2025
20 of 21 checks passed
@NicoHinderling NicoHinderling deleted the kill-task-subprocess-if-join-timeout-is-triggered branch December 3, 2025 20:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants