From ebf00f2b5edbdf33f668d4a60311ac10053e0a85 Mon Sep 17 00:00:00 2001 From: "wenqi.huang" Date: Thu, 29 Aug 2019 14:28:37 +0800 Subject: [PATCH 1/3] Fix offset maybe lost problem. --- .../broker/offset/ConsumerOffsetManager.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index ebc9dd8acce..f16f0fab8fc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -129,14 +129,13 @@ private void commitOffset(final String clientHost, final String key, final int q ConcurrentMap map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap(32); - map.put(queueId, offset); - this.offsetTable.put(key, map); - } else { - Long storeOffset = map.put(queueId, offset); - if (storeOffset != null && offset < storeOffset) { - log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); - } - } + this.offsetTable.putIfAbsent(key, map); + map = this.offsetTable.get(key); + } + Long storeOffset = map.put(queueId, offset); + if (storeOffset != null && offset < storeOffset) { + log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); + } } public long queryOffset(final String group, final String topic, final int queueId) { From 8a30f80e86ca5f4d1def0c882f5b1c2b94edd73d Mon Sep 17 00:00:00 2001 From: "wenqi.huang" Date: Thu, 29 Aug 2019 14:58:35 +0800 Subject: [PATCH 2/3] Checkstyle --- .../rocketmq/broker/offset/ConsumerOffsetManager.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index f16f0fab8fc..951526a8f90 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -131,11 +131,11 @@ private void commitOffset(final String clientHost, final String key, final int q map = new ConcurrentHashMap(32); this.offsetTable.putIfAbsent(key, map); map = this.offsetTable.get(key); - } - Long storeOffset = map.put(queueId, offset); - if (storeOffset != null && offset < storeOffset) { - log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); - } + } + Long storeOffset = map.put(queueId, offset); + if (storeOffset != null && offset < storeOffset) { + log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); + } } public long queryOffset(final String group, final String topic, final int queueId) { From bf459e50d6db176c85ae033d6dd61f00c7741735 Mon Sep 17 00:00:00 2001 From: "wenqi.huang" Date: Mon, 28 Oct 2019 19:31:24 +0800 Subject: [PATCH 3/3] Add unit test for ConsumerOffsetManager --- .../offset/ConsumerOffsetManagerTest.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java new file mode 100644 index 00000000000..d0f59e1bc95 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -0,0 +1,33 @@ +package org.apache.rocketmq.broker.offset; + +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +public class ConsumerOffsetManagerTest { + + @Test + public void testCommitOffset() { + ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(); + String group = "testGroup"; + String topic = "testTopic"; + int queueId = 1; + long offset = 100; + consumerOffsetManager.commitOffset("127.0.0.1", group, topic, queueId, offset); + long actualOffset = consumerOffsetManager.queryOffset(group, topic, queueId); + Assert.assertEquals(offset, actualOffset); + + Map map = consumerOffsetManager.queryOffset(group, topic); + Assert.assertEquals(offset, map.get(queueId).longValue()); + + offset = 101; + consumerOffsetManager.commitOffset("127.0.0.1", group, topic, queueId, offset); + + actualOffset = consumerOffsetManager.queryOffset(group, topic, queueId); + Assert.assertEquals(offset, actualOffset); + + map = consumerOffsetManager.queryOffset(group, topic); + Assert.assertEquals(offset, map.get(queueId).longValue()); + } +}