Skip to content

Commit

Permalink
[FLINK-20625] Review comments and code clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Skraba committed Nov 15, 2023
1 parent 55f0cf1 commit b6e38a6
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
* PubSubSourceReader} that joins. The split does not contain any split-specific information because
* Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing
* partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple
* readers which use the same subscription.
* readers using same subscription.
*
* <p>A {@link PubSubSource} can be constructed through the {@link PubSubSourceBuilder} like so:
*
Expand Down Expand Up @@ -229,8 +229,8 @@ public PubSubSourceBuilder<OUT> setSubscriptionName(String subscriptionName) {
}

/**
* @param credentials an instance of {@com.google.auth.Credentials} to authenticate against
* Google Cloud
* @param credentials an instance of {@link com.google.auth.Credentials} to authenticate
* against Google Cloud
*/
public PubSubSourceBuilder<OUT> setCredentials(Credentials credentials) {
this.credentials = credentials;
Expand All @@ -245,7 +245,7 @@ public PubSubSourceBuilder<OUT> setPubSubSubscriberFactory(
}

/**
* Create a parameterized {@DefaultPubSubSubscriberFactory} and set it on the builder.
* Create a parameterized {@link DefaultPubSubSubscriberFactory} and set it on the builder.
*
* @param maxMessagesPerPull The maximum number of messages that should be pulled in one go.
* @param perRequestTimeout The timeout per request from the subscriber
Expand Down Expand Up @@ -289,7 +289,7 @@ public PubSubSource<OUT> build() throws IOException {
DEFAULT_PUBSUB_SUBSCRIBER_MAX_MESSAGES_PER_PULL);
}

return new PubSubSource(
return new PubSubSource<>(
deserializationSchema, pubSubSubscriberFactory, props, credentials);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public class PubSubRecordEmitter<T> implements RecordEmitter<Tuple2<T, Long>, T,

@Override
public void emitRecord(
Tuple2<T, Long> element, SourceOutput<T> output, PubSubSplitState splitState)
throws Exception {
Tuple2<T, Long> element, SourceOutput<T> output, PubSubSplitState splitState) {
output.collect(element.f0, element.f1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.flink.connector.gcp.pubsub.source.reader;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
Expand All @@ -45,8 +46,9 @@ class PubSubSourceFetcherManager<T>

PubSubSourceFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, Long>>> elementsQueue,
Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> splitReaderSupplier) {
super(elementsQueue, splitReaderSupplier);
Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> splitReaderSupplier,
Configuration config) {
super(elementsQueue, splitReaderSupplier, config);
}

void prepareForAcknowledgement(long checkpointId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -51,7 +51,7 @@ public PubSubSourceReader(
SourceReaderContext context) {
super(
elementsQueue,
new PubSubSourceFetcherManager<>(elementsQueue, splitReaderSupplier::get),
new PubSubSourceFetcherManager<>(elementsQueue, splitReaderSupplier::get, config),
recordEmitter,
config,
context);
Expand All @@ -64,7 +64,7 @@ protected void onSplitFinished(Map<String, PubSubSplitState> finishedSplitIds) {
public List<PubSubSplit> snapshotState(long checkpointId) {
((PubSubSourceFetcherManager<T>) splitFetcherManager)
.prepareForAcknowledgement(checkpointId);
return Arrays.asList(new PubSubSplit());
return Collections.singletonList(new PubSubSplit());
}

/**
Expand All @@ -73,10 +73,9 @@ public List<PubSubSplit> snapshotState(long checkpointId) {
* PubSubSplitReader}.
*
* @param checkpointId the checkpoint ID.
* @throws Exception
*/
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
public void notifyCheckpointComplete(long checkpointId) {
LOG.info("Acknowledging received GCP Pub/Sub messages for checkpoint {}", checkpointId);
((PubSubSourceFetcherManager<T>) splitFetcherManager).acknowledgeMessages(checkpointId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class PubSubSplitReader<T> implements SplitReader<Tuple2<T, Long>, PubSub
* @param credentials the credentials to use for creating a new subscriber
*/
public PubSubSplitReader(
PubSubDeserializationSchema deserializationSchema,
PubSubDeserializationSchema<T> deserializationSchema,
PubSubSubscriberFactory pubSubSubscriberFactory,
Credentials credentials) {

Expand Down

0 comments on commit b6e38a6

Please sign in to comment.