From 16ab2fd39e95d82721bba39bbf4ce01e4a01beb2 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 21 Apr 2016 15:54:19 -0700 Subject: [PATCH 1/2] KAFKA-3602: rename RecordAccumulator dequeFor() and fix usage --- .../producer/internals/RecordAccumulator.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d96398152443..ef220df63f2d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -167,7 +167,7 @@ public RecordAppendResult append(TopicPartition tp, appendsInProgress.incrementAndGet(); try { // check if we have an in-progress batch - Deque dq = dequeFor(tp); + Deque dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); @@ -259,7 +259,7 @@ public void reenqueue(RecordBatch batch, long now) { batch.lastAttemptMs = now; batch.lastAppendTime = now; batch.setRetry(); - Deque deque = dequeFor(batch.topicPartition); + Deque deque = getOrCreateDeque(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); } @@ -369,7 +369,7 @@ public Map> drain(Cluster cluster, TopicPartition tp = new TopicPartition(part.topic(), part.partition()); // Only proceed if the partition has no in-flight batches. if (!muted.contains(tp)) { - Deque deque = dequeFor(new TopicPartition(part.topic(), part.partition())); + Deque deque = getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null) { synchronized (deque) { RecordBatch first = deque.peekFirst(); @@ -401,10 +401,14 @@ public Map> drain(Cluster cluster, return batches; } + private Deque getDeque(TopicPartition tp) { + return batches.get(tp); + } + /** * Get the deque for the given topic-partition, creating it if necessary. */ - private Deque dequeFor(TopicPartition tp) { + private Deque getOrCreateDeque(TopicPartition tp) { Deque d = this.batches.get(tp); if (d != null) return d; @@ -478,7 +482,7 @@ public void abortIncompleteBatches() { */ private void abortBatches() { for (RecordBatch batch : incomplete.all()) { - Deque dq = dequeFor(batch.topicPartition); + Deque dq = getDeque(batch.topicPartition); // Close the batch before aborting synchronized (dq) { batch.records.close(); From d7499c50b24c20983412ae9b5b77eb485693b3d0 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 22 Apr 2016 09:34:14 -0700 Subject: [PATCH 2/2] remove unused argument in abortExpiredBatches --- .../producer/internals/RecordAccumulator.java | 2 +- .../kafka/clients/producer/internals/Sender.java | 2 +- .../producer/internals/RecordAccumulatorTest.java | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ef220df63f2d..1766609ace02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -213,7 +213,7 @@ public RecordAppendResult append(TopicPartition tp, * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout * due to metadata being unavailable */ - public List abortExpiredBatches(int requestTimeout, Cluster cluster, long now) { + public List abortExpiredBatches(int requestTimeout, long now) { List expiredBatches = new ArrayList(); int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index db8918c2a49d..29077b6bb6b3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -202,7 +202,7 @@ void run(long now) { } } - List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 904aa73bd33c..a39d2e82840a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -316,11 +316,11 @@ public void testExpiredBatches() throws InterruptedException { // Advance the clock to expire the batch. time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - List expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + List expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should be expired", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -330,11 +330,11 @@ public void testExpiredBatches() throws InterruptedException { time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -351,16 +351,16 @@ public void testExpiredBatches() throws InterruptedException { // test expiration. time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired.", 0, expiredBatches.size()); time.sleep(1L); accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size()); }