Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMB-2059 Create fields for holding vaccine event type & supplier system in dynamo #197

104 changes: 63 additions & 41 deletions backend/src/fhir_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
ParameterException,
InconsistentIdError,
UnauthorizedVaxError,
UnauthorizedVaxOnRecordError
UnauthorizedVaxOnRecordError,
UnauthorizedSystemError
)

from pds_service import PdsService
Expand Down Expand Up @@ -84,17 +85,20 @@ def get_immunization_by_id(self, aws_event) -> dict:

try:
if aws_event.get("headers"):
try:
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
if len(imms_vax_type_perms) == 0:
raise UnauthorizedVaxError()

except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
app_id = aws_event["headers"]["ApplicationId"]
if len(imms_vax_type_perms) == 0:
raise UnauthorizedVaxError()
if len(app_id) == 0:
raise UnauthorizedSystemError()
else:
raise UnauthorizedVaxError()
raise UnauthorizedError()
except UnauthorizedError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedSystemError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())

try:
if resource := self.fhir_service.get_immunization_by_id(imms_id, imms_vax_type_perms):
Expand All @@ -121,17 +125,23 @@ def get_immunization_by_id(self, aws_event) -> dict:
def create_immunization(self, aws_event):
if response := self.authorize_request(EndpointOperation.CREATE, aws_event):
return response

if aws_event.get("headers"):
try:
try:
if aws_event.get("headers"):
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
app_id = aws_event["headers"]["ApplicationId"]
if len(imms_vax_type_perms) == 0:
raise UnauthorizedVaxError()

except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
else:
raise UnauthorizedVaxError()
if len(app_id) == 0:
raise UnauthorizedSystemError()
else:
raise UnauthorizedError()
except UnauthorizedError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedSystemError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())

try:
imms = json.loads(aws_event["body"], parse_float=Decimal)
Expand All @@ -141,7 +151,7 @@ def create_immunization(self, aws_event):
)

try:
resource = self.fhir_service.create_immunization(imms,imms_vax_type_perms)
resource = self.fhir_service.create_immunization(imms,imms_vax_type_perms,app_id)
if "diagnostics" in resource:
exp_error = create_operation_outcome(
resource_id=str(uuid.uuid4()),
Expand All @@ -167,16 +177,22 @@ def update_immunization(self, aws_event):
imms_id = aws_event["pathParameters"]["id"]

# Check vaxx type permissions- start
if aws_event.get("headers"):
try:
try:
if aws_event.get("headers"):
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
app_id = aws_event["headers"]["ApplicationId"]
if len(imms_vax_type_perms) == 0:
raise UnauthorizedVaxError()

except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
else:
raise UnauthorizedVaxError()
if len(app_id) == 0:
raise UnauthorizedSystemError()
else:
raise UnauthorizedError()
except UnauthorizedError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedSystemError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
# Check vaxx type permissions- end

# Validate the imms id -start
Expand Down Expand Up @@ -333,17 +349,20 @@ def delete_immunization(self, aws_event):

try:
if aws_event.get("headers"):
try:
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
if len(imms_vax_type_perms) == 0:
raise UnauthorizedVaxError()

except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
app_id = aws_event["headers"]["ApplicationId"]
if len(imms_vax_type_perms) == 0:
raise UnauthorizedVaxError()
if len(app_id) == 0:
raise UnauthorizedSystemError()
else:
raise UnauthorizedVaxError()
raise UnauthorizedError()
except UnauthorizedError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedSystemError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())

try:
self.fhir_service.delete_immunization(imms_id, imms_vax_type_perms)
Expand All @@ -369,16 +388,19 @@ def search_immunizations(self, aws_event: APIGatewayProxyEventV1) -> dict:
# Check vaxx type permissions- start
try:
if aws_event.get("headers"):
try:
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
if len(imms_vax_type_perms) == 0:
raise UnauthorizedVaxError()

except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
imms_vax_type_perms = aws_event["headers"]["VaccineTypePermissions"]
app_id = aws_event["headers"]["ApplicationId"]
if len(imms_vax_type_perms) == 0:
raise UnauthorizedVaxError()
if len(app_id) == 0:
raise UnauthorizedSystemError()
else:
raise UnauthorizedVaxError()
raise UnauthorizedError()
except UnauthorizedError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedVaxError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
except UnauthorizedSystemError as unauthorized:
return self.create_response(403, unauthorized.to_operation_outcome())
# Check vaxx type permissions on the existing record - start
try:
Expand Down
8 changes: 5 additions & 3 deletions backend/src/fhir_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ def __init__(self, imms: dict, patient: dict):
self.resource = imms
self.timestamp = int(time.time())
self.vaccine_type = get_vaccine_type(imms)

self.system_id = imms["identifier"][0]["system"]
self.system_value = imms["identifier"][0]["value"]
self.patient_sk = f"{self.vaccine_type}#{imms_id}"
self.identifier = imms["identifier"][0]["value"]
self.identifier = f"{self.system_id}#{self.system_value}"


class ImmunizationRepository:
Expand Down Expand Up @@ -143,7 +144,7 @@ def get_immunization_by_id_all(self, imms_id: str,imms:dict ) -> Optional[dict]:
else:
return None

def create_immunization(self, immunization: dict, patient: dict , imms_vax_type_perms) -> dict:
def create_immunization(self, immunization: dict, patient: dict , imms_vax_type_perms, app_id) -> dict:
new_id = str(uuid.uuid4())
immunization["id"] = new_id
attr = RecordAttributes(immunization, patient)
Expand All @@ -165,6 +166,7 @@ def create_immunization(self, immunization: dict, patient: dict , imms_vax_type_
"Resource": json.dumps(attr.resource, cls=DecimalEncoder),
"Patient": attr.patient,
"IdentifierPK": attr.identifier,
"AppId": app_id,
"Operation": "CREATE",
"Version": 1,
}
Expand Down
4 changes: 2 additions & 2 deletions backend/src/fhir_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def get_immunization_by_id_all(self, imms_id: str,imms: dict) -> Optional[dict]:
imms_resp = self.immunization_repo.get_immunization_by_id_all(imms_id,imms)
return imms_resp

def create_immunization(self, immunization: dict, imms_vax_type_perms) -> Immunization:
def create_immunization(self, immunization: dict, imms_vax_type_perms, app_id) -> Immunization:
try:
self.validator.validate(immunization)
except (ValidationError, ValueError, MandatoryError, NotApplicableError) as error:
Expand All @@ -128,7 +128,7 @@ def create_immunization(self, immunization: dict, imms_vax_type_perms) -> Immuni

if "diagnostics" in patient:
return patient
imms = self.immunization_repo.create_immunization(immunization, patient, imms_vax_type_perms)
imms = self.immunization_repo.create_immunization(immunization, patient, imms_vax_type_perms,app_id)

return Immunization.parse_obj(imms)

Expand Down
12 changes: 12 additions & 0 deletions backend/src/models/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ def to_operation_outcome() -> dict:
diagnostics=msg,
)

@dataclass
class UnauthorizedSystemError(RuntimeError):
@staticmethod
def to_operation_outcome() -> dict:
msg = f"Unauthorized system"
return create_operation_outcome(
resource_id=str(uuid.uuid4()),
severity=Severity.error,
code=Code.forbidden,
diagnostics=msg,
)

@dataclass
class UnauthorizedVaxError(RuntimeError):
@staticmethod
Expand Down
Loading
Loading