Skip to content

Commit

Permalink
Make logs / exceptions more informative
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Dec 12, 2023
1 parent 93e7fac commit 5d76823
Showing 1 changed file with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,32 @@ protected void possiblyResetDataSourceMetadata(

if (!shardResetMap.isEmpty()) {
for (Map.Entry<StreamPartition<String>, String> partitionToReset : shardResetMap.entrySet()) {
log.warn("Starting sequence number [%s] is no longer available for partition [%s]",
log.warn("Starting sequenceNumber[%s] is no longer available for partition[%s].",
partitionToReset.getValue(),
partitionToReset.getKey().getPartitionId()
partitionToReset.getKey()
);
}
if (task.getTuningConfig().isResetOffsetAutomatically()) {
log.info("Attempting to reset offsets for [%d] partitions.", shardResetMap.size());
log.info(
"Attempting to reset offsets for [%d] partitions with ids[%s].",
shardResetMap.size(),
shardResetMap.keySet()
);
try {
sendResetRequestAndWait(shardResetMap, toolbox);
}
catch (IOException e) {
throw new ISE(e, "Exception while attempting to automatically reset sequences");
throw new ISE(
e,
"Exception while attempting to automatically reset sequences for partitions[%s]",
shardResetMap.keySet()
);
}
} else {
throw new ISE("Sequence numbers are unavailable but automatic offset reset is disabled.");
throw new ISE(
"Automatic offset reset is disabled, but there are partitions with unavailable sequence numbers [%s].",
shardResetMap
);
}
}
}
Expand Down Expand Up @@ -194,5 +205,4 @@ protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(
return null;
}
}

}

0 comments on commit 5d76823

Please sign in to comment.