Skip to content

Commit

Permalink
Fixes ITs
Browse files Browse the repository at this point in the history
  • Loading branch information
chamikaramj committed Apr 9, 2021
1 parent 29ff0c0 commit 3d29f9e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.hamcrest.text.MatchesPattern;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -93,8 +94,11 @@ public void testConstructPubsubRead() throws Exception {
RunnerApi.PTransform transform = result.getTransform();
assertThat(
transform.getSubtransformsList(),
Matchers.contains(
"test_namespacetest/PubsubUnboundedSource", "test_namespacetest/MapElements"));
Matchers.hasItem(MatchesPattern.matchesPattern("PubsubUnboundedSource")));
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern("MapElements")));

assertThat(transform.getInputsCount(), Matchers.is(0));
assertThat(transform.getOutputsCount(), Matchers.is(1));
}
Expand Down Expand Up @@ -150,8 +154,10 @@ public void testConstructPubsubWrite() throws Exception {
RunnerApi.PTransform transform = result.getTransform();
assertThat(
transform.getSubtransformsList(),
Matchers.contains(
"test_namespacetest/MapElements", "test_namespacetest/PubsubUnboundedSink"));
Matchers.hasItem(MatchesPattern.matchesPattern("MapElements")));
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern("PubsubUnboundedSink")));
assertThat(transform.getInputsCount(), Matchers.is(1));
assertThat(transform.getOutputsCount(), Matchers.is(0));

Expand Down
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.hamcrest.Matchers;
import org.hamcrest.text.MatchesPattern;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -120,19 +121,24 @@ public void testConstructKafkaRead() throws Exception {
RunnerApi.PTransform transform = result.getTransform();
assertThat(
transform.getSubtransformsList(),
Matchers.contains(
"test_namespacetest/KafkaIO.Read", "test_namespacetest/Remove Kafka Metadata"));
Matchers.hasItem(MatchesPattern.matchesPattern("KafkaIO-Read")));
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern("KafkaIO-Read")));
assertThat(transform.getInputsCount(), Matchers.is(0));
assertThat(transform.getOutputsCount(), Matchers.is(1));

RunnerApi.PTransform kafkaComposite =
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
assertThat(
kafkaComposite.getSubtransformsList(),
Matchers.contains(
"test_namespacetest/KafkaIO.Read/Impulse",
"test_namespacetest/KafkaIO.Read/ParDo(GenerateKafkaSourceDescriptor)",
"test_namespacetest/KafkaIO.Read/KafkaIO.ReadSourceDescriptors"));
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern("Impulse")));
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern("GenerateKafkaSourceDescriptor")));
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern("ReadSourceDescriptors")));
RunnerApi.PTransform kafkaSdfParDo =
result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(2));
RunnerApi.ParDoPayload parDoPayload =
Expand Down

0 comments on commit 3d29f9e

Please sign in to comment.