Skip to content

Commit

Permalink
Merge branch 'experience/7078/node-18' of https://github.com/CDCgov/p…
Browse files Browse the repository at this point in the history
…rime-reportstream into experience/7078/node-18
  • Loading branch information
jpandersen87 committed Mar 10, 2023
2 parents 5bef3b2 + a192b20 commit 99ed291
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 5 deletions.
50 changes: 45 additions & 5 deletions prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt
Expand Up @@ -18,9 +18,13 @@ import gov.cdc.prime.router.azure.QueueAccess
import gov.cdc.prime.router.azure.db.Tables
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Converter
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.FhirPathUtils
import gov.cdc.prime.router.fhirengine.utils.FHIRBundleHelpers.deleteResource
import gov.cdc.prime.router.fhirengine.utils.FHIRBundleHelpers.getResourceReferences
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.fhirengine.utils.HL7MessageHelpers
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.DiagnosticReport
import org.hl7.fhir.r4.model.Endpoint
import org.hl7.fhir.r4.model.Provenance

Expand Down Expand Up @@ -59,14 +63,15 @@ class FHIRTranslator(
actionHistory.trackExistingInputReport(message.reportId)

val provenance = bundle.entry.first { it.resource.resourceType.name == "Provenance" }.resource as Provenance
val receivers = provenance.target.map { it.resource }
.filterIsInstance<Endpoint>().map { it.identifier[0].value }
val receiverEndpoints = provenance.target.map { it.resource }.filterIsInstance<Endpoint>()

receivers.forEach { recName ->
val receiver = settings.findReceiver(recName)
receiverEndpoints.forEach { receiverEndpoint ->
val receiverName = receiverEndpoint.identifier[0].value
val receiver = settings.findReceiver(receiverName)
// We only process receivers that are active and for this pipeline.
if (receiver != null && receiver.topic == Topic.FULL_ELR) {
val hl7Message = getHL7MessageFromBundle(bundle, receiver)
val updatedBundle = removeUnwantedConditions(bundle, receiverEndpoint)
val hl7Message = getHL7MessageFromBundle(updatedBundle, receiver)
val bodyBytes = hl7Message.encode().toByteArray()

// get a Report from the hl7 message
Expand Down Expand Up @@ -142,4 +147,39 @@ class FHIRTranslator(

return hl7Message
}

/**
* Removes observations from a [bundle] that are not referenced in [receiverEndpoint]
*
* @return [Bundle] with the unwanted observations removed
*/
internal fun removeUnwantedConditions(bundle: Bundle, receiverEndpoint: Endpoint): Bundle {

// Copy bundle to make sure original stays untouched
val newBundle = bundle.copy()

// Get observation references to keep from the receiver endpoint
val observationsToKeep = receiverEndpoint.extension.flatMap { it.getResourceReferences() }

// If endpoint doesn't have any references don't remove any
if (observationsToKeep.isNotEmpty()) {
// Get all diagnostic reports in the bundle
val diagnosticReports =
FhirPathUtils.evaluate(null, newBundle, newBundle, "Bundle.entry.resource.ofType(DiagnosticReport)")

// Get all observation references in the diagnostic reports
val allObservations =
diagnosticReports.filterIsInstance<DiagnosticReport>().flatMap { it.result }.map { it.reference }

// Determine observations ids to remove
val observationsIdsToRemove = allObservations - observationsToKeep.toSet()

// Get observation resources to be removed from the bundle
val observationsToRemove = newBundle.entry.filter { it.resource.id in observationsIdsToRemove }

observationsToRemove.forEach { newBundle.deleteResource(it.resource) }
}

return newBundle
}
}
139 changes: 139 additions & 0 deletions prime-router/src/test/kotlin/fhirengine/engine/FhirTranslatorTests.kt
Expand Up @@ -2,6 +2,8 @@ package gov.cdc.prime.router.fhirengine.engine

import assertk.assertThat
import assertk.assertions.isEqualTo
import assertk.assertions.isNotEmpty
import assertk.assertions.isNotNull
import ca.uhn.hl7v2.util.Terser
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.CustomerStatus
Expand All @@ -19,6 +21,7 @@ import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.QueueAccess
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.FhirPathUtils
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.unittest.UnitTestUtils
import io.mockk.clearAllMocks
Expand All @@ -28,6 +31,9 @@ import io.mockk.mockkClass
import io.mockk.mockkObject
import io.mockk.spyk
import io.mockk.verify
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Endpoint
import org.hl7.fhir.r4.model.Provenance
import org.jooq.tools.jdbc.MockConnection
import org.jooq.tools.jdbc.MockDataProvider
import org.jooq.tools.jdbc.MockResult
Expand Down Expand Up @@ -331,4 +337,137 @@ class FhirTranslatorTests {
// assert
assertThat(terser.get("MSH-11-1")).isEqualTo("T")
}

@Test
fun `test full elr translation happy path, receiver with condition filter so extensions`() {
mockkObject(BlobAccess)

// set up
val settings = FileSettings().loadOrganizations(oneOrganization)
val one = Schema(name = "None", topic = Topic.FULL_ELR, elements = emptyList())
val metadata = Metadata(schema = one)
val actionHistory = mockk<ActionHistory>()
val actionLogger = mockk<ActionLogger>()

val message = spyk(RawSubmission(UUID.randomUUID(), "http://blob.url", "test", "test-sender"))

val bodyFormat = Report.Format.FHIR
val bodyUrl = "http://anyblob.com"

every { actionLogger.hasErrors() } returns false
every { message.downloadContent() }
.returns(File("src/test/resources/fhirengine/engine/valid_data_with_extensions.fhir").readText())
every { BlobAccess.Companion.uploadBlob(any(), any()) } returns "test"
every { accessSpy.insertTask(any(), bodyFormat.toString(), bodyUrl, any()) }.returns(Unit)
every { actionHistory.trackCreatedReport(any(), any(), any()) }.returns(Unit)
every { actionHistory.trackExistingInputReport(any()) }.returns(Unit)
every { queueMock.sendMessage(any(), any()) }
.returns(Unit)

val engine = spyk(makeFhirEngine(metadata, settings, TaskAction.translate) as FHIRTranslator)

// act
engine.doWork(message, actionLogger, actionHistory)

// assert
verify(exactly = 0) {
queueMock.sendMessage(any(), any())
}
verify(exactly = 1) {
actionHistory.trackExistingInputReport(any())
actionHistory.trackCreatedReport(any(), any(), any())
BlobAccess.Companion.uploadBlob(any(), any())
accessSpy.insertTask(any(), any(), any(), any())
engine.removeUnwantedConditions(any(), any())
}
}
@Test
fun `Test removing some filtered observations from a DiagnosticReport`() {
val settings = FileSettings().loadOrganizations(oneOrganization)
val one = Schema(name = "None", topic = Topic.FULL_ELR, elements = emptyList())
val metadata = Metadata(schema = one)
val actionLogger = ActionLogger()
val fhirBundle = File("src/test/resources/fhirengine/engine/bundle_some_filtered_observations.fhir").readText()
val messages = FhirTranscoder.getBundles(fhirBundle, actionLogger)
assertThat(messages).isNotEmpty()
val bundle = messages[0]
assertThat(bundle).isNotNull()
val engine = (makeFhirEngine(metadata, settings, TaskAction.translate) as FHIRTranslator)
val provenance = bundle.entry.first { it.resource.resourceType.name == "Provenance" }.resource as Provenance
val endpoint = provenance.target.map { it.resource }.filterIsInstance<Endpoint>()[0]

var observations = getResource(bundle, "Observation")

assertThat(observations.count()).isEqualTo(5)

val updatedBundle = engine.removeUnwantedConditions(bundle, endpoint)

observations = getResource(updatedBundle, "Observation")

assertThat(observations.count()).isEqualTo(2)
}

@Test
fun `Test removing all filtered observations from a DiagnosticReport`() {
val settings = FileSettings().loadOrganizations(oneOrganization)
val one = Schema(name = "None", topic = Topic.FULL_ELR, elements = emptyList())
val metadata = Metadata(schema = one)
val actionLogger = ActionLogger()
val fhirBundle = File("src/test/resources/fhirengine/engine/bundle_all_filtered_observations.fhir").readText()
val messages = FhirTranscoder.getBundles(fhirBundle, actionLogger)
assertThat(messages).isNotEmpty()
val bundle = messages[0]
assertThat(bundle).isNotNull()
val engine = (makeFhirEngine(metadata, settings, TaskAction.translate) as FHIRTranslator)
val provenance = bundle.entry.first { it.resource.resourceType.name == "Provenance" }.resource as Provenance
assertThat(provenance).isNotNull()
val endpoint = provenance.target.map { it.resource }.filterIsInstance<Endpoint>()[0]
assertThat(endpoint).isNotNull()
var observations = getResource(bundle, "Observation")
var diagnosticReport = getResource(bundle, "DiagnosticReport")

assertThat(observations.count()).isEqualTo(3)
assertThat(diagnosticReport.count()).isEqualTo(3)

val updatedBundle = engine.removeUnwantedConditions(bundle, endpoint)

observations = getResource(updatedBundle, "Observation")
diagnosticReport = getResource(updatedBundle, "DiagnosticReport")
assertThat(observations.count()).isEqualTo(1)
assertThat(diagnosticReport.count()).isEqualTo(1)
}

@Test
fun `Test observations are not removed if receiver Endpoint is not populated`() {
val settings = FileSettings().loadOrganizations(oneOrganization)
val one = Schema(name = "None", topic = Topic.FULL_ELR, elements = emptyList())
val metadata = Metadata(schema = one)
val actionLogger = ActionLogger()
val fhirBundle = File("src/test/resources/fhirengine/engine/valid_data.fhir").readText()
val messages = FhirTranscoder.getBundles(fhirBundle, actionLogger)
assertThat(messages).isNotEmpty()
val bundle = messages[0]
assertThat(bundle).isNotNull()
val engine = (makeFhirEngine(metadata, settings, TaskAction.translate) as FHIRTranslator)
val provenance = bundle.entry.first { it.resource.resourceType.name == "Provenance" }.resource as Provenance
assertThat(provenance).isNotNull()
val endpoint = provenance.target.map { it.resource }.filterIsInstance<Endpoint>()[0]
assertThat(endpoint).isNotNull()
var observations = getResource(bundle, "Observation")
var diagnosticReport = getResource(bundle, "DiagnosticReport")
val observationsCount = observations.count()
val diagnosticReportCount = diagnosticReport.count()
assertThat(observationsCount).isEqualTo(3)
assertThat(diagnosticReportCount).isEqualTo(3)

val updatedBundle = engine.removeUnwantedConditions(bundle, endpoint)

observations = getResource(updatedBundle, "Observation")
diagnosticReport = getResource(updatedBundle, "DiagnosticReport")
assertThat(observations.count()).isEqualTo(observationsCount)
assertThat(diagnosticReport.count()).isEqualTo(diagnosticReportCount)
}

private fun getResource(bundle: Bundle, resource: String) =
FhirPathUtils.evaluate(null, bundle, bundle, "Bundle.entry.resource.ofType($resource)")
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

0 comments on commit 99ed291

Please sign in to comment.