Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions example_use.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,68 @@
from healthchain.use_cases.cds import ClinicalDecisionSupport
from healthchain.decorators import ehr
from healthchain.decorators import ehr, api, sandbox

import dataclasses
import uuid


def Run():
# ehr = EHR()
# ehr = EHR.from_doppeldata(data)
# ehr = EHR.from_path(path)

# ehr.UseCase = ClinicalDecisionSupport()
# print(ehr.UseCase)

# ehr.add_database(data)

# ehr.send_request("http://0.0.0.0:8000", Workflow("patient-view"))
# ehr.send_request("http://0.0.0.0:8000", Workflow("notereader-sign-inpatient"))

def run():
@dataclasses.dataclass
class synth_data:
context: dict
uuid: str
prefetch: dict

# @sandbox(use_case=ClinicalDecisionSupport())
class myCDS:
@sandbox(service_config={"port": 9000})
class myCDS(ClinicalDecisionSupport):
def __init__(self) -> None:
self.data_generator = None
self.use_case = ClinicalDecisionSupport()

# decorator sets up an instance of ehr configured with use case CDS
@ehr(workflow="patient-view", num=5)
@ehr(workflow="encounter-discharge", num=3)
def load_data(self, data_spec):
# data = "hello, " + data_spec
data = synth_data(
context={"userId": "Practitioner/123", "patientId": data_spec},
context={
"userId": "Practitioner/123",
"patientId": data_spec,
"encounterId": "123",
},
uuid=str(uuid.uuid4()),
prefetch={},
)
return data

# @service(langserve=True)
# def llm(self):
# chain = llm | output_parser
# return chain
@api
def llm(self, text: str):
print(f"processing {text}...")
return {
"cards": [
{
"summary": "example",
"indicator": "info",
"source": {"label": "website"},
}
]
}

cds = myCDS()
ehr_client = cds.load_data("123")
request = ehr_client.request_data
for i in range(len(request)):
print(request[i].model_dump_json(exclude_none=True))
cds.start_sandbox()
print(cds.responses)

# ehr_client = cds.load_data("123")
# request = ehr_client.request_data
# for i in range(len(request)):
# print(request[i].model_dump_json(exclude_none=True))

# cds_dict = {
# "hook": "patient-view",
# "hookInstance": "29e93987-c345-4cb7-9a92-b5136289c2a4",
# "context": {"userId": "Practitioner/123", "patientId": "123"},
# }
# request = CDSRequest(**cds_dict)
# result = cds.llm(request)
# print(result)


if __name__ == "__main__":
Run()
run()
5 changes: 3 additions & 2 deletions healthchain/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from .utils.logger import ColorLogger
from .utils.logger import add_handlers


logging.setLoggerClass(ColorLogger)
logger = logging.getLogger(__name__)
add_handlers(logger)
logger.setLevel(logging.INFO)
30 changes: 28 additions & 2 deletions healthchain/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from enum import Enum
from typing import Dict

from .utils.endpoints import Endpoint


# a workflow is a specific event that may occur in an EHR that triggers a request to server
class Workflow(Enum):
Expand Down Expand Up @@ -66,9 +68,9 @@ def send_request(self) -> None:
"""


class BaseUseCase(ABC):
class BaseStrategy(ABC):
"""
Abstract class for a specific use case of an EHR object
Abstract class for the strategy for validating and constructing a request
Use cases will differ by:
- the data it accepts (FHIR or CDA)
- the format of the request it constructs (CDS Hook or NoteReader workflows)
Expand All @@ -81,3 +83,27 @@ def _validate_data(self, data) -> bool:
@abstractmethod
def construct_request(self, data, workflow: Workflow) -> Dict:
pass


class BaseUseCase(ABC):
"""
Abstract class for a specific use case of an EHR object
Use cases will differ by:
- the data it accepts (FHIR or CDA)
- the format of the request it constructs (CDS Hook or NoteReader workflows)
"""

@property
@abstractmethod
def type(self) -> UseCaseType:
pass

@property
@abstractmethod
def strategy(self) -> BaseStrategy:
pass

@property
@abstractmethod
def endpoints(self) -> Dict[str, Endpoint]:
pass
34 changes: 18 additions & 16 deletions healthchain/clients.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import logging
import requests
import httpx

from typing import Any, Callable, List, Dict

from .base import BaseUseCase, BaseClient, Workflow
from .base import BaseStrategy, BaseClient, Workflow

log = logging.getLogger(__name__)


class EHRClient(BaseClient):
def __init__(
self, func: Callable[..., Any], workflow: Workflow, use_case: BaseUseCase
self, func: Callable[..., Any], workflow: Workflow, strategy: BaseStrategy
):
"""
Initializes the EHRClient with a data generator function and optional workflow and use case.
Expand All @@ -23,7 +23,7 @@ def __init__(
"""
self.data_generator_func: Callable[..., Any] = func
self.workflow: Workflow = workflow
self.use_case: BaseUseCase = use_case
self.strategy: BaseStrategy = strategy
self.request_data: List[Dict] = []

def generate_request(self, *args: Any, **kwargs: Any) -> None:
Expand All @@ -39,9 +39,9 @@ def generate_request(self, *args: Any, **kwargs: Any) -> None:
ValueError: If the use case is not configured.
"""
data = self.data_generator_func(*args, **kwargs)
self.request_data.append(self.use_case.construct_request(data, self.workflow))
self.request_data.append(self.strategy.construct_request(data, self.workflow))

def send_request(self, url: str) -> List[Dict]:
async def send_request(self, url: str) -> List[Dict]:
"""
Sends all queued requests to the specified URL and collects the responses.

Expand All @@ -52,15 +52,17 @@ def send_request(self, url: str) -> List[Dict]:
Notes:
This method logs errors rather than raising them, to avoid interrupting the batch processing of requests.
"""
json_responses: List[Dict] = []
for request in self.request_data:
try:
response = requests.post(
url=url, data=request.model_dump_json(exclude_none=True)
)
json_responses.append(response.json())
except Exception as e:
log.error(f"Error sending request: {e}")
json_responses.append({})

async with httpx.AsyncClient() as client:
json_responses: List[Dict] = []
for request in self.request_data:
try:
response = await client.post(
url=url, data=request.model_dump_json(exclude_none=True)
)
json_responses.append(response.json())
except Exception as e:
log.error(f"Error sending request: {e}")
json_responses.append({})

return json_responses
Loading