Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
289f5c5
feat: rename result files
Nightknight3000 Oct 14, 2025
32e6eca
feat: add progress tracking
Nightknight3000 Oct 14, 2025
1b62e69
fix: append UUID to result file names for uniqueness
antidodo Oct 17, 2025
5ee1639
fix: append UUID to result file names for uniqueness
antidodo Oct 17, 2025
b6a2f38
fix: correct string formatting in result file naming
antidodo Oct 17, 2025
6c8b37e
fix: call get_progress method to retrieve current progress status
antidodo Oct 17, 2025
0d35aff
feat: set runstatus to finished and push final log
Nightknight3000 Oct 21, 2025
9c32142
fix: tiny logic change for progress
Nightknight3000 Oct 22, 2025
725ab40
feat: move progress to log streaming and remove from status health
Nightknight3000 Oct 22, 2025
71ada9d
fix: temporarely comment out progress stream
Nightknight3000 Oct 22, 2025
8d8936b
Merge branch 'main' into canary
Nightknight3000 Oct 22, 2025
2a9b73f
fix: missing progress on log stream
Nightknight3000 Oct 30, 2025
e493788
fix: add control print to error in log stream
Nightknight3000 Oct 30, 2025
ff820c8
fix: add control print to error in log stream
Nightknight3000 Oct 31, 2025
cdd9bc8
refactor: remove log tails
Nightknight3000 Nov 6, 2025
f0d1b3c
refactor: remove control prints
Nightknight3000 Nov 6, 2025
9e2a264
refactor: enhance send/receive msg logs
Nightknight3000 Nov 6, 2025
9b2baf3
fix: delete message_by_id
Nightknight3000 Nov 6, 2025
6bb7cd2
fix: log error in msgbroker
Nightknight3000 Nov 6, 2025
0477e5c
refactor: remove control print
Nightknight3000 Nov 6, 2025
399215c
refactor: linebreaks for readability
Nightknight3000 Nov 19, 2025
4c47d62
feat: small revision of data retrieval logic
Nightknight3000 Jan 19, 2026
4b6f2cd
fix: add debug prints for finished check status and main thread activity
antidodo Jan 21, 2026
e2f5fa8
Merge remote-tracking branch 'origin/canary' into canary
antidodo Jan 21, 2026
c245392
refactor: remove todos
Nightknight3000 Jan 27, 2026
4f7e170
Merge branch 'main' into canary
Nightknight3000 Feb 5, 2026
08d990d
fix: improve logging and refactor key sequence handling in FHIR to CS…
antidodo Feb 6, 2026
8de9e1b
feat: move keys split out of the loop
antidodo Feb 9, 2026
51bb89e
refactor: remove debug print statements from finished check method
antidodo Feb 9, 2026
ad156fa
Merge branch 'canary' into fhir_to_csv
antidodo Feb 9, 2026
db553c7
refactor: remove warning msg in fhir-to-csv
Nightknight3000 Feb 9, 2026
ec82896
Merge remote-tracking branch 'origin/fhir_to_csv' into fhir_to_csv
Nightknight3000 Feb 9, 2026
aae3672
refactor: remove warning msg in fhir-to-csv
Nightknight3000 Feb 9, 2026
d8b574a
feat: enhance logging behaviour in fhir-to-ccsv
Nightknight3000 Feb 9, 2026
6cd922d
feat: reduce dict_to_csv complexity
Nightknight3000 Feb 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flamesdk/resources/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uvicorn
from typing import Any, Callable, Union, Optional


from fastapi import FastAPI, APIRouter, Request, Depends
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
Expand Down Expand Up @@ -107,7 +108,6 @@ def get_message(msg: dict = Depends(get_body)) -> None:
def _finished(self, clients: list[Any]) -> str:
init_failed = None in clients
main_alive = threading.main_thread().is_alive()

if init_failed:
return "stuck"
elif (not main_alive) and (not self.finished_check()):
Expand Down
98 changes: 44 additions & 54 deletions flamesdk/resources/utils/fhir.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,25 @@ def fhir_to_csv(fhir_data: dict[str, Any],
f"(given={row_key_seq})")

df_dict = {}
col_keys = col_key_seq.split('.')
value_keys = value_key_seq.split('.')
row_keys = row_key_seq.split('.') if row_key_seq else None
flame_logger.new_log(f"Converting fhir data resource of type={input_resource} to csv")
total_count = int(fhir_data['total'])
count_mod = 10 ** (len(str(total_count)) - 2)
count_mod = count_mod if count_mod > 1 else 1
current_count = 0
while True:
# extract from resource
if input_resource == 'Observation':
for i, entry in enumerate(fhir_data['entry']):
flame_logger.new_log(f"Parsing fhir data entry no={i + 1} of {len(fhir_data['entry'])}")
col_id = _search_fhir_resource(fhir_entry=entry, flame_logger=flame_logger, key_sequence=col_key_seq)
row_id = _search_fhir_resource(fhir_entry=entry, flame_logger=flame_logger, key_sequence=row_key_seq)
value = _search_fhir_resource(fhir_entry=entry, flame_logger=flame_logger, key_sequence=value_key_seq)
for i, entry in enumerate(fhir_data['entry']):
current_count += 1
if (current_count == 1) or not (current_count % count_mod):
flame_logger.new_log(f"Parsing fhir data entry no={current_count} of {total_count}")

# extract from resource
if input_resource == 'Observation':
col_id = _search_fhir_resource(fhir_entry=entry, flame_logger=flame_logger, keys=col_keys)
row_id = _search_fhir_resource(fhir_entry=entry, flame_logger=flame_logger, keys=row_keys)
value = _search_fhir_resource(fhir_entry=entry, flame_logger=flame_logger, keys=value_keys)
if row_id_filters is not None:
if (row_id is None) or (not any([row_id_filter in row_id for row_id_filter in row_id_filters])):
continue
Expand All @@ -47,29 +57,28 @@ def fhir_to_csv(fhir_data: dict[str, Any],
if row_id not in df_dict[col_id].keys():
df_dict[col_id][row_id] = ''
df_dict[col_id][row_id] = value
elif input_resource == 'QuestionnaireResponse':
for i, entry in enumerate(fhir_data['entry']):
flame_logger.new_log(f"Parsing fhir data entry no={i + 1} of {len(fhir_data['entry'])}")

elif input_resource == 'QuestionnaireResponse':
for item in entry['resource']['item']:
col_id = _search_fhir_resource(fhir_entry=item,
flame_logger=flame_logger,
key_sequence=col_key_seq,
keys=col_keys,
current=2)
value = _search_fhir_resource(fhir_entry=item,
flame_logger=flame_logger,
key_sequence=value_key_seq,
keys=value_keys,
current=2)
if col_id_filters is not None:
if (col_id is None) or (not any([col_id_filter in col_id for col_id_filter in col_id_filters])):
continue
if col_id not in df_dict.keys():
df_dict[col_id] = {}
df_dict[col_id][str(i)] = value
Comment on lines +61 to 76
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Row IDs for QuestionnaireResponse can collide across pages.
Using the per-page i index resets on each paged batch, so later pages can overwrite earlier rows for the same column. Use a stable row key (e.g., current_count or resource id).

🛠️ Proposed fix
         for i, entry in enumerate(fhir_data['entry']):
             current_count += 1
@@
-            elif input_resource == 'QuestionnaireResponse':
-                for item in entry['resource']['item']:
+            elif input_resource == 'QuestionnaireResponse':
+                row_key = str(entry.get("resource", {}).get("id", current_count))
+                for item in entry['resource']['item']:
@@
-                    df_dict[col_id][str(i)] = value
+                    df_dict[col_id][row_key] = value
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
elif input_resource == 'QuestionnaireResponse':
for item in entry['resource']['item']:
col_id = _search_fhir_resource(fhir_entry=item,
flame_logger=flame_logger,
key_sequence=col_key_seq,
keys=col_keys,
current=2)
value = _search_fhir_resource(fhir_entry=item,
flame_logger=flame_logger,
key_sequence=value_key_seq,
keys=value_keys,
current=2)
if col_id_filters is not None:
if (col_id is None) or (not any([col_id_filter in col_id for col_id_filter in col_id_filters])):
continue
if col_id not in df_dict.keys():
df_dict[col_id] = {}
df_dict[col_id][str(i)] = value
elif input_resource == 'QuestionnaireResponse':
row_key = str(entry.get("resource", {}).get("id", current_count))
for item in entry['resource']['item']:
col_id = _search_fhir_resource(fhir_entry=item,
flame_logger=flame_logger,
keys=col_keys,
current=2)
value = _search_fhir_resource(fhir_entry=item,
flame_logger=flame_logger,
keys=value_keys,
current=2)
if col_id_filters is not None:
if (col_id is None) or (not any([col_id_filter in col_id for col_id_filter in col_id_filters])):
continue
if col_id not in df_dict.keys():
df_dict[col_id] = {}
df_dict[col_id][row_key] = value
🤖 Prompt for AI Agents
In `@flamesdk/resources/utils/fhir.py` around lines 61 - 76, The loop handling
input_resource == 'QuestionnaireResponse' uses the per-page index i as the row
key which can collide across paged batches; update the logic in that branch
(where col_id and value are populated via _search_fhir_resource and df_dict is
updated) to use a stable row identifier instead of str(i) — for example derive
row_key from the QuestionnaireResponse resource id (entry['resource']['id']) or
a running current_count variable that is incremented across pages; ensure you
check for None id and fall back to a globally incremented counter, then use
df_dict[col_id][row_key] = value instead of df_dict[col_id][str(i)] to avoid
overwrites across pages.

else:
try:
raise IOError(f"Unknown resource specified (given={input_resource}, known={_KNOWN_RESOURCES})")
except IOError as e:
flame_logger.raise_error(f"Error while parsing fhir data: {repr(e)}")
else:
try:
raise IOError(f"Unknown resource specified (given={input_resource}, known={_KNOWN_RESOURCES})")
except IOError as e:
flame_logger.raise_error(f"Error while parsing fhir data: {repr(e)}")

# get next data
if (data_client is None) or (isinstance(data_client, bool)):
Expand Down Expand Up @@ -100,58 +109,39 @@ def _dict_to_csv(data: dict[Any, dict[Any, Any]],
row_col_name: str,
separator: str,
flame_logger: FlameLogger) -> StringIO:
io = StringIO()
headers = [f"{row_col_name}"]
headers.extend(list(data.keys()))
headers = [f"{header}" for header in headers]
file_content = separator.join(headers)

flame_logger.new_log("Writing fhir data dict to csv...")
visited_rows = []
for i, rows in enumerate(data.values()):
flame_logger.new_log(f"Writing row {i + 1} of {len(data.values())}")
for row_id in rows.keys():
if row_id in visited_rows:
continue
line_list = [row_id]
visited_rows.append(row_id)
for col_id in data.keys():
try:
line_list.append(data[col_id][row_id])
except KeyError:
line_list.append('')
line_list = [f"{e}" for e in line_list]
file_content += '\n' + separator.join(line_list)
columns = list(data.keys())
row_ids = dict.fromkeys(row_id for col in data.values() for row_id in col)
lines = [separator.join([row_col_name] + [str(c) for c in columns])]
for row_id in row_ids:
line = [str(row_id)]
for col in columns:
line.append(str(data[col].get(row_id, '')))
lines.append(separator.join(line))

io.write(file_content)
io = StringIO()
io.write('\n'.join(lines))
io.seek(0)
flame_logger.new_log("Fhir data converted to csv")
return io


def _search_fhir_resource(fhir_entry: Union[dict[str, Any], list[Any]],
flame_logger: FlameLogger,
key_sequence: str,
keys: list[str],
current: int = 0) -> Optional[Any]:
keys = key_sequence.split('.')
key = keys[current]
if (current < (len(keys) - 1)) or (type(fhir_entry) == list):
if type(fhir_entry) == dict:
for field in fhir_entry.keys():
try:
if field == key:
fhir_entry = fhir_entry[key]
next_value = _search_fhir_resource(fhir_entry, flame_logger, key_sequence, current + 1)
if next_value is not None:
return next_value
except KeyError:
flame_logger.new_log(f"Unable to find field '{key}' in fhir data at level={current + 1} "
f"(keys found: {fhir_entry.keys()})",
log_type='warning')
return None
if key in fhir_entry.keys():
next_value = _search_fhir_resource(fhir_entry[key], flame_logger, keys, current + 1)
if next_value is not None:
return next_value
else:
return None
elif type(fhir_entry) == list:
for e in fhir_entry:
next_value = _search_fhir_resource(e, flame_logger, key_sequence, current)
next_value = _search_fhir_resource(e, flame_logger, keys, current)
if next_value is not None:
return next_value
else:
Expand Down