Skip to content

Commit

Permalink
Rename variables
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Dec 13, 2023
1 parent 5d76823 commit 499dcfd
Showing 1 changed file with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,41 +125,41 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
final Map<StreamPartition<String>, String> shardResetMap = new HashMap<>();
final Map<StreamPartition<String>, String> partitionToSequenceResetMap = new HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition, KinesisSequenceNumber.of(sequence))) {
shardResetMap.put(streamPartition, sequence);
partitionToSequenceResetMap.put(streamPartition, sequence);
}
}

if (!shardResetMap.isEmpty()) {
for (Map.Entry<StreamPartition<String>, String> partitionToReset : shardResetMap.entrySet()) {
if (!partitionToSequenceResetMap.isEmpty()) {
for (Map.Entry<StreamPartition<String>, String> partitionToSequence : partitionToSequenceResetMap.entrySet()) {
log.warn("Starting sequenceNumber[%s] is no longer available for partition[%s].",
partitionToReset.getValue(),
partitionToReset.getKey()
partitionToSequence.getValue(),
partitionToSequence.getKey()
);
}
if (task.getTuningConfig().isResetOffsetAutomatically()) {
log.info(
"Attempting to reset offsets for [%d] partitions with ids[%s].",
shardResetMap.size(),
shardResetMap.keySet()
partitionToSequenceResetMap.size(),
partitionToSequenceResetMap.keySet()
);
try {
sendResetRequestAndWait(shardResetMap, toolbox);
sendResetRequestAndWait(partitionToSequenceResetMap, toolbox);
}
catch (IOException e) {
throw new ISE(
e,
"Exception while attempting to automatically reset sequences for partitions[%s]",
shardResetMap.keySet()
partitionToSequenceResetMap.keySet()
);
}
} else {
throw new ISE(
"Automatic offset reset is disabled, but there are partitions with unavailable sequence numbers [%s].",
shardResetMap
partitionToSequenceResetMap
);
}
}
Expand Down

0 comments on commit 499dcfd

Please sign in to comment.