diff --git a/healthchain/gateway/fhir/aio.py b/healthchain/gateway/fhir/aio.py index 5a941832..de88d6c3 100644 --- a/healthchain/gateway/fhir/aio.py +++ b/healthchain/gateway/fhir/aio.py @@ -1,7 +1,7 @@ import logging from contextlib import asynccontextmanager -from typing import Any, Dict, Type +from typing import Any, Dict, Optional, Type from fhir.resources.bundle import Bundle from fhir.resources.capabilitystatement import CapabilityStatement @@ -173,6 +173,8 @@ async def search( source: str = None, add_provenance: bool = False, provenance_tag: str = None, + follow_pagination: bool = False, + max_pages: Optional[int] = None, ) -> Bundle: """ Search for FHIR resources. @@ -183,6 +185,8 @@ async def search( source: Source name to search in (uses first available if None) add_provenance: If True, automatically add provenance metadata to resources provenance_tag: Optional tag code for provenance (e.g., "aggregated", "transformed") + follow_pagination: If True, automatically fetch all pages + max_pages: Maximum number of pages to fetch (None for unlimited) Returns: Bundle containing search results @@ -212,6 +216,33 @@ async def search( client_kwargs={"params": params}, ) + # Handle pagination if requested + if follow_pagination: + all_entries = bundle.entry or [] + page_count = 1 + + while bundle.link: + next_link = next((link for link in bundle.link if link.relation == "next"), None) + if not next_link or (max_pages and page_count >= max_pages): + break + + # Extract the relative URL from the next link + next_url = next_link.url.split("/")[-2:] # Get resource_type/_search part + next_params = dict(pair.split("=") for pair in next_link.url.split("?")[1].split("&")) + + bundle = await self._execute_with_client( + "search", + source=source, + resource_type=resource_type, + client_args=(resource_type, next_params), + ) + + if bundle.entry: + all_entries.extend(bundle.entry) + page_count += 1 + + bundle.entry = all_entries + if add_provenance and bundle.entry: source_name = source or next(iter(self.connection_manager.sources.keys())) for entry in bundle.entry: diff --git a/healthchain/gateway/fhir/sync.py b/healthchain/gateway/fhir/sync.py index 96a7b7d4..ab0ebdb6 100644 --- a/healthchain/gateway/fhir/sync.py +++ b/healthchain/gateway/fhir/sync.py @@ -1,6 +1,6 @@ import logging -from typing import Any, Dict, Type +from typing import Any, Dict, Type, Optional from fhir.resources.bundle import Bundle from fhir.resources.capabilitystatement import CapabilityStatement @@ -235,6 +235,8 @@ def search( source: str = None, add_provenance: bool = False, provenance_tag: str = None, + follow_pagination: bool = False, + max_pages: Optional[int] = None, ) -> Bundle: """ Search for FHIR resources (sync version). @@ -245,6 +247,8 @@ def search( source: Source name to search in (uses first available if None) add_provenance: If True, automatically add provenance metadata to resources provenance_tag: Optional tag code for provenance (e.g., "aggregated", "transformed") + follow_pagination: If True, automatically fetch all pages + max_pages: Maximum number of pages to fetch (None for unlimited) Returns: Bundle containing search results @@ -270,6 +274,33 @@ def search( client_args=(resource_type, params), ) + # Handle pagination if requested + if follow_pagination: + all_entries = bundle.entry or [] + page_count = 1 + + while bundle.link: + next_link = next((link for link in bundle.link if link.relation == "next"), None) + if not next_link or (max_pages and page_count >= max_pages): + break + + # Extract the relative URL from the next link + next_url = next_link.url.split("/")[-2:] # Get resource_type/_search part + next_params = dict(pair.split("=") for pair in next_link.url.split("?")[1].split("&")) + + bundle = self._execute_with_client( + "search", + source=source, + resource_type=resource_type, + client_args=(resource_type, next_params), + ) + + if bundle.entry: + all_entries.extend(bundle.entry) + page_count += 1 + + bundle.entry = all_entries + # Add provenance metadata if requested if add_provenance and bundle.entry: source_name = source or next(iter(self.connection_manager.sources.keys())) diff --git a/tests/gateway/test_fhir_gateway.py b/tests/gateway/test_fhir_gateway.py index a009b3cb..14cf289e 100644 --- a/tests/gateway/test_fhir_gateway.py +++ b/tests/gateway/test_fhir_gateway.py @@ -284,3 +284,124 @@ def test_search_with_empty_bundle(): provenance_tag="aggregated", ) assert result.entry is None + +def test_search_with_pagination(fhir_gateway): + """Gateway.search fetches all pages when pagination is enabled.""" + # Create mock bundles for pagination + page1 = Bundle( + type="searchset", + entry=[BundleEntry(resource=Patient(id="1"))], + link=[{"relation": "next", "url": "Patient?page=2"}] + ) + page2 = Bundle( + type="searchset", + entry=[BundleEntry(resource=Patient(id="2"))], + link=[{"relation": "next", "url": "Patient?page=3"}] + ) + page3 = Bundle( + type="searchset", + entry=[BundleEntry(resource=Patient(id="3"))] + ) + + with patch.object( + fhir_gateway, "_execute_with_client", side_effect=[page1, page2, page3] + ) as mock_execute: + result = fhir_gateway.search( + Patient, + {"name": "Smith"}, + follow_pagination=True + ) + + assert mock_execute.call_count == 3 + assert result.entry is not None + assert len(result.entry) == 3 + assert [entry.resource.id for entry in result.entry] == ["1", "2", "3"] + + +def test_search_with_max_pages(fhir_gateway): + """Gateway.search respects maximum page limit.""" + # Create mock bundles for pagination + page1 = Bundle( + type="searchset", + entry=[BundleEntry(resource=Patient(id="1"))], + link=[{"relation": "next", "url": "Patient?page=2"}] + ) + page2 = Bundle( + type="searchset", + entry=[BundleEntry(resource=Patient(id="2"))], + link=[{"relation": "next", "url": "Patient?page=3"}] + ) + + with patch.object( + fhir_gateway, "_execute_with_client", side_effect=[page1, page2] + ) as mock_execute: + result = fhir_gateway.search( + Patient, + {"name": "Smith"}, + follow_pagination=True, + max_pages=2 + ) + + assert mock_execute.call_count == 2 + assert result.entry is not None + assert len(result.entry) == 2 + assert [entry.resource.id for entry in result.entry] == ["1", "2"] + + +def test_search_with_pagination_empty_next_link(fhir_gateway): + """Gateway.search handles missing next links correctly.""" + # Create mock bundle without next link + bundle = Bundle( + type="searchset", + entry=[BundleEntry(resource=Patient(id="1"))], + link=[{"relation": "self", "url": "Patient?name=Smith"}] + ) + + with patch.object( + fhir_gateway, "_execute_with_client", return_value=bundle + ) as mock_execute: + result = fhir_gateway.search( + Patient, + {"name": "Smith"}, + follow_pagination=True + ) + + mock_execute.assert_called_once() + assert result.entry is not None + assert len(result.entry) == 1 + assert result.entry[0].resource.id == "1" + + +def test_search_with_pagination_and_provenance(fhir_gateway): + """Gateway.search combines pagination with provenance metadata.""" + page1 = Bundle( + type="searchset", + entry=[BundleEntry(resource=Patient(id="1"))], + link=[{"relation": "next", "url": "Patient?page=2"}] + ) + page2 = Bundle( + type="searchset", + entry=[BundleEntry(resource=Patient(id="2"))] + ) + + with patch.object( + fhir_gateway, "_execute_with_client", side_effect=[page1, page2] + ) as mock_execute: + result = fhir_gateway.search( + Patient, + {"name": "Smith"}, + source="test_source", + follow_pagination=True, + add_provenance=True, + provenance_tag="aggregated" + ) + + assert mock_execute.call_count == 2 + assert result.entry is not None + assert len(result.entry) == 2 + + # Check provenance metadata + for entry in result.entry: + assert entry.resource.meta is not None + assert entry.resource.meta.source == "urn:healthchain:source:test_source" + assert entry.resource.meta.tag[0].code == "aggregated" \ No newline at end of file diff --git a/tests/gateway/test_fhir_gateway_async.py b/tests/gateway/test_fhir_gateway_async.py index 2fbf996c..f0e948f9 100644 --- a/tests/gateway/test_fhir_gateway_async.py +++ b/tests/gateway/test_fhir_gateway_async.py @@ -126,6 +126,130 @@ async def test_search_operation_with_parameters(fhir_gateway): assert result == mock_bundle + +@pytest.mark.asyncio +async def test_search_with_pagination(fhir_gateway): + """AsyncFHIRGateway.search fetches all pages when pagination is enabled.""" + # Create mock bundles for pagination + page1 = Bundle( + type="searchset", + entry=[{"resource": Patient(id="1")}], + link=[{"relation": "next", "url": "Patient?page=2"}] + ) + page2 = Bundle( + type="searchset", + entry=[{"resource": Patient(id="2")}], + link=[{"relation": "next", "url": "Patient?page=3"}] + ) + page3 = Bundle( + type="searchset", + entry=[{"resource": Patient(id="3")}] + ) + + with patch.object( + fhir_gateway, "_execute_with_client", side_effect=[page1, page2, page3] + ) as mock_execute: + result = await fhir_gateway.search( + Patient, + {"name": "Smith"}, + follow_pagination=True + ) + + assert mock_execute.call_count == 3 + assert result.entry is not None + assert len(result.entry) == 3 + assert [entry.resource.id for entry in result.entry] == ["1", "2", "3"] + + +@pytest.mark.asyncio +async def test_search_with_max_pages(fhir_gateway): + """AsyncFHIRGateway.search respects maximum page limit.""" + page1 = Bundle( + type="searchset", + entry=[{"resource": Patient(id="1")}], + link=[{"relation": "next", "url": "Patient?page=2"}] + ) + page2 = Bundle( + type="searchset", + entry=[{"resource": Patient(id="2")}], + link=[{"relation": "next", "url": "Patient?page=3"}] + ) + + with patch.object( + fhir_gateway, "_execute_with_client", side_effect=[page1, page2] + ) as mock_execute: + result = await fhir_gateway.search( + Patient, + {"name": "Smith"}, + follow_pagination=True, + max_pages=2 + ) + + assert mock_execute.call_count == 2 + assert result.entry is not None + assert len(result.entry) == 2 + assert [entry.resource.id for entry in result.entry] == ["1", "2"] + + +@pytest.mark.asyncio +async def test_search_with_pagination_empty_next_link(fhir_gateway): + """AsyncFHIRGateway.search handles missing next links correctly.""" + bundle = Bundle( + type="searchset", + entry=[{"resource": Patient(id="1")}], + link=[{"relation": "self", "url": "Patient?name=Smith"}] + ) + + with patch.object( + fhir_gateway, "_execute_with_client", return_value=bundle + ) as mock_execute: + result = await fhir_gateway.search( + Patient, + {"name": "Smith"}, + follow_pagination=True + ) + + mock_execute.assert_called_once() + assert result.entry is not None + assert len(result.entry) == 1 + assert result.entry[0].resource.id == "1" + + +@pytest.mark.asyncio +async def test_search_with_pagination_and_provenance(fhir_gateway): + """AsyncFHIRGateway.search combines pagination with provenance metadata.""" + page1 = Bundle( + type="searchset", + entry=[{"resource": Patient(id="1")}], + link=[{"relation": "next", "url": "Patient?page=2"}] + ) + page2 = Bundle( + type="searchset", + entry=[{"resource": Patient(id="2")}] + ) + + with patch.object( + fhir_gateway, "_execute_with_client", side_effect=[page1, page2] + ) as mock_execute: + result = await fhir_gateway.search( + Patient, + {"name": "Smith"}, + source="test_source", + follow_pagination=True, + add_provenance=True, + provenance_tag="aggregated" + ) + + assert mock_execute.call_count == 2 + assert result.entry is not None + assert len(result.entry) == 2 + + # Check provenance metadata + for entry in result.entry: + assert entry.resource.meta is not None + assert entry.resource.meta.source == "urn:healthchain:source:test_source" + assert entry.resource.meta.tag[0].code == "aggregated" + @pytest.mark.asyncio async def test_modify_context_for_existing_resource(fhir_gateway, test_patient): """Modify context manager fetches, yields, and updates existing resources."""