Skip to content

Commit

Permalink
Merge pull request #218 from NHSDigital/AMB-2134_search_response_cont…
Browse files Browse the repository at this point in the history
…ained_is_not_backwards_compatible_with_imms_history_response

Amb 2134 search response contained is not backwards compatible with imms history response
  • Loading branch information
AlexandraBenson committed Jul 30, 2024
2 parents eff2881 + 27a2d50 commit dc47d15
Show file tree
Hide file tree
Showing 28 changed files with 1,043 additions and 376 deletions.
1 change: 1 addition & 0 deletions backend/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# Constants for use within the test
VALID_NHS_NUMBER = "1345678940" # Valid for pre, FHIR and post validators
NHS_NUMBER_USED_IN_SAMPLE_DATA = "9000000009"
ADDRESS_UNKNOWN_POSTCODE = "ZZ99 3WZ"


Expand Down
87 changes: 39 additions & 48 deletions backend/src/fhir_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,19 @@ def make_controller(
authorizer = Authorization()
service = FhirService(imms_repo=imms_repo, pds_service=pds_service)


return FhirController(
authorizer=authorizer, fhir_service=service)

return FhirController(authorizer=authorizer, fhir_service=service)


class FhirController:
immunization_id_pattern = r"^[A-Za-z0-9\-.]{1,64}$"


def __init__(
self,
authorizer: Authorization,
fhir_service: FhirService,
):
):
self.fhir_service = fhir_service
self.authorizer = authorizer


def get_immunization_by_id(self, aws_event) -> dict:
if response := self.authorize_request(EndpointOperation.READ, aws_event):
Expand All @@ -81,7 +76,7 @@ def get_immunization_by_id(self, aws_event) -> dict:
imms_id = aws_event["pathParameters"]["id"]
if id_error := self._validate_id(imms_id):
return self.create_response(400, id_error)

try:
if aws_event.get("headers"):
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
Expand Down Expand Up @@ -119,7 +114,7 @@ def get_immunization_by_id(self, aws_event) -> dict:
def create_immunization(self, aws_event):
if response := self.authorize_request(EndpointOperation.CREATE, aws_event):
return response

try:
if aws_event.get("headers"):
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
Expand All @@ -133,12 +128,10 @@ def create_immunization(self, aws_event):
return self.create_response(403, unauthorized.to_operation_outcome())

try:
imms = json.loads(aws_event["body"], parse_float=Decimal)
imms = json.loads(aws_event["body"], parse_float=Decimal)
except json.decoder.JSONDecodeError as e:
return self._create_bad_request(
f"Request's body contains malformed JSON: {e}"
)

return self._create_bad_request(f"Request's body contains malformed JSON: {e}")

try:
resource = self.fhir_service.create_immunization(imms,imms_vax_type_perms)
if "diagnostics" in resource:
Expand All @@ -164,7 +157,7 @@ def update_immunization(self, aws_event):
if response := self.authorize_request(EndpointOperation.UPDATE, aws_event):
return response
imms_id = aws_event["pathParameters"]["id"]

# Check vaxx type permissions- start
try:
if aws_event.get("headers"):
Expand All @@ -178,7 +171,7 @@ def update_immunization(self, aws_event):
except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
# Check vaxx type permissions- end

# Validate the imms id -start
if id_error := self._validate_id(imms_id):
return FhirController.create_response(400, json.dumps(id_error))
Expand Down Expand Up @@ -214,26 +207,26 @@ def update_immunization(self, aws_event):
return self.create_response(404, json.dumps(exp_error))

if "diagnostics" in existing_record and existing_record is not None:
exp_error = create_operation_outcome(
resource_id=str(uuid.uuid4()),
severity=Severity.error,
code=Code.invariant,
diagnostics=existing_record["diagnostics"],
)
return self.create_response(400, json.dumps(exp_error))
exp_error = create_operation_outcome(
resource_id=str(uuid.uuid4()),
severity=Severity.error,
code=Code.invariant,
diagnostics=existing_record["diagnostics"],
)
return self.create_response(400, json.dumps(exp_error))
except ValidationError as error:
return self.create_response(400, error.to_operation_outcome())
# Validate if the imms resource does not exists -end

# Check vaxx type permissions on the existing record - start
try:
vax_type_perms = self._parse_vaccine_permissions(imms_vax_type_perms)
vax_type_perm= self._vaccine_permission(existing_record["VaccineType"], "update")
self._check_permission(vax_type_perm,vax_type_perms)
vax_type_perm = self._vaccine_permission(existing_record["VaccineType"], "update")
self._check_permission(vax_type_perm, vax_type_perms)
except UnauthorizedVaxOnRecordError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
# Check vaxx type permissions on the existing record - end

existing_resource_version = int(existing_record["Version"])
try:
# Validate if the imms resource to be updated is a logically deleted resource-start
Expand Down Expand Up @@ -293,13 +286,11 @@ def update_immunization(self, aws_event):
# Check if the record is reinstated record -start
if existing_record["Reinstated"] == True:
outcome, resource = self.fhir_service.update_reinstated_immunization(

imms_id, imms, existing_resource_version, imms_vax_type_perms
imms_id, imms, existing_resource_version, imms_vax_type_perms
)
else:
outcome, resource = self.fhir_service.update_immunization(
imms_id, imms, existing_resource_version, imms_vax_type_perms

)

# Check if the record is reinstated record -end
Expand Down Expand Up @@ -330,7 +321,7 @@ def delete_immunization(self, aws_event):

if id_error := self._validate_id(imms_id):
return FhirController.create_response(400, id_error)

try:
if aws_event.get("headers"):
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
Expand All @@ -341,6 +332,7 @@ def delete_immunization(self, aws_event):
except UnauthorizedError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedVaxError as unauthorized:

return self.create_response(403, unauthorized.to_operation_outcome())

try:
Expand All @@ -355,14 +347,14 @@ def delete_immunization(self, aws_event):
def search_immunizations(self, aws_event: APIGatewayProxyEventV1) -> dict:
if response := self.authorize_request(EndpointOperation.SEARCH, aws_event):
return response

try:
search_params = process_search_params(process_params(aws_event))
except ParameterException as e:
return self._create_bad_request(e.message)
if search_params is None:
raise Exception("Failed to parse parameters.")

# Check vaxx type permissions- start
try:
if aws_event.get("headers"):
Expand All @@ -374,26 +366,25 @@ def search_immunizations(self, aws_event: APIGatewayProxyEventV1) -> dict:
except UnauthorizedError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
return self.create_response(403, unauthorized.to_operation_outcome())
# Check vaxx type permissions on the existing record - start
try:
vax_type_perms = self._parse_vaccine_permissions(imms_vax_type_perms)
vax_type_perm= self._new_vaccine_request(search_params.immunization_targets, "search", vax_type_perms)
vax_type_perm = self._new_vaccine_request(search_params.immunization_targets, "search", vax_type_perms)
if not vax_type_perm:
raise UnauthorizedVaxError
except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
# Check vaxx type permissions on the existing record - end

result = self.fhir_service.search_immunizations(
search_params.patient_identifier,
vax_type_perm,
create_query_string(search_params),
search_params.date_from,
search_params.date_to
search_params.date_to,
)


if "diagnostics" in result:
exp_error = create_operation_outcome(
resource_id=str(uuid.uuid4()),
Expand All @@ -419,9 +410,9 @@ def search_immunizations(self, aws_event: APIGatewayProxyEventV1) -> dict:
resource_id=str(uuid.uuid4()),
severity=Severity.warning,
code=Code.unauthorized,
diagnostics="Your search contains details that you are not authorised to request"
diagnostics="Your search contains details that you are not authorised to request",
)
result_json_dict['entry'].append({'resource': exp_error})
result_json_dict["entry"].append({"resource": exp_error})
if "entry" not in result_json_dict:
result_json_dict["entry"] = []
result_json_dict["total"] = 0
Expand Down Expand Up @@ -478,35 +469,35 @@ def create_response(status_code, body=None, headers=None):
"headers": headers if headers else {},
**({"body": body} if body else {}),
}

@staticmethod
def _vaccine_permission( vaccine_type, operation) -> set:
def _vaccine_permission(vaccine_type, operation) -> set:
vaccine_permission = set()
if isinstance(vaccine_type, list):
for x in vaccine_type:
vaccine_permission.add(str.lower(f"{x}:{operation}"))
return vaccine_permission
return vaccine_permission
else:
vaccine_permission.add(str.lower(f"{vaccine_type}:{operation}"))
return vaccine_permission

@staticmethod
def _parse_vaccine_permissions(imms_vax_type_perms) -> set:
parsed = [str.strip(str.lower(s)) for s in imms_vax_type_perms.split(",")]
vaccine_permissions = set()
for s in parsed:
vaccine_permissions.add(s)
return vaccine_permissions

@staticmethod
def _check_permission( requested: set, allowed: set) -> set:
def _check_permission(requested: set, allowed: set) -> set:
if not requested.issubset(allowed):
raise UnauthorizedVaxOnRecordError()
else:
return None

@staticmethod
def _new_vaccine_request( vaccine_type, operation, vaccine_type_permissions: None) -> Optional[list]:
@staticmethod
def _new_vaccine_request(vaccine_type, operation, vaccine_type_permissions: None) -> Optional[list]:
vaccine_permission = list()
if isinstance(vaccine_type, list):
for x in vaccine_type:
Expand Down
Loading

0 comments on commit dc47d15

Please sign in to comment.