Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,6 @@ public Optional<ProcessContinuation> run(
return Optional.empty();
}
// no new data, finish reading data
return cancelQueryOnHeartbeat ? Optional.empty() : Optional.of(ProcessContinuation.resume());
return cancelQueryOnHeartbeat ? Optional.of(ProcessContinuation.resume()) : Optional.empty();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Inverting the ternary operator here makes the behavior of cancelQueryOnHeartbeat opposite to its name:

  • If cancelQueryOnHeartbeat is true, it now returns ProcessContinuation.resume() (meaning it does not cancel the query on heartbeat).
  • If cancelQueryOnHeartbeat is false, it now returns Optional.empty() (meaning it does cancel the query on heartbeat).

This also forced the tests to be inverted in a counter-intuitive way (e.g., testEndTimestampNotReachedOnCancellingAction now asserts resume(), and testEndTimestampNotReachedOnAction now asserts Optional.empty()).

If the issue is that the feature is enabled by default when it shouldn't be, the correct fix is to change the default value of cancelQueryOnHeartbeat to false where it is initialized or configured (e.g., in the SpannerIO connector configuration or builder), rather than inverting the core logic here. Please revert this change and the test changes, and instead update the default configuration value of cancelQueryOnHeartbeat to false.

Suggested change
return cancelQueryOnHeartbeat ? Optional.of(ProcessContinuation.resume()) : Optional.empty();
return cancelQueryOnHeartbeat ? Optional.empty() : Optional.of(ProcessContinuation.resume());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testEndTimestampNotReachedOnCancellingAction() {
watermarkEstimator,
endTimestamp);

assertEquals(Optional.empty(), maybeContinuation);
assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime()));
}

Expand All @@ -254,7 +254,7 @@ public void testEndTimestampNotReachedOnAction() {
watermarkEstimator,
endTimestamp);

assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime()));
}
}
Loading