From 499dcfdc3c6aadb4df0f701c54b708290d98f6ce Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 13 Dec 2023 10:01:34 +0530 Subject: [PATCH] Rename variables --- .../kinesis/KinesisIndexTaskRunner.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 7e13cc7631bc..75f23da0e1f6 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -125,41 +125,41 @@ protected void possiblyResetDataSourceMetadata( { if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) { final ConcurrentMap currOffsets = getCurrentOffsets(); - final Map, String> shardResetMap = new HashMap<>(); + final Map, String> partitionToSequenceResetMap = new HashMap<>(); for (final StreamPartition streamPartition : assignment) { String sequence = currOffsets.get(streamPartition.getPartitionId()); if (!recordSupplier.isOffsetAvailable(streamPartition, KinesisSequenceNumber.of(sequence))) { - shardResetMap.put(streamPartition, sequence); + partitionToSequenceResetMap.put(streamPartition, sequence); } } - if (!shardResetMap.isEmpty()) { - for (Map.Entry, String> partitionToReset : shardResetMap.entrySet()) { + if (!partitionToSequenceResetMap.isEmpty()) { + for (Map.Entry, String> partitionToSequence : partitionToSequenceResetMap.entrySet()) { log.warn("Starting sequenceNumber[%s] is no longer available for partition[%s].", - partitionToReset.getValue(), - partitionToReset.getKey() + partitionToSequence.getValue(), + partitionToSequence.getKey() ); } if (task.getTuningConfig().isResetOffsetAutomatically()) { log.info( "Attempting to reset offsets for [%d] partitions with ids[%s].", - shardResetMap.size(), - shardResetMap.keySet() + partitionToSequenceResetMap.size(), + partitionToSequenceResetMap.keySet() ); try { - sendResetRequestAndWait(shardResetMap, toolbox); + sendResetRequestAndWait(partitionToSequenceResetMap, toolbox); } catch (IOException e) { throw new ISE( e, "Exception while attempting to automatically reset sequences for partitions[%s]", - shardResetMap.keySet() + partitionToSequenceResetMap.keySet() ); } } else { throw new ISE( "Automatic offset reset is disabled, but there are partitions with unavailable sequence numbers [%s].", - shardResetMap + partitionToSequenceResetMap ); } }