Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13351] FhirIO GetPatientEverything connector #15936

Merged
merged 7 commits into from Nov 30, 2021

Conversation

msbukal
Copy link
Contributor

@msbukal msbukal commented Nov 10, 2021

Add FhirIO connector for: https://cloud.google.com/healthcare-api/docs/reference/rest/v1/projects.locations.datasets.fhirStores.fhir/Patient-everything


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@lukecwik
Copy link
Member

R: @pabloem

@msbukal
Copy link
Contributor Author

msbukal commented Nov 11, 2021

Run Java PostCommit

Copy link
Member

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good. Added a few comments. Aslo it seems that there's a bad merge.

* retrieve a {@link PCollection} containing the successfully fetched {@link String}s and/or {@link
* FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link
* HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a
* <<<<<<< HEAD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops there seem to be some issues with the merged result of the file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, missed these somehow because they didn't cause runtime errors haha.

getPatientEverything(
patientEverythingParameter.getResourceName(),
patientEverythingParameter.getFilters()));
} catch (IllegalArgumentException | NoSuchElementException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC - if there are any other exceptions, they will be thrown out right? And retried by the runner however it chooses to - correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I believe.

* PatientEverythingParameter defines required attributes for a FHIR GetPatientEverything request
* in {@link FhirIOPatientEverything}. *
*/
@DefaultCoder(PatientEverythingParameterCoder.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may be able to do @DefaultSchema(JavaBeanSchema.class) for this - the advantage is that you would not have to add your own coder (though you may need to add setters and getters or constructor - or you can make it an autovalue class and use @DefaultSchema(AutoValueSchema.class)). The other advantage is that if it has a schema, it can be used easily as a cross-language transform, and users can use it with other transforms that operate on PCollections with schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I made them AutoValue!

public void processElement(ProcessContext context) {
PatientEverythingParameter patientEverythingParameter = context.element();
try {
context.output(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to parallelize the records of a single bundle, you could run something like that:

@StartBundle
tpe = new ThreadPoolExecutor();

@ProcessBundle
future = tpe.submit(param -> getPatientEverything(....))
futures.add(future)

@FinishBundle
for (f : futures) {
  // collect outputs or errors
}

for other healthcare IO we decided to not parallelize things - and this may be a fair choice in this case - but I'm just curious if you think this may be useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up for this.

this.fhirStore = StaticValueProvider.of(fhirStore);
@Override
ValueProvider<String> getFhirStore() {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't you return the fhirStore here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added a couple questions. ready to merge after that : )

Comment on lines +1372 to +1378
Optional<ValueProvider<String>> getImportGcsTempPath() {
return Optional.empty();
}

@Override
Optional<ValueProvider<String>> getImportGcsDeadLetterPath() {
return Optional.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you use ValueProviders / templates in your use case? If you do not, then I think it does not make sense to add new properties that depend on value providers. Instead, just make this Optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean, these methods are only here because I made ExecuteBundles implement FhirIO.Write, which requires them.

}

@Override
public Map<TupleTag<?>, PValue> expand() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you interested in returning a Result type with getFailedReads and getPatientCompartments? This would be a little nicer to use for others, but maybe you have your reasons for going with the Map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry... isn't this what I'm doing? The return type of the FhirIOPatientEverything PTransform is Result, and there is the methods getFailedReads and getPatientCompartments, but the type POutput has to implement expand, like all the other .Result methods in FhirIO?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh my bad! I misread this. Thanks!

@pabloem
Copy link
Member

pabloem commented Nov 30, 2021

LGTM. Sorry about the delay!

@pabloem pabloem changed the title FhirIO GetPatientEverything connector [BEAM-13351] FhirIO GetPatientEverything connector Nov 30, 2021
@pabloem pabloem merged commit 5ce29e7 into apache:master Nov 30, 2021
@y1chi
Copy link
Contributor

y1chi commented Dec 6, 2021

The test is failing and breaking post commit, https://issues.apache.org/jira/browse/BEAM-13387 @msbukal could you take a look at it?

@msbukal
Copy link
Contributor Author

msbukal commented Dec 6, 2021

Hi, could you provide more information regarding the failed run? Is this a flake, or consistent error? Where is this test running?

The test passes locally for me. The exception is coming from line 85 but this is the exact same setup that other integration tests use (eg. FhirIOSearchIT). Probably because I run the test independently it passes, will investigate more.

@y1chi
Copy link
Contributor

y1chi commented Dec 6, 2021

Hi, could you provide more information regarding the failed run? Is this a flake, or consistent error? Where is this test running?

The test passes locally for me. The exception is coming from line 85 but this is the exact same setup that other integration tests use (eg. FhirIOSearchIT). Probably because I run the test independently it passes, will investigate more.

You can check the jenkins job from https://issues.apache.org/jira/browse/BEAM-13387

it seems to fail consistently, but only on dataflow runner v2. On dataflow runner v1 it seems passing https://ci-beam.apache.org/job/beam_PostCommit_Java_DataflowV1/993/testReport/org.apache.beam.sdk.io.gcp.healthcare/FhirIOPatientEverythingIT/testFhirIOPatientEverything_R4_/

@msbukal
Copy link
Contributor Author

msbukal commented Dec 7, 2021

How can I run the test that uses v2 dataflow runner locally?

None of the test seem to modify the bundle list, but still I think maybe changing the tests to copy the array instead of just using it might fix the issue. But I have no idea how to test this... especially since the presubmit run here passed.

@y1chi
Copy link
Contributor

y1chi commented Dec 7, 2021

It appears that we are not running other FhirIO tests with dataflow runner v2.

exclude '**/FhirIOReadIT.class'

I think you need either exclude the test from runner v2 if it is not intended to work on runner v2, or you can try to make

task googleCloudPlatformRunnerV2IntegrationTest(type: Test) {
depends on copyGoogleCloudPlatformTestResources like
task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyGoogleCloudPlatformTestResources) {
(since you need to read the resource files).

You can test it locally by running the corresponding gradle task.

@msbukal
Copy link
Contributor Author

msbukal commented Dec 7, 2021

Ah that makes so much sense why my new integration tests always fail.

I'll send a quick PR to disable the new test in v2 for now, and follow up to investigate enabling in v2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants