From f6efd565023d440c6d15091609442ff61ad6f85a Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 19 May 2017 11:31:48 -0700 Subject: [PATCH 1/8] KAFKA-5247 materialize offset commits in offset order Updated the GroupMetadata to keep track of the offset in the __consumer_offsets topic for the commit record for a given offset commit. We only update the offsets cache when a given offset is committed if the offset of the commit record in the offsets topic is greater than the offset of the existing materialized offset. This way, if we have a mix of transactional and non transactional offset commits for the same group, we will always materialize the offset commtis in offset order. --- .../coordinator/group/GroupMetadata.scala | 81 +++++++++++++++---- .../group/GroupMetadataManager.scala | 50 +++++++----- .../coordinator/group/GroupMetadataTest.scala | 9 ++- 3 files changed, 97 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 2f76d635dbf1b..6eb0cf16b9f68 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -126,6 +126,17 @@ case class GroupSummary(state: String, protocol: String, members: List[MemberSummary]) +/** + * We cache offset commits along with their commit record offset. This enables us to ensure that the latest offset + * commit is always materialized when we have a mix of transactional and regular offset commits. Without preserving + * information of the commit record offset, compaction of the offsets topic it self may result in the wrong offset commit + * being materialized. + */ +case class CommitRecordMetadataAndOffset(commitRecordOffset: Long, + offsetAndMetadata: OffsetAndMetadata) { + def olderThan(that: CommitRecordMetadataAndOffset) = commitRecordOffset < that.commitRecordOffset +} + /** * Group contains the following metadata: * @@ -143,12 +154,17 @@ case class GroupSummary(state: String, private[group] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) extends Logging { private var state: GroupState = initialState + private val members = new mutable.HashMap[String, MemberMetadata] - private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata] + + private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset] + private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] - // A map from a producer id to the open offset commits for that producer id. - private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]() + + private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]() + private var receivedTransactionalOffsetCommits = false + private var receivedConsumerOffsetCommits = false var protocolType: Option[String] = None @@ -271,18 +287,23 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState GroupOverview(groupId, protocolType.getOrElse("")) } - def initializeOffsets(offsets: collection.Map[TopicPartition, OffsetAndMetadata], - pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]) { + def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset], + pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) { this.offsets ++= offsets this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets } - def completePendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata) { - if (pendingOffsetCommits.contains(topicPartition)) - offsets.put(topicPartition, offset) + def completePendingOffsetWrite(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) { + if (pendingOffsetCommits.contains(topicPartition)) { + // TODO(apurva) : Is this check necessary? Is it ever possible that a transactional offset commit would + // be written and then committed in the time where a regular offset commit was written but not acknowledged. + if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata)) + offsets.put(topicPartition, offsetWithCommitRecordMetadata) + } pendingOffsetCommits.get(topicPartition) match { - case Some(stagedOffset) if offset == stagedOffset => pendingOffsetCommits.remove(topicPartition) + case Some(stagedOffset) if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset => + pendingOffsetCommits.remove(topicPartition) case _ => } } @@ -301,8 +322,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, OffsetAndMetadata]) { receivedTransactionalOffsetCommits = true - val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, OffsetAndMetadata]) - producerOffsets ++= offsets + val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId, + mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) + + offsets.foreach { case (topicPartition, offsetAndMetadata) => + producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(-1, offsetAndMetadata)) + } } def hasReceivedConsistentOffsetCommits : Boolean = { @@ -323,14 +348,28 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState } } + def updateCommitRecordMetadataForPendingTxnOffsetWrite(producerId: Long, topicPartition: TopicPartition, + commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) { + pendingTransactionalOffsetCommits.get(producerId) match { + case Some(pendingOffset) => + if (pendingOffset.contains(topicPartition) + && pendingOffset(topicPartition).offsetAndMetadata == commitRecordMetadataAndOffset.offsetAndMetadata) + pendingOffset.update(topicPartition, commitRecordMetadataAndOffset) + case _ => + // We may hit this case if the partition in question has emigrated. + } + } /* Complete a pending transactional offset commit. This is called after a commit or abort marker is fully written * to the log. */ def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): Unit = { trace(s"Completing transactional offset commit for producer $producerId and group $groupId. isCommit: $isCommit") if (isCommit) { - val producerOffsets = pendingTransactionalOffsetCommits.getOrElse(producerId, Map.empty[TopicPartition, OffsetAndMetadata]) - offsets ++= producerOffsets + val producerOffsets = pendingTransactionalOffsetCommits.getOrElse(producerId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) + producerOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) => + if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(commitRecordMetadataAndOffset)) + offsets.put(topicPartition, commitRecordMetadataAndOffset) + } } pendingTransactionalOffsetCommits.remove(producerId) } @@ -340,7 +379,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def hasPendingOffsetCommitsFromProducer(producerId: Long) = pendingTransactionalOffsetCommits.contains(producerId) - def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = { + def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, CommitRecordMetadataAndOffset] = { topicPartitions.flatMap { topicPartition => pendingOffsetCommits.remove(topicPartition) pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) => @@ -353,15 +392,23 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def removeExpiredOffsets(startMs: Long) = { val expiredOffsets = offsets.filter { - case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition) + case (topicPartition, commitRecordMetadataAndOffset) => + commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition) } offsets --= expiredOffsets.keySet expiredOffsets.toMap } - def allOffsets = offsets.toMap + def allOffsets = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => + (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata) + }.toMap - def offset(topicPartition: TopicPartition) = offsets.get(topicPartition) + def offset(topicPartition: TopicPartition) : Option[OffsetAndMetadata] = offsets.get(topicPartition) match { + case Some(commitRecordMetadataAndOffset) => + Option(commitRecordMetadataAndOffset.offsetAndMetadata) + case None => + None + } def numOffsets = offsets.size diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 74a3f7b6a4a5e..b5e014dc31efb 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -309,9 +309,12 @@ class GroupMetadataManager(brokerId: Int, val responseError = group synchronized { if (status.error == Errors.NONE) { - if (!group.is(Dead) && !isTxnOffsetCommit) { + if (!group.is(Dead)) { filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) => - group.completePendingOffsetWrite(topicPartition, offsetAndMetadata) + if (isTxnOffsetCommit) + group.updateCommitRecordMetadataForPendingTxnOffsetWrite(producerId, topicPartition, CommitRecordMetadataAndOffset(status.baseOffset, offsetAndMetadata)) + else + group.completePendingOffsetWrite(topicPartition, CommitRecordMetadataAndOffset(status.baseOffset, offsetAndMetadata)) } } Errors.NONE @@ -473,8 +476,8 @@ class GroupMetadataManager(brokerId: Int, lazy val buffer = ByteBuffer.allocate(config.loadBufferSize) // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 - val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]() - val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, OffsetAndMetadata]]() + val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]() + val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]() val loadedGroups = mutable.Map[String, GroupMetadata]() val removedGroups = mutable.Set[String]() @@ -496,10 +499,11 @@ class GroupMetadataManager(brokerId: Int, val record = batch.iterator.next() val controlRecord = ControlRecordType.parse(record.key) if (controlRecord == ControlRecordType.COMMIT) { - pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, OffsetAndMetadata]()) + pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()) .foreach { - case (groupTopicPartition, offsetAndMetadata) => - loadedOffsets.put(groupTopicPartition, offsetAndMetadata) + case (groupTopicPartition, commitRecordMetadataAndOffset) => + if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset)) + loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset) } } pendingOffsets.remove(batch.producerId) @@ -510,7 +514,7 @@ class GroupMetadataManager(brokerId: Int, case offsetKey: OffsetKey => if (isTxnOffsetCommit && !pendingOffsets.contains(batch.producerId)) - pendingOffsets.put(batch.producerId, mutable.Map[GroupTopicPartition, OffsetAndMetadata]()) + pendingOffsets.put(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()) // load offset val groupTopicPartition = offsetKey.key @@ -520,11 +524,11 @@ class GroupMetadataManager(brokerId: Int, else loadedOffsets.remove(groupTopicPartition) } else { - val value = GroupMetadataManager.readOffsetMessageValue(record.value) + val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value) if (isTxnOffsetCommit) - pendingOffsets(batch.producerId).put(groupTopicPartition, value) + pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batch.baseOffset, offsetAndMetadata)) else - loadedOffsets.put(groupTopicPartition, value) + loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batch.baseOffset, offsetAndMetadata)) } case groupMetadataKey: GroupMetadataKey => @@ -554,15 +558,15 @@ class GroupMetadataManager(brokerId: Int, .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) }) .partition { case (group, _) => loadedGroups.contains(group) } - val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]]() + val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]() pendingOffsets.foreach { case (producerId, producerOffsets) => producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _)) producerOffsets .groupBy(_._1.group) .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)}) .foreach { case (group, offsets) => - val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]) - val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, OffsetAndMetadata]) + val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) + val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) groupProducerOffsets ++= offsets } } @@ -571,8 +575,8 @@ class GroupMetadataManager(brokerId: Int, .partition { case (group, _) => loadedGroups.contains(group)} loadedGroups.values.foreach { group => - val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata]) - val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]) + val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) + val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) loadGroup(group, offsets, pendingOffsets) onGroupLoaded(group) } @@ -581,8 +585,8 @@ class GroupMetadataManager(brokerId: Int, // metadata stored in the log (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId) => val group = new GroupMetadata(groupId) - val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, OffsetAndMetadata]) - val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]) + val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) + val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) loadGroup(group, offsets, pendingOffsets) onGroupLoaded(group) } @@ -603,17 +607,19 @@ class GroupMetadataManager(brokerId: Int, } } - private def loadGroup(group: GroupMetadata, offsets: Map[TopicPartition, OffsetAndMetadata], - pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]): Unit = { + private def loadGroup(group: GroupMetadata, offsets: Map[TopicPartition, CommitRecordMetadataAndOffset], + pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = { // offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent // view of the group's offsets - val loadedOffsets = offsets.mapValues { offsetAndMetadata => + val loadedOffsets = offsets.mapValues { case CommitRecordMetadataAndOffset(commitRecordOffset, offsetAndMetadata) => // special handling for version 0: // set the expiration time stamp as commit time stamp + server default retention time - if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) + val updatedOffsetAndMetadata = + if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) offsetAndMetadata.copy(expireTimestamp = offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs) else offsetAndMetadata + CommitRecordMetadataAndOffset(commitRecordOffset, updatedOffsetAndMetadata) } trace(s"Initialized offsets $loadedOffsets for group ${group.groupId}") group.initializeOffsets(loadedOffsets, pendingTransactionalOffsets.toMap) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index e62c0d39ba69f..db99d1a1647ae 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -296,12 +296,13 @@ class GroupMetadataTest extends JUnitSuite { def testOffsetCommit(): Unit = { val partition = new TopicPartition("foo", 0) val offset = OffsetAndMetadata(37) + val commitRecordOffset = 3 group.prepareOffsetCommit(Map(partition -> offset)) assertTrue(group.hasOffsets) assertEquals(None, group.offset(partition)) - group.completePendingOffsetWrite(partition, offset) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(commitRecordOffset, offset)) assertTrue(group.hasOffsets) assertEquals(Some(offset), group.offset(partition)) } @@ -337,7 +338,7 @@ class GroupMetadataTest extends JUnitSuite { assertTrue(group.hasOffsets) assertEquals(None, group.offset(partition)) - group.completePendingOffsetWrite(partition, secondOffset) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(3L, secondOffset)) assertTrue(group.hasOffsets) assertEquals(Some(secondOffset), group.offset(partition)) } @@ -355,11 +356,11 @@ class GroupMetadataTest extends JUnitSuite { group.prepareOffsetCommit(Map(partition -> secondOffset)) assertTrue(group.hasOffsets) - group.completePendingOffsetWrite(partition, firstOffset) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(4L, firstOffset)) assertTrue(group.hasOffsets) assertEquals(Some(firstOffset), group.offset(partition)) - group.completePendingOffsetWrite(partition, secondOffset) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(5L, secondOffset)) assertTrue(group.hasOffsets) assertEquals(Some(secondOffset), group.offset(partition)) } From 20ee45422130f197791600891a9872826d510ca7 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 19 May 2017 15:35:27 -0700 Subject: [PATCH 2/8] Update the return values of the GroupMetadata.remove* methods --- .../kafka/coordinator/group/GroupMetadata.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 6eb0cf16b9f68..33cfd0cd5bc9c 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -359,6 +359,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState // We may hit this case if the partition in question has emigrated. } } + /* Complete a pending transactional offset commit. This is called after a commit or abort marker is fully written * to the log. */ @@ -379,22 +380,26 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def hasPendingOffsetCommitsFromProducer(producerId: Long) = pendingTransactionalOffsetCommits.contains(producerId) - def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, CommitRecordMetadataAndOffset] = { + def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = { topicPartitions.flatMap { topicPartition => pendingOffsetCommits.remove(topicPartition) pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) => pendingOffsets.remove(topicPartition) } val removedOffset = offsets.remove(topicPartition) - removedOffset.map(topicPartition -> _) + removedOffset.map(topicPartition -> _.offsetAndMetadata) }.toMap } def removeExpiredOffsets(startMs: Long) = { - val expiredOffsets = offsets.filter { - case (topicPartition, commitRecordMetadataAndOffset) => - commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition) - } + val expiredOffsets = offsets + .filter { + case (topicPartition, commitRecordMetadataAndOffset) => + commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition) + } + .map { case (topicPartition, commitRecordOffsetAndMetadata) => + (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata) + } offsets --= expiredOffsets.keySet expiredOffsets.toMap } From 2fd79d1680711cdd746233dfbeaea957e65e67d8 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 19 May 2017 16:49:08 -0700 Subject: [PATCH 3/8] Minor cleanups and added unit tests --- .../coordinator/group/GroupMetadata.scala | 9 +- .../group/GroupMetadataManager.scala | 2 +- .../group/GroupMetadataManagerTest.scala | 92 ++++++++++++++++ .../coordinator/group/GroupMetadataTest.scala | 100 ++++++++++++++++++ 4 files changed, 198 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 33cfd0cd5bc9c..daff4b159c00c 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -348,8 +348,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState } } - def updateCommitRecordMetadataForPendingTxnOffsetWrite(producerId: Long, topicPartition: TopicPartition, - commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) { + def updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition, + commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) { pendingTransactionalOffsetCommits.get(producerId) match { case Some(pendingOffset) => if (pendingOffset.contains(topicPartition) @@ -397,8 +397,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState case (topicPartition, commitRecordMetadataAndOffset) => commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition) } - .map { case (topicPartition, commitRecordOffsetAndMetadata) => - (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata) + .map { + case (topicPartition, commitRecordOffsetAndMetadata) => + (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata) } offsets --= expiredOffsets.keySet expiredOffsets.toMap diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index b5e014dc31efb..51f385f9631f5 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -312,7 +312,7 @@ class GroupMetadataManager(brokerId: Int, if (!group.is(Dead)) { filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) => if (isTxnOffsetCommit) - group.updateCommitRecordMetadataForPendingTxnOffsetWrite(producerId, topicPartition, CommitRecordMetadataAndOffset(status.baseOffset, offsetAndMetadata)) + group.updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId, topicPartition, CommitRecordMetadataAndOffset(status.baseOffset, offsetAndMetadata)) else group.completePendingOffsetWrite(topicPartition, CommitRecordMetadataAndOffset(status.baseOffset, offsetAndMetadata)) } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index f76eb7b68a7f4..d21b2d5ec9fa2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -364,6 +364,98 @@ class GroupMetadataManagerTest { } } + @Test + def testGroupLoadWithConsumerAndTransactionalOffsetCommitsConsumerWins(): Unit = { + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) + val producerId = 1000L + val producerEpoch: Short = 2 + + val transactionalOffsetCommits = Map( + new TopicPartition("foo", 0) -> 23L + ) + + val consumerOffsetCommits = Map( + new TopicPartition("foo", 0) -> 24L + ) + + val buffer = ByteBuffer.allocate(1024) + var nextOffset = 0 + nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, transactionalOffsetCommits) + nextOffset += appendConsumerOffsetCommit(buffer, nextOffset, consumerOffsetCommits) + nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true) + buffer.flip() + + val records = MemoryRecords.readableRecords(buffer) + expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) + + // The group should be loaded with pending offsets. + val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) + assertEquals(groupId, group.groupId) + assertEquals(Empty, group.currentState) + // Ensure that no offsets are materialized, but that we have offsets pending. + assertEquals(1, group.allOffsets.size) + assertTrue(group.hasOffsets) + assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)) + assertEquals(consumerOffsetCommits.size, group.allOffsets.size) + consumerOffsetCommits.foreach { case (topicPartition, offset) => + assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + } + } + + @Test + def testGroupLoadWithConsumerAndTransactionalOffsetCommitsTransactionWins(): Unit = { + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) + val producerId = 1000L + val producerEpoch: Short = 2 + + val transactionalOffsetCommits = Map( + new TopicPartition("foo", 0) -> 23L + ) + + val consumerOffsetCommits = Map( + new TopicPartition("foo", 0) -> 24L + ) + + val buffer = ByteBuffer.allocate(1024) + var nextOffset = 0 + nextOffset += appendConsumerOffsetCommit(buffer, nextOffset, consumerOffsetCommits) + nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, transactionalOffsetCommits) + nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true) + buffer.flip() + + val records = MemoryRecords.readableRecords(buffer) + expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) + + // The group should be loaded with pending offsets. + val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) + assertEquals(groupId, group.groupId) + assertEquals(Empty, group.currentState) + // Ensure that no offsets are materialized, but that we have offsets pending. + assertEquals(1, group.allOffsets.size) + assertTrue(group.hasOffsets) + assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)) + assertEquals(consumerOffsetCommits.size, group.allOffsets.size) + transactionalOffsetCommits.foreach { case (topicPartition, offset) => + assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + } + } + + private def appendConsumerOffsetCommit(buffer: ByteBuffer, baseOffset: Long, offsets: Map[TopicPartition, Long]) = { + val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset) + val commitRecords = createCommittedOffsetRecords(offsets) + commitRecords.foreach(builder.append) + builder.build() + offsets.size + } + private def appendTransactionalOffsetCommits(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, baseOffset: Long, offsets: Map[TopicPartition, Long]): Int = { val builder = MemoryRecords.builder(buffer, CompressionType.NONE, baseOffset, producerId, producerEpoch, 0, true) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index db99d1a1647ae..9450055d9a96a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -365,6 +365,106 @@ class GroupMetadataTest extends JUnitSuite { assertEquals(Some(secondOffset), group.offset(partition)) } + @Test + def testConsumerBeatsTransactionalOffsetCommit(): Unit = { + val partition = new TopicPartition("foo", 0) + val producerId = 13232L + val txnOffsetCommit = OffsetAndMetadata(37) + val consumerOffsetCommit = OffsetAndMetadata(57) + + group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit)) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + + group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit)) + assertTrue(group.hasOffsets) + + group.updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId, partition, CommitRecordMetadataAndOffset(3L, txnOffsetCommit)) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(4L, consumerOffsetCommit)) + assertTrue(group.hasOffsets) + assertEquals(Some(consumerOffsetCommit), group.offset(partition)) + + group.completePendingTxnOffsetCommit(producerId, isCommit = true) + assertTrue(group.hasOffsets) + // This is the crucial assertion which validates that we materialize offsets in offset order, not transactional order. + assertEquals(Some(consumerOffsetCommit), group.offset(partition)) + } + + @Test + def testTransactionBeatsConsumerOffsetCommit(): Unit = { + val partition = new TopicPartition("foo", 0) + val producerId = 13232L + val txnOffsetCommit = OffsetAndMetadata(37) + val consumerOffsetCommit = OffsetAndMetadata(57) + + group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit)) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + + group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit)) + assertTrue(group.hasOffsets) + + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(3L, consumerOffsetCommit)) + group.updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId, partition, CommitRecordMetadataAndOffset(4L, txnOffsetCommit)) + assertTrue(group.hasOffsets) + // The transactional offset commit hasn't been committed yet, so we should materialize the consumer offset commit. + assertEquals(Some(consumerOffsetCommit), group.offset(partition)) + + group.completePendingTxnOffsetCommit(producerId, isCommit = true) + assertTrue(group.hasOffsets) + // The transactional offset commit has been materialized and the transactional commit record is later in the log, + // so it should be materialized. + assertEquals(Some(txnOffsetCommit), group.offset(partition)) + } + + @Test + def testTransactionalCommitIsAbortedAndConsumerCommitWins(): Unit = { + val partition = new TopicPartition("foo", 0) + val producerId = 13232L + val txnOffsetCommit = OffsetAndMetadata(37) + val consumerOffsetCommit = OffsetAndMetadata(57) + + group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit)) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + + group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit)) + assertTrue(group.hasOffsets) + + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(3L, consumerOffsetCommit)) + group.updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId, partition, CommitRecordMetadataAndOffset(4L, txnOffsetCommit)) + assertTrue(group.hasOffsets) + // The transactional offset commit hasn't been committed yet, so we should materialize the consumer offset commit. + assertEquals(Some(consumerOffsetCommit), group.offset(partition)) + + group.completePendingTxnOffsetCommit(producerId, isCommit = false) + assertTrue(group.hasOffsets) + // The transactional offset commit should be discarded and the consumer offset commit should continue to be + // materialized. + assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)) + assertEquals(Some(consumerOffsetCommit), group.offset(partition)) + } + + @Test + def testFailedTxnOffsetCommitLeavesNoPendingState(): Unit = { + val partition = new TopicPartition("foo", 0) + val producerId = 13232L + val txnOffsetCommit = OffsetAndMetadata(37) + + group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit)) + assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId)) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + group.failPendingTxnOffsetCommit(producerId, partition, txnOffsetCommit) + assertFalse(group.hasOffsets) + assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)) + + // The commit marker should now have no effect. + group.completePendingTxnOffsetCommit(producerId, isCommit = true) + assertFalse(group.hasOffsets) + assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)) + } + private def assertState(group: GroupMetadata, targetState: GroupState) { val states: Set[GroupState] = Set(Stable, PreparingRebalance, AwaitingSync, Dead) val otherStates = states - targetState From 7e5f2820809d9a085333e1fa97efd13207e5a4e0 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 19 May 2017 17:02:13 -0700 Subject: [PATCH 4/8] Remove erroneous comment --- .../unit/kafka/coordinator/group/GroupMetadataManagerTest.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index d21b2d5ec9fa2..17d46755356ee 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -396,7 +396,6 @@ class GroupMetadataManagerTest { val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) assertEquals(groupId, group.groupId) assertEquals(Empty, group.currentState) - // Ensure that no offsets are materialized, but that we have offsets pending. assertEquals(1, group.allOffsets.size) assertTrue(group.hasOffsets) assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)) @@ -438,7 +437,6 @@ class GroupMetadataManagerTest { val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) assertEquals(groupId, group.groupId) assertEquals(Empty, group.currentState) - // Ensure that no offsets are materialized, but that we have offsets pending. assertEquals(1, group.allOffsets.size) assertTrue(group.hasOffsets) assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)) From 4667ea2da51b66f4ecbac8c3ea389fa3b6ae08cb Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Mon, 22 May 2017 17:43:35 -0700 Subject: [PATCH 5/8] Address PR Comments --- .../coordinator/group/GroupMetadata.scala | 23 +++++++++++-------- .../group/GroupMetadataManager.scala | 8 +++---- .../coordinator/group/GroupMetadataTest.scala | 20 ++++++++-------- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index daff4b159c00c..9f092700f71b7 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -132,9 +132,8 @@ case class GroupSummary(state: String, * information of the commit record offset, compaction of the offsets topic it self may result in the wrong offset commit * being materialized. */ -case class CommitRecordMetadataAndOffset(commitRecordOffset: Long, - offsetAndMetadata: OffsetAndMetadata) { - def olderThan(that: CommitRecordMetadataAndOffset) = commitRecordOffset < that.commitRecordOffset +case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) { + def olderThan(that: CommitRecordMetadataAndOffset) : Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get } /** @@ -295,8 +294,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def completePendingOffsetWrite(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) { if (pendingOffsetCommits.contains(topicPartition)) { - // TODO(apurva) : Is this check necessary? Is it ever possible that a transactional offset commit would - // be written and then committed in the time where a regular offset commit was written but not acknowledged. + if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty) + throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record" + + "in the log.") if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata)) offsets.put(topicPartition, offsetWithCommitRecordMetadata) } @@ -305,6 +305,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState case Some(stagedOffset) if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset => pendingOffsetCommits.remove(topicPartition) case _ => + // The pendingOffsetCommits for this partition could be empty if the topic was deleted, in which case + // its entries would be removed from the cache by the `removeOffsets` method. } } @@ -326,7 +328,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) offsets.foreach { case (topicPartition, offsetAndMetadata) => - producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(-1, offsetAndMetadata)) + producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata)) } } @@ -348,8 +350,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState } } - def updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition, - commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) { + def onTxnOffsetCommitAppend(producerId: Long, topicPartition: TopicPartition, + commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) { pendingTransactionalOffsetCommits.get(producerId) match { case Some(pendingOffset) => if (pendingOffset.contains(topicPartition) @@ -368,6 +370,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState if (isCommit) { val producerOffsets = pendingTransactionalOffsetCommits.getOrElse(producerId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) producerOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) => + if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty) + throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " + + s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.") if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(commitRecordMetadataAndOffset)) offsets.put(topicPartition, commitRecordMetadataAndOffset) } @@ -391,7 +396,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState }.toMap } - def removeExpiredOffsets(startMs: Long) = { + def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = { val expiredOffsets = offsets .filter { case (topicPartition, commitRecordMetadataAndOffset) => diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 51f385f9631f5..fa3eb0e375ae2 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -312,9 +312,9 @@ class GroupMetadataManager(brokerId: Int, if (!group.is(Dead)) { filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) => if (isTxnOffsetCommit) - group.updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId, topicPartition, CommitRecordMetadataAndOffset(status.baseOffset, offsetAndMetadata)) + group.onTxnOffsetCommitAppend(producerId, topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) else - group.completePendingOffsetWrite(topicPartition, CommitRecordMetadataAndOffset(status.baseOffset, offsetAndMetadata)) + group.completePendingOffsetWrite(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) } } Errors.NONE @@ -526,9 +526,9 @@ class GroupMetadataManager(brokerId: Int, } else { val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value) if (isTxnOffsetCommit) - pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batch.baseOffset, offsetAndMetadata)) + pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata)) else - loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batch.baseOffset, offsetAndMetadata)) + loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata)) } case groupMetadataKey: GroupMetadataKey => diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index 9450055d9a96a..0a38d6c1a93a1 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -302,7 +302,7 @@ class GroupMetadataTest extends JUnitSuite { assertTrue(group.hasOffsets) assertEquals(None, group.offset(partition)) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(commitRecordOffset, offset)) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset)) assertTrue(group.hasOffsets) assertEquals(Some(offset), group.offset(partition)) } @@ -338,7 +338,7 @@ class GroupMetadataTest extends JUnitSuite { assertTrue(group.hasOffsets) assertEquals(None, group.offset(partition)) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(3L, secondOffset)) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(3L), secondOffset)) assertTrue(group.hasOffsets) assertEquals(Some(secondOffset), group.offset(partition)) } @@ -356,11 +356,11 @@ class GroupMetadataTest extends JUnitSuite { group.prepareOffsetCommit(Map(partition -> secondOffset)) assertTrue(group.hasOffsets) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(4L, firstOffset)) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(4L), firstOffset)) assertTrue(group.hasOffsets) assertEquals(Some(firstOffset), group.offset(partition)) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(5L, secondOffset)) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(5L), secondOffset)) assertTrue(group.hasOffsets) assertEquals(Some(secondOffset), group.offset(partition)) } @@ -379,8 +379,8 @@ class GroupMetadataTest extends JUnitSuite { group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit)) assertTrue(group.hasOffsets) - group.updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId, partition, CommitRecordMetadataAndOffset(3L, txnOffsetCommit)) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(4L, consumerOffsetCommit)) + group.onTxnOffsetCommitAppend(producerId, partition, CommitRecordMetadataAndOffset(Some(3L), txnOffsetCommit)) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(4L), consumerOffsetCommit)) assertTrue(group.hasOffsets) assertEquals(Some(consumerOffsetCommit), group.offset(partition)) @@ -404,8 +404,8 @@ class GroupMetadataTest extends JUnitSuite { group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit)) assertTrue(group.hasOffsets) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(3L, consumerOffsetCommit)) - group.updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId, partition, CommitRecordMetadataAndOffset(4L, txnOffsetCommit)) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit)) + group.onTxnOffsetCommitAppend(producerId, partition, CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit)) assertTrue(group.hasOffsets) // The transactional offset commit hasn't been committed yet, so we should materialize the consumer offset commit. assertEquals(Some(consumerOffsetCommit), group.offset(partition)) @@ -431,8 +431,8 @@ class GroupMetadataTest extends JUnitSuite { group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit)) assertTrue(group.hasOffsets) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(3L, consumerOffsetCommit)) - group.updateCommitRecordMetadataForPendingTxnOffsetCommit(producerId, partition, CommitRecordMetadataAndOffset(4L, txnOffsetCommit)) + group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit)) + group.onTxnOffsetCommitAppend(producerId, partition, CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit)) assertTrue(group.hasOffsets) // The transactional offset commit hasn't been committed yet, so we should materialize the consumer offset commit. assertEquals(Some(consumerOffsetCommit), group.offset(partition)) From 261573d9526599b1b4d84ab3ede79381add1c89e Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Mon, 22 May 2017 17:57:14 -0700 Subject: [PATCH 6/8] Add test cases to verify the log position of offset commits is what we expect --- .../scala/kafka/coordinator/group/GroupMetadata.scala | 3 +++ .../coordinator/group/GroupMetadataManagerTest.scala | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 9f092700f71b7..798d05a295839 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -421,6 +421,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState None } + // visible for testing + private[group] def offsetWithRecordMetadata(topicPartition: TopicPartition): Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition) + def numOffsets = offsets.size def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty || pendingTransactionalOffsetCommits.nonEmpty diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 17d46755356ee..323c8d55bc7e6 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -278,6 +278,7 @@ class GroupMetadataManagerTest { val buffer = ByteBuffer.allocate(1024) var nextOffset = 0 + val commitOffsetsLogPosition = nextOffset nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, committedOffsets) nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true) nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, abortedOffsets) @@ -301,6 +302,7 @@ class GroupMetadataManagerTest { assertEquals(committedOffsets.size, group.allOffsets.size) committedOffsets.foreach { case (topicPartition, offset) => assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + assertEquals(Some(commitOffsetsLogPosition), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset) } // We should have pending commits. @@ -335,9 +337,13 @@ class GroupMetadataManagerTest { ) val buffer = ByteBuffer.allocate(1024) - var nextOffset = 0 + var nextOffset = 0L + + val firstProduceRecordOffset = nextOffset nextOffset += appendTransactionalOffsetCommits(buffer, firstProducerId, firstProducerEpoch, nextOffset, committedOffsetsFirstProducer) nextOffset += completeTransactionalOffsetCommit(buffer, firstProducerId, firstProducerEpoch, nextOffset, isCommit = true) + + val secondProducerRecordOffset = nextOffset nextOffset += appendTransactionalOffsetCommits(buffer, secondProducerId, secondProducerEpoch, nextOffset, committedOffsetsSecondProducer) nextOffset += completeTransactionalOffsetCommit(buffer, secondProducerId, secondProducerEpoch, nextOffset, isCommit = true) buffer.flip() @@ -358,9 +364,11 @@ class GroupMetadataManagerTest { assertEquals(committedOffsetsFirstProducer.size + committedOffsetsSecondProducer.size, group.allOffsets.size) committedOffsetsFirstProducer.foreach { case (topicPartition, offset) => assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + assertEquals(Some(firstProduceRecordOffset), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset) } committedOffsetsSecondProducer.foreach { case (topicPartition, offset) => assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + assertEquals(Some(secondProducerRecordOffset), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset) } } From 05cbd775926a3be4ebd0c1633e9e00758abe5586 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Mon, 22 May 2017 21:45:46 -0700 Subject: [PATCH 7/8] Verify that the offset for the consumer offset commit record is the materialized in the cache correctly --- .../unit/kafka/coordinator/group/GroupMetadataManagerTest.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 323c8d55bc7e6..83187413c055a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -389,6 +389,7 @@ class GroupMetadataManagerTest { val buffer = ByteBuffer.allocate(1024) var nextOffset = 0 nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, transactionalOffsetCommits) + val consumerRecordOffset = nextOffset nextOffset += appendConsumerOffsetCommit(buffer, nextOffset, consumerOffsetCommits) nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true) buffer.flip() @@ -410,6 +411,7 @@ class GroupMetadataManagerTest { assertEquals(consumerOffsetCommits.size, group.allOffsets.size) consumerOffsetCommits.foreach { case (topicPartition, offset) => assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + assertEquals(Some(consumerRecordOffset), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset) } } From cb423e1c67fe7362631409d03a6870c98b4071d6 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Mon, 22 May 2017 22:57:19 -0700 Subject: [PATCH 8/8] Address Jason's comments --- .../kafka/coordinator/group/GroupMetadata.scala | 11 +++-------- .../coordinator/group/GroupMetadataManager.scala | 2 +- .../coordinator/group/GroupMetadataTest.scala | 14 +++++++------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 798d05a295839..302fcb5784d53 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -292,10 +292,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets } - def completePendingOffsetWrite(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) { + def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) { if (pendingOffsetCommits.contains(topicPartition)) { if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty) - throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record" + + throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " + "in the log.") if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata)) offsets.put(topicPartition, offsetWithCommitRecordMetadata) @@ -414,12 +414,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata) }.toMap - def offset(topicPartition: TopicPartition) : Option[OffsetAndMetadata] = offsets.get(topicPartition) match { - case Some(commitRecordMetadataAndOffset) => - Option(commitRecordMetadataAndOffset.offsetAndMetadata) - case None => - None - } + def offset(topicPartition: TopicPartition) : Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata) // visible for testing private[group] def offsetWithRecordMetadata(topicPartition: TopicPartition): Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index fa3eb0e375ae2..f8f536efc9b53 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -314,7 +314,7 @@ class GroupMetadataManager(brokerId: Int, if (isTxnOffsetCommit) group.onTxnOffsetCommitAppend(producerId, topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) else - group.completePendingOffsetWrite(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) + group.onOffsetCommitAppend(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) } } Errors.NONE diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index 0a38d6c1a93a1..0e13f89ec5873 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -302,7 +302,7 @@ class GroupMetadataTest extends JUnitSuite { assertTrue(group.hasOffsets) assertEquals(None, group.offset(partition)) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset)) + group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset)) assertTrue(group.hasOffsets) assertEquals(Some(offset), group.offset(partition)) } @@ -338,7 +338,7 @@ class GroupMetadataTest extends JUnitSuite { assertTrue(group.hasOffsets) assertEquals(None, group.offset(partition)) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(3L), secondOffset)) + group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(3L), secondOffset)) assertTrue(group.hasOffsets) assertEquals(Some(secondOffset), group.offset(partition)) } @@ -356,11 +356,11 @@ class GroupMetadataTest extends JUnitSuite { group.prepareOffsetCommit(Map(partition -> secondOffset)) assertTrue(group.hasOffsets) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(4L), firstOffset)) + group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(4L), firstOffset)) assertTrue(group.hasOffsets) assertEquals(Some(firstOffset), group.offset(partition)) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(5L), secondOffset)) + group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(5L), secondOffset)) assertTrue(group.hasOffsets) assertEquals(Some(secondOffset), group.offset(partition)) } @@ -380,7 +380,7 @@ class GroupMetadataTest extends JUnitSuite { assertTrue(group.hasOffsets) group.onTxnOffsetCommitAppend(producerId, partition, CommitRecordMetadataAndOffset(Some(3L), txnOffsetCommit)) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(4L), consumerOffsetCommit)) + group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(4L), consumerOffsetCommit)) assertTrue(group.hasOffsets) assertEquals(Some(consumerOffsetCommit), group.offset(partition)) @@ -404,7 +404,7 @@ class GroupMetadataTest extends JUnitSuite { group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit)) assertTrue(group.hasOffsets) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit)) + group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit)) group.onTxnOffsetCommitAppend(producerId, partition, CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit)) assertTrue(group.hasOffsets) // The transactional offset commit hasn't been committed yet, so we should materialize the consumer offset commit. @@ -431,7 +431,7 @@ class GroupMetadataTest extends JUnitSuite { group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit)) assertTrue(group.hasOffsets) - group.completePendingOffsetWrite(partition, CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit)) + group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit)) group.onTxnOffsetCommitAppend(producerId, partition, CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit)) assertTrue(group.hasOffsets) // The transactional offset commit hasn't been committed yet, so we should materialize the consumer offset commit.