Skip to content

Commit

Permalink
Merge pull request #45 from confluentinc/logs
Browse files Browse the repository at this point in the history
added some logging. hopefully more useful than annoying while trouble…
  • Loading branch information
gwenshap committed Feb 23, 2016
2 parents c647674 + a34c6b4 commit 25bd681
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/main/java/io/confluent/connect/jdbc/JdbcSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public List<SourceRecord> poll() throws InterruptedException {

List<SourceRecord> results = new ArrayList<>();
try {
log.trace("Checking for next block of results from {}", querier.toString());
log.debug("Checking for next block of results from {}", querier.toString());
querier.maybeStartQuery(db);

int batchMaxRows = config.getInt(JdbcSourceTaskConfig.BATCH_MAX_ROWS_CONFIG);
Expand All @@ -211,9 +211,10 @@ public List<SourceRecord> poll() throws InterruptedException {
results.add(querier.extractRecord());
}


// If we finished processing the results from this query, we can clear it out
if (!hadNext) {
log.trace("Closing this query for {}", querier.toString());
log.debug("Closing this query for {}", querier.toString());
TableQuerier removedQuerier = tableQueue.poll();
assert removedQuerier == querier;
now = time.milliseconds();
Expand All @@ -226,7 +227,7 @@ public List<SourceRecord> poll() throws InterruptedException {
continue;
}

log.trace("Returning {} records for {}", results.size(), querier.toString());
log.debug("Returning {} records for {}", results.size(), querier.toString());
return results;
} catch (SQLException e) {
log.error("Failed to run query for table {}: {}", querier.toString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -103,6 +104,7 @@ private boolean updateTables() {
final List<String> tables;
try {
tables = JdbcUtils.getTables(db);
log.debug("Got the following tables: " + Arrays.toString(tables.toArray()));
} catch (SQLException e) {
log.error("Error while trying to get updated table list, ignoring and waiting for next "
+ "table poll interval", e);
Expand All @@ -129,6 +131,7 @@ private boolean updateTables() {
}

if (!filteredTables.equals(this.tables)) {
log.debug("After filtering we got tables: " + Arrays.toString(filteredTables.toArray()));
List<String> previousTables = this.tables;
this.tables = filteredTables;
db.notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,14 @@ protected ResultSet executeQuery() throws SQLException {
stmt.setTimestamp(1, ts, UTC_CALENDAR);
stmt.setLong(2, (incrementingOffset == null ? -1 : incrementingOffset));
stmt.setTimestamp(3, ts, UTC_CALENDAR);
log.debug("Executing prepared statement with timestamp value = " + timestampOffset + " and incrementing value = " + incrementingOffset);
} else if (incrementingColumn != null) {
stmt.setLong(1, (incrementingOffset == null ? -1 : incrementingOffset));
log.debug("Executing prepared statement with incrementing value = " + incrementingOffset);
} else if (timestampColumn != null) {
Timestamp ts = new Timestamp(timestampOffset == null ? 0 : timestampOffset);
stmt.setTimestamp(1, ts, UTC_CALENDAR);
log.debug("Executing prepared statement with timestamp value = " + timestampOffset);
}
return stmt.executeQuery();
}
Expand Down

0 comments on commit 25bd681

Please sign in to comment.