Skip to content

Commit

Permalink
Merge pull request #220 from NHSDigital/AMB-1891_Amend_s_flag_integra…
Browse files Browse the repository at this point in the history
…tion

Amb 1891 amend s flag integration
  • Loading branch information
AlexandraBenson authored Jul 31, 2024
2 parents 5d7fbdc + 207bc6c commit abcef80
Show file tree
Hide file tree
Showing 19 changed files with 759 additions and 320 deletions.
4 changes: 2 additions & 2 deletions backend/src/base_utils/base_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from models.field_locations import FieldLocations


def obtain_field_value(imms, field_name):
def obtain_field_value(imms: dict, field_name: str) -> any:
"""Finds and returns the field value from the imms json data. Returns none if field not found."""

# Obtain the function for extracting the field value from the json data
Expand All @@ -19,6 +19,6 @@ def obtain_field_value(imms, field_name):
return field_value


def obtain_field_location(field_name):
def obtain_field_location(field_name: str) -> str:
"""Returns the field location string"""
return getattr(FieldLocations, field_name)
2 changes: 2 additions & 0 deletions backend/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ class Urls:
nhs_number_verification_status_code_system = (
"https://fhir.hl7.org.uk/CodeSystem/UKCore-NHSNumberVerificationStatusEngland"
)
ods_organization_code = "https://fhir.nhs.uk/Id/ods-organization-code"
urn_school_number = "https://fhir.hl7.org.uk/Id/urn-school-number"
2 changes: 2 additions & 0 deletions backend/src/fhir_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ def create_immunization(self, aws_event):
except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())



def update_immunization(self, aws_event):
if response := self.authorize_request(EndpointOperation.UPDATE, aws_event):
return response
Expand Down
51 changes: 21 additions & 30 deletions backend/src/fhir_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import datetime
import os
from enum import Enum
from typing import Optional
from typing import Optional, Union

from fhir.resources.R4B.bundle import (
Bundle as FhirBundle,
Expand All @@ -15,7 +15,9 @@

import parameter_parser
from fhir_repository import ImmunizationRepository
from models.errors import InvalidPatientId, CustomValidationError
from base_utils.base_utils import obtain_field_value
from models.field_names import FieldNames
from models.errors import InvalidPatientId, CustomValidationError, UnhandledResponseError
from models.fhir_immunization import ImmunizationValidator
from models.utils.generic_utils import nhs_number_mod11_check, get_occurrence_datetime, create_diagnostics
from models.constants import Constants
Expand Down Expand Up @@ -61,37 +63,25 @@ def get_immunization_by_id(self, imms_id: str, imms_vax_type_perms: str) -> Opti
Get an Immunization by its ID. Return None if not found. If the patient doesn't have an NHS number,
return the Immunization without calling PDS or checking S flag.
"""
imms_resp = self.immunization_repo.get_immunization_by_id(imms_id, imms_vax_type_perms)
imms = dict()
version = str()
resp = dict()
nhs_number = str()
if not imms_resp:
if not (imms_resp := self.immunization_repo.get_immunization_by_id(imms_id, imms_vax_type_perms)):
return None

else:

if imms_resp.get("Resource"):
imms = imms_resp["Resource"]
if imms_resp.get("Version"):
version = imms_resp["Version"]

# Remove fields which are not to be returned for read
imms_filtered_for_read = Filter.read(imms)
# Remove fields rom the imms resource which are not to be returned for read
imms_filtered_for_read = Filter.read(imms_resp.get("Resource", {}))

# Handle s-flag filtering, where applicable
try:
nhs_number = [x for x in imms["contained"] if x["resourceType"] == "Patient"][0]["identifier"][0][
"value"
]
patient = self.pds_service.get_patient_details(nhs_number)
# Handle s-flag filtering, where applicable
if not (nhs_number := obtain_field_value(imms_filtered_for_read, FieldNames.patient_identifier_value)):
imms_filtered_for_read_and_s_flag = imms_filtered_for_read
else:
if patient := self.pds_service.get_patient_details(nhs_number):
imms_filtered_for_read_and_s_flag = handle_s_flag(imms_filtered_for_read, patient)
except (KeyError, IndexError):
imms_filtered_for_read_and_s_flag = imms_filtered_for_read
else:
raise UnhandledResponseError("unable to validate NHS number with downstream service")

resp["Version"] = version
resp["Resource"] = Immunization.parse_obj(imms_filtered_for_read_and_s_flag)
return resp
return {
"Version": imms_resp.get("Version", ""),
"Resource": Immunization.parse_obj(imms_filtered_for_read_and_s_flag),
}

def get_immunization_by_id_all(self, imms_id: str, imms: dict) -> Optional[dict]:
"""
Expand Down Expand Up @@ -141,6 +131,7 @@ def create_immunization(self, immunization: dict, imms_vax_type_perms) -> Immuni
raise ValueError(error)
except (ValidationError, ValueError, MandatoryError) as error:
raise CustomValidationError(message=str(error)) from error

patient = self._validate_patient(immunization)

if "diagnostics" in patient:
Expand Down Expand Up @@ -204,7 +195,7 @@ def delete_immunization(self, imms_id, imms_vax_type_perms) -> Immunization:
return Immunization.parse_obj(imms)

@staticmethod
def is_valid_date_from(immunization: dict, date_from: datetime.date):
def is_valid_date_from(immunization: dict, date_from: Union[datetime.date, None]):
"""
Returns False if immunization occurrence is earlier than the date_from, or True otherwise
(also returns True if date_from is None)
Expand All @@ -219,7 +210,7 @@ def is_valid_date_from(immunization: dict, date_from: datetime.date):
return occurrence_datetime.date() >= date_from

@staticmethod
def is_valid_date_to(immunization: dict, date_to: datetime.date):
def is_valid_date_to(immunization: dict, date_to: Union[datetime.date, None]):
"""
Returns False if immunization occurrence is later than the date_to, or True otherwise
(also returns True if date_to is None)
Expand Down
54 changes: 46 additions & 8 deletions backend/src/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ def remove_reference_to_contained_practitioner(imms: dict) -> dict:
return imms

# Remove reference to the contained practitioner from imms[performer]
contained_practitioner_id = contained_practitioner["id"]
imms["performer"] = [
x for x in imms["performer"] if not is_actor_referencing_contained_resource(x, contained_practitioner_id)
x for x in imms["performer"] if not is_actor_referencing_contained_resource(x, contained_practitioner["id"])
]

return imms
Expand All @@ -24,7 +23,7 @@ def remove_reference_to_contained_practitioner(imms: dict) -> dict:
def create_reference_to_patient_resource(patient_full_url: str, patient: dict) -> dict:
"""
Returns a reference to the given patient which includes the patient nhs number identifier (system and value fields
only) and patient uuid. "Type" field is set to "Patient".
only) and a reference to patient full url. "Type" field is set to "Patient".
"""
patient_nhs_number_identifier = [x for x in patient["identifier"] if x.get("system") == Urls.nhs_number][0]

Expand All @@ -38,6 +37,38 @@ def create_reference_to_patient_resource(patient_full_url: str, patient: dict) -
}


def replace_address_postal_codes(imms: dict) -> dict:
"""Replace any postal codes found in contained patient address with 'ZZ99 3CZ'"""
for resource in imms.get("contained", [{}]):
if resource.get("resourceType") == "Patient":
for address in resource.get("address", [{}]):
if address.get("postalCode") is not None:
address["postalCode"] = "ZZ99 3CZ"

return imms


def replace_organization_values(imms: dict) -> dict:
"""
Replace organization_identifier_values with N2N9I, organization_identifier_systems with
https://fhir.nhs.uk/Id/ods-organization-code, and remove any organization_displays
"""
for performer in imms.get("performer", [{}]):
if performer.get("actor", {}).get("type") == "Organization":

if performer["actor"].get("identifier", {}).get("value") is not None:
performer["actor"]["identifier"]["value"] = "N2N9I"
performer["actor"]["identifier"]["system"] = Urls.ods_organization_code

elif performer["actor"].get("identifier", {}).get("system") is not None:
performer["actor"]["identifier"]["system"] = Urls.ods_organization_code

if performer["actor"].get("display") is not None:
del performer["actor"]["display"]

return imms


def add_use_to_identifier(imms: dict) -> dict:
"""
Add use of "offical" to immunisation identifier if no use currently specified
Expand All @@ -52,21 +83,28 @@ class Filter:
"""Functions for filtering a FHIR Immunization Resource"""

@staticmethod
def read(imms: dict):
def read(imms: dict) -> dict:
"""Apply filtering for READ request"""
imms.pop("identifier")
return imms

@staticmethod
def search(imms: dict, patient_full_url: str, bundle_patient: dict = None):
def search(imms: dict, patient_full_url: str, bundle_patient: dict = None) -> dict:
"""Apply filtering for an individual FHIR Immunization Resource as part of SEARCH request"""
imms = remove_reference_to_contained_practitioner(imms)
imms.pop("contained")
imms["patient"] = create_reference_to_patient_resource(patient_full_url, bundle_patient)
imms = add_use_to_identifier(imms)
# Location identifier system and value are to be overwritten
# (for backwards compatibility with Immunisation History API, as agreed with VDS team)
imms["location"]["identifier"]["system"] = "urn:iso:std:iso:3166"
imms["location"]["identifier"]["value"] = "GB"
imms["location"]["type"] = "Location"
imms["location"] = {"identifier": {"system": "urn:iso:std:iso:3166", "value": "GB"}}
return imms

@staticmethod
def s_flag(imms: dict) -> dict:
"""Apply filtering for patients with 'RESTRICTED' flag"""
imms = replace_address_postal_codes(imms)
imms = replace_organization_values(imms)
if imms.get("location"):
imms["location"] = {"identifier": {"system": "urn:iso:std:iso:3166", "value": "GB"}}
return imms
13 changes: 1 addition & 12 deletions backend/src/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@ class VaccineTypes:
)


@dataclass
class Mandation:
"""Mandation types"""

mandatory: str = "M"
conditional_mandatory: str = "CM"
required: str = "R"
optional: str = "O"
not_applicable: str = "N/A"


@dataclass
class DiseaseDisplayTerms:
"""Disease display terms which correspond to disease codes"""
Expand Down Expand Up @@ -59,7 +48,7 @@ class DiseaseCodes:
([DiseaseCodes.hpv], VaccineTypes.hpv),
# IMPORTANT: FOR VACCINE_TYPES WHICH TARGET MULTIPLE DISEASES ENSURE THAT DISEASE CODES ARE SORTED ALPHABETICALLY
# This allows order-insensitive comparison with other lists, by alphabetically sorting the list for comparison
(sorted([DiseaseCodes.measles, DiseaseCodes.rubella, DiseaseCodes.mumps]), VaccineTypes.mmr),
(sorted([DiseaseCodes.measles, DiseaseCodes.mumps, DiseaseCodes.rubella]), VaccineTypes.mmr),
]


Expand Down
88 changes: 13 additions & 75 deletions backend/src/s_flag_handler.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,22 @@
import copy
from filter import Filter


def handle_s_flag(imms, patient):
"""
See https://nhsd-confluence.digital.nhs.uk/pages/viewpage.action?pageId=758110223 for details.
Checks if the patient has a restricted flag, and returns the imms resource with the appropriate filtering
applied where necessary.
NOTE: the term 's_flag' is not found in the PDS response. Instead we are searching for a code of
'R' for RESTRICTED in the meta.security field.
"""
try:
patient_is_restricted = str.lower(patient["meta"]["security"][0]["code"]) == "r"
except (KeyError, IndexError):
return imms

if not patient_is_restricted:
return imms
patient_is_restricted = False

result = copy.deepcopy(imms)
# NOTE: meta.security is currently restricted to max length of 1 according to PDS, however we are looping
# through all of the items in meta.security for safety in case PDS ever start sending more than one security item.
for item in patient["meta"]["security"]:
if str.upper(item["code"]) == "R":
patient_is_restricted = True
break

contained_questionnaire = next(
(
record
for record in result.get("contained", [])
if record["resourceType"] == "QuestionnaireResponse"
),
None,
)

contained_patient = next(
(
record
for record in result.get("contained", [])
if record["resourceType"] == "Patient"
),
None,
)

# Handle Questionnaire SiteCode
performer_actor_organization = next(
(
item
for item in result["performer"]
if item.get("actor", {}).get("type") == "Organization"
),
None,
)

if performer_actor_organization:
try:
performer_actor_organization["actor"]["identifier"]["value"] = "N2N9I"
performer_actor_organization["actor"]["identifier"][
"system"
] = "https://fhir.nhs.uk/Id/ods-organization-code"
del performer_actor_organization["actor"]["display"]
except KeyError:
pass

if contained_patient:
try:
contained_patient["address"][0]["postalCode"] = "ZZ99 3CZ"
except (KeyError, IndexError):
pass


# Handle Questionnaire removals
if contained_questionnaire and contained_questionnaire.get("item"):
for item in contained_questionnaire["item"]:
if "linkId" in item and item["linkId"] == "Consent":
if "answer" in item:
for answer in item["answer"]:
if "valueCoding" in answer:
answer["valueCoding"]["display"] = None # Set display to null

# Handle reportOrigin
try:
del result["reportOrigin"]
except KeyError:
pass

# Handle Location
try:
del result["location"]
except KeyError:
pass

return result
return Filter.s_flag(imms) if patient_is_restricted else imms
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@
]
}
],
"location": {
"identifier": { "system": "urn:iso:std:iso:3166", "value": "GB" }
},
"protocolApplied": [
{
"targetDisease": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@
]
}
],
"location": {
"identifier": { "system": "urn:iso:std:iso:3166", "value": "GB" }
},
"protocolApplied": [
{
"targetDisease": [
Expand Down
Loading

0 comments on commit abcef80

Please sign in to comment.