Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-23308] Optimize checks if SourceContext is closed #16429

Merged
merged 1 commit into from Jul 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -173,7 +173,6 @@ private AutomaticWatermarkContext(

@Override
protected void processAndCollect(T element) {
checkNotClosed();
lastRecordTime = this.timeService.getCurrentProcessingTime();
output.collect(reuse.replace(element, lastRecordTime));

Expand All @@ -193,7 +192,6 @@ protected void processAndCollect(T element) {

@Override
protected void processAndCollectWithTimestamp(T element, long timestamp) {
checkNotClosed();
processAndCollect(element);
}

Expand All @@ -207,7 +205,6 @@ protected boolean allowWatermark(Watermark mark) {
/** This will only be called if allowWatermark returned {@code true}. */
@Override
protected void processAndEmitWatermark(Watermark mark) {
checkNotClosed();
nextWatermarkTime = Long.MAX_VALUE;
output.emitWatermark(mark);

Expand All @@ -222,7 +219,6 @@ protected void processAndEmitWatermark(Watermark mark) {

@Override
protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
checkNotClosed();
if (idle != streamStatus.isIdle()) {
output.emitStreamStatus(streamStatus);
}
Expand Down Expand Up @@ -320,25 +316,21 @@ private ManualWatermarkContext(

@Override
protected void processAndCollect(T element) {
checkNotClosed();
output.collect(reuse.replace(element));
}

@Override
protected void processAndCollectWithTimestamp(T element, long timestamp) {
checkNotClosed();
output.collect(reuse.replace(element, timestamp));
}

@Override
protected void processAndEmitWatermark(Watermark mark) {
checkNotClosed();
output.emitWatermark(mark);
}

@Override
protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
checkNotClosed();
if (idle != streamStatus.isIdle()) {
output.emitStreamStatus(streamStatus);
}
Expand Down Expand Up @@ -410,7 +402,7 @@ public WatermarkContext(
}

@Override
public void collect(T element) {
public final void collect(T element) {
checkNotClosed();
pnowojski marked this conversation as resolved.
Show resolved Hide resolved

synchronized (checkpointLock) {
Expand All @@ -426,14 +418,14 @@ public void collect(T element) {
}
}

protected void checkNotClosed() {
private void checkNotClosed() {
if (closed) {
throw new FlinkRuntimeException("The Source Context has been closed already.");
}
}

@Override
public void collectWithTimestamp(T element, long timestamp) {
public final void collectWithTimestamp(T element, long timestamp) {
checkNotClosed();

synchronized (checkpointLock) {
Expand All @@ -450,7 +442,7 @@ public void collectWithTimestamp(T element, long timestamp) {
}

@Override
public void emitWatermark(Watermark mark) {
public final void emitWatermark(Watermark mark) {
checkNotClosed();

if (allowWatermark(mark)) {
Expand All @@ -469,7 +461,7 @@ public void emitWatermark(Watermark mark) {
}

@Override
public void markAsTemporarilyIdle() {
public final void markAsTemporarilyIdle() {
checkNotClosed();

synchronized (checkpointLock) {
Expand Down