Skip to content

Commit

Permalink
Merge pull request #334 from C0urante/kafka-15090-fix
Browse files Browse the repository at this point in the history
fix: adapt source task shutdown logic to work with newer versions of …
  • Loading branch information
jjaakola-aiven committed Jun 7, 2024
2 parents 7ad3001 + 79818e9 commit 667202b
Showing 1 changed file with 65 additions and 61 deletions.
126 changes: 65 additions & 61 deletions src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ public class JdbcSourceTask extends SourceTask {
// Visible for testing
public static final int MAX_QUERY_SLEEP_MS = 100;

private Time time;
private final Time time;
private JdbcSourceTaskConfig config;
private DatabaseDialect dialect;
private CachedConnectionProvider cachedConnectionProvider;
private PriorityQueue<TableQuerier> tableQueue = new PriorityQueue<TableQuerier>();
private final PriorityQueue<TableQuerier> tableQueue = new PriorityQueue<TableQuerier>();
private final AtomicBoolean running = new AtomicBoolean(false);
private final Object pollLock = new Object();

public JdbcSourceTask() {
this.time = Time.SYSTEM;
Expand Down Expand Up @@ -269,8 +270,14 @@ private List<Map<String, String>> possibleTablePartitions(final String table) {
public void stop() throws ConnectException {
log.info("Stopping JDBC source task");
running.set(false);
// All resources are closed at the end of 'poll()' when no longer running or
// if there is an error
// Wait for any in-progress polls to stop before closing resources
// On older versions of Kafka Connect, SourceTask::stop and SourceTask::poll may
// be called concurrently on different threads
// On more recent versions, SourceTask::stop is always called after the last invocation
// of SourceTask::poll
synchronized (pollLock) {
closeResources();
}
}

protected void closeResources() {
Expand All @@ -297,72 +304,69 @@ protected void closeResources() {

@Override
public List<SourceRecord> poll() throws InterruptedException {
log.trace("Polling for new data");

while (running.get()) {
final TableQuerier querier = tableQueue.peek();

if (!querier.querying()) {
// If not in the middle of an update, wait for next update time
final long nextUpdate = querier.getLastUpdate()
+ config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG);
final long untilNext = nextUpdate - time.milliseconds();
final long sleepMs = Math.min(untilNext, MAX_QUERY_SLEEP_MS);
if (sleepMs > 0) {
log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)",
sleepMs, querier.toString(), untilNext);
time.sleep(sleepMs);
// Return control to the Connect runtime periodically
// See https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll():
// "If no data is currently available, this method should block but return control to the caller
// regularly (by returning null)"
return null;
synchronized (pollLock) {
log.trace("Polling for new data");

while (running.get()) {
final TableQuerier querier = tableQueue.peek();

if (!querier.querying()) {
// If not in the middle of an update, wait for next update time
final long nextUpdate = querier.getLastUpdate()
+ config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG);
final long untilNext = nextUpdate - time.milliseconds();
final long sleepMs = Math.min(untilNext, MAX_QUERY_SLEEP_MS);
if (sleepMs > 0) {
log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)",
sleepMs, querier.toString(), untilNext);
time.sleep(sleepMs);
// Return control to the Connect runtime periodically
// See https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll():
// "If no data is currently available, this method should block but return control to the caller
// regularly (by returning null)"
return null;
}
}
}

final List<SourceRecord> results = new ArrayList<>();
try {
log.debug("Checking for next block of results from {}", querier.toString());
querier.maybeStartQuery(cachedConnectionProvider.getConnection());
final List<SourceRecord> results = new ArrayList<>();
try {
log.debug("Checking for next block of results from {}", querier.toString());
querier.maybeStartQuery(cachedConnectionProvider.getConnection());

final int batchMaxRows = config.getInt(JdbcSourceTaskConfig.BATCH_MAX_ROWS_CONFIG);
boolean hadNext = true;
while (results.size() < batchMaxRows && (hadNext = querier.next())) {
results.add(querier.extractRecord());
}
final int batchMaxRows = config.getInt(JdbcSourceTaskConfig.BATCH_MAX_ROWS_CONFIG);
boolean hadNext = true;
while (results.size() < batchMaxRows && (hadNext = querier.next())) {
results.add(querier.extractRecord());
}

if (!hadNext) {
// If we finished processing the results from the current query, we can reset and send
// the querier to the tail of the queue
resetAndRequeueHead(querier);
}
if (!hadNext) {
// If we finished processing the results from the current query, we can reset and send
// the querier to the tail of the queue
resetAndRequeueHead(querier);
}

if (results.isEmpty()) {
log.trace("No updates for {}", querier.toString());
continue;
}
if (results.isEmpty()) {
log.trace("No updates for {}", querier.toString());
continue;
}

log.debug("Returning {} records for {}", results.size(), querier.toString());
return results;
} catch (final SQLException sqle) {
log.error("Failed to run query for table {}: {}", querier.toString(), sqle);
resetAndRequeueHead(querier);
return null;
} catch (final Throwable t) {
resetAndRequeueHead(querier);
// This task has failed, so close any resources (may be reopened if needed) before throwing
closeResources();
throw t;
log.debug("Returning {} records for {}", results.size(), querier.toString());
return results;
} catch (final SQLException sqle) {
log.error("Failed to run query for table {}: {}", querier.toString(), sqle);
resetAndRequeueHead(querier);
return null;
} catch (final Throwable t) {
resetAndRequeueHead(querier);
// This task has failed, so close any resources (may be reopened if needed) before throwing
closeResources();
throw t;
}
}
}

// Only in case of shutdown
final TableQuerier querier = tableQueue.peek();
if (querier != null) {
resetAndRequeueHead(querier);
// Only in case of shutdown
return null;
}
closeResources();
return null;
}

private void resetAndRequeueHead(final TableQuerier expectedHead) {
Expand Down

0 comments on commit 667202b

Please sign in to comment.