Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f9a1a91
+ using older aws sdk version
maghamravi Sep 18, 2021
f373a63
[LYFT] [STRMCMP-1388] pyflink changes
maghamravi Jun 20, 2022
571c3e1
[backport][FLINK-26846][python] Fix the gauge metric
maghamravi Jun 20, 2022
ec31f69
[backport][FLINK-10052][ha] Tolerate temporarily suspended ZooKeeper …
maghamravi Jun 20, 2022
511f5b7
[backport][FLINK-25437][python] Correct grpcio dependency version in …
maghamravi Jun 20, 2022
f818b11
[backport][FLINK-24049][python] Handle properly for field types need …
maghamravi Jun 20, 2022
10c159c
[LYFT][STRMCMP-1388] pyflink changes
maghamravi Jun 20, 2022
2a55298
add options to enable lib
kosigz Jul 18, 2022
f2cec36
Merge pull request #46 from lyft/options_to_afl
kosigz Jul 18, 2022
c4f2a06
test
kosigz Jul 20, 2022
0ed52fa
wip
kosigz Jul 20, 2022
fba04fb
Merge pull request #47 from lyft/fix_tag_loc
kosigz Jul 20, 2022
9e63206
[LYFT][DSP] Copy Hive JAR into apache-flink-libraries at setup time
kosigz Jul 22, 2022
8519bca
Merge pull request #45 from lyft/add_more_jars
kosigz Jul 22, 2022
365ffa8
update zk version to 3.5.6
adilhafeez Jul 26, 2022
cee1518
Merge pull request #48 from lyft/adil/update_zk_version
adilhafeez Jul 26, 2022
c199bf9
Update setup.py
kosigz Aug 5, 2022
4d39752
[RTOP-645] [backport] Job and Execution status metrics
maghamravi Dec 17, 2021
a134200
update to write to lib
kosigz Aug 9, 2022
dc1cfe8
Merge pull request #51 from lyft/update_lib_path
kosigz Aug 9, 2022
db3f91b
lib path
kosigz Aug 9, 2022
ff0f4d5
Merge pull request #52 from lyft/boop
kosigz Aug 9, 2022
e0654a8
unlink the symlink
kosigz Aug 9, 2022
c2b36be
Merge pull request #53 from lyft/unlink_before_release
kosigz Aug 9, 2022
e762062
wip
kosigz Aug 9, 2022
a46a013
Merge pull request #54 from lyft/more_simlink
kosigz Aug 9, 2022
7678c39
remove the explicit JAR target since this JAR should come from the li…
kosigz Aug 9, 2022
b63add5
Merge pull request #55 from lyft/no_exp_simlink_needed
kosigz Aug 9, 2022
e235648
Revert "update zk version to 3.5.6"
maghamravi Aug 9, 2022
d37a92f
Merge pull request #56 from lyft/revert_zk_3.5_changes
maghamravi Aug 9, 2022
97342ed
Merge pull request #49 from lyft/RTOP-645-uptime
maghamravi Aug 9, 2022
df0bc14
[FLINK-24347][connectors/kafka] Keep idle source readers if paralleli…
maghamravi Sep 21, 2021
269a345
Merge pull request #58 from lyft/STRMHELP-206-FLINK-24347
maghamravi Aug 9, 2022
98a866d
apache flink libraries version
kosigz Aug 10, 2022
0bb0318
Merge pull request #59 from lyft/update_afl
kosigz Aug 10, 2022
8733b7b
[STRMHELP-197] Update Global Watermark When Idle (#61)
sethsaperstein-lyft Sep 1, 2022
aec7944
[STRMCMP-1477] Zookeeper 3.5 Upgrade (#65)
sethsaperstein-lyft Sep 22, 2022
dc7d5c2
Update setup.py
kosigz Sep 30, 2022
2196926
unpin pickle
kosigz Oct 11, 2022
b9c36dd
Merge pull request #66 from lyft/unpin_pickle
kosigz Oct 11, 2022
869a2e9
define gauge once
sethsaperstein-lyft Oct 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/metric_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
<td>Integer</td>
<td>The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.</td>
</tr>
<tr>
<td><h5>metrics.job.status.enable</h5></td>
<td style="word-wrap: break-word;">CURRENT_TIME</td>
<td><p>List&lt;Enum&gt;</p></td>
<td>The selection of job status metrics that should be reported.<br /><br />Possible values:<ul><li>"STATE": For a given state, return 1 if the job is currently in that state, otherwise return 0.</li><li>"CURRENT_TIME": For a given state, if the job is currently in that state, return the time since the job transitioned into that state, otherwise return 0.</li><li>"TOTAL_TIME": For a given state, return how much time the job has spent in that state in total.</li></ul></td>
</tr>
<tr>
<td><h5>metrics.latency.granularity</h5></td>
<td style="word-wrap: break-word;">"operator"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerat
startingOffsetsInitializer,
stoppingOffsetsInitializer,
props,
enumContext);
enumContext,
boundedness);
}

@Override
Expand All @@ -177,6 +178,7 @@ public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumera
stoppingOffsetsInitializer,
props,
enumContext,
boundedness,
checkpoint.assignedPartitions());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class KafkaSourceEnumerator
private final Properties properties;
private final long partitionDiscoveryIntervalMs;
private final SplitEnumeratorContext<KafkaPartitionSplit> context;
private final Boundedness boundedness;

// The internal states of the enumerator.
/**
Expand Down Expand Up @@ -97,13 +99,15 @@ public KafkaSourceEnumerator(
OffsetsInitializer startingOffsetInitializer,
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
SplitEnumeratorContext<KafkaPartitionSplit> context) {
SplitEnumeratorContext<KafkaPartitionSplit> context,
Boundedness boundedness) {
this(
subscriber,
startingOffsetInitializer,
stoppingOffsetInitializer,
properties,
context,
boundedness,
Collections.emptySet());
}

Expand All @@ -113,12 +117,14 @@ public KafkaSourceEnumerator(
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
SplitEnumeratorContext<KafkaPartitionSplit> context,
Boundedness boundedness,
Set<TopicPartition> assignedPartitions) {
this.subscriber = subscriber;
this.startingOffsetInitializer = startingOffsetInitializer;
this.stoppingOffsetInitializer = stoppingOffsetInitializer;
this.properties = properties;
this.context = context;
this.boundedness = boundedness;

this.discoveredPartitions = new HashSet<>();
this.assignedPartitions = new HashSet<>(assignedPartitions);
Expand Down Expand Up @@ -296,7 +302,7 @@ private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {

// If periodically partition discovery is disabled and the initializing discovery has done,
// signal NoMoreSplitsEvent to pending readers
if (noMoreNewPartitionSplits) {
if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) {
LOG.debug(
"No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}"
+ " in consumer group {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.kafka.source.enumerator;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
Expand Down Expand Up @@ -432,6 +433,7 @@ private KafkaSourceEnumerator createEnumerator(
stoppingOffsetsInitializer,
props,
enumContext,
Boundedness.CONTINUOUS_UNBOUNDED,
assignedPartitions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,10 @@ public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {

getStream(env, topic, schema, props)
.map(new PartitionValidatingMapper(numPartitions, 1))
// Job only fails after a checkpoint is taken and the necessary number of elements
// is seen
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements))
.addSink(new ValidatingExactlyOnceSink(totalElements, true))
.setParallelism(1);

FailingIdentityMapper.failedBefore = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka.testutils;

import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
Expand All @@ -32,20 +33,26 @@

/** A {@link RichSinkFunction} that verifies that no duplicate records are generated. */
public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer>
implements ListCheckpointed<Tuple2<Integer, BitSet>> {
implements ListCheckpointed<Tuple2<Integer, BitSet>>, CheckpointListener {

private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);

private static final long serialVersionUID = 1748426382527469932L;

private final int numElementsTotal;
private final boolean waitForFinalCheckpoint;

private BitSet duplicateChecker = new BitSet(); // this is checkpointed

private int numElements; // this is checkpointed

public ValidatingExactlyOnceSink(int numElementsTotal) {
this(numElementsTotal, false);
}

public ValidatingExactlyOnceSink(int numElementsTotal, boolean waitForFinalCheckpoint) {
this.numElementsTotal = numElementsTotal;
this.waitForFinalCheckpoint = waitForFinalCheckpoint;
}

@Override
Expand All @@ -56,15 +63,8 @@ public void invoke(Integer value) throws Exception {
throw new Exception("Received a duplicate: " + value);
}
duplicateChecker.set(value);
if (numElements == numElementsTotal) {
// validate
if (duplicateChecker.cardinality() != numElementsTotal) {
throw new Exception("Duplicate checker has wrong cardinality");
} else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
throw new Exception("Received sparse sequence");
} else {
throw new SuccessException();
}
if (!waitForFinalCheckpoint) {
checkFinish();
}
}

Expand All @@ -87,4 +87,22 @@ public void restoreState(List<Tuple2<Integer, BitSet>> state) throws Exception {
this.numElements = s.f0;
this.duplicateChecker = s.f1;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
checkFinish();
}

private void checkFinish() throws Exception {
if (numElements == numElementsTotal) {
// validate
if (duplicateChecker.cardinality() != numElementsTotal) {
throw new Exception("Duplicate checker has wrong cardinality");
} else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
throw new Exception("Received sparse sequence");
} else {
throw new SuccessException();
}
}
}
}
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ under the License.
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
<name>Flink : Connectors : Kinesis</name>
<properties>
<aws.sdk.version>1.12.7</aws.sdk.version>
<aws.sdk.version>1.11.603</aws.sdk.version>
<aws.sdkv2.version>2.16.86</aws.sdkv2.version>
<aws.kinesis-kcl.version>1.11.2</aws.kinesis-kcl.version>
<aws.kinesis-kpl.version>0.14.0</aws.kinesis-kpl.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ public enum EFORegistrationType {
/** The maximum delta allowed for the reader to advance ahead of the shared global watermark. */
public static final String WATERMARK_LOOKAHEAD_MILLIS = "flink.watermark.lookahead.millis";

/** Feature flag to update global watermark when idle. */
public static final String WATERMARK_SYNC_GLOBAL = "flink.watermark.sync.global";

/**
* The maximum number of records that will be buffered before suspending consumption of a shard.
*/
Expand Down Expand Up @@ -403,6 +406,8 @@ public enum EFORegistrationType {

public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000;

public static final boolean DEFAULT_WATERMARK_SYNC_GLOBAL = false;

public static final int DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY = 10_000;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down Expand Up @@ -142,6 +143,9 @@ public class KinesisDataFetcher<T> {
/** The metric group that all metrics should be registered to. */
private final MetricGroup consumerMetricGroup;

/** The metric group for the individual subtask. */
private final MetricGroup shardMetricsGroup;

// ------------------------------------------------------------------------
// Subtask-specific settings
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -415,7 +419,10 @@ protected KinesisDataFetcher(
runtimeContext
.getMetricGroup()
.addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);

this.shardMetricsGroup =
consumerMetricGroup.addGroup(
"subtaskId", String.valueOf(indexOfThisConsumerSubtask));
this.shardMetricsGroup.gauge("isIdle", () -> isIdle ? 1 : 0);
this.error = checkNotNull(error);
this.subscribedShardsState = checkNotNull(subscribedShardsState);
this.subscribedStreamsToLastDiscoveredShardIds =
Expand Down Expand Up @@ -626,7 +633,19 @@ public void runFetcher() throws Exception {
watermarkTracker.setUpdateTimeoutMillis(
watermarkSyncMillis * 3); // synchronization latency
watermarkTracker.open(runtimeContext);
new WatermarkSyncCallback(timerService, watermarkSyncMillis).start();
boolean updateGlobalWatermarkForIdleSubtask =
Boolean.parseBoolean(
getConsumerConfiguration()
.getProperty(
ConsumerConfigConstants.WATERMARK_SYNC_GLOBAL,
Boolean.toString(
ConsumerConfigConstants
.DEFAULT_WATERMARK_SYNC_GLOBAL)));
new WatermarkSyncCallback(
timerService,
watermarkSyncMillis,
updateGlobalWatermarkForIdleSubtask)
.start();
// emit records ahead of watermark to offset synchronization latency
long lookaheadMillis =
Long.parseLong(
Expand Down Expand Up @@ -1172,6 +1191,14 @@ protected void emitWatermark() {
}
}

LOG.debug(
"WatermarkEmitter subtask: {}, last watermark: {}, potential watermark: {}"
+ ", potential next watermark: {}",
indexOfThisConsumerSubtask,
lastWatermark,
potentialWatermark,
potentialNextWatermark);

// advance watermark if possible (watermarks can only be ascending)
if (potentialWatermark == Long.MAX_VALUE) {
if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
Expand Down Expand Up @@ -1239,17 +1266,20 @@ private class WatermarkSyncCallback implements ProcessingTimeCallback {

private final ProcessingTimeService timerService;
private final long interval;
private final boolean updateGlobalWatermarkForIdleSubtask;
private long lastGlobalWatermark = Long.MIN_VALUE;
private long propagatedLocalWatermark = Long.MIN_VALUE;
private int stalledWatermarkIntervalCount = 0;
private long lastLogged;

WatermarkSyncCallback(ProcessingTimeService timerService, long interval) {
WatermarkSyncCallback(
ProcessingTimeService timerService,
long interval,
boolean updateGlobalWatermarkForIdleSubtask) {
this.timerService = checkNotNull(timerService);
this.interval = interval;
MetricGroup shardMetricsGroup =
consumerMetricGroup.addGroup(
"subtaskId", String.valueOf(indexOfThisConsumerSubtask));
this.updateGlobalWatermarkForIdleSubtask = updateGlobalWatermarkForIdleSubtask;

shardMetricsGroup.gauge("localWatermark", () -> nextWatermark);
shardMetricsGroup.gauge("globalWatermark", () -> lastGlobalWatermark);
}
Expand All @@ -1263,11 +1293,13 @@ public void start() {
public void onProcessingTime(long timestamp) {
if (nextWatermark != Long.MIN_VALUE) {
long globalWatermark = lastGlobalWatermark;
// TODO: refresh watermark while idle
if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
globalWatermark = watermarkTracker.updateWatermark(nextWatermark);
propagatedLocalWatermark = nextWatermark;
} else {
if (updateGlobalWatermarkForIdleSubtask) {
globalWatermark = watermarkTracker.getWatermark();
}
LOG.info(
"WatermarkSyncCallback subtask: {} is idle",
indexOfThisConsumerSubtask);
Expand All @@ -1277,12 +1309,14 @@ public void onProcessingTime(long timestamp) {
lastLogged = System.currentTimeMillis();
LOG.info(
"WatermarkSyncCallback subtask: {} local watermark: {}"
+ ", global watermark: {}, delta: {} timeouts: {}, emitter: {}",
+ ", global watermark: {}, delta: {} timeouts: {}, idle: {}"
+ ", emitter: {}",
indexOfThisConsumerSubtask,
nextWatermark,
globalWatermark,
nextWatermark - globalWatermark,
watermarkTracker.getUpdateTimeoutCount(),
isIdle,
recordEmitter.printInfo());

// Following is for debugging non-reproducible issue with stalled watermark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ public long updateWatermark(long localWatermark) {
WatermarkUpdate update = new WatermarkUpdate();
update.id = getSubtaskId();
update.watermark = localWatermark;
return updateWatermark(update);
}

@Override
public long getWatermark() {
WatermarkUpdate update = new WatermarkUpdate();
update.id = getSubtaskId();
update.updateLocalWatermark = false;
return updateWatermark(update);
}

public long updateWatermark(WatermarkUpdate update) {
try {
byte[] resultBytes =
aggregateManager.updateGlobalAggregate(
Expand Down Expand Up @@ -92,6 +104,7 @@ public long getUpdateTimeoutCount() {
protected static class WatermarkUpdate implements Serializable {
protected long watermark = Long.MIN_VALUE;
protected String id;
protected boolean updateLocalWatermark = true;
}

/** Watermark aggregation result. */
Expand Down Expand Up @@ -129,6 +142,11 @@ public Map<String, WatermarkState> add(
} catch (Exception e) {
throw new RuntimeException(e);
}
// no op to get global watermark without updating it
if (!value.updateLocalWatermark) {
addCount--;
return accumulator;
}
WatermarkState ws = accumulator.get(value.id);
if (ws == null) {
accumulator.put(value.id, ws = new WatermarkState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public void setUpdateTimeoutMillis(long updateTimeoutMillis) {
*/
public abstract long updateWatermark(final long localWatermark);

public abstract long getWatermark();

protected long getCurrentTime() {
return System.currentTimeMillis();
}
Expand Down
Loading