Skip to content

CI: Fix - Replace sleep-based Kinesis test sync with deterministic wait_ready()#703

Merged
amotl merged 2 commits intocrate:mainfrom
hampsterx:kinesis/use-consumer-wait-ready
Mar 17, 2026
Merged

CI: Fix - Replace sleep-based Kinesis test sync with deterministic wait_ready()#703
amotl merged 2 commits intocrate:mainfrom
hampsterx:kinesis/use-consumer-wait-ready

Conversation

@hampsterx
Copy link
Contributor

async-kinesis 2.3.1 exposes Consumer.wait_ready() which fires once all shard iterators are established. Use it to eliminate the fragile time.sleep(3) that was flagged in PR #691, and replace the sleep(2) record-arrival wait with a CrateDB poll loop that converges immediately.

@coderabbitai
Copy link

coderabbitai bot commented Mar 15, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: a536821b-b2f4-48ef-a3dd-15080cfa46ff

📥 Commits

Reviewing files that changed from the base of the PR and between f827935 and 24a3df1.

📒 Files selected for processing (2)
  • cratedb_toolkit/io/kinesis/adapter.py
  • tests/io/dynamodb/test_relay.py

Walkthrough

Adds threading-based readiness coordination to KinesisStreamAdapter (new _ready_event/_ready_exception and wait_until_ready), tightens the optional async-kinesis dependency, and refactors a test to replace fixed sleeps with bounded readiness/polling and explicit thread shutdown checks.

Changes

Cohort / File(s) Summary
Kinesis Adapter Readiness
cratedb_toolkit/io/kinesis/adapter.py
Adds threading import, introduces _ready_event and _ready_exception, resets at consume start, awaits consumer.wait_ready(...) capturing/propagating readiness exceptions, sets readiness event, and adds public wait_until_ready(timeout: float = 30) -> bool.
Dependency Update
pyproject.toml
Tightens optional dependency async-kinesis version from >=2.3,<3 to >=2.3.1,<3.
Test Synchronization
tests/io/dynamodb/test_relay.py
Replaces fixed sleeps with bounded readiness wait (up to 30s), polling for processed records (up to 10s, 0.2s intervals), uses thread.join(timeout=10) with termination assertion, ensures at least one record before shutdown, and removes a boolean table-refresh assertion.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant Adapter as KinesisStreamAdapter
    participant Consumer
    participant Kinesis

    Caller->>Adapter: start consumer thread (_consume_forever)
    Adapter->>Adapter: clear _ready_event, reset _ready_exception
    Adapter->>Consumer: consumer.wait_ready(describe_timeout)
    Consumer->>Kinesis: describe stream / get iterator
    Kinesis-->>Consumer: iterator or error
    Consumer-->>Adapter: ready or raise error
    alt ready success
        Adapter->>Adapter: _ready_event.set() (rgba(0,128,0,0.5))
        Adapter-->>Caller: wait_until_ready returns True
    else ready failure
        Adapter->>Adapter: store _ready_exception
        Adapter-->>Caller: wait_until_ready raises stored exception
    end
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

Possibly related PRs

Suggested reviewers

  • seut
  • matriv

Poem

🐰 I nibble threads and raise a flag so merry,
No sleepy waits that slow the carry.
Events align and consumers wake,
Streams hop swift for data's sake.
Hop, ready, go — the queue I ferry! 🚀

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: replacing sleep-based test synchronization with deterministic wait_ready() for Kinesis.
Description check ✅ Passed The description explains the motivation and change: using Consumer.wait_ready() from async-kinesis 2.3.1 to replace fragile sleep calls with deterministic waiting.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can generate a title for your PR based on the changes.

Add @coderabbitai placeholder anywhere in the title of your PR and CodeRabbit will replace it with a title based on the changes in the PR. You can change the placeholder by changing the reviews.auto_title_placeholder setting.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
cratedb_toolkit/io/kinesis/adapter.py (1)

125-130: Note: KinesisRelay.start() doesn't leverage this readiness mechanism.

Looking at cratedb_toolkit/io/kinesis/relay.py (lines 78-80), the start() method calls consume_forever() directly without waiting for readiness. For callers who produce events immediately after calling start() in a thread, there's still a potential race window. Consider whether the relay should expose or use this readiness check internally for non-test scenarios.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cratedb_toolkit/io/kinesis/adapter.py` around lines 125 - 130, The start()
method in KinesisRelay currently calls consume_forever() directly and doesn't
wait on the readiness mechanism, creating a race for producers that send events
immediately; update KinesisRelay.start() (or add an optional parameter like
wait_for_ready) to either spawn consume_forever() in a background thread and
then call self.wait_until_ready(timeout=...) or, if start is meant to block,
call self.wait_until_ready(timeout=...) before returning so callers can rely on
the readiness signalled by _ready_event/ wait_until_ready(); ensure you use the
existing wait_until_ready() helper and preserve existing threading behavior in
consume_forever().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cratedb_toolkit/io/kinesis/adapter.py`:
- Around line 136-139: The consumer.wait_ready(self.describe_timeout) call can
raise and currently prevents self._ready_event from being set, causing callers
of wait_until_ready() to hang; wrap the await consumer.wait_ready(...) in a
try/except that catches broad initialization errors (e.g., asyncio.TimeoutError,
ValueError, RuntimeError), store the exception on the instance (e.g.,
self._ready_exception) and set self._ready_event in the except block before
re-raising the exception (or alternatively return a clear failure state),
keeping the successful-path self._ready_event.set() intact; locate symbols
consumer_factory(), wait_ready(), describe_timeout, _ready_event and
wait_until_ready() to implement this change.

In `@tests/io/dynamodb/test_relay.py`:
- Line 113: After calling thread.join(timeout=10) in the test (the variable
named thread in tests/io/dynamodb/test_relay.py), check thread.is_alive() and
fail the test or take corrective action if it’s still running; for example,
assert not thread.is_alive() with a clear message or raise an AssertionError so
the test fails instead of letting the thread run in background, or if
appropriate call thread.join() again (without timeout) only after confirming it
should be allowed to block; ensure the check happens immediately after
thread.join(timeout=10).

---

Nitpick comments:
In `@cratedb_toolkit/io/kinesis/adapter.py`:
- Around line 125-130: The start() method in KinesisRelay currently calls
consume_forever() directly and doesn't wait on the readiness mechanism, creating
a race for producers that send events immediately; update KinesisRelay.start()
(or add an optional parameter like wait_for_ready) to either spawn
consume_forever() in a background thread and then call
self.wait_until_ready(timeout=...) or, if start is meant to block, call
self.wait_until_ready(timeout=...) before returning so callers can rely on the
readiness signalled by _ready_event/ wait_until_ready(); ensure you use the
existing wait_until_ready() helper and preserve existing threading behavior in
consume_forever().
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: fd9baef7-d9ab-4099-b545-eaeffdb0bcb1

📥 Commits

Reviewing files that changed from the base of the PR and between cd73d77 and 4ff2189.

📒 Files selected for processing (3)
  • cratedb_toolkit/io/kinesis/adapter.py
  • pyproject.toml
  • tests/io/dynamodb/test_relay.py

@amotl amotl force-pushed the kinesis/use-consumer-wait-ready branch from f70caa8 to f827935 Compare March 16, 2026 21:47
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cratedb_toolkit/io/kinesis/adapter.py`:
- Around line 142-151: Wrap the entire async with self.consumer_factory() block
in a try/except Exception to ensure any error raised during __aenter__ or inside
(including asyncio.CancelledError, ConnectionError, botocore ClientError,
StreamDoesNotExist, or internal fetch_task errors) sets self._ready_exception
and self._ready_event before re-raising; replace the narrow except
(asyncio.TimeoutError, ValueError, RuntimeError) with a broader except Exception
as ex (or separate handling for TimeoutError if you want different behavior),
assign self._ready_exception = ex, call self._ready_event.set(), and then
re-raise so wait_until_ready() observers see the real init error (referencing
consumer_factory, consumer.wait_ready, _ready_exception, _ready_event, and
wait_until_ready).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: fbd9f7aa-4947-45bb-850a-59d7ef2ab53d

📥 Commits

Reviewing files that changed from the base of the PR and between f70caa8 and f827935.

📒 Files selected for processing (3)
  • cratedb_toolkit/io/kinesis/adapter.py
  • pyproject.toml
  • tests/io/dynamodb/test_relay.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/io/dynamodb/test_relay.py
  • pyproject.toml

Comment on lines +142 to +151
self._ready_event.clear()
self._ready_exception = None
async with self.consumer_factory() as consumer:
try:
await consumer.wait_ready(timeout=self.describe_timeout)
except (asyncio.TimeoutError, ValueError, RuntimeError) as ex:
self._ready_exception = ex
self._ready_event.set()
raise
self._ready_event.set()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

In async-kinesis 2.3.1, what exceptions can Consumer.wait_ready(timeout=...) raise, including cancellation and propagated fetch-task failures?

💡 Result:

In async-kinesis 2.3.1, Consumer.wait_ready(timeout=...) can raise:

  • ValueError — if skip_describe_stream=True (explicitly unsupported). (raw.githubusercontent.com)
  • asyncio.TimeoutError — on timeout while:
    • connecting/initializing via get_conn() (wrapped into a new TimeoutError with a custom message), or
    • waiting for readiness after connection. (raw.githubusercontent.com)
  • asyncio.CancelledError — if:
    • the caller cancels wait_ready() (it re-raises any BaseException during the wait), or
    • the internal fetch_task was cancelled before readiness. (raw.githubusercontent.com)
  • Any exception raised by the internal fetch_task (propagated with raise exc), if it finishes before readiness with an exception (this includes non-Exception failures, since _fetch() only catches Exception, not BaseException). (raw.githubusercontent.com)
  • RuntimeError — if the fetch_task exits (done, not cancelled) without an exception before readiness. (raw.githubusercontent.com)
  • Any exception from get_conn() other than timeout — because only asyncio.TimeoutError is caught around await asyncio.wait_for(self.get_conn(), ...); e.g. StreamDoesNotExist, StreamStatusInvalid, ConnectionError, botocore.exceptions.ClientError, etc., can propagate from get_conn()/start(). (raw.githubusercontent.com)

Citations:


🏁 Script executed:

#!/bin/bash
# Find and examine the adapter.py file
fd "adapter.py" -t f --exec wc -l {} \;

Repository: crate/cratedb-toolkit

Length of output: 269


🏁 Script executed:

#!/bin/bash
# Read the relevant file to understand the context
cat -n cratedb_toolkit/io/kinesis/adapter.py | head -200

Repository: crate/cratedb-toolkit

Length of output: 8839


🏁 Script executed:

#!/bin/bash
# Search for wait_until_ready method to understand how _ready_event is used
rg "wait_until_ready|_ready_event" cratedb_toolkit/io/kinesis/adapter.py -A 3 -B 1

Repository: crate/cratedb-toolkit

Length of output: 1159


🏁 Script executed:

#!/bin/bash
# Search for tests or usage of wait_until_ready
rg "wait_until_ready|_consume_forever" --type python -B 2 -A 2

Repository: crate/cratedb-toolkit

Length of output: 95


🏁 Script executed:

#!/bin/bash
# Check if there are any other exception types mentioned in comments or docs
rg "CancelledError|StreamDoesNotExist|BaseException" cratedb_toolkit/io/kinesis/ -B 2 -A 2

Repository: crate/cratedb-toolkit

Length of output: 415


🏁 Script executed:

#!/bin/bash
# Search for tests related to kinesis adapter
fd -t f "test.*kinesis|kinesis.*test" --exec grep -l "wait_until_ready\|_consume_forever" {} \;

Repository: crate/cratedb-toolkit

Length of output: 47


🏁 Script executed:

#!/bin/bash
# Search for any test files in the io directory
fd -t f -p "tests/io" | grep kinesis

Repository: crate/cratedb-toolkit

Length of output: 235


🏁 Script executed:

#!/bin/bash
# Look for test files more broadly
find tests -name "*kinesis*" -type f 2>/dev/null || echo "No kinesis tests found"

Repository: crate/cratedb-toolkit

Length of output: 47


🏁 Script executed:

#!/bin/bash
# Check test files for wait_until_ready or error handling tests
rg "wait_until_ready" tests/io/kinesis/ -A 5 -B 2

Repository: crate/cratedb-toolkit

Length of output: 47


🏁 Script executed:

#!/bin/bash
# Check test_relay.py which likely tests the adapter
wc -l tests/io/kinesis/test_relay.py && head -100 tests/io/kinesis/test_relay.py

Repository: crate/cratedb-toolkit

Length of output: 2227


🏁 Script executed:

#!/bin/bash
# Search for any exception handling or error simulation in tests
rg "exception|Exception|error|Error" tests/io/kinesis/ -A 2 -B 1 | head -50

Repository: crate/cratedb-toolkit

Length of output: 1355


Broaden startup failure capture so waiters always get the real init error.

The exception catch at line 147 is too narrow. Consumer.wait_ready() can raise asyncio.CancelledError, StreamDoesNotExist, ConnectionError, botocore.exceptions.ClientError, and exceptions propagated from the internal fetch_task—not just the three currently caught types. Additionally, failures during async with entry (line 144) are not caught at all. When these uncaught exceptions occur, neither _ready_event nor _ready_exception is set, causing wait_until_ready() to time out and return False instead of surfacing the root cause.

Proposed fix
 async def _consume_forever(self, handler: t.Callable):
@@
-        async with self.consumer_factory() as consumer:
-            try:
-                await consumer.wait_ready(timeout=self.describe_timeout)
-            except (asyncio.TimeoutError, ValueError, RuntimeError) as ex:
-                self._ready_exception = ex
-                self._ready_event.set()
-                raise
-            self._ready_event.set()
+        try:
+            async with self.consumer_factory() as consumer:
+                try:
+                    await consumer.wait_ready(timeout=self.describe_timeout)
+                except BaseException as ex:
+                    self._ready_exception = ex
+                    self._ready_event.set()
+                    raise
+                self._ready_event.set()
+                while True:
+                    async for item in consumer:
+                        handler(item)
+                    if self.stopping:
+                        self.stopping = False
+                        break
+        except BaseException as ex:
+            if not self._ready_event.is_set():
+                self._ready_exception = ex
+                self._ready_event.set()
+            raise
-            while True:
-                async for item in consumer:
-                    handler(item)
-                if self.stopping:
-                    self.stopping = False
-                    break
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cratedb_toolkit/io/kinesis/adapter.py` around lines 142 - 151, Wrap the
entire async with self.consumer_factory() block in a try/except Exception to
ensure any error raised during __aenter__ or inside (including
asyncio.CancelledError, ConnectionError, botocore ClientError,
StreamDoesNotExist, or internal fetch_task errors) sets self._ready_exception
and self._ready_event before re-raising; replace the narrow except
(asyncio.TimeoutError, ValueError, RuntimeError) with a broader except Exception
as ex (or separate handling for TimeoutError if you want different behavior),
assign self._ready_exception = ex, call self._ready_event.set(), and then
re-raise so wait_until_ready() observers see the real init error (referencing
consumer_factory, consumer.wait_ready, _ready_exception, _ready_event, and
wait_until_ready).

@amotl amotl force-pushed the kinesis/use-consumer-wait-ready branch from f827935 to 24a3df1 Compare March 17, 2026 19:49
Copy link
Member

@amotl amotl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@amotl amotl merged commit 792af48 into crate:main Mar 17, 2026
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants