Skip to content

Release v0.6.0: result handling, messaging resilience, and status sync#22

Merged
Nightknight3000 merged 58 commits into
mainfrom
canary
May 20, 2026
Merged

Release v0.6.0: result handling, messaging resilience, and status sync#22
Nightknight3000 merged 58 commits into
mainfrom
canary

Conversation

@Nightknight3000
Copy link
Copy Markdown
Member

@Nightknight3000 Nightknight3000 commented May 20, 2026

Summary by CodeRabbit

Release Notes v0.6.0

  • New Features

    • Added stream log level filtering to control output verbosity.
    • Enabled filename specification when submitting final results.
    • Improved status synchronization control for better workflow management.
  • Bug Fixes

    • Enhanced error handling for network connectivity and timeout scenarios across all client APIs.
    • Implemented automatic retry logic for message broker operations to improve reliability.
    • Fixed terminal status preservation to prevent unintended status overwrites.
  • Improvements

    • Refined intermediate data retrieval with simplified query parameters.
    • Better logging controls and debug information for troubleshooting.

Review Change Stack

Nightknight3000 and others added 30 commits October 14, 2025 11:40
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: antidodo <albin2993@gmail.com>
# Conflicts:
#	flamesdk/flame_core.py
#	flamesdk/resources/client_apis/clients/data_api_client.py
#	flamesdk/resources/client_apis/clients/po_client.py
#	flamesdk/resources/client_apis/clients/storage_client.py
#	flamesdk/resources/client_apis/storage_api.py
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: antidodo <albin2993@gmail.com>
Nightknight3000 and others added 24 commits April 30, 2026 15:30
# Conflicts:
#	flamesdk/flame_core.py
#	flamesdk/resources/rest_api.py
#	flamesdk/resources/utils/logging.py
#	pyproject.toml
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
…sending messages

Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: Nightknight3000 <alexander.roehl@uni-tuebingen.de>
Co-authored-by: antidodo <albin2993@gmail.com>
Co-authored-by: antidodo <albin2993@gmail.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 20, 2026

Warning

Rate limit exceeded

@Nightknight3000 has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 33 minutes and 41 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b7cd399c-1114-4e61-b6ee-22513348156e

📥 Commits

Reviewing files that changed from the base of the PR and between e608813 and 5ba4593.

📒 Files selected for processing (3)
  • flamesdk/resources/client_apis/clients/data_api_client.py
  • flamesdk/resources/client_apis/clients/storage_client.py
  • flamesdk/resources/rest_api.py
📝 Walkthrough

Walkthrough

This PR updates the SDK's core parameter signatures, implements log-level filtering infrastructure, refactors the intermediate-data query API, adds filename support to result submission, standardizes HTTP error handling across all clients, and improves message broker resilience with retries and deduplication.

Changes

SDK Core Refactoring and Infrastructure

Layer / File(s) Summary
Log level infrastructure and runstatus guards
flamesdk/resources/utils/constants.py, flamesdk/resources/utils/logging.py
LogTypeLiteral enum members now pair log names with numeric severity levels via a custom __new__ method; FlameLogger.set_runstatus and raise_error now guard against overwriting terminal runstatus states; root logger level set to DEBUG instead of INFO.
Core SDK parameter refactoring
flamesdk/flame_core.py, flamesdk/resources/client_apis/po_api.py, flamesdk/resources/rest_api.py
FlameCoreSDK, POAPI, and FlameAPI constructors replace suggestible with stream_log_level and status_sync parameters; POAPI.stream_logs gates log forwarding by log level; FlameAPI introduces status_sync and _SYNC_TIMER_IN_SECONDS for delayed status synchronization; wiring threads parameters through initialization stack.
Intermediate data query API refactoring
flamesdk/flame_core.py, flamesdk/resources/client_apis/storage_api.py, flamesdk/resources/client_apis/clients/storage_client.py
get_intermediate_data signature changes from (id, sender_node_id) to (query) across all layers; await_intermediate_data derives per-node query from result_id body; StorageClient.get_intermediate_data retrieves via f"/{type}/{query}".
File submission with filename support
flamesdk/flame_core.py, flamesdk/resources/client_apis/storage_api.py, flamesdk/resources/client_apis/clients/storage_client.py
submit_final_result gains optional filename parameter; StorageClient.push_result accepts filename and uses EXT_TO_OUTPUT_TYPE mapping to resolve extensions; StorageAPI adds _check_multi_result_validity and _warn_filename_extension helpers for validation and extension warnings.
HTTP error handling expansion
flamesdk/resources/client_apis/clients/data_api_client.py, flamesdk/resources/client_apis/clients/message_broker_client.py, flamesdk/resources/client_apis/clients/po_client.py, flamesdk/resources/client_apis/clients/storage_client.py
All HTTP clients expand exception handling from generic HTTPError to specific HTTPStatusError, ConnectError, and TimeoutException; FHIR/S3/broker/storage methods now consistently wrap HTTP calls in try/except blocks catching all three exception types.
Message broker resilience improvements
flamesdk/resources/client_apis/clients/message_broker_client.py, flamesdk/resources/client_apis/message_broker_api.py
send_message implements up to 10 retry attempts with per-attempt warning logs; receive_message conditionally adds sender IDs to deduplication only when sender is not local node; await_message_acknowledgement removes matched messages during wait phase; outgoing meta.id uses shortened node_id[:4] prefix; added debug logging for send attempts and elapsed time tracking.
API status and lifecycle management
flamesdk/resources/rest_api.py
/partner_status gates status updates until _SYNC_TIMER_IN_SECONDS elapses; /healthz computes and logs response before returning; webhook handler removes conditional sender-mismatch logging; _finished adds debug logging and preserves STOPPED runstatus as terminal.
Supporting utilities and version
flamesdk/resources/utils/fhir.py, pyproject.toml
FHIR pagination logging uses LogTypeLiteral.DEBUG; CSV generation logging includes halt_submission=True; success message changed to "success"; project version bumped to 0.6.0.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • PrivateAIM/python-sdk#14: Both PRs modify FlameCoreSDK.submit_final_result(...) and _node_finished lifecycle handling, including parameter threading and runstatus management logic.
  • PrivateAIM/python-sdk#17: Both PRs update flamesdk/resources/utils/fhir.py logging (pagination debug logs and CSV generation messages).

Poem

🐰 A hop through refactored params,
Status sync delays that tap,
Queries flow where IDs once sat,
Filenames now wear logging's hat!
Retries keep the brokers spry,
Terminal states stand resolute and dry. 🚀

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.27% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The PR title accurately summarizes the three main change themes evident in the changeset: result handling enhancements (filename parameter), messaging resilience improvements (retry logic and error handling), and status sync implementation (replacing suggestible parameter).

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch canary

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
flamesdk/resources/client_apis/clients/data_api_client.py (1)

63-68: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Move the S3 request into the try block.

Lines 63-65 execute asyncio.run(self.client.get(...)) outside the try block. When this request raises ConnectError or TimeoutException, the exception handler at lines 68-70 cannot catch it. This inconsistency leaves transient S3 failures unhandled, unlike the FHIR path (lines 47-52) and the helper method _get_s3_dataset_names (lines 76-78), which both wrap the request in the try block.

Proposed fix
                    if (len(s3_keys) == 0) or (res_name in s3_keys):
-                        response = asyncio.run(self.client.get(f"{source['name']}/s3/{res_name}",
-                                                               headers=[('Connection', 'close')],
-                                                               timeout=Timeout(5, write=None, read=None)))
                         try:
+                            response = asyncio.run(self.client.get(f"{source['name']}/s3/{res_name}",
+                                                                   headers=[('Connection', 'close')],
+                                                                   timeout=Timeout(5, write=None, read=None)))
                             response.raise_for_status()
                         except (HTTPStatusError, ConnectError, TimeoutException) as e:
                             self.flame_logger.raise_error(f"Failed to retrieve s3 data for key {res_name} "
                                                           f"from source {source['name']}: {repr(e)}")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@flamesdk/resources/client_apis/clients/data_api_client.py` around lines 63 -
68, The S3 GET call is made outside the exception handler so
ConnectError/TimeoutException escape the except; move the
asyncio.run(self.client.get(...)) invocation into the same try block that calls
response.raise_for_status() (the block handling HTTPStatusError, ConnectError,
TimeoutException) so transient S3 failures are caught consistently; mirror the
pattern used for the FHIR path and _get_s3_dataset_names by wrapping the request
and subsequent response.raise_for_status() together inside the try surrounding
those exception types (update the block around the S3 request in
data_api_client.py where response is assigned).
flamesdk/resources/client_apis/clients/storage_client.py (1)

159-198: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Switch the required-parameter guards to query.

This method was refactored to accept query, but Lines 175-178 still test id, which is the Python built-in here and therefore never None. Calls like get_intermediate_data(type="global") now fall through to /_/None instead of failing fast.

Suggested fix
-        if (type == "global") and (id is None):
+        if (type == "global") and (query is None):
             self.flame_logger.raise_error("Global intermediate data retrieval requires storage id specification")
-        if (type == "local") and (id is None) and (tag is None):
+        if (type == "local") and (query is None) and (tag is None):
             self.flame_logger.raise_error("For local data a tag or storage id has to be specified")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@flamesdk/resources/client_apis/clients/storage_client.py` around lines 159 -
198, In get_intermediate_data, the guards incorrectly check the built-in id
instead of the refactored query parameter; replace the id checks so that when
type == "global" you raise via flame_logger.raise_error if query is None, and
when type == "local" you raise if both query is None and tag is None; also
ensure the final non-tag branch fails fast if query is None before calling
_get_file(f"/{type}/{query}"). Use the function name get_intermediate_data,
parameters query and tag, and flame_logger.raise_error/new_log to locate and
update the checks.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@flamesdk/flame_core.py`:
- Around line 664-668: The code currently treats any non-EXECUTED status as
success, causing STOPPED/FAILED runs to get progress=100 and a "Node finished
successfully" log; update the guard around the success path in flame_core.py to
only run for non-terminal successful completion by checking that
self._flame_logger.runstatus is not in the terminal set (use
AnalysisStatus.EXECUTED.value, AnalysisStatus.STOPPED.value,
AnalysisStatus.FAILED.value) before calling set_progress,
_flame_logger.set_runstatus, flame_log, and config.finish_analysis so
aborted/failed analyses are not marked as successful.

In `@flamesdk/resources/client_apis/clients/message_broker_client.py`:
- Around line 201-207: The code only adds message IDs to
list_of_known_message_ids for non-local senders, so messages originated locally
can be redelivered and appended multiple times; update the handling in the
message receipt path (use the symbols message.body['meta']['sender'],
self.nodeConfig.node_id, self.list_of_known_message_ids,
self.list_of_incoming_messages, and self.flame_logger.new_log) so that you
always add message.body["meta"]["id"] to self.list_of_known_message_ids before
appending to self.list_of_incoming_messages; keep the existing conditional
logging for non-local messages but move the deduplication
(self.list_of_known_message_ids.add(...)) outside or into an else branch that
still marks local messages as known.

In `@flamesdk/resources/client_apis/clients/storage_client.py`:
- Around line 133-144: The generated filename uses
EXT_TO_OUTPUT_TYPE[output_type] even when local_dp is enabled and file_body is
actually UTF-8 text; change the logic so the chosen extension reflects the
actual wire format: compute an effective_output_type (e.g., set
effective_output_type = output_type normally, but if local_dp is true and
file_body is UTF-8-decodable text then set effective_output_type to the
textual/wire format you expect such as "json" or "text"), then use
EXT_TO_OUTPUT_TYPE[effective_output_type][0] when building resolved_name; update
the code paths around filename/resolved_name and references to
output_type/file_body in push_result (and the local_dp branch) so the uploaded
file extension matches the real wire format.

In `@flamesdk/resources/client_apis/message_broker_api.py`:
- Around line 91-97: The timeout check in the loop uses time_passed.seconds
which discards fractional seconds; update the condition in message_broker_api.py
(the loop computing time_passed = datetime.now() - start_time and using
acknowledged/receivers) to use time_passed.total_seconds() and compare with >=
timeout (and keep the existing break when counts match). Ensure the logged time
string (the flame_logger.new_log call that reports time_passed.microseconds) is
left as-is or adjusted separately, but the timeout enforcement must use
total_seconds() >= timeout for precise behavior.

In `@flamesdk/resources/client_apis/storage_api.py`:
- Around line 35-40: Do not overwrite the caller-provided multiple_results flag;
compute a separate derived flag for execution. Keep the original
multiple_results variable intact, compute result_len = len(result) if
multiple_results and isinstance(result, (list, tuple)) else 1, then set a new
local variable (e.g., actual_multiple = result_len != 1) and pass
actual_multiple as the multiple_result argument to
self._check_multi_result_validity(...) while leaving output_type and filename
handling unchanged so warnings for caller-requested multiple_results still fire.

In `@flamesdk/resources/rest_api.py`:
- Around line 59-63: The constructor leaves self.status_sync as None which
causes membership checks in apply_partner_status_to_self to raise; in the class
__init__ initialize/normalize self.status_sync to an empty set or list (e.g.,
set()) when the provided status_sync is None, and if you expect
iterable/deduplicated membership use a set; update any code that assumes a
specific type so apply_partner_status_to_self, status_sync, and any callers
consistently use that collection type.

In `@flamesdk/resources/utils/logging.py`:
- Around line 147-153: The raise_error method currently blocks by sleeping 1000
seconds by default; change its signature and behavior so it does not sleep by
default (e.g., set seconds=0 or remove the sleep entirely) and only sleep when
an explicit non-zero seconds is passed; update raise_error (and any callers like
FlameCoreSDK.flame_log) to rely on immediate logging via set_runstatus and
new_log without a long blocking time.sleep call. Ensure raise_error,
set_runstatus, and new_log remain called in the same order and preserve error
logging semantics, but eliminate the 1000-second default delay.

---

Outside diff comments:
In `@flamesdk/resources/client_apis/clients/data_api_client.py`:
- Around line 63-68: The S3 GET call is made outside the exception handler so
ConnectError/TimeoutException escape the except; move the
asyncio.run(self.client.get(...)) invocation into the same try block that calls
response.raise_for_status() (the block handling HTTPStatusError, ConnectError,
TimeoutException) so transient S3 failures are caught consistently; mirror the
pattern used for the FHIR path and _get_s3_dataset_names by wrapping the request
and subsequent response.raise_for_status() together inside the try surrounding
those exception types (update the block around the S3 request in
data_api_client.py where response is assigned).

In `@flamesdk/resources/client_apis/clients/storage_client.py`:
- Around line 159-198: In get_intermediate_data, the guards incorrectly check
the built-in id instead of the refactored query parameter; replace the id checks
so that when type == "global" you raise via flame_logger.raise_error if query is
None, and when type == "local" you raise if both query is None and tag is None;
also ensure the final non-tag branch fails fast if query is None before calling
_get_file(f"/{type}/{query}"). Use the function name get_intermediate_data,
parameters query and tag, and flame_logger.raise_error/new_log to locate and
update the checks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 619cf84a-c531-4399-ab2a-5961f398a781

📥 Commits

Reviewing files that changed from the base of the PR and between 7935d96 and e608813.

⛔ Files ignored due to path filters (1)
  • poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (13)
  • flamesdk/flame_core.py
  • flamesdk/resources/client_apis/clients/data_api_client.py
  • flamesdk/resources/client_apis/clients/message_broker_client.py
  • flamesdk/resources/client_apis/clients/po_client.py
  • flamesdk/resources/client_apis/clients/storage_client.py
  • flamesdk/resources/client_apis/message_broker_api.py
  • flamesdk/resources/client_apis/po_api.py
  • flamesdk/resources/client_apis/storage_api.py
  • flamesdk/resources/rest_api.py
  • flamesdk/resources/utils/constants.py
  • flamesdk/resources/utils/fhir.py
  • flamesdk/resources/utils/logging.py
  • pyproject.toml

Comment thread flamesdk/flame_core.py
Comment on lines +664 to +668
if self._flame_logger.runstatus != AnalysisStatus.EXECUTED.value:
self.set_progress(100)
self._flame_logger.set_runstatus(AnalysisStatus.EXECUTED.value)
self.flame_log("Node finished successfully")
self.config.finish_analysis()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don’t run the success path for STOPPED/FAILED runs.

With the new terminal-state guard in FlameLogger.set_runstatus(), this block no longer changes the status, but it still sets progress to 100 and logs "Node finished successfully" for aborted runs. That makes stopped/failed analyses look successful.

💡 Suggested fix
-        if self._flame_logger.runstatus != AnalysisStatus.EXECUTED.value:
+        if self._flame_logger.runstatus not in {
+            AnalysisStatus.EXECUTED.value,
+            AnalysisStatus.STOPPED.value,
+            AnalysisStatus.FAILED.value,
+        }:
             self.set_progress(100)
             self._flame_logger.set_runstatus(AnalysisStatus.EXECUTED.value)
             self.flame_log("Node finished successfully")
-            self.config.finish_analysis()
+        if not self.config.finished:
+            self.config.finish_analysis()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@flamesdk/flame_core.py` around lines 664 - 668, The code currently treats any
non-EXECUTED status as success, causing STOPPED/FAILED runs to get progress=100
and a "Node finished successfully" log; update the guard around the success path
in flame_core.py to only run for non-terminal successful completion by checking
that self._flame_logger.runstatus is not in the terminal set (use
AnalysisStatus.EXECUTED.value, AnalysisStatus.STOPPED.value,
AnalysisStatus.FAILED.value) before calling set_progress,
_flame_logger.set_runstatus, flame_log, and config.finish_analysis so
aborted/failed analyses are not marked as successful.

Comment on lines +201 to 207
if message.body['meta']['sender'] != self.nodeConfig.node_id:
self.flame_logger.new_log(f"received message from {message.body['meta']['sender']}",
log_type=LogTypeLiteral.INFO.value)
self.flame_logger.new_log(f"message body: {message.body}",
log_type=LogTypeLiteral.DEBUG.value)
self.list_of_known_message_ids.add(message.body["meta"]["id"])
self.list_of_incoming_messages.append(message)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Local-sender deliveries are no longer deduplicated.

Lines 201-206 only mark non-local messages as known. A redelivered acknowledgement will keep is_new_message == True, so the same message can be appended multiple times and later treated as fresh traffic.

Proposed fix
         if is_new_message:
             if message.body['meta']['sender'] != self.nodeConfig.node_id:
                 self.flame_logger.new_log(f"received message from {message.body['meta']['sender']}",
                                           log_type=LogTypeLiteral.INFO.value)
                 self.flame_logger.new_log(f"message body: {message.body}",
                                           log_type=LogTypeLiteral.DEBUG.value)
-                self.list_of_known_message_ids.add(message.body["meta"]["id"])
+            self.list_of_known_message_ids.add(message.body["meta"]["id"])
             self.list_of_incoming_messages.append(message)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@flamesdk/resources/client_apis/clients/message_broker_client.py` around lines
201 - 207, The code only adds message IDs to list_of_known_message_ids for
non-local senders, so messages originated locally can be redelivered and
appended multiple times; update the handling in the message receipt path (use
the symbols message.body['meta']['sender'], self.nodeConfig.node_id,
self.list_of_known_message_ids, self.list_of_incoming_messages, and
self.flame_logger.new_log) so that you always add message.body["meta"]["id"] to
self.list_of_known_message_ids before appending to
self.list_of_incoming_messages; keep the existing conditional logging for
non-local messages but move the deduplication
(self.list_of_known_message_ids.add(...)) outside or into an else branch that
still marks local messages as known.

Comment thread flamesdk/resources/client_apis/clients/storage_client.py
Comment on lines +91 to 97
time_passed = datetime.now() - start_time
self.message_broker_client.flame_logger.new_log(
f"{len(acknowledged)} acknowledged (time={time_passed.microseconds}{chr(956)}s)",
log_type=LogTypeLiteral.DEBUG.value
)
if (len(acknowledged) == len(receivers)) or ((timeout is not None) and (time_passed.seconds > timeout)):
break
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify current timeout comparison usage in the changed file.
rg -nP 'time_passed\.seconds\s*>\s*timeout|time_passed\.microseconds' flamesdk/resources/client_apis/message_broker_api.py

# Verify Python timedelta behavior relevant to this condition.
python - <<'PY'
from datetime import timedelta
d = timedelta(seconds=10, microseconds=500000)
print("timedelta.seconds =", d.seconds)             # integer seconds component only
print("timedelta.total_seconds() =", d.total_seconds())  # precise elapsed seconds
PY

Repository: PrivateAIM/python-sdk

Length of output: 342


Use total_seconds() for timeout enforcement

Line 96 uses time_passed.seconds > timeout, which returns only the integer seconds component and can delay timeout handling by truncating fractional seconds. Use total_seconds() with >= for precise timeout behavior.

Proposed fix
-            time_passed = datetime.now() - start_time
+            time_passed = datetime.now() - start_time
+            elapsed_seconds = time_passed.total_seconds()
             self.message_broker_client.flame_logger.new_log(
-                f"{len(acknowledged)} acknowledged (time={time_passed.microseconds}{chr(956)}s)",
+                f"{len(acknowledged)} acknowledged (time={elapsed_seconds:.6f}s)",
                 log_type=LogTypeLiteral.DEBUG.value
             )
-            if (len(acknowledged) == len(receivers)) or ((timeout is not None) and (time_passed.seconds > timeout)):
+            if (len(acknowledged) == len(receivers)) or ((timeout is not None) and (elapsed_seconds >= timeout)):
                 break
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
time_passed = datetime.now() - start_time
self.message_broker_client.flame_logger.new_log(
f"{len(acknowledged)} acknowledged (time={time_passed.microseconds}{chr(956)}s)",
log_type=LogTypeLiteral.DEBUG.value
)
if (len(acknowledged) == len(receivers)) or ((timeout is not None) and (time_passed.seconds > timeout)):
break
time_passed = datetime.now() - start_time
elapsed_seconds = time_passed.total_seconds()
self.message_broker_client.flame_logger.new_log(
f"{len(acknowledged)} acknowledged (time={elapsed_seconds:.6f}s)",
log_type=LogTypeLiteral.DEBUG.value
)
if (len(acknowledged) == len(receivers)) or ((timeout is not None) and (elapsed_seconds >= timeout)):
break
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@flamesdk/resources/client_apis/message_broker_api.py` around lines 91 - 97,
The timeout check in the loop uses time_passed.seconds which discards fractional
seconds; update the condition in message_broker_api.py (the loop computing
time_passed = datetime.now() - start_time and using acknowledged/receivers) to
use time_passed.total_seconds() and compare with >= timeout (and keep the
existing break when counts match). Ensure the logged time string (the
flame_logger.new_log call that reports time_passed.microseconds) is left as-is
or adjusted separately, but the timeout enforcement must use total_seconds() >=
timeout for precise behavior.

Comment on lines +35 to +40
result_len = len(result) if multiple_results and isinstance(result, (list, tuple)) else 1
multiple_results = result_len != 1
output_type, filename = self._check_multi_result_validity(multiple_result=multiple_results,
output_type=output_type,
filename=filename,
result_length=result_len)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Preserve the caller’s multiple_results intent.

Lines 35-36 overwrite multiple_results based on result_len, so submit_final_result([item], multiple_results=True) now falls into the single-result path and uploads the whole list as one artifact. It also prevents the warning on Lines 66-70 from ever firing for multiple_results=True with non-sequence inputs. Keep the requested flag separate from the derived execution path.

Suggested fix
-        result_len = len(result) if multiple_results and isinstance(result, (list, tuple)) else 1
-        multiple_results = result_len != 1
-        output_type, filename = self._check_multi_result_validity(multiple_result=multiple_results,
+        requested_multiple_results = multiple_results
+        effective_multiple_results = requested_multiple_results and isinstance(result, (list, tuple))
+        result_len = len(result) if effective_multiple_results else 1
+        output_type, filename = self._check_multi_result_validity(multiple_result=effective_multiple_results,
                                                                   output_type=output_type,
                                                                   filename=filename,
                                                                   result_length=result_len)
@@
-        if multiple_results and isinstance(result, (list, tuple)):
+        if effective_multiple_results:
@@
-            if multiple_results:
+            if requested_multiple_results and not isinstance(result, (list, tuple)):
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@flamesdk/resources/client_apis/storage_api.py` around lines 35 - 40, Do not
overwrite the caller-provided multiple_results flag; compute a separate derived
flag for execution. Keep the original multiple_results variable intact, compute
result_len = len(result) if multiple_results and isinstance(result, (list,
tuple)) else 1, then set a new local variable (e.g., actual_multiple =
result_len != 1) and pass actual_multiple as the multiple_result argument to
self._check_multi_result_validity(...) while leaving output_type and filename
handling unchanged so warnings for caller-requested multiple_results still fire.

Comment thread flamesdk/resources/rest_api.py
Comment on lines +147 to 153
def raise_error(self, message: str, seconds: int = 1000) -> None:
if self.runstatus not in [AnalysisStatus.EXECUTED.value,
AnalysisStatus.STOPPED.value,
AnalysisStatus.FAILED.value]:
self.set_runstatus(AnalysisStatus.FAILED.value)
self.new_log(message, log_type=LogTypeLiteral.ERROR.value)
time.sleep(seconds)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Remove the 1000-second default sleep from raise_error.

FlameCoreSDK.flame_log() sends every ERROR log through this method, and the REST endpoints call it before returning failures. With the new default, routine startup/API errors now block for ~16 minutes instead of surfacing immediately.

💡 Suggested fix
-    def raise_error(self, message: str, seconds: int = 1000) -> None:
+    def raise_error(self, message: str, seconds: int = 0) -> None:
         if self.runstatus not in [AnalysisStatus.EXECUTED.value,
                                   AnalysisStatus.STOPPED.value,
                                   AnalysisStatus.FAILED.value]:
             self.set_runstatus(AnalysisStatus.FAILED.value)
             self.new_log(message, log_type=LogTypeLiteral.ERROR.value)
-        time.sleep(seconds)
+        if seconds > 0:
+            time.sleep(seconds)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@flamesdk/resources/utils/logging.py` around lines 147 - 153, The raise_error
method currently blocks by sleeping 1000 seconds by default; change its
signature and behavior so it does not sleep by default (e.g., set seconds=0 or
remove the sleep entirely) and only sleep when an explicit non-zero seconds is
passed; update raise_error (and any callers like FlameCoreSDK.flame_log) to rely
on immediate logging via set_runstatus and new_log without a long blocking
time.sleep call. Ensure raise_error, set_runstatus, and new_log remain called in
the same order and preserve error logging semantics, but eliminate the
1000-second default delay.

@Nightknight3000 Nightknight3000 changed the title Canary feat: create release 0.6.0 May 20, 2026
@antidodo antidodo changed the title feat: create release 0.6.0 Release v0.6.0: result handling, messaging resilience, and status sync May 20, 2026
Co-authored-by: antidodo <albin2993@gmail.com>
@Nightknight3000 Nightknight3000 merged commit f058128 into main May 20, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants