Skip to content

Commit

Permalink
Merge pull request #350 from matt-kwong/restructure
Browse files Browse the repository at this point in the history
Add example job using CPS connector and restructure flink-connector directory
  • Loading branch information
hannahrogers-google committed Feb 22, 2024
2 parents 0450c80 + a6862ac commit fa56435
Show file tree
Hide file tree
Showing 40 changed files with 508 additions and 79 deletions.
148 changes: 148 additions & 0 deletions flink-connector/flink-connector-gcp-pubsub/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.google.pubsub.flink</groupId>
<artifactId>flink-connector-parent</artifactId>
<version>0.0.0</version>
</parent>

<groupId>com.google.pubsub.flink</groupId>
<packaging>jar</packaging>
<name>Pub/Sub Flink Connector</name>
<artifactId>flink-connector-gcp-pubsub</artifactId>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.4.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.49.2:exe:${os.detected.classifier}
</pluginArtifact>
<protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<exclusions>
<exclusion>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</exclusion>
</exclusions>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-pubsub-v1</artifactId>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-gwt</artifactId>
<version>32.1.2-jre</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-auth</artifactId>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-okhttp</artifactId>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.base.Optional;
import com.google.pubsub.flink.internal.sink.PubSubFlushablePublisher;
import com.google.pubsub.flink.internal.sink.PubSubPublisherCache;
import com.google.pubsub.flink.internal.sink.PubSubSinkWriter;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Optional;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.base.Optional;
import com.google.pubsub.flink.internal.source.enumerator.PubSubCheckpointSerializer;
import com.google.pubsub.flink.internal.source.enumerator.PubSubSplitEnumerator;
import com.google.pubsub.flink.internal.source.reader.AckTracker;
Expand All @@ -32,7 +33,6 @@
import com.google.pubsub.flink.proto.PubSubEnumeratorCheckpoint;
import com.google.pubsub.v1.ProjectSubscriptionName;
import java.util.HashMap;
import java.util.Optional;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package com.google.pubsub.flink.internal.source.reader;

import com.google.api.core.ApiFuture;
import com.google.common.base.Optional;
import com.google.pubsub.v1.PubsubMessage;
import java.util.Optional;

public interface NotifyingPullSubscriber {
/** Returns a {@link ApiFuture} that will be completed when messages are available to pull */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Optional;

public class PubSubNotifyingPullSubscriber implements NotifyingPullSubscriber {
public static class SubscriberWakeupException extends Exception {}
Expand All @@ -41,10 +41,10 @@ public interface SubscriberFactory {
private final Subscriber subscriber;

@GuardedBy("this")
private Optional<Throwable> permanentError = Optional.empty();
private Optional<Throwable> permanentError = Optional.absent();

@GuardedBy("this")
private Optional<SettableApiFuture<Void>> notification = Optional.empty();
private Optional<SettableApiFuture<Void>> notification = Optional.absent();

@GuardedBy("this")
private final Deque<PubsubMessage> messages = new ArrayDeque<>();
Expand Down Expand Up @@ -86,7 +86,7 @@ public synchronized Optional<PubsubMessage> pullMessage() throws Throwable {
throw permanentError.get();
}
if (messages.size() == 0) {
return Optional.empty();
return Optional.absent();
}
return Optional.of(messages.pop());
}
Expand All @@ -106,7 +106,7 @@ private synchronized void receiveMessage(
PubsubMessage message, AckReplyConsumer ackReplyConsumer) {
ackTracker.addPendingAck(ackReplyConsumer);
messages.add(message);
completeNotification(Optional.empty());
completeNotification(Optional.absent());
}

@VisibleForTesting
Expand All @@ -121,7 +121,7 @@ private synchronized void completeNotification(Optional<Throwable> t) {
} else {
notification.get().set(null);
}
notification = Optional.empty();
notification = Optional.absent();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -45,7 +46,9 @@ public PubSubSplitReader(Supplier<NotifyingPullSubscriber> factory) {
private Multimap<String, PubsubMessage> getMessages() throws Throwable {
ImmutableListMultimap.Builder<String, PubsubMessage> messages = ImmutableListMultimap.builder();
for (Map.Entry<String, NotifyingPullSubscriber> entry : subscribers.entrySet()) {
entry.getValue().pullMessage().ifPresent(m -> messages.put(entry.getKey(), m));
for (PubsubMessage m : entry.getValue().pullMessage().asSet()) {
messages.put(entry.getKey(), m);
}
}
return messages.build();
}
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Multimap;
Expand All @@ -33,7 +34,6 @@
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -121,7 +121,7 @@ public void fetch_oneSplitNoMessages() throws Throwable {
when(mockSubscriber1.notifyDataAvailable()).thenReturn(ApiFutures.immediateFuture(null));
when(mockSubscriber2.notifyDataAvailable()).thenReturn(SettableApiFuture.create());
when(mockSubscriber1.pullMessage()).thenReturn(Optional.of(message));
when(mockSubscriber2.pullMessage()).thenReturn(Optional.empty());
when(mockSubscriber2.pullMessage()).thenReturn(Optional.absent());

RecordsWithSplitIds<PubsubMessage> records = reader.fetch();
assertThat(records.finishedSplits()).isEmpty();
Expand Down Expand Up @@ -185,6 +185,8 @@ public void interrupt_returnsEmptyMessages() throws Throwable {
SettableApiFuture<Void> future2 = SettableApiFuture.create();
when(mockSubscriber1.notifyDataAvailable()).thenReturn(future1);
when(mockSubscriber2.notifyDataAvailable()).thenReturn(future2);
when(mockSubscriber1.pullMessage()).thenReturn(Optional.absent());
when(mockSubscriber2.pullMessage()).thenReturn(Optional.absent());
doAnswer(
invocation -> {
future1.setException(new PubSubNotifyingPullSubscriber.SubscriberWakeupException());
Expand Down
53 changes: 53 additions & 0 deletions flink-connector/flink-examples-gcp-pubsub/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.google.pubsub.flink</groupId>
<artifactId>flink-connector-parent</artifactId>
<version>0.0.0</version>
</parent>

<artifactId>flink-examples-gcp-pubsub</artifactId>
<packaging>pom</packaging>
<name>Google Cloud Pub/Sub Connector Examples</name>

<properties>
<japicmp.skip>true</japicmp.skip>
</properties>

<modules>
<module>pubsub-streaming</module>
</modules>

<dependencies>
<dependency>
<groupId>com.google.pubsub.flink</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.13</version>
<configuration>
<skipNexusStagingDeployMojo>true</skipNexusStagingDeployMojo>
</configuration>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit fa56435

Please sign in to comment.