From 095456ccee67cb97913158c2b78a92ad90970745 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 10 Nov 2015 12:31:53 -0800 Subject: [PATCH] KAFKA-2795: fix potential NPE in addGroup --- .../kafka/coordinator/GroupMetadataManager.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index f98fc7402fe2d..047970e3bd6df 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -67,9 +67,6 @@ class GroupMetadataManager(val brokerId: Int, /* lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */ private val offsetExpireLock = new ReentrantReadWriteLock() - /* lock for removing offsets of a range partition, it should be always called BEFORE the group lock if needed */ - private val offsetRemoveLock = new ReentrantReadWriteLock() - /* shutting down flag */ private val shuttingDown = new AtomicBoolean(false) @@ -116,12 +113,12 @@ class GroupMetadataManager(val brokerId: Int, * Add a group or get the group associated with the given groupId if it already exists */ def addGroup(groupId: String, protocolType: String): GroupMetadata = { - addGroup(groupId, new GroupMetadata(groupId, protocolType)) - } - - private def addGroup(groupId: String, group: GroupMetadata): GroupMetadata = { - groupsCache.putIfNotExists(groupId, group) - groupsCache.get(groupId) + val newGroup = new GroupMetadata(groupId, protocolType) + val currentGroup = groupsCache.putIfNotExists(groupId, newGroup) + if (currentGroup != null) + currentGroup + else + newGroup } /**