Skip to content

CI: Fix Kinesis integration test race condition in LocalStack readiness and stream reset#691

Merged
amotl merged 5 commits intocrate:mainfrom
hampsterx:fix/kinesis-test-race-condition
Mar 13, 2026
Merged

CI: Fix Kinesis integration test race condition in LocalStack readiness and stream reset#691
amotl merged 5 commits intocrate:mainfrom
hampsterx:fix/kinesis-test-race-condition

Conversation

@hampsterx
Copy link
Contributor

Summary

  • Fix LocalStackContainerWithKeepalive not waiting for LocalStack readiness due to MRO bypassing LocalStackContainer.start()
  • Replace fragile delete/recreate stream pattern with unique stream names per test
  • Forward describe-timeout URL parameter to async-kinesis Consumer/Producer
  • Re-enable Kinesis CI tests across Python 3.10–3.14

Problem

Kinesis integration tests were flaky (and disabled in CI) due to two race conditions:

  1. Container readiness: KeepaliveContainer.start() overrides LocalStackContainer.start() in the MRO, so the wait_for_logs("Ready") call never executes. The Kinesis API receives requests before LocalStack is ready.

  2. Stream reset: reset_streams() deletes and recreates the testdrive stream between tests, but LocalStack's eventual consistency means create_stream can fail if called too soon after delete_stream.

Changes

  • localstack.py: Add _configure() and _connect() hooks to wait for LocalStack's "Ready" log, matching the pattern used by CrateDBContainer and InfluxDB2Container
  • conftest.py: Replace stream deletion with unique stream names per test (testdrive-<uuid>), eliminating the race entirely
  • adapter.py: Forward describe-timeout URL query parameter to async-kinesis Consumer and Producer, giving more time for stream activation after creation
  • kinesis.yml: Re-enable CI matrix for Python 3.10–3.14

References

Test plan

  • All 6 Kinesis tests pass locally (fresh container start)
  • Tests pass with TC_KEEPALIVE=true (container reuse path)
  • Linting and formatting clean
  • CI passes across Python 3.10–3.14

@coderabbitai
Copy link

coderabbitai bot commented Mar 9, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 74271448-df49-4229-a970-d1cb9f4bf56d

📥 Commits

Reviewing files that changed from the base of the PR and between 314cded and 370e092.

📒 Files selected for processing (1)
  • tests/io/dynamodb/conftest.py

Walkthrough

Per-test unique Kinesis stream names replace shared delete/recreate logic; LocalStack container now waits for a "Ready." log before use; Kinesis adapter carries a new describe_timeout parameter from the connection URL; CI Kinesis workflow Python matrix explicitly lists 3.11–3.14.

Changes

Cohort / File(s) Summary
CI Workflow
.github/workflows/kinesis.yml
Replace commented matrix with explicit Python versions: 3.11, 3.12, 3.13, 3.14 (3.10 kept as commented note).
Kinesis Adapter
cratedb_toolkit/io/kinesis/adapter.py
Add describe_timeout (parsed from URL query describe-timeout, default 60) and pass it into consumer_factory and producer calls.
LocalStack Container Readiness
cratedb_toolkit/testing/testcontainers/localstack.py
Add _configure() to set a LogMessageWaitStrategy for "Ready." and _connect() to wait_for_logs("Ready.", 60s); integrate into start flow.
Kinesis test fixtures
tests/io/kinesis/conftest.py
Switch to per-test unique stream names (UUID) instead of global delete/recreate; remove reset_streams() flow; kinesis_test_manager now takes kinesis (function-scoped); kinesis_service no longer calls stack.reset().
DynamoDB fixtures / tests
tests/io/dynamodb/conftest.py, tests/io/dynamodb/test_relay.py
Remove pre-test stream deletion and Kinesis adapter setup; add private _stream_name (UUID) for CDC URL; update CDC URLs to omit trailing /demo; increase sleeps to allow shard iterator readiness and record delivery.
Dependency pinning
pyproject.toml
Bump optional-dependencies.kinesis: async-kinesis minimum tightened to >=2.3,<3.

Sequence Diagram(s)

sequenceDiagram
  participant Test as Test Runner
  participant Fixture as Test Fixture
  participant LocalStack as LocalStack Container
  participant Kinesis as Kinesis Service
  participant Adapter as Kinesis Adapter

  rect rgba(200,200,255,0.5)
  Test->>Fixture: request kinesis fixture
  Fixture->>LocalStack: ensure container started
  LocalStack->>LocalStack: _configure() sets log wait strategy
  LocalStack->>LocalStack: _connect() wait_for_logs("Ready.", 60s)
  LocalStack-->>Fixture: ready
  end

  rect rgba(200,255,200,0.5)
  Fixture->>Fixture: generate unique stream name (UUID)
  Fixture->>Kinesis: create stream with unique name
  Fixture-->>Test: return connection URL (includes describe-timeout)
  end

  rect rgba(255,200,200,0.5)
  Test->>Adapter: Adapter init (reads describe-timeout)
  Adapter->>Kinesis: create consumer/producer with describe_timeout
  Adapter-->>Test: stream operations use per-test stream
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested labels

dependencies

Suggested reviewers

  • surister
  • hammerhead

Poem

🐰
I hopped through logs until they read "Ready." bright,
Gave every test its own stream to sleep soundly at night.
Timeouts tucked softly in every new call,
Streams now avoid racing and tests stand tall. ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.83% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main changes: fixing Kinesis race conditions in LocalStack readiness and stream reset for CI.
Description check ✅ Passed The description provides clear context on the race conditions, solution approach, and test status, directly relating to the changeset.
Linked Issues check ✅ Passed All code changes address the linked issue #690 objectives: LocalStack readiness is fixed via _configure/_connect hooks, stream reset race is eliminated via unique per-test streams, and describe-timeout forwarding improves resilience.
Out of Scope Changes check ✅ Passed Changes are scoped to fixing the race conditions and related resilience improvements; CI matrix adjustments for Python versions are necessary context for test enablement, and async-kinesis version bump supports the timeout forwarding feature.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Member

@amotl amotl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent! 🙇

@hampsterx
Copy link
Contributor Author

@amotl hmm looks like you might need to disable py3.10 again but seems dynamdb related?

FAILED tests/io/dynamodb/test_relay.py::test_kinesis_latest_dynamodb_cdc_insert_update - assert 0 == 1

@amotl
Copy link
Member

amotl commented Mar 9, 2026

@hampsterx: Yes, please disable Python 3.10 for the time being. Please also ignore integration tests against CrateDB Cloud: I've created an issue GH-693 to track this, certainly unrelated to your patch.

@hampsterx
Copy link
Contributor Author

@hampsterx: Yes, please disable Python 3.10 for the time being. Please also ignore integration tests against CrateDB Cloud: I've created an issue GH-693 to track this, certainly unrelated to your patch.

ok even better, have applied the same fixes from kinesis fixture to the dynamo one. 🤞

@amotl amotl requested review from matriv and seut March 9, 2026 23:31
@hampsterx
Copy link
Contributor Author

@hampsterx: Yes, please disable Python 3.10 for the time being. Please also ignore integration tests against CrateDB Cloud: I've created an issue GH-693 to track this, certainly unrelated to your patch.

yup have just commit with py3.10 removed but should be noted that its kind of luck timing wise that it works on 3.11 (has had some async improvements) so under load the CI could fail on any version.

I think async-kinesis needs some way to say its produced records, wait for them before stopping, which would eliminate this timing issue. Will have a look :)

@hampsterx
Copy link
Contributor Author

hampsterx commented Mar 12, 2026

async-kinesis 2.3.0 fixes:

  • Producer close() now reliably flushes all records before closing
  • Consumer idle_timeout=2.0s waits for records instead of immediately giving up on empty queue
  • The _consume_forever loop correctly retries after idle timeout, so even if the consumer starts waiting before events arrive, it picks them up on the next iteration

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
tests/io/dynamodb/conftest.py (1)

38-38: Initialize _stream_name uniquely at construction time (optional hardening).

Line 38 can use the same UUID-based pattern as reset() so the fixture is isolation-safe even before the first reset call.

Suggested diff
-        self._stream_name = "demo"
+        self._stream_name = f"demo-{uuid.uuid4().hex[:8]}"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/io/dynamodb/conftest.py` at line 38, Initialize the fixture's
_stream_name at construction to a unique value instead of the hardcoded "demo":
in the fixture's __init__ (or where self._stream_name is set) generate a
UUID-based name using the same pattern used in reset() so the attribute is
isolation-safe before reset() is called; update the assignment to use that UUID
pattern and keep reset() behavior intact (refer to _stream_name and reset()).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@tests/io/dynamodb/conftest.py`:
- Line 38: Initialize the fixture's _stream_name at construction to a unique
value instead of the hardcoded "demo": in the fixture's __init__ (or where
self._stream_name is set) generate a UUID-based name using the same pattern used
in reset() so the attribute is isolation-safe before reset() is called; update
the assignment to use that UUID pattern and keep reset() behavior intact (refer
to _stream_name and reset()).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: eff61ead-0f4f-476d-81ef-39ad953e2945

📥 Commits

Reviewing files that changed from the base of the PR and between 237b76d and 314cded.

📒 Files selected for processing (4)
  • .github/workflows/kinesis.yml
  • pyproject.toml
  • tests/io/dynamodb/conftest.py
  • tests/io/dynamodb/test_relay.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • .github/workflows/kinesis.yml
  • tests/io/dynamodb/test_relay.py

Comment on lines +96 to +107
# Allow enough time for the consumer to create its LATEST shard iterator
# before producing events. With LocalStack, stream creation + shard iterator
# setup can take longer than 1 second on slow systems.
time.sleep(3)

# Populate source database with data.
for event in events:
table_loader.kinesis_adapter.produce(event)

# Allow time for the consumer's fetch tasks to poll Kinesis
# and deliver the records to the processing handler.
time.sleep(2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. We need to sleep longer than before? Based on your efforts with async-kinesis 2.3.0, I would have expected that we wouldn't need this any longer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah fair question, the 2.3.0 fixes addressed the consumer/producer flush race (records getting lost on close), but this sleep is for a different race: the shard iterator setup 🤦‍♂️

With start=LATEST, the consumer needs to obtain its shard iterator before any events are produced, otherwise they're missed. The iterator is obtained lazily inside a background fetch task, so there's currently no way for the caller to know when it's actually ready, hence the sleep.

I'll add a wait_ready() method to async-kinesis that exposes an asyncio.Event fired once shard iterators are obtained, will raise another PR for bump the package on crate and we should be good :)

Copy link
Member

@amotl amotl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much!

@amotl amotl merged commit c83a622 into crate:main Mar 13, 2026
26 of 27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

CI: Timing issue / race condition with Kinesis integration tests in reset_streams()

2 participants