diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java new file mode 100644 index 00000000000..9fb38d238f6 --- /dev/null +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java @@ -0,0 +1,1019 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pulsar.config.StartupMode; +import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient; +import org.apache.flink.streaming.connectors.pulsar.internal.MessageIdSerializer; +import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback; +import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher; +import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; +import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; +import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSourceStateSerializer; +import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange; +import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils; +import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange; +import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscription; +import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer; +import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter; +import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; + +import lombok.extern.slf4j.Slf4j; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.pulsar.table.DynamicPulsarDeserializationSchema; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_FAILED_METRICS_COUNTER; +import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_SUCCEEDED_METRICS_COUNTER; +import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; + +/** + * Pulsar data source. + * Copied from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9 + * Added with inlong metric support + * @param The type of records produced by this data source. + */ +@Slf4j +public class FlinkPulsarSource + extends + RichParallelSourceFunction + implements + ResultTypeQueryable, + CheckpointListener, + CheckpointedFunction { + + /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */ + public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; + + /** Boolean configuration key to disable metrics tracking. **/ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** State name of the consumer's partition offset states. */ + private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; + + private static final String OFFSETS_STATE_NAME_V3 = "topic-offset-states"; + + // ------------------------------------------------------------------------ + // configuration state, set on the client relevant for all subtasks + // ------------------------------------------------------------------------ + + protected String adminUrl; + + protected ClientConfigurationData clientConfigurationData; + + protected final Map caseInsensitiveParams; + + protected final Map readerConf; + + protected volatile PulsarDeserializationSchema deserializer; + + private Map ownedTopicStarts; + + /** + * Optional watermark strategy that will be run per pulsar partition, to exploit per-partition + * timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize + * it into multiple copies. + */ + private SerializedValue> watermarkStrategy; + + /** User configured value for discovery interval, in milliseconds. */ + private final long discoveryIntervalMillis; + + protected final int pollTimeoutMs; + + protected final int commitMaxRetries; + + /** The startup mode for the reader (default is {@link StartupMode#LATEST}). */ + private StartupMode startupMode = StartupMode.LATEST; + + /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ + private transient Map specificStartupOffsets; + + /** + * The subscription name to be used; only relevant when startup mode is {@link StartupMode#EXTERNAL_SUBSCRIPTION} + * If the subscription exists for a partition, we would start reading this partition from the subscription cursor. + * At the same time, checkpoint for the job would made progress on the subscription. + */ + private String externalSubscriptionName; + + /** + * The subscription position to use when subscription does not exist (default is {@link MessageId#latest}); + * Only relevant when startup mode is {@link StartupMode#EXTERNAL_SUBSCRIPTION}. + */ + private MessageId subscriptionPosition = MessageId.latest; + + // TODO: remove this when MessageId is serializable itself. + // see: https://github.com/apache/pulsar/pull/6064 + private Map specificStartupOffsetsAsBytes; + + protected final Properties properties; + + protected final UUID uuid = UUID.randomUUID(); + + // ------------------------------------------------------------------------ + // runtime state (used individually by each parallel subtask) + // ------------------------------------------------------------------------ + + /** Data for pending but uncommitted offsets. */ + private final LinkedHashMap> pendingOffsetsToCommit = new LinkedHashMap<>(); + + /** Fetcher implements Pulsar reads. */ + private transient volatile PulsarFetcher pulsarFetcher; + + /** The partition discoverer, used to find new partitions. */ + protected transient volatile PulsarMetadataReader metadataReader; + + /** + * The offsets to restore to, if the consumer restores state from a checkpoint. + * + *

This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method. + * + *

Using a sorted map as the ordering is important when using restored state + * to seed the partition discoverer. + */ + private transient volatile TreeMap restoredState; + private transient volatile Set excludeStartMessageIds; + + /** + * Accessor for state in the operator state backend. + */ + private transient ListState> unionOffsetStates; + + private int oldStateVersion = 2; + + private volatile boolean stateSubEqualexternalSub = false; + + /** Discovery loop, executed in a separate thread. */ + private transient volatile Thread discoveryLoopThread; + + /** Flag indicating whether the consumer is still running. */ + private volatile boolean running = true; + + /** + * Flag indicating whether or not metrics should be exposed. + * If {@code true}, offset metrics (e.g. current offset, committed offset) and + * other metrics will be registered. + */ + private final boolean useMetrics; + + /** Counter for successful Pulsar offset commits. */ + private transient Counter successfulCommits; + + /** Counter for failed Pulsar offset commits. */ + private transient Counter failedCommits; + + /** Callback interface that will be invoked upon async pulsar commit completion. */ + private transient PulsarCommitCallback offsetCommitCallback; + + private transient int taskIndex; + + private transient int numParallelTasks; + + protected String inlongMetric; + + protected String auditHostAndPorts; + + public FlinkPulsarSource( + String adminUrl, + ClientConfigurationData clientConf, + PulsarDeserializationSchema deserializer, + Properties properties, + String inlongMetric, + String inlongAudit) { + this.adminUrl = checkNotNull(adminUrl); + this.clientConfigurationData = checkNotNull(clientConf); + this.deserializer = deserializer; + this.properties = properties; + this.caseInsensitiveParams = + SourceSinkUtils.validateStreamSourceOptions(Maps.fromProperties(properties)); + this.readerConf = + SourceSinkUtils.getReaderParams(Maps.fromProperties(properties)); + this.discoveryIntervalMillis = + SourceSinkUtils.getPartitionDiscoveryIntervalInMillis(caseInsensitiveParams); + this.pollTimeoutMs = + SourceSinkUtils.getPollTimeoutMs(caseInsensitiveParams); + this.commitMaxRetries = + SourceSinkUtils.getCommitMaxRetries(caseInsensitiveParams); + this.useMetrics = + SourceSinkUtils.getUseMetrics(caseInsensitiveParams); + + CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(caseInsensitiveParams)); + + if (this.clientConfigurationData.getServiceUrl() == null) { + throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration"); + } + this.oldStateVersion = SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion); + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = inlongAudit; + } + + // ------------------------------------------------------------------------ + // Configuration + // ------------------------------------------------------------------------ + + /** + * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks + * in a punctuated manner. The watermark extractor will run per Pulsar partition, + * watermarks will be merged across partitions in the same way as in the Flink runtime, + * when streams are merged. + * + *

When a subtask of a FlinkPulsarSource source reads multiple Pulsar partitions, + * the streams from the partitions are unioned in a "first come first serve" fashion. + * Per-partition characteristics are usually lost that way. + * For example, if the timestamps are strictly ascending per Pulsar partition, + * they will not be strictly ascending in the resulting Flink DataStream, if the + * parallel source subtask reads more that one partition. + * + *

Running timestamp extractors / watermark generators directly inside the Pulsar source, + * per Pulsar partition, allows users to let them exploit the per-partition characteristics. + * + *

Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an + * {@link AssignerWithPeriodicWatermarks}, not both at the same time. + * + * @param assigner The timestamp assigner / watermark generator to use. + * @return The reader object, to allow function chaining. + */ + @Deprecated + public FlinkPulsarSource assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks assigner) { + checkNotNull(assigner); + + if (this.watermarkStrategy != null) { + throw new IllegalStateException("Some watermark strategy has already been set."); + } + + try { + ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + final WatermarkStrategy wms = new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(assigner); + + return assignTimestampsAndWatermarks(wms); + } catch (Exception e) { + throw new IllegalArgumentException("The given assigner is not serializable", e); + } + } + + /** + * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks + * in a punctuated manner. The watermark extractor will run per Pulsar partition, + * watermarks will be merged across partitions in the same way as in the Flink runtime, + * when streams are merged. + * + *

When a subtask of a FlinkPulsarSource source reads multiple Pulsar partitions, + * the streams from the partitions are unioned in a "first come first serve" fashion. + * Per-partition characteristics are usually lost that way. + * For example, if the timestamps are strictly ascending per Pulsar partition, + * they will not be strictly ascending in the resulting Flink DataStream, + * if the parallel source subtask reads more that one partition. + * + *

Running timestamp extractors / watermark generators directly inside the Pulsar source, + * per Pulsar partition, allows users to let them exploit the per-partition characteristics. + * + *

Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an + * {@link AssignerWithPeriodicWatermarks}, not both at the same time. + * + * @param assigner The timestamp assigner / watermark generator to use. + * @return The reader object, to allow function chaining. + */ + @Deprecated + public FlinkPulsarSource assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks assigner) { + checkNotNull(assigner); + + if (this.watermarkStrategy != null) { + throw new IllegalStateException("Some watermark strategy has already been set."); + } + + try { + ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + final WatermarkStrategy wms = new AssignerWithPeriodicWatermarksAdapter.Strategy<>(assigner); + + return assignTimestampsAndWatermarks(wms); + } catch (Exception e) { + throw new IllegalArgumentException("The given assigner is not serializable", e); + } + } + + /** + * Sets the given {@link WatermarkStrategy} on this consumer. These will be used to assign + * timestamps to records and generates watermarks to signal event time progress. + * + *

Running timestamp extractors / watermark generators directly inside the Pulsar source + * (which you can do by using this method), per Pulsar partition, allows users to let them + * exploit the per-partition characteristics. + * + *

When a subtask of a FlinkPulsarSource reads multiple pulsar partitions, + * the streams from the partitions are unioned in a "first come first serve" fashion. + * Per-partition characteristics are usually lost that way. For example, if the timestamps are + * strictly ascending per Pulsar partition, they will not be strictly ascending in the resulting + * Flink DataStream, if the parallel source subtask reads more than one partition. + * + *

Common watermark generation patterns can be found as static methods in the + * {@link org.apache.flink.api.common.eventtime.WatermarkStrategy} class. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkPulsarSource assignTimestampsAndWatermarks( + WatermarkStrategy watermarkStrategy) { + checkNotNull(watermarkStrategy); + + try { + ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + this.watermarkStrategy = new SerializedValue<>(watermarkStrategy); + } catch (Exception e) { + throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e); + } + + return this; + } + + public FlinkPulsarSource setStartFromEarliest() { + this.startupMode = StartupMode.EARLIEST; + this.specificStartupOffsets = null; + return this; + } + + public FlinkPulsarSource setStartFromLatest() { + this.startupMode = StartupMode.LATEST; + this.specificStartupOffsets = null; + return this; + } + + public FlinkPulsarSource setStartFromSpecificOffsets(Map specificStartupOffsets) { + checkNotNull(specificStartupOffsets); + this.specificStartupOffsets = specificStartupOffsets.entrySet() + .stream() + .collect(Collectors.toMap(e -> new TopicRange(e.getKey()), Map.Entry::getValue)); + this.startupMode = StartupMode.SPECIFIC_OFFSETS; + this.specificStartupOffsetsAsBytes = new HashMap<>(); + for (Map.Entry entry : this.specificStartupOffsets.entrySet()) { + specificStartupOffsetsAsBytes.put(entry.getKey(), entry.getValue().toByteArray()); + } + return this; + } + + public FlinkPulsarSource setStartFromSubscription(String externalSubscriptionName) { + this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION; + this.externalSubscriptionName = checkNotNull(externalSubscriptionName); + return this; + } + + public FlinkPulsarSource setStartFromSubscription(String externalSubscriptionName, + MessageId subscriptionPosition) { + this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION; + this.externalSubscriptionName = checkNotNull(externalSubscriptionName); + this.subscriptionPosition = checkNotNull(subscriptionPosition); + return this; + } + + // ------------------------------------------------------------------------ + // Work methods + // ------------------------------------------------------------------------ + + private MetricState metricState; + private SourceMetricData sourceMetricData; + + @Override + public void open(Configuration parameters) throws Exception { + + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withRegisterMetric(RegisteredMetric.ALL) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L) + .build(); + + if (metricOption != null) { + log.info("init source"); + sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } + + if (this.deserializer != null) { + DynamicPulsarDeserializationSchema dynamicKafkaDeserializationSchema = + (DynamicPulsarDeserializationSchema) deserializer; + dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData); + + this.deserializer.open( + RuntimeContextInitializationContextAdapters.deserializationAdapter( + getRuntimeContext(), + metricGroup -> metricGroup.addGroup("user"))); + } + this.taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + this.numParallelTasks = getRuntimeContext().getNumberOfParallelSubtasks(); + + this.metadataReader = createMetadataReader(); + + ownedTopicStarts = new HashMap<>(); + excludeStartMessageIds = new HashSet<>(); + Set allTopics = metadataReader.discoverTopicChanges(); + + if (specificStartupOffsets == null && specificStartupOffsetsAsBytes != null) { + specificStartupOffsets = new HashMap<>(); + for (Map.Entry entry : specificStartupOffsetsAsBytes.entrySet()) { + specificStartupOffsets.put(entry.getKey(), MessageId.fromByteArray(entry.getValue())); + } + } + Map allTopicOffsets = + offsetForEachTopic(allTopics, startupMode, specificStartupOffsets); + + boolean usingRestoredState = (startupMode != StartupMode.EXTERNAL_SUBSCRIPTION) || stateSubEqualexternalSub; + + if (restoredState != null && usingRestoredState) { + allTopicOffsets.entrySet().stream() + .filter(e -> !restoredState.containsKey(e.getKey())) + .forEach(e -> restoredState.put(e.getKey(), e.getValue())); + + SerializableRange subTaskRange = metadataReader.getRange(); + restoredState.entrySet().stream() + .filter( + e -> SourceSinkUtils.belongsTo( + e.getKey().getTopic(), + subTaskRange, + numParallelTasks, + taskIndex)) + .forEach( + e -> { + TopicRange tr = + new TopicRange( + e.getKey().getTopic(), + subTaskRange.getPulsarRange()); + ownedTopicStarts.put(tr, e.getValue()); + excludeStartMessageIds.add(e.getKey()); + }); + + Set goneTopics = + Sets.difference(restoredState.keySet(), allTopics).stream() + .filter( + k -> SourceSinkUtils.belongsTo( + k.getTopic(), + subTaskRange, + numParallelTasks, + taskIndex)) + .map(k -> new TopicRange(k.getTopic(), subTaskRange.getPulsarRange())) + .collect(Collectors.toSet()); + + for (TopicRange goneTopic : goneTopics) { + log.warn(goneTopic + " is removed from subscription since " + + "it no longer matches with topics settings."); + ownedTopicStarts.remove(goneTopic); + } + + log.info("Source {} will start reading {} topics in restored state {}", + taskIndex, ownedTopicStarts.size(), StringUtils.join(ownedTopicStarts.entrySet())); + } else { + ownedTopicStarts.putAll( + allTopicOffsets.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + if (ownedTopicStarts.isEmpty()) { + log.info("Source {} initially has no topics to read from.", taskIndex); + } else { + log.info("Source {} will start reading {} topics from initialized positions: {}", + taskIndex, ownedTopicStarts.size(), ownedTopicStarts); + } + } + } + + protected String getSubscriptionName() { + if (startupMode == StartupMode.EXTERNAL_SUBSCRIPTION) { + checkNotNull(externalSubscriptionName); + return externalSubscriptionName; + } else { + return "flink-pulsar-" + uuid.toString(); + } + } + + protected PulsarMetadataReader createMetadataReader() throws PulsarClientException { + return new PulsarMetadataReader( + adminUrl, + clientConfigurationData, + getSubscriptionName(), + caseInsensitiveParams, + taskIndex, + numParallelTasks, + startupMode == StartupMode.EXTERNAL_SUBSCRIPTION); + } + + @Override + public void run(SourceContext ctx) throws Exception { + if (ownedTopicStarts == null) { + throw new Exception("The partitions were not set for the source"); + } + + this.successfulCommits = + this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER); + this.failedCommits = + this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER); + + this.offsetCommitCallback = new PulsarCommitCallback() { + + @Override + public void onSuccess() { + successfulCommits.inc(); + } + + @Override + public void onException(Throwable cause) { + log.warn("source {} failed commit by {}", taskIndex, cause.toString()); + failedCommits.inc(); + } + }; + + if (ownedTopicStarts.isEmpty()) { + ctx.markAsTemporarilyIdle(); + } + + log.info("Source {} creating fetcher with offsets {}", + taskIndex, + StringUtils.join(ownedTopicStarts.entrySet())); + + // from this point forward: + // - 'snapshotState' will draw offsets from the fetcher, + // instead of being built from `subscribedPartitionsToStartOffsets` + // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to + // Pulsar through the fetcher, if configured to do so) + + StreamingRuntimeContext streamingRuntime = (StreamingRuntimeContext) getRuntimeContext(); + + this.pulsarFetcher = createFetcher( + ctx, + ownedTopicStarts, + watermarkStrategy, + streamingRuntime.getProcessingTimeService(), + streamingRuntime.getExecutionConfig().getAutoWatermarkInterval(), + getRuntimeContext().getUserCodeClassLoader(), + streamingRuntime, + useMetrics, + excludeStartMessageIds); + + if (!running) { + return; + } + + if (discoveryIntervalMillis < 0) { + pulsarFetcher.runFetchLoop(); + } else { + runWithTopicsDiscovery(); + } + } + + protected PulsarFetcher createFetcher( + SourceContext sourceContext, + Map seedTopicsWithInitialOffsets, + SerializedValue> watermarkStrategy, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval, + ClassLoader userCodeClassLoader, + StreamingRuntimeContext streamingRuntime, + boolean useMetrics, + Set excludeStartMessageIds) throws Exception { + + // readerConf.putIfAbsent(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, getSubscriptionName()); + + return new PulsarFetcher<>( + sourceContext, + seedTopicsWithInitialOffsets, + excludeStartMessageIds, + watermarkStrategy, + processingTimeProvider, + autoWatermarkInterval, + userCodeClassLoader, + streamingRuntime, + clientConfigurationData, + readerConf, + pollTimeoutMs, + commitMaxRetries, + deserializer, + metadataReader, + streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP), + useMetrics); + } + + public void joinDiscoveryLoopThread() throws InterruptedException { + if (discoveryLoopThread != null) { + discoveryLoopThread.join(); + } + } + + public void runWithTopicsDiscovery() throws Exception { + AtomicReference discoveryLoopErrorRef = new AtomicReference<>(); + createAndStartDiscoveryLoop(discoveryLoopErrorRef); + + pulsarFetcher.runFetchLoop(); + + joinDiscoveryLoopThread(); + + Exception discoveryLoopError = discoveryLoopErrorRef.get(); + if (discoveryLoopError != null) { + throw new RuntimeException(discoveryLoopError); + } + } + + private void createAndStartDiscoveryLoop(AtomicReference discoveryLoopErrorRef) { + discoveryLoopThread = new Thread( + () -> { + try { + while (running) { + Set added = metadataReader.discoverTopicChanges(); + + if (running && !added.isEmpty()) { + pulsarFetcher.addDiscoveredTopics(added); + } + + if (running && discoveryIntervalMillis != -1) { + Thread.sleep(discoveryIntervalMillis); + } + } + } catch (PulsarMetadataReader.ClosedException e) { + // break out while and do nothing + } catch (InterruptedException e) { + // break out while and do nothing + } catch (Exception e) { + discoveryLoopErrorRef.set(e); + } finally { + if (running) { + // calling cancel will also let the fetcher loop escape + // (if not running, cancel() was already called) + cancel(); + } + } + }, "Pulsar topic discovery for source " + taskIndex); + discoveryLoopThread.start(); + } + + @Override + public void close() throws Exception { + cancel(); + + joinDiscoveryLoopThread(); + + Exception exception = null; + + if (metadataReader != null) { + try { + metadataReader.close(); + } catch (Exception e) { + exception = e; + } + } + + try { + super.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw exception; + } + } + + @Override + public void cancel() { + running = false; + + if (discoveryLoopThread != null) { + discoveryLoopThread.interrupt(); + } + + if (pulsarFetcher != null) { + try { + pulsarFetcher.cancel(); + } catch (Exception e) { + log.error("Failed to cancel the Pulsar Fetcher {}", ExceptionUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + } + + // ------------------------------------------------------------------------ + // ResultTypeQueryable methods + // ------------------------------------------------------------------------ + + @Override + public TypeInformation getProducedType() { + return deserializer.getProducedType(); + } + + // ------------------------------------------------------------------------ + // Checkpoint and restore + // ------------------------------------------------------------------------ + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + OperatorStateStore stateStore = context.getOperatorStateStore(); + + unionOffsetStates = stateStore.getUnionListState( + new ListStateDescriptor<>( + OFFSETS_STATE_NAME_V3, + createStateSerializer())); + + if (context.isRestored()) { + restoredState = new TreeMap<>(); + Iterator> iterator = unionOffsetStates.get().iterator(); + + if (!iterator.hasNext()) { + iterator = tryMigrateState(stateStore); + } + while (iterator.hasNext()) { + final Tuple2 tuple2 = iterator.next(); + final SerializableRange range = + tuple2.f0.getRange() != null ? tuple2.f0.getRange() : SerializableRange.ofFullRange(); + final TopicRange topicRange = + new TopicRange(tuple2.f0.getTopic(), range.getPulsarRange()); + restoredState.put(topicRange, tuple2.f1); + String subscriptionName = tuple2.f0.getSubscriptionName(); + if (!stateSubEqualexternalSub && StringUtils.equals(subscriptionName, externalSubscriptionName)) { + stateSubEqualexternalSub = true; + log.info("Source restored state with subscriptionName {}", subscriptionName); + } + } + log.info("Source subtask {} restored state {}", + taskIndex, + StringUtils.join(restoredState.entrySet())); + } else { + log.info("Source subtask {} has no restore state", taskIndex); + } + } + + @VisibleForTesting + static TupleSerializer> createStateSerializer() { + // explicit serializer will keep the compatibility with GenericTypeInformation and allow to + // disableGenericTypes for users + TypeSerializer[] fieldSerializers = + new TypeSerializer[]{ + TopicSubscriptionSerializer.INSTANCE, + MessageIdSerializer.INSTANCE + }; + @SuppressWarnings("unchecked") + Class> tupleClass = + (Class>) (Class) Tuple2.class; + return new TupleSerializer<>(tupleClass, fieldSerializers); + } + + /** + * Try to restore the old save point. + * + * @param stateStore state store + * @return state data + * @throws Exception Type incompatibility, serialization failure + */ + private Iterator> tryMigrateState(OperatorStateStore stateStore) + throws Exception { + log.info("restore old state version {}", oldStateVersion); + PulsarSourceStateSerializer stateSerializer = + new PulsarSourceStateSerializer(getRuntimeContext().getExecutionConfig()); + // Since stateStore.getUnionListState gets the data of a state point, + // it can only be registered once and will fail to register again, + // so it only allows the user to set a version number. + ListState rawStates = stateStore.getUnionListState(new ListStateDescriptor<>( + OFFSETS_STATE_NAME, + stateSerializer.getSerializer(oldStateVersion))); + + ListState oldUnionSubscriptionNameStates = + stateStore.getUnionListState( + new ListStateDescriptor<>( + OFFSETS_STATE_NAME + "_subName", + TypeInformation.of(new TypeHint() { + }))); + final Iterator subNameIterator = oldUnionSubscriptionNameStates.get().iterator(); + + Iterator tuple2s = rawStates.get().iterator(); + log.info("restore old state has data {}", tuple2s.hasNext()); + final List> records = new ArrayList<>(); + while (tuple2s.hasNext()) { + final Object next = tuple2s.next(); + Tuple2 tuple2 = stateSerializer.deserialize(oldStateVersion, next); + + String subName = tuple2.f0.getSubscriptionName(); + if (subNameIterator.hasNext()) { + subName = subNameIterator.next(); + } + final TopicSubscription topicSubscription = TopicSubscription.builder() + .topic(tuple2.f0.getTopic()) + .range(tuple2.f0.getRange()) + .subscriptionName(subName) + .build(); + final Tuple2 record = Tuple2.of(topicSubscription, tuple2.f1); + log.info("migrationState {}", record); + records.add(record); + } + rawStates.clear(); + oldUnionSubscriptionNameStates.clear(); + return records.listIterator(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (!running) { + log.debug("snapshotState() called on closed source"); + } else { + unionOffsetStates.clear(); + + PulsarFetcher fetcher = this.pulsarFetcher; + + if (fetcher == null) { + // the fetcher has not yet been initialized, which means we need to return the + // originally restored offsets or the assigned partitions + for (Map.Entry entry : ownedTopicStarts.entrySet()) { + final TopicSubscription topicSubscription = TopicSubscription.builder() + .topic(entry.getKey().getTopic()) + .range(entry.getKey().getRange()) + .subscriptionName(getSubscriptionName()) + .build(); + unionOffsetStates.add(Tuple2.of(topicSubscription, entry.getValue())); + } + pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState); + } else { + Map currentOffsets = fetcher.snapshotCurrentState(); + pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); + for (Map.Entry entry : currentOffsets.entrySet()) { + final TopicSubscription topicSubscription = TopicSubscription.builder() + .topic(entry.getKey().getTopic()) + .range(entry.getKey().getRange()) + .subscriptionName(getSubscriptionName()) + .build(); + unionOffsetStates.add(Tuple2.of(topicSubscription, entry.getValue())); + } + + int exceed = pendingOffsetsToCommit.size() - MAX_NUM_PENDING_CHECKPOINTS; + Iterator iterator = pendingOffsetsToCommit.keySet().iterator(); + + while (iterator.hasNext() && exceed > 0) { + iterator.next(); + iterator.remove(); + } + } + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (!running) { + log.info("notifyCheckpointComplete() called on closed source"); + return; + } + + PulsarFetcher fetcher = this.pulsarFetcher; + + if (fetcher == null) { + log.info("notifyCheckpointComplete() called on uninitialized source"); + return; + } + + log.debug("Source {} received confirmation for unknown checkpoint id {}", + taskIndex, checkpointId); + + try { + if (!pendingOffsetsToCommit.containsKey(checkpointId)) { + log.warn("Source {} received confirmation for unknown checkpoint id {}", + taskIndex, checkpointId); + return; + } + + Map offset = pendingOffsetsToCommit.get(checkpointId); + + // remove older checkpoints in map + Iterator iterator = pendingOffsetsToCommit.keySet().iterator(); + while (iterator.hasNext()) { + Long key = iterator.next(); + iterator.remove(); + if (Objects.equals(key, checkpointId)) { + break; + } + } + + if (offset == null || offset.size() == 0) { + log.debug("Source {} has empty checkpoint state", taskIndex); + return; + } + fetcher.commitOffsetToPulsar(offset, offsetCommitCallback); + } catch (Exception e) { + if (running) { + throw e; + } + } + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + log.error("checkpoint aborted, checkpointId: {}", checkpointId); + } + + public Map offsetForEachTopic( + Set topics, + StartupMode mode, + Map specificStartupOffsets) { + + switch (mode) { + case LATEST: + return topics.stream() + .collect(Collectors.toMap(k -> k, k -> MessageId.latest)); + case EARLIEST: + return topics.stream() + .collect(Collectors.toMap(k -> k, k -> MessageId.earliest)); + case SPECIFIC_OFFSETS: + checkArgument(topics.containsAll(specificStartupOffsets.keySet()), + String.format( + "Topics designated in startingOffsets should appear in %s, topics:" + + "%s, topics in offsets: %s", + StringUtils.join(PulsarOptions.TOPIC_OPTION_KEYS), + StringUtils.join(topics.toArray()), + StringUtils.join(specificStartupOffsets.entrySet().toArray()))); + + Map specificOffsets = new HashMap<>(); + for (TopicRange topic : topics) { + if (specificStartupOffsets.containsKey(topic)) { + specificOffsets.put(topic, specificStartupOffsets.get(topic)); + } else { + specificOffsets.put(topic, MessageId.latest); + } + } + return specificOffsets; + case EXTERNAL_SUBSCRIPTION: + Map offsetsFromSubs = new HashMap<>(); + for (TopicRange topic : topics) { + offsetsFromSubs.put(topic, metadataReader.getPositionFromSubscription(topic, + subscriptionPosition)); + } + return offsetsFromSubs; + } + return null; + } + + public Map> getPendingOffsetsToCommit() { + return pendingOffsetsToCommit; + } + + public Map getOwnedTopicStarts() { + return ownedTopicStarts; + } +} diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java index 85abc6e2e3c..60212768b66 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java @@ -62,8 +62,6 @@ public SimpleCollector initialValue() { private final TypeInformation producedTypeInfo; private final boolean upsertMode; private SourceMetricData sourceMetricData; - private String inlongMetric; - private String auditHostAndPorts; DynamicPulsarDeserializationSchema( int physicalArity, @@ -74,8 +72,7 @@ public SimpleCollector initialValue() { boolean hasMetadata, MetadataConverter[] metadataConverters, TypeInformation producedTypeInfo, - boolean upsertMode, - String inlongMetric, String auditHostAndPorts) { + boolean upsertMode) { if (upsertMode) { Preconditions.checkArgument( keyDeserialization != null && keyProjection.length > 0, @@ -92,8 +89,6 @@ public SimpleCollector initialValue() { upsertMode); this.producedTypeInfo = producedTypeInfo; this.upsertMode = upsertMode; - this.inlongMetric = inlongMetric; - this.auditHostAndPorts = auditHostAndPorts; } @Override diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java index 74717d416cf..a0eeb961dea 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java @@ -136,11 +136,6 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin */ protected final PulsarTableOptions.StartupOptions startupOptions; - /** - * The default value when startup timestamp is not used. - */ - private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L; - /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. **/ protected final boolean upsertMode; @@ -269,20 +264,20 @@ private PulsarDeserializationSchema createPulsarDeserialization( hasMetadata, metadataConverters, producedTypeInfo, - upsertMode, - inlongMetric, - auditHostAndPorts); + upsertMode); } private SourceFunction createPulsarSource( ClientConfigurationData clientConfigurationData, PulsarDeserializationSchema deserializationSchema) { - org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource source = - new org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource( + org.apache.inlong.sort.pulsar.FlinkPulsarSource source = + new org.apache.inlong.sort.pulsar.FlinkPulsarSource( adminUrl, clientConfigurationData, deserializationSchema, - properties); + properties, + inlongMetric, + auditHostAndPorts); if (watermarkStrategy != null) { source.assignTimestampsAndWatermarks(watermarkStrategy); diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 4c40b8cd880..eb955300fec 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -479,6 +479,7 @@ inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java + inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java Source : pulsar-flink-connector_2.11 1.13.6.1-rc9 (Please note that the software have been modified.) License : https://github.com/streamnative/pulsar-flink/blob/master/LICENSE