Skip to content

Commit

Permalink
Merge pull request #14691 from Add PatchResources to FhirIO.
Browse files Browse the repository at this point in the history
* Add PatchResources to FhirIO.

References:
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/patch
https://cloud.google.com/healthcare/docs/how-tos/fhir-resources#patching_a_fhir_resource
https://cloud.google.com/healthcare/docs/how-tos/fhir-resources#conditionally_patching_a_fhir_resource

The Google Cloud FHIR service doesn't support neither Patch nor
Conditional Patch within bundles, so it requires it's own connector.

* Run spotless

* Fix @nullable tags

* Fix guava import

* Prototype AutoValue implementation

* Final AutoValue implementation

* Add GroupIntoBatches to PatchResources.

* Private constructor
  • Loading branch information
msbukal committed May 6, 2021
1 parent d1042c0 commit e37bede
Show file tree
Hide file tree
Showing 6 changed files with 410 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -58,13 +59,16 @@
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
Expand Down Expand Up @@ -101,19 +105,19 @@
* <h3>Reading</h3>
*
* <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have a
* ${@link PCollection} of message IDs. This is appropriate for reading the Fhir notifications from
* ${@link PCollection} of FHIR resource names in the format of projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}. This is appropriate for reading the Fhir notifications from
* a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
* prepared list of messages that you need to process (e.g. in a text file read with {@link
* org.apache.beam.sdk.io.TextIO}*) .
*
* <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
* <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of FHIR resource name strings
* {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieve a
* {@link PCollection} containing the successfully fetched {@link String}s and/or {@link
* {@link PCollection} containing the successfully fetched json resources as {@link String}s and/or {@link
* FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link
* HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a
* HealthcareIOError}* containing the resources that could not be fetched and the exception as a
* {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
* choosing. This error handling is mainly to transparently surface errors where the upstream {@link
* PCollection}* contains IDs that are not valid or are not reachable due to permissions issues.
* PCollection}* contains FHIR resources that are not valid or are not reachable due to permissions issues.
*
* <h3>Writing</h3>
*
Expand Down Expand Up @@ -382,6 +386,16 @@ public static Deidentify deidentify(
return new Deidentify(sourceFhirStore, destinationFhirStore, deidConfig);
}

/**
* Patch FHIR resources, @see <a
* href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/patch></a>.
*
* @return the patch
*/
public static PatchResources patchResources() {
return new PatchResources();
}

/**
* Increments success and failure counters for an LRO. To be used after the LRO has completed.
* This function leverages the fact that the LRO metadata is always of the format: "counter": {
Expand Down Expand Up @@ -1330,6 +1344,7 @@ public enum ContentStructure {

/** The type Execute bundles. */
public static class ExecuteBundles extends PTransform<PCollection<String>, Write.Result> {

private final ValueProvider<String> fhirStore;

/**
Expand Down Expand Up @@ -1417,8 +1432,109 @@ public void executeBundles(ProcessContext context) {
}
}

/** The type Patch resources. */
public static class PatchResources extends PTransform<PCollection<Input>, Write.Result> {

private PatchResources() {}

/** Represents the input parameters for a single FHIR patch request. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract static class Input implements Serializable {
abstract String getResourceName();

abstract String getPatch();

abstract @Nullable Map<String, String> getQuery();

static Builder builder() {
return new AutoValue_FhirIO_PatchResources_Input.Builder();
}

@AutoValue.Builder
abstract static class Builder {
abstract Builder setResourceName(String resourceName);

abstract Builder setPatch(String patch);

abstract Builder setQuery(Map<String, String> query);

abstract Input build();
}
}

@Override
public FhirIO.Write.Result expand(PCollection<Input> input) {
int numShards = 10;
int batchSize = 10000;
PCollectionTuple bodies =
// Shard input into batches to improve worker performance.
input
.apply(
"Shard input",
WithKeys.of(elm -> ThreadLocalRandom.current().nextInt(0, numShards)))
.setCoder(KvCoder.of(TextualIntegerCoder.of(), input.getCoder()))
.apply("Assemble batches", GroupIntoBatches.ofSize(batchSize))
.setCoder(KvCoder.of(TextualIntegerCoder.of(), IterableCoder.of(input.getCoder())))
.apply(
ParDo.of(new PatchResourcesFn())
.withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY)));
bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
return Write.Result.in(input.getPipeline(), bodies);
}

/** The type Write Fhir fn. */
static class PatchResourcesFn extends DoFn<KV<Integer, Iterable<Input>>, String> {

private static final Counter PATCH_RESOURCES_ERRORS =
Metrics.counter(
PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_error_count");
private static final Counter PATCH_RESOURCES_SUCCESS =
Metrics.counter(
PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_success_count");
private static final Distribution PATCH_RESOURCES_LATENCY_MS =
Metrics.distribution(
PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_latency_ms");

private transient HealthcareApiClient client;
private final ObjectMapper mapper = new ObjectMapper();

/**
* Initialize healthcare client.
*
* @throws IOException If the Healthcare client cannot be created.
*/
@Setup
public void initClient() throws IOException {
this.client = new HttpHealthcareApiClient();
}

@ProcessElement
public void patchResources(ProcessContext context) {
Iterable<Input> batch = context.element().getValue();
for (Input patchParameter : batch) {
try {
long startTime = Instant.now().toEpochMilli();
client.patchFhirResource(
patchParameter.getResourceName(),
patchParameter.getPatch(),
patchParameter.getQuery());
PATCH_RESOURCES_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime);
PATCH_RESOURCES_SUCCESS.inc();
context.output(Write.SUCCESSFUL_BODY, patchParameter.toString());
} catch (IOException | HealthcareHttpException e) {
PATCH_RESOURCES_ERRORS.inc();
context.output(Write.FAILED_BODY, HealthcareIOError.of(patchParameter.toString(), e));
}
}
}
}
}

/** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */
public static class Export extends PTransform<PBegin, PCollection<String>> {

private final ValueProvider<String> fhirStore;
private final ValueProvider<String> exportGcsUriPrefix;

Expand Down Expand Up @@ -1481,6 +1597,7 @@ public void exportResourcesToGcs(ProcessContext context)

/** Deidentify FHIR resources from a FHIR store to a destination FHIR store. */
public static class Deidentify extends PTransform<PBegin, PCollection<String>> {

private final ValueProvider<String> sourceFhirStore;
private final ValueProvider<String> destinationFhirStore;
private final ValueProvider<DeidentifyConfig> deidConfig;
Expand Down Expand Up @@ -1551,6 +1668,7 @@ public void deidentify(ProcessContext context)
/** The type Search. */
public static class Search<T>
extends PTransform<PCollection<FhirSearchParameter<T>>, FhirIO.Search.Result> {

private static final Logger LOG = LoggerFactory.getLogger(Search.class);

private final ValueProvider<String> fhirStore;
Expand All @@ -1564,6 +1682,7 @@ public static class Search<T>
}

public static class Result implements POutput, PInput {

private PCollection<KV<String, JsonArray>> keyedResources;
private PCollection<JsonArray> resources;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,26 @@ private FhirSearchParameter(
this.queries = queries;
}

/**
* Creates a FhirSearchParameter to represent a FHIR Search request.
*
* @param resourceType resource type for search, leave empty for all
* @param key optional key to index searches by
* @param queries search query, with field as key and search as value
* @return FhirSearchParameter
*/
public static <T> FhirSearchParameter<T> of(
String resourceType, @Nullable String key, @Nullable Map<String, T> queries) {
return new FhirSearchParameter<>(resourceType, key, queries);
}

/**
* Creates a FhirSearchParameter to represent a FHIR Search request.
*
* @param resourceType resource type for search, leave empty for all
* @param queries search query, with field as key and search as value
* @return FhirSearchParameter
*/
public static <T> FhirSearchParameter<T> of(
String resourceType, @Nullable Map<String, T> queries) {
return new FhirSearchParameter<>(resourceType, null, queries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,28 @@ Operation pollOperation(Operation operation, Long sleepMs)
HttpBody executeFhirBundle(String fhirStore, String bundle)
throws IOException, HealthcareHttpException;

/**
* Patch fhir resource http body.
*
* @param resourceName the resource name, in format
* projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}[/{id}], id not
* present when queryString is specified.
* @param patch the patch operation
* @param query optional query for conditional patches
* @return the http body
*/
HttpBody patchFhirResource(String resourceName, String patch, @Nullable Map<String, String> query)
throws IOException, HealthcareHttpException;

/**
* Read fhir resource http body.
*
* @param resourceId the resource
* @param resourceName the resource name, in format
* projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}
* @return the http body
* @throws IOException the io exception
*/
HttpBody readFhirResource(String resourceId) throws IOException;
HttpBody readFhirResource(String resourceName) throws IOException;

/**
* Search fhir resource http body.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class HttpHealthcareApiClient implements HealthcareApiClient, Serializable {

private static final String USER_AGENT =
String.format(
"apache-beam-io-google-cloud-platform-healthcare/%s",
ReleaseInfo.getReleaseInfo().getSdkVersion());
private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json";
private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8";
private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8";
private static final String FHIRSTORE_PATCH_CONTENT_TYPE = "application/json-patch+json";
private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class);
private transient CloudHealthcare client;
private transient HttpClient httpClient;
Expand Down Expand Up @@ -594,11 +596,61 @@ public HttpBody executeFhirBundle(String fhirStore, String bundle)
return responseModel;
}

@Override
public HttpBody patchFhirResource(
String resourceName, String patch, @Nullable Map<String, String> query)
throws IOException, HealthcareHttpException {
if (httpClient == null || client == null) {
initClient();
}

credentials.refreshIfExpired();
StringEntity requestEntity = new StringEntity(patch, ContentType.APPLICATION_JSON);
URI uri;
try {
URIBuilder uriBuilder = new URIBuilder(client.getRootUrl() + "v1beta1/" + resourceName);
if (query != null) {
for (Map.Entry<String, String> q : query.entrySet()) {
uriBuilder.addParameter(q.getKey(), q.getValue());
}
}
uri = uriBuilder.build();
} catch (URISyntaxException e) {
LOG.error("URL error when making patch request to FHIR API. " + e.getMessage());
throw new IllegalArgumentException(e);
}

RequestBuilder requestBuilder =
RequestBuilder.patch()
.setUri(uri)
.setEntity(requestEntity)
.addHeader("Authorization", "Bearer " + credentials.getAccessToken().getTokenValue())
.addHeader("User-Agent", USER_AGENT)
.addHeader("Content-Type", FHIRSTORE_PATCH_CONTENT_TYPE)
.addHeader("Accept", FHIRSTORE_HEADER_ACCEPT)
.addHeader("Accept-Charset", FHIRSTORE_HEADER_ACCEPT_CHARSET);

HttpUriRequest request = requestBuilder.build();
HttpResponse response = httpClient.execute(request);
HttpEntity responseEntity = response.getEntity();
String content = EntityUtils.toString(responseEntity);

// Check 2XX code.
int statusCode = response.getStatusLine().getStatusCode();
if (!(statusCode / 100 == 2)) {
throw HealthcareHttpException.of(statusCode, content);
}
HttpBody responseModel = new HttpBody();
responseModel.setData(content);
return responseModel;
}

/**
* Wraps {@link HttpResponse} in an exception with a statusCode field for use with {@link
* HealthcareIOError}.
*/
public static class HealthcareHttpException extends Exception {

private final int statusCode;

private HealthcareHttpException(int statusCode, String message) {
Expand Down Expand Up @@ -630,8 +682,15 @@ int getStatusCode() {
}

@Override
public HttpBody readFhirResource(String resourceId) throws IOException {
return client.projects().locations().datasets().fhirStores().fhir().read(resourceId).execute();
public HttpBody readFhirResource(String resourceName) throws IOException {
return client
.projects()
.locations()
.datasets()
.fhirStores()
.fhir()
.read(resourceName)
.execute();
}

@Override
Expand Down
Loading

0 comments on commit e37bede

Please sign in to comment.