Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9779] Patch HL7v2IOWriteIT Flakiness #11450

Merged
merged 24 commits into from Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,10 +27,8 @@

import java.io.IOException;
import java.security.SecureRandom;
import java.util.Collections;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.ListHL7v2MessageIDs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
Expand All @@ -39,6 +37,7 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -54,6 +53,7 @@ public class HL7v2IOReadIT {
+ "_"
+ (new SecureRandom().nextInt(32))
+ "_read_it";
@Rule public transient TestPipeline pipeline = TestPipeline.create();

@BeforeClass
public static void createHL7v2tore() throws IOException {
Expand Down Expand Up @@ -86,36 +86,6 @@ public void tearDown() throws Exception {
deleteAllHL7v2Messages(this.client, healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME);
}

@Test
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests are actually redundant with the non-deleted testHL7v2IO_ListHL7v2Messages and testHL7v2IO_ListHL7v2Messages_filtered

public void testHL7v2IORead() throws Exception {
// Should read all messages.
Pipeline pipeline = Pipeline.create();
HL7v2IO.Read.Result result =
pipeline
.apply(
new ListHL7v2MessageIDs(
Collections.singletonList(
healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME)))
.apply(HL7v2IO.getAll());
PCollection<Long> numReadMessages =
result.getMessages().setCoder(new HL7v2MessageCoder()).apply(Count.globally());
PAssert.thatSingleton(numReadMessages).isEqualTo((long) MESSAGES.size());
PAssert.that(result.getFailedReads()).empty();

PAssert.that(result.getMessages())
.satisfies(
input -> {
for (HL7v2Message elem : input) {
assertFalse(elem.getName().isEmpty());
assertFalse(elem.getData().isEmpty());
assertFalse(elem.getMessageType().isEmpty());
}
return null;
});

pipeline.run();
}

@Test
public void testHL7v2IO_ListHL7v2Messages() throws Exception {
// Should read all messages.
Expand Down Expand Up @@ -164,33 +134,4 @@ public void testHL7v2IO_ListHL7v2Messages_filtered() throws Exception {

pipeline.run();
}

@Test
public void testHL7v2IORead_filtered() throws Exception {
final String adtFilter = "messageType = \"ADT\"";
// Should read only messages matching the filter.
Pipeline pipeline = Pipeline.create();
HL7v2IO.Read.Result result =
pipeline
.apply(
new ListHL7v2MessageIDs(
Collections.singletonList(
healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME),
adtFilter))
.apply(HL7v2IO.getAll());
PCollection<Long> numReadMessages =
result.getMessages().setCoder(new HL7v2MessageCoder()).apply(Count.globally());
PAssert.thatSingleton(numReadMessages).isEqualTo(NUM_ADT);
PAssert.that(result.getFailedReads()).empty();

PAssert.that(result.getMessages())
.satisfies(
input -> {
for (HL7v2Message elem : input) {
assertEquals("ADT", elem.getMessageType());
}
return null;
});
pipeline.run();
}
}
Expand Up @@ -18,17 +18,22 @@
package org.apache.beam.sdk.io.gcp.healthcare;

import com.google.api.client.util.Base64;
import com.google.api.client.util.Sleeper;
import com.google.api.services.healthcare.v1beta1.model.Message;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HL7v2MessagePages;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;

class HL7v2IOTestUtil {
/** Google Cloud Healthcare Dataset in Apache Beam integration test project. */
Expand Down Expand Up @@ -81,20 +86,51 @@ class HL7v2IOTestUtil {
/** Clear all messages from the HL7v2 store. */
static void deleteAllHL7v2Messages(HealthcareApiClient client, String hl7v2Store)
throws IOException {
for (String msgId :
client
.getHL7v2MessageStream(hl7v2Store)
.map(HL7v2Message::getName)
.collect(Collectors.toList())) {
client.deleteHL7v2Message(msgId);
for (List<HL7v2Message> page : new HL7v2MessagePages(client, hl7v2Store)) {
for (String msgId : page.stream().map(HL7v2Message::getName).collect(Collectors.toList())) {
client.deleteHL7v2Message(msgId);
}
}
}

/** Utiliy for waiting on HL7v2 Store indexing to be complete see BEAM-9779. */
public static void waitForHL7v2Indexing(
HealthcareApiClient client, String hl7v2Store, long expectedNumMessages, Duration timeout)
throws InterruptedException, TimeoutException {

Instant start = Instant.now();
long sleepMs = 50;
long numListedMessages = 0;
while (new Duration(start, Instant.now()).isShorterThan(timeout)) {
numListedMessages = 0;
// count messages in HL7v2 Store.
for (List<HL7v2Message> page :
new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store)) {
numListedMessages += page.size();
}
if (numListedMessages == expectedNumMessages) {
return;
}
// exponential backoff.
sleepMs *= 2;
Sleeper.DEFAULT.sleep(sleepMs);
}
throw new TimeoutException(
String.format(
"Timed out waiting for %s to reach %s messages. last list request returned %s messages.",
hl7v2Store, expectedNumMessages, numListedMessages));
}

/** Populate the test messages into the HL7v2 store. */
static void writeHL7v2Messages(HealthcareApiClient client, String hl7v2Store) throws IOException {
static void writeHL7v2Messages(HealthcareApiClient client, String hl7v2Store)
throws IOException, InterruptedException, TimeoutException {
for (HL7v2Message msg : MESSAGES) {
client.createHL7v2Message(hl7v2Store, msg.toModel());
}
// [BEAM-9779] HL7v2 indexing is asyncronous. Block until indexing completes to stabilize this
// IT.
HL7v2IOTestUtil.waitForHL7v2Indexing(
client, hl7v2Store, MESSAGES.size(), Duration.standardMinutes(1));
}

/**
Expand Down Expand Up @@ -170,10 +206,11 @@ public void initClient() throws IOException {
public void listMessages(ProcessContext context) throws IOException {
String hl7v2Store = context.element();
// Output all elements of all pages.
this.client
.getHL7v2MessageStream(hl7v2Store, this.filter)
.map(HL7v2Message::getName)
.forEach(context::output);
HttpHealthcareApiClient.HL7v2MessagePages pages =
new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
for (List<HL7v2Message> page : pages) {
page.stream().map(HL7v2Message::getName).forEach(context::output);
}
}
}

Expand Down
Expand Up @@ -20,20 +20,22 @@
import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.MESSAGES;
import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.deleteAllHL7v2Messages;
import static org.junit.Assert.assertEquals;

import com.google.api.services.healthcare.v1beta1.model.Hl7V2Store;
import java.io.IOException;
import java.security.SecureRandom;
import org.apache.beam.sdk.Pipeline;
import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -46,12 +48,15 @@ public class HL7v2IOWriteIT {
private static final String HL7V2_STORE_NAME =
"hl7v2_store_write_it_" + System.currentTimeMillis() + "_" + (new SecureRandom().nextInt(32));

@Rule public transient TestPipeline pipeline = TestPipeline.create();
jaketf marked this conversation as resolved.
Show resolved Hide resolved

@BeforeClass
public static void createHL7v2tore() throws IOException {
String project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
HealthcareApiClient client = new HttpHealthcareApiClient();
client.createHL7v2Store(healthcareDataset, HL7V2_STORE_NAME);
Hl7V2Store store = client.createHL7v2Store(healthcareDataset, HL7V2_STORE_NAME);
store.getParserConfig();
}

@AfterClass
Expand All @@ -65,7 +70,6 @@ public void setup() throws Exception {
if (client == null) {
client = new HttpHealthcareApiClient();
}
PipelineOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
}

@After
Expand All @@ -74,8 +78,7 @@ public void tearDown() throws Exception {
}

@Test
public void testHL7v2IOWrite() throws IOException {
Pipeline pipeline = Pipeline.create();
public void testHL7v2IOWrite() throws Exception {
HL7v2IO.Write.Result result =
pipeline
.apply(Create.of(MESSAGES).withCoder(new HL7v2MessageCoder()))
Expand All @@ -84,10 +87,15 @@ public void testHL7v2IOWrite() throws IOException {
PAssert.that(result.getFailedInsertsWithErr()).empty();

pipeline.run().waitUntilFinish();
long numWrittenMessages =
client
.getHL7v2MessageStream(healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME)
.count();
assertEquals(MESSAGES.size(), numWrittenMessages);

try {
HL7v2IOTestUtil.waitForHL7v2Indexing(
client,
healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME,
MESSAGES.size(),
Duration.standardMinutes(2));
} catch (TimeoutException e) {
Assert.fail(e.getMessage());
}
}
}