# Inr
recting with the EDS standalona servere

This small note
ook shows how to interact with the EDS standalone server running locally.b
This notebook requires the Docker image to be run with the EDS SDK ("fhir-sdk.jar") mounted to "/opt/fhir-sdk.jar" in read-only mode, and two links to the EDS server and the OMOP database. It is also possible to cache the Synthea population to save time on multiple runs.<br>!
More specifically, if "./eds-fhir-backend-test" is the path to the unzipped distribution of the standalone server, the following Docker command can be used to launch the notebooks:

`./make.py docker_run -v $(pwd)/eds-fhir-backend-test:/opt/fhir-sdk.jar:ro`--docker=net=app_default --docker=link=app_backend_1:backend --docker=link=app_db_1:db -v $(pwd)/.cache:/opt/generator/.cache




## Initialization



In [25]:
// Implementation detail: Synthea currently uses an older version of the
// HAPIFHIR than the EDS SDK.  We must ensure that the newer version is
// loaded, not the old one.  We do this by retrieving the jar that
// contains an HAPIFHIR class.
{
    val klass = classOf[ca.uhn.fhir.rest.client.api.IGenericClient]
    val location =
        klass.getResource(
            '/' + klass.getName().replace('.', '/') + ".class"
        ).toString
    assert(
        location.startsWith("jar:file:/opt/fhir-sdk.jar!"),
        s"unexpected class location ${location}")
}

In [1]:
// Let's start by creating a client to connect to the EDS server.
// Its default port is 8003.  There are two groups of endpoints:
// - The FHIR endpoints are under http://backend:8003/fhir
// - The "service" endpoints are under http://backend:8003/eds
//
// Service endpoints are used to reset the databases between tests,
// to seed the databases, to update indexes, and to check health.
//
// The SDK exposes strongly typed clients to both groups of endpoints.
import fr.aphp.wind.eds.sdk._

val client = new Client("http://backend:8003")

In [3]:
// Check that the server is healthy.  This just tries to connect
// to a "health" endpoint.
client.ping()

In [4]:
// Get DB connection info
import java.util.Properties
import java.sql.DriverManager

// DB connection info can be accessed directly through the
// EDS test server.  This is so that there is only one
// base URL to know in advance.
val dbUrl = {
    val stackInfo = client.eds().test().stackInfo()
    s"${stackInfo.getDbUrl}&" +
        s"user=${stackInfo.getDbUser}&" +
        s"password=${stackInfo.getDbPassword}"
}

// Let's check that we can connect to the database using
// JDBC.  If we can't connect through this procedure, it can
// be because: 
// - The database server is down
// - It is not on the same network as the notebook
// - Its address is wrong
// - The Postgresql driver is not on the classpath
{
    val conn = DriverManager.getConnection(dbUrl)
    try {
        conn.createStatement().execute("select 1")
    } finally {
        conn.close()
    }
}

// Additional connection properties
val dbProperties = new Properties()
dbProperties.put("driver", "org.postgresql.Driver")

// Let's try to access the database using Spark
spark.read.jdbc(dbUrl, "person", dbProperties).count()

0

## Synthetic data generation



In [5]:
// Let's generate a number of patients using the Synthea generator.
// The default Synthea modules are used (modules contain
// generation rules).
import java.io.File
import fr.aphp.wind.eds.generator.source.synthea
import fr.aphp.wind.eds.generator.source.synthea.SyntheaDataBundle
import fr.aphp.wind.eds.data.DFSupplyCache

// We use a cache to avoid generating the synthea population again
// on a second run.
val cache = new DFSupplyCache(
    new File("/opt/generator/.cache"),
    format = "parquet")
val syntheaBundle = new SyntheaDataBundle(
    cache.cache(
        "synthea",
        synthea.tables,
        () => synthea.generate(5).genericBundle))

In [6]:
// Let's get an EDS bundle from the Synthea bundle
import fr.aphp.wind.eds.generator.target.eds._
val edsBundle = FromSynthea(syntheaBundle) 

// Let's validate that the tables have schemas compatible with the EDS
// (all the schemas are automatically generated from the table descriptions).
edsBundle.validate(allowMissingFields=true).throwOnErrors()

In [7]:
// We collect all the dataframes at this point.  This forces
// all the calculations to be made (Spark is a lazy engine).
edsBundle.genericBundle.collect()

List([Lorg.apache.spark.sql.Row;@146d99e0, [Lorg.apache.spark.sql.Row;@70cc3202, [Lorg.apache.spark.sql.Row;@50173a1b, [Lorg.apache.spark.sql.Row;@23c1ab09, [Lorg.apache.spark.sql.Row;@5b1b8408, [Lorg.apache.spark.sql.Row;@4ad6e14b, [Lorg.apache.spark.sq

## Identity and access management



In [8]:
// Let's give the right to view all the patients to a random provider.

// This is the ID of the provider with all-encompassing access rights.
val providerId = edsBundle.providers.select('provider_id.as[Long]).first

// We give access rights through a randomly chosen care site.
val careSiteId = edsBundle.careSites.select('care_site_id.as[Long]).first

// We gather all the person IDs.
val personIds = edsBundle.persons.select('person_id.as[Long])

// edsBundleWithRights is the EDS bundle augmented with identity
// and access management info.
val edsBundleWithRights = {
    // This import adds methods for rights management to
    // [[EDSDataBundle]], using an implicit class.
    import fr.aphp.wind.eds.generator.target.eds.access._

    edsBundle
        // Add the standard roles to the "role" table.
        .withStandardRoles()
        // Associates the chosen provider to one of the 
        // standard roles.
        .withProviderRoles(Seq(
            ProviderRole(
                standardRoles.carer,
                providerId,
                careSiteId)
            ).toDS())
        // Add all the persons to a cohort named "foo"
        // that is registered at the chosen care site.
        .withCohorts(
            Seq(CohortSpec("foo", careSiteId, personIds)))
}

In [14]:
// Ensure that the bundle still passes the validation rules.
edsBundleWithRights.validate(allowMissingFields=true).throwOnErrors()

In [18]:
// The username the provider must use in the token used for authentication
// is its "provider_source_value".
val providerUsername = edsBundle.providers
    .where('provider_id === providerId)
    .select('provider_source_value.as[String]).first

## Database synchronization



In [32]:
// Reset the test database by truncating all the tables.
client.eds.test.reset()

In [31]:
// Save the bundle to the test database. 
edsBundleWithRights
    // An additional column "hash" is required.  It is used
    // by the ETL.
    .withHash
    .genericBundle
    .saveToDB(
        dbUrl,
        connectionProperties=dbProperties)

In [33]:
// Update the search indexes.
client.eds.index.update()

## FHIR access



In [19]:
// Let's retrieve, as an example, the Patient resource for a
// random person, through the FHIR client.
val personId = edsBundle.persons.select('person_id.as[Long]).first

In [21]:
// This creates a FHIR client for a given username.  Under the
// hood, the test server creates a test token without first needing
// authentication.
val fhirClient = client.genericFhir(providerUsername)

In [23]:
// The FHIR client above is an R4 client, so we fetch an R4 Patient
// resource.
import org.hl7.fhir.r4.model.Patient
val patient = fhirClient.read()
    .resource(classOf[Patient])
    .withId(personId.toString)
    .execute()

In [24]:
// We encode the resource to a pretty-printed JSON string.
import ca.uhn.fhir.context.FhirContext
val parser = FhirContext.forR4().newJsonParser()
parser.setPrettyPrint(true)
println(parser.encodeResourceToString(patient))

{
  "resourceType": "Patient",
  "id": "5476746449018318943",
  "meta": {
    "lastUpdated": "2020-04-09T20:43:23.249+00:00"
  },
  "extension": [
    {
      "url": "deidentified",
      "valueBoolean": false
    },
    {
      "url": "Age(Years)",
      "valueInteger": 28
    }
  ],
  "active": true,
  "name": [
    {
      "family": "Jacobson885",
      "given": [
        "Jody426"
      ]
    }
  ],
  "gender": "female",
  "birthDate": "1991-05-07",
  "deceasedBoolean": false
}


In [34]:
// As a cursory check, we ensure that the person I Dmatches
// the patient ID
val patientId =
    patient.getId.substring(patient.getId.lastIndexOf('/') + 1)
if (patientId.toLong != personId) throw new AssertionError()