Skip to content

Commit

Permalink
[FLINK-35548] Add E2E tests for PubSubSinkV2
Browse files Browse the repository at this point in the history
  • Loading branch information
vahmed-hamdy committed Jun 7, 2024
1 parent baafc39 commit 1e45ee3
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 12 deletions.
12 changes: 12 additions & 0 deletions flink-connector-gcp-pubsub-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -73,6 +79,12 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.apache.flink.connector.gcp.pubsub.sink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLogger;

import com.google.pubsub.v1.ReceivedMessage;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.testcontainers.containers.PubSubEmulatorContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.util.List;

/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */
@ExtendWith(MiniClusterExtension.class)
@Execution(ExecutionMode.CONCURRENT)
@Testcontainers
class PubSubSinkV2ITTests extends TestLogger {

private static final String PROJECT_ID = "test-project";

private static final String TOPIC_ID = "test-topic";

private static final String SUBSCRIPTION_ID = "test-subscription";

private StreamExecutionEnvironment env;

@Container
private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
new PubSubEmulatorContainer(
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));

private PubsubHelper pubSubHelper;

@BeforeEach
void setUp() throws IOException {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
pubSubHelper = new PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());

pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID);
pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, PROJECT_ID, TOPIC_ID);
}

@AfterEach
void tearDown() throws IOException {
pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID);
pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID);
pubSubHelper.close();
}

@Test
void pubSubSinkV2DeliversRecords() throws Exception {
String[] elements = new String[] {"test1", "test2", "test3"};
DataStreamSource<String> stream =
env.fromSource(
new DataGeneratorSource<>(
new FromElementsGeneratorFunction<>(
BasicTypeInfo.STRING_TYPE_INFO, elements),
elements.length,
TypeInformation.of(String.class)),
WatermarkStrategy.noWatermarks(),
"DataGeneratorSource");

GcpPublisherConfig gcpPublisherConfig =
GcpPublisherConfig.builder()
.setCredentialsProvider(EmulatorCredentialsProvider.create())
.setTransportChannelProvider(
new TestChannelProvider(
PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()))
.build();

PubSubSinkV2<String> sink =
PubSubSinkV2.<String>builder()
.setProjectId(PROJECT_ID)
.setTopicId(TOPIC_ID)
.setSerializationSchema(new SimpleStringSchema())
.setGcpPublisherConfig(gcpPublisherConfig)
.setFailOnError(true)
.build();

stream.sinkTo(sink);

env.execute("PubSubSinkV2ITTests");
List<ReceivedMessage> receivedMessages =
pubSubHelper.pullMessages(PROJECT_ID, SUBSCRIPTION_ID, 100);

Assertions.assertThat(receivedMessages).hasSameSizeAs(elements);
Assertions.assertThat(receivedMessages)
.extracting(ReceivedMessage::getMessage)
.extracting(message -> message.getData().toStringUtf8())
.containsExactlyInAnyOrder(elements);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.gcp.pubsub.emulator;
package org.apache.flink.connector.gcp.pubsub.sink.util;

import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;

import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
Expand All @@ -36,27 +40,50 @@
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/** A helper class to make managing the testing topics a bit easier. */
public class PubsubHelper {

private static final Logger LOG = LoggerFactory.getLogger(PubsubHelper.class);

private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(5);

private ManagedChannel channel;

private TransportChannelProvider channelProvider;

private TopicAdminClient topicClient;

private SubscriptionAdminClient subscriptionAdminClient;

public PubsubHelper(String endpoint) {
channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
}

public PubsubHelper(TransportChannelProvider channelProvider) {
this.channelProvider = channelProvider;
}

public TransportChannelProvider getChannelProvider() {
return channelProvider;
}

public ManagedChannel getChannel() {
return channel;
}

public TopicAdminClient getTopicAdminClient() throws IOException {
if (topicClient == null) {
TopicAdminSettings topicAdminSettings =
Expand Down Expand Up @@ -90,12 +117,6 @@ public void deleteTopic(TopicName topicName) throws IOException {
return;
}

// If it exists we delete all subscriptions and the topic itself.
LOG.info("DeleteTopic {} first delete old subscriptions.", topicName);
adminClient
.listTopicSubscriptions(topicName)
.iterateAll()
.forEach(subscriptionAdminClient::deleteSubscription);
LOG.info("DeleteTopic {}", topicName);
adminClient.deleteTopic(topicName);
}
Expand Down Expand Up @@ -222,4 +243,34 @@ public Publisher createPublisher(String project, String topic) throws IOExceptio
.setCredentialsProvider(EmulatorCredentialsProvider.create())
.build();
}

public void close() {
if (topicClient != null) {
try {
topicClient.shutdown();
topicClient.awaitTermination(SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Error shutting down topic client", e);
}
}

if (subscriptionAdminClient != null) {
try {
subscriptionAdminClient.shutdown();
subscriptionAdminClient.awaitTermination(
SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Error shutting down subscription admin client", e);
}
}

if (channel != null) {
try {
channel.shutdown();
channel.awaitTermination(SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Error shutting down channel", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.flink.connector.gcp.pubsub.sink.util;

import org.apache.flink.connector.gcp.pubsub.sink.config.SerializableTransportChannelProvider;

import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

/** A test channel provider for {@link GrpcTransportChannel}. */
public class TestChannelProvider extends SerializableTransportChannelProvider {

private final String endpoint;

public TestChannelProvider(String endpoint) {
this.endpoint = endpoint;
}

@Override
protected void open() {
ManagedChannel managedChannel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
this.transportChannelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.flink.streaming.connectors.gcp.pubsub;

import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;

import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;

import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;

import com.google.pubsub.v1.ReceivedMessage;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.flink.streaming.connectors.gcp.pubsub;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;

import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.gcp.pubsub.emulator;

import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
import org.apache.flink.util.TestLogger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.auth.Credentials;

import java.io.Serializable;

/**
* A CredentialsProvider that simply provides the right credentials that are to be used for
* connecting to an emulator. NOTE: The Google provided NoCredentials and NoCredentialsProvider do
* not behave as expected. See https://github.com/googleapis/gax-java/issues/1148
*/
public final class EmulatorCredentialsProvider implements CredentialsProvider {
public final class EmulatorCredentialsProvider implements CredentialsProvider, Serializable {
@Override
public Credentials getCredentials() {
return EmulatorCredentials.getInstance();
Expand Down

0 comments on commit 1e45ee3

Please sign in to comment.