Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion healthchain/gateway/fhir/aio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

from contextlib import asynccontextmanager
from typing import Any, Dict, Type
from typing import Any, Dict, Optional, Type

from fhir.resources.bundle import Bundle
from fhir.resources.capabilitystatement import CapabilityStatement
Expand Down Expand Up @@ -173,6 +173,8 @@ async def search(
source: str = None,
add_provenance: bool = False,
provenance_tag: str = None,
follow_pagination: bool = False,
max_pages: Optional[int] = None,
) -> Bundle:
"""
Search for FHIR resources.
Expand All @@ -183,6 +185,8 @@ async def search(
source: Source name to search in (uses first available if None)
add_provenance: If True, automatically add provenance metadata to resources
provenance_tag: Optional tag code for provenance (e.g., "aggregated", "transformed")
follow_pagination: If True, automatically fetch all pages
max_pages: Maximum number of pages to fetch (None for unlimited)

Returns:
Bundle containing search results
Expand Down Expand Up @@ -212,6 +216,33 @@ async def search(
client_kwargs={"params": params},
)

# Handle pagination if requested
if follow_pagination:
all_entries = bundle.entry or []
page_count = 1

while bundle.link:
next_link = next((link for link in bundle.link if link.relation == "next"), None)
if not next_link or (max_pages and page_count >= max_pages):
break

# Extract the relative URL from the next link
next_url = next_link.url.split("/")[-2:] # Get resource_type/_search part
next_params = dict(pair.split("=") for pair in next_link.url.split("?")[1].split("&"))

bundle = await self._execute_with_client(
"search",
source=source,
resource_type=resource_type,
client_args=(resource_type, next_params),
)

if bundle.entry:
all_entries.extend(bundle.entry)
page_count += 1

bundle.entry = all_entries

if add_provenance and bundle.entry:
source_name = source or next(iter(self.connection_manager.sources.keys()))
for entry in bundle.entry:
Expand Down
33 changes: 32 additions & 1 deletion healthchain/gateway/fhir/sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from typing import Any, Dict, Type
from typing import Any, Dict, Type, Optional

from fhir.resources.bundle import Bundle
from fhir.resources.capabilitystatement import CapabilityStatement
Expand Down Expand Up @@ -235,6 +235,8 @@ def search(
source: str = None,
add_provenance: bool = False,
provenance_tag: str = None,
follow_pagination: bool = False,
max_pages: Optional[int] = None,
) -> Bundle:
"""
Search for FHIR resources (sync version).
Expand All @@ -245,6 +247,8 @@ def search(
source: Source name to search in (uses first available if None)
add_provenance: If True, automatically add provenance metadata to resources
provenance_tag: Optional tag code for provenance (e.g., "aggregated", "transformed")
follow_pagination: If True, automatically fetch all pages
max_pages: Maximum number of pages to fetch (None for unlimited)

Returns:
Bundle containing search results
Expand All @@ -270,6 +274,33 @@ def search(
client_args=(resource_type, params),
)

# Handle pagination if requested
if follow_pagination:
all_entries = bundle.entry or []
page_count = 1

while bundle.link:
next_link = next((link for link in bundle.link if link.relation == "next"), None)
if not next_link or (max_pages and page_count >= max_pages):
break

# Extract the relative URL from the next link
next_url = next_link.url.split("/")[-2:] # Get resource_type/_search part
next_params = dict(pair.split("=") for pair in next_link.url.split("?")[1].split("&"))

bundle = self._execute_with_client(
"search",
source=source,
resource_type=resource_type,
client_args=(resource_type, next_params),
)

if bundle.entry:
all_entries.extend(bundle.entry)
page_count += 1

bundle.entry = all_entries

# Add provenance metadata if requested
if add_provenance and bundle.entry:
source_name = source or next(iter(self.connection_manager.sources.keys()))
Expand Down
121 changes: 121 additions & 0 deletions tests/gateway/test_fhir_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,124 @@ def test_search_with_empty_bundle():
provenance_tag="aggregated",
)
assert result.entry is None

def test_search_with_pagination(fhir_gateway):
"""Gateway.search fetches all pages when pagination is enabled."""
# Create mock bundles for pagination
page1 = Bundle(
type="searchset",
entry=[BundleEntry(resource=Patient(id="1"))],
link=[{"relation": "next", "url": "Patient?page=2"}]
)
page2 = Bundle(
type="searchset",
entry=[BundleEntry(resource=Patient(id="2"))],
link=[{"relation": "next", "url": "Patient?page=3"}]
)
page3 = Bundle(
type="searchset",
entry=[BundleEntry(resource=Patient(id="3"))]
)

with patch.object(
fhir_gateway, "_execute_with_client", side_effect=[page1, page2, page3]
) as mock_execute:
result = fhir_gateway.search(
Patient,
{"name": "Smith"},
follow_pagination=True
)

assert mock_execute.call_count == 3
assert result.entry is not None
assert len(result.entry) == 3
assert [entry.resource.id for entry in result.entry] == ["1", "2", "3"]


def test_search_with_max_pages(fhir_gateway):
"""Gateway.search respects maximum page limit."""
# Create mock bundles for pagination
page1 = Bundle(
type="searchset",
entry=[BundleEntry(resource=Patient(id="1"))],
link=[{"relation": "next", "url": "Patient?page=2"}]
)
page2 = Bundle(
type="searchset",
entry=[BundleEntry(resource=Patient(id="2"))],
link=[{"relation": "next", "url": "Patient?page=3"}]
)

with patch.object(
fhir_gateway, "_execute_with_client", side_effect=[page1, page2]
) as mock_execute:
result = fhir_gateway.search(
Patient,
{"name": "Smith"},
follow_pagination=True,
max_pages=2
)

assert mock_execute.call_count == 2
assert result.entry is not None
assert len(result.entry) == 2
assert [entry.resource.id for entry in result.entry] == ["1", "2"]


def test_search_with_pagination_empty_next_link(fhir_gateway):
"""Gateway.search handles missing next links correctly."""
# Create mock bundle without next link
bundle = Bundle(
type="searchset",
entry=[BundleEntry(resource=Patient(id="1"))],
link=[{"relation": "self", "url": "Patient?name=Smith"}]
)

with patch.object(
fhir_gateway, "_execute_with_client", return_value=bundle
) as mock_execute:
result = fhir_gateway.search(
Patient,
{"name": "Smith"},
follow_pagination=True
)

mock_execute.assert_called_once()
assert result.entry is not None
assert len(result.entry) == 1
assert result.entry[0].resource.id == "1"


def test_search_with_pagination_and_provenance(fhir_gateway):
"""Gateway.search combines pagination with provenance metadata."""
page1 = Bundle(
type="searchset",
entry=[BundleEntry(resource=Patient(id="1"))],
link=[{"relation": "next", "url": "Patient?page=2"}]
)
page2 = Bundle(
type="searchset",
entry=[BundleEntry(resource=Patient(id="2"))]
)

with patch.object(
fhir_gateway, "_execute_with_client", side_effect=[page1, page2]
) as mock_execute:
result = fhir_gateway.search(
Patient,
{"name": "Smith"},
source="test_source",
follow_pagination=True,
add_provenance=True,
provenance_tag="aggregated"
)

assert mock_execute.call_count == 2
assert result.entry is not None
assert len(result.entry) == 2

# Check provenance metadata
for entry in result.entry:
assert entry.resource.meta is not None
assert entry.resource.meta.source == "urn:healthchain:source:test_source"
assert entry.resource.meta.tag[0].code == "aggregated"
124 changes: 124 additions & 0 deletions tests/gateway/test_fhir_gateway_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,130 @@ async def test_search_operation_with_parameters(fhir_gateway):
assert result == mock_bundle



@pytest.mark.asyncio
async def test_search_with_pagination(fhir_gateway):
"""AsyncFHIRGateway.search fetches all pages when pagination is enabled."""
# Create mock bundles for pagination
page1 = Bundle(
type="searchset",
entry=[{"resource": Patient(id="1")}],
link=[{"relation": "next", "url": "Patient?page=2"}]
)
page2 = Bundle(
type="searchset",
entry=[{"resource": Patient(id="2")}],
link=[{"relation": "next", "url": "Patient?page=3"}]
)
page3 = Bundle(
type="searchset",
entry=[{"resource": Patient(id="3")}]
)

with patch.object(
fhir_gateway, "_execute_with_client", side_effect=[page1, page2, page3]
) as mock_execute:
result = await fhir_gateway.search(
Patient,
{"name": "Smith"},
follow_pagination=True
)

assert mock_execute.call_count == 3
assert result.entry is not None
assert len(result.entry) == 3
assert [entry.resource.id for entry in result.entry] == ["1", "2", "3"]


@pytest.mark.asyncio
async def test_search_with_max_pages(fhir_gateway):
"""AsyncFHIRGateway.search respects maximum page limit."""
page1 = Bundle(
type="searchset",
entry=[{"resource": Patient(id="1")}],
link=[{"relation": "next", "url": "Patient?page=2"}]
)
page2 = Bundle(
type="searchset",
entry=[{"resource": Patient(id="2")}],
link=[{"relation": "next", "url": "Patient?page=3"}]
)

with patch.object(
fhir_gateway, "_execute_with_client", side_effect=[page1, page2]
) as mock_execute:
result = await fhir_gateway.search(
Patient,
{"name": "Smith"},
follow_pagination=True,
max_pages=2
)

assert mock_execute.call_count == 2
assert result.entry is not None
assert len(result.entry) == 2
assert [entry.resource.id for entry in result.entry] == ["1", "2"]


@pytest.mark.asyncio
async def test_search_with_pagination_empty_next_link(fhir_gateway):
"""AsyncFHIRGateway.search handles missing next links correctly."""
bundle = Bundle(
type="searchset",
entry=[{"resource": Patient(id="1")}],
link=[{"relation": "self", "url": "Patient?name=Smith"}]
)

with patch.object(
fhir_gateway, "_execute_with_client", return_value=bundle
) as mock_execute:
result = await fhir_gateway.search(
Patient,
{"name": "Smith"},
follow_pagination=True
)

mock_execute.assert_called_once()
assert result.entry is not None
assert len(result.entry) == 1
assert result.entry[0].resource.id == "1"


@pytest.mark.asyncio
async def test_search_with_pagination_and_provenance(fhir_gateway):
"""AsyncFHIRGateway.search combines pagination with provenance metadata."""
page1 = Bundle(
type="searchset",
entry=[{"resource": Patient(id="1")}],
link=[{"relation": "next", "url": "Patient?page=2"}]
)
page2 = Bundle(
type="searchset",
entry=[{"resource": Patient(id="2")}]
)

with patch.object(
fhir_gateway, "_execute_with_client", side_effect=[page1, page2]
) as mock_execute:
result = await fhir_gateway.search(
Patient,
{"name": "Smith"},
source="test_source",
follow_pagination=True,
add_provenance=True,
provenance_tag="aggregated"
)

assert mock_execute.call_count == 2
assert result.entry is not None
assert len(result.entry) == 2

# Check provenance metadata
for entry in result.entry:
assert entry.resource.meta is not None
assert entry.resource.meta.source == "urn:healthchain:source:test_source"
assert entry.resource.meta.tag[0].code == "aggregated"

@pytest.mark.asyncio
async def test_modify_context_for_existing_resource(fhir_gateway, test_patient):
"""Modify context manager fetches, yields, and updates existing resources."""
Expand Down
Loading