From 2b5f603dfd6b2182b3537d6aad457a1bbf53a1c5 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 12 May 2017 19:58:41 -0700 Subject: [PATCH 1/7] MINOR: Some minor cleanups from TxnOffsetCommit patch --- .../coordinator/group/GroupCoordinator.scala | 84 ++-- core/src/main/scala/kafka/log/Log.scala | 14 +- .../src/main/scala/kafka/log/LogSegment.scala | 3 +- .../kafka/log/ProducerStateManager.scala | 28 +- .../main/scala/kafka/server/KafkaApis.scala | 74 ++-- .../group/GroupCoordinatorResponseTest.scala | 14 +- .../group/GroupCoordinatorTest.scala | 36 -- .../test/scala/unit/kafka/log/LogTest.scala | 4 +- .../kafka/log/ProducerStateManagerTest.scala | 407 +++++++++--------- 9 files changed, 334 insertions(+), 330 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 031a9c10466b3..ee93fa54586e0 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -25,10 +25,9 @@ import kafka.message.ProducerCompressionCodec import kafka.server._ import kafka.utils._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.RecordBatch -import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse, TransactionResult} +import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID} +import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.Time import scala.collection.{Map, Seq, immutable} @@ -48,6 +47,8 @@ class GroupCoordinator(val brokerId: Int, val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat], val joinPurgatory: DelayedOperationPurgatory[DelayedJoin], time: Time) extends Logging { + import GroupCoordinator._ + type JoinCallback = JoinGroupResult => Unit type SyncCallback = (Array[Byte], Errors) => Unit @@ -395,42 +396,51 @@ class GroupCoordinator(val brokerId: Int, } } + def handleTxnCommitOffsets(groupId: String, + producerId: Long, + producerEpoch: Short, + offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], + responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { + validateGroup(groupId) match { + case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) + case None => + val group = groupManager.getGroup(groupId).getOrElse(groupManager.addGroup(new GroupMetadata(groupId))) + doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback) + } + } + def handleCommitOffsets(groupId: String, memberId: String, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], - responseCallback: immutable.Map[TopicPartition, Errors] => Unit, - producerId: Long = RecordBatch.NO_PRODUCER_ID, - producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH) { - if (!isActive.get) { - responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE)) - } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR)) - } else if (isCoordinatorLoadInProgress(groupId)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_LOAD_IN_PROGRESS)) - } else { - groupManager.getGroup(groupId) match { - case None => - if (generationId < 0) { - // the group is not relying on Kafka for group management, so allow the commit - val group = groupManager.addGroup(new GroupMetadata(groupId)) - doCommitOffsets(group, memberId, generationId, producerId, producerEpoch, offsetMetadata, responseCallback) - } else { - // or this is a request coming from an older generation. either way, reject the commit - responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION)) - } + responseCallback: immutable.Map[TopicPartition, Errors] => Unit) { + validateGroup(groupId) match { + case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) + case None => + groupManager.getGroup(groupId) match { + case None => + if (generationId < 0) { + // the group is not relying on Kafka for group management, so allow the commit + val group = groupManager.addGroup(new GroupMetadata(groupId)) + doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH, + offsetMetadata, responseCallback) + } else { + // or this is a request coming from an older generation. either way, reject the commit + responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION)) + } - case Some(group) => - doCommitOffsets(group, memberId, generationId, producerId, producerEpoch, offsetMetadata, responseCallback) - } + case Some(group) => + doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH, + offsetMetadata, responseCallback) + } } } def handleTxnCompletion(producerId: Long, - topicPartitions: Seq[TopicPartition], + offsetsPartitions: Iterable[TopicPartition], transactionResult: TransactionResult) { - val offsetPartitions = topicPartitions.filter(_.topic == Topic.GROUP_METADATA_TOPIC_NAME).map(_.partition).toSet - groupManager.handleTxnCompletion(producerId, offsetPartitions, transactionResult == TransactionResult.COMMIT) + val isCommit = transactionResult == TransactionResult.COMMIT + groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) } private def doCommitOffsets(group: GroupMetadata, @@ -444,7 +454,7 @@ class GroupCoordinator(val brokerId: Int, group synchronized { if (group.is(Dead)) { responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID)) - } else if ((generationId < 0 && group.is(Empty)) || (producerId != RecordBatch.NO_PRODUCER_ID)) { + } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) { // the group is only using Kafka to store offsets // Also, for transactional offset commits we don't need to validate group membership and the generation. delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId, @@ -514,6 +524,18 @@ class GroupCoordinator(val brokerId: Int, groupManager.cleanupGroupMetadata(Some(topicPartitions)) } + + private def validateGroup(groupId: String): Option[Errors] = { + if (!isActive.get) + Some(Errors.COORDINATOR_NOT_AVAILABLE) + else if (!isCoordinatorForGroup(groupId)) + Some(Errors.NOT_COORDINATOR) + else if (isCoordinatorLoadInProgress(groupId)) + Some(Errors.COORDINATOR_LOAD_IN_PROGRESS) + else + None + } + private def onGroupUnloaded(group: GroupMetadata) { group synchronized { info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}") @@ -791,6 +813,8 @@ object GroupCoordinator { val NoProtocolType = "" val NoProtocol = "" val NoLeader = "" + val NoGeneration = -1 + val NoMemberId = "" val NoMembers = List[MemberSummary]() val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers) val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e3a21d1006497..dd22a263dce62 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -446,10 +446,8 @@ class Log(@volatile var dir: File, val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo] val completedTxns = ListBuffer.empty[CompletedTxn] records.batches.asScala.foreach { batch => - if (batch.hasProducerId) { - val lastEntry = producerStateManager.lastEntry(batch.producerId) - updateProducers(batch, loadedProducers, completedTxns, lastEntry, loadingFromLog = true) - } + if (batch.hasProducerId) + updateProducers(batch, loadedProducers, completedTxns, loadingFromLog = true) } loadedProducers.values.foreach(producerStateManager.update) completedTxns.foreach(producerStateManager.completeTxn) @@ -695,7 +693,7 @@ class Log(@volatile var dir: File, // the last appended entry to the client. if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch))) return (updatedProducers, completedTxns.toList, maybeLastEntry) - updateProducers(batch, updatedProducers, completedTxns, maybeLastEntry, loadingFromLog = false) + updateProducers(batch, updatedProducers, completedTxns, loadingFromLog = false) } (updatedProducers, completedTxns.toList, None) } @@ -780,12 +778,10 @@ class Log(@volatile var dir: File, private def updateProducers(batch: RecordBatch, producers: mutable.Map[Long, ProducerAppendInfo], completedTxns: ListBuffer[CompletedTxn], - lastEntry: Option[ProducerIdEntry], loadingFromLog: Boolean): Unit = { val producerId = batch.producerId - val appendInfo = producers.getOrElseUpdate(producerId, new ProducerAppendInfo(producerId, lastEntry, loadingFromLog)) - val shouldValidateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME - val maybeCompletedTxn = appendInfo.append(batch, shouldValidateSequenceNumbers) + val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, loadingFromLog)) + val maybeCompletedTxn = appendInfo.append(batch) maybeCompletedTxn.foreach(completedTxns += _) } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index cf3ef0e40ca95..70269bb793789 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -147,8 +147,7 @@ class LogSegment(val log: FileRecords, private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = { if (batch.hasProducerId) { val producerId = batch.producerId - val lastEntry = producerStateManager.lastEntry(producerId) - val appendInfo = new ProducerAppendInfo(batch.producerId, lastEntry, loadingFromLog = true) + val appendInfo = producerStateManager.prepareUpdate(producerId, loadingFromLog = true) val maybeCompletedTxn = appendInfo.append(batch) producerStateManager.update(appendInfo) maybeCompletedTxn.foreach { completedTxn => diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index ba7c470f1d0af..45188ba21b400 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -26,6 +26,7 @@ import kafka.server.LogOffsetMetadata import kafka.utils.{Logging, nonthreadsafe, threadsafe} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors._ +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch} import org.apache.kafka.common.utils.{ByteUtils, Crc32C} @@ -63,7 +64,10 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short, * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata * as the incoming records are validated. */ -private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: ProducerIdEntry, loadingFromLog: Boolean = false) { +private[log] class ProducerAppendInfo(val producerId: Long, + initialEntry: ProducerIdEntry, + validateSequenceNumbers: Boolean = true, + loadingFromLog: Boolean = false) { private var producerEpoch = initialEntry.producerEpoch private var firstSeq = initialEntry.firstSeq private var lastSeq = initialEntry.lastSeq @@ -73,14 +77,14 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc private var coordinatorEpoch = initialEntry.coordinatorEpoch private val transactions = ListBuffer.empty[TxnMetadata] - def this(producerId: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) = - this(producerId, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog) + def this(producerId: Long, initialEntry: Option[ProducerIdEntry], validateSequenceNumbers: Boolean, loadingFromLog: Boolean) = + this(producerId, initialEntry.getOrElse(ProducerIdEntry.Empty), validateSequenceNumbers, loadingFromLog) - private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers: Boolean) = { + private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = { if (this.producerEpoch > producerEpoch) { throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " + s"with a newer epoch. $producerEpoch (request epoch), ${this.producerEpoch} (server epoch)") - } else if (shouldValidateSequenceNumbers) { + } else if (validateSequenceNumbers) { if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < producerEpoch) { if (firstSeq != 0) throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " + @@ -100,7 +104,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc } } - def append(batch: RecordBatch, shouldValidateSequenceNumbers: Boolean = true): Option[CompletedTxn] = { + def append(batch: RecordBatch): Option[CompletedTxn] = { if (batch.isControlBatch) { val record = batch.iterator.next() val endTxnMarker = EndTransactionMarker.deserialize(record) @@ -108,7 +112,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc Some(completedTxn) } else { append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset, - batch.isTransactional, shouldValidateSequenceNumbers) + batch.isTransactional) None } } @@ -118,12 +122,11 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc lastSeq: Int, lastTimestamp: Long, lastOffset: Long, - isTransactional: Boolean, - shouldValidateSequenceNumbers: Boolean): Unit = { + isTransactional: Boolean): Unit = { if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog) // skip validation if this is the first entry when loading from the log. Log retention // will generally have removed the beginning entries from each producer id - validateAppend(epoch, firstSeq, lastSeq, shouldValidateSequenceNumbers) + validateAppend(epoch, firstSeq, lastSeq) this.producerEpoch = epoch this.firstSeq = firstSeq @@ -325,6 +328,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, import ProducerStateManager._ import java.util + private val validateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME private val producers = mutable.Map.empty[Long, ProducerIdEntry] private var lastMapOffset = 0L private var lastSnapOffset = 0L @@ -448,6 +452,10 @@ class ProducerStateManager(val topicPartition: TopicPartition, } } + def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo = { + new ProducerAppendInfo(producerId, lastEntry(producerId), validateSequenceNumbers, loadingFromLog) + } + /** * Update the mapping with the given append information */ diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5e9cd9f54621c..755e5e6b60f48 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -39,7 +39,7 @@ import kafka.security.auth._ import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.common.internals.Topic.{isInternal, GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} +import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} @@ -1454,17 +1454,20 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { errors.put(producerId, responseStatus.mapValues(_.error).asJava) - val successfulPartitions = responseStatus.filter { case (_, partitionResponse) => - partitionResponse.error == Errors.NONE - }.keys.toSeq + val offsetsPartitions = responseStatus.filterKeys(_.topic == GROUP_METADATA_TOPIC_NAME) + if (offsetsPartitions.nonEmpty) { + val successfulOffsetsPartitions = offsetsPartitions.filter { case (_, partitionResponse) => + partitionResponse.error == Errors.NONE + }.keys - try { - groupCoordinator.handleTxnCompletion(producerId = producerId, topicPartitions = successfulPartitions, transactionResult = result) - } catch { - case e: Exception => - error(s"Received an exception while trying to update the offsets cache on transaction completion: $e") - val producerIdErrors = errors.get(producerId) - successfulPartitions.foreach(producerIdErrors.put(_, Errors.UNKNOWN)) + try { + groupCoordinator.handleTxnCompletion(producerId, successfulOffsetsPartitions, result) + } catch { + case e: Exception => + error(s"Received an exception while trying to update the offsets cache on transaction marker append", e) + val partitionErrors = errors.get(producerId) + successfulOffsetsPartitions.foreach(partitionErrors.put(_, Errors.UNKNOWN)) + } } if (numAppends.decrementAndGet() == 0) @@ -1597,8 +1600,9 @@ class KafkaApis(val requestChannel: RequestChannel, val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic)) val exists = metadataCache.contains(topicPartition.topic) if (!authorizedForDescribe && exists) - debug(s"Transaction Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION") + debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " + + s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning " + + s"UNKNOWN_TOPIC_OR_PARTITION") authorizedForDescribe && exists } @@ -1607,7 +1611,7 @@ class KafkaApis(val requestChannel: RequestChannel, } // the callback for sending an offset commit response - def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) { + def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) { val combinedCommitStatus = commitStatus ++ unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION) @@ -1615,7 +1619,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (isDebugEnabled) combinedCommitStatus.foreach { case (topicPartition, error) => if (error != Errors.NONE) { - debug(s"TxnOffsetCommit request with correlation id ${header.correlationId} from client ${header.clientId} " + + debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " + s"on partition $topicPartition failed due to ${error.exceptionName}") } } @@ -1626,33 +1630,29 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.isEmpty) sendResponseCallback(Map.empty) else { - val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs - - // commit timestamp is always set to now. - // "default" expiration timestamp is now + retention (and retention may be overridden if v2) - val currentTimestamp = time.milliseconds - val defaultExpireTimestamp = offsetRetention + currentTimestamp - val partitionData = authorizedTopics.mapValues { partitionData => - val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata - new OffsetAndMetadata( - offsetMetadata = OffsetMetadata(partitionData.offset, metadata), - commitTimestamp = currentTimestamp, - expireTimestamp = defaultExpireTimestamp - ) - } - - // call coordinator to handle commit offset - groupCoordinator.handleCommitOffsets( + val offsetMetadata = convertTxnOffsets(authorizedTopics) + groupCoordinator.handleTxnCommitOffsets( txnOffsetCommitRequest.consumerGroupId, - null, - -1, - partitionData, - sendResponseCallback, txnOffsetCommitRequest.producerId, - txnOffsetCommitRequest.producerEpoch) + txnOffsetCommitRequest.producerEpoch, + offsetMetadata, + sendResponseCallback) } } + } + private def convertTxnOffsets(offsetsMap: immutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]): immutable.Map[TopicPartition, OffsetAndMetadata] = { + val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs + val currentTimestamp = time.milliseconds + val defaultExpireTimestamp = offsetRetention + currentTimestamp + offsetsMap.mapValues { partitionData => + val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata + new OffsetAndMetadata( + offsetMetadata = OffsetMetadata(partitionData.offset, metadata), + commitTimestamp = currentTimestamp, + expireTimestamp = defaultExpireTimestamp + ) + } } def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = { diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala index 0ace2e7408ba0..efa0a3b413f41 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala @@ -39,9 +39,6 @@ import scala.collection._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future, Promise, TimeoutException} -/** - * Test GroupCoordinator responses - */ class GroupCoordinatorResponseTest extends JUnitSuite { type JoinGroupCallback = JoinGroupResult => Unit type SyncGroupCallbackParams = (Array[Byte], Errors) @@ -115,6 +112,15 @@ class GroupCoordinatorResponseTest extends JUnitSuite { groupCoordinator.shutdown() } + @Test + def testOffsetsRetentionMsIntegerOverflow() { + val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") + props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString) + val config = KafkaConfig.fromProps(props) + val offsetConfig = GroupCoordinator.offsetConfig(config) + assertEquals(offsetConfig.offsetsRetentionMs, Integer.MAX_VALUE * 60L * 1000L) + } + @Test def testJoinGroupWrongCoordinator() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID @@ -1462,7 +1468,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V2)).anyTimes() EasyMock.replay(replicaManager) - groupCoordinator.handleCommitOffsets(groupId, null, -1, offsets, responseCallback, producerId, producerEpoch) + groupCoordinator.handleTxnCommitOffsets(groupId, producerId, producerEpoch, offsets, responseCallback) val result = Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) EasyMock.reset(replicaManager) result diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala deleted file mode 100644 index fd981b200ff74..0000000000000 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator.group - -import kafka.server.KafkaConfig -import kafka.utils.TestUtils -import org.junit.Assert._ -import org.junit.Test - -class GroupCoordinatorTest { - - @Test - def testOffsetsRetentionMsIntegerOverflow() { - val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") - props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString) - val config = KafkaConfig.fromProps(props) - val offsetConfig = GroupCoordinator.offsetConfig(config) - assertEquals(offsetConfig.offsetsRetentionMs, Integer.MAX_VALUE * 60L * 1000L) - } - -} diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 8c330ed86060b..1ef2d67ba2ac5 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -2270,7 +2270,7 @@ class LogTest { } @Test - def testRecoverTransactionIndex(): Unit = { + def testFullTransactionIndexRecovery(): Unit = { val log = createLog(128) val epoch = 0.toShort @@ -2589,7 +2589,7 @@ class LogTest { new Log(logDir, config, logStartOffset = 0L, - recoveryPoint = 0L, + recoveryPoint = recoveryPoint, scheduler = time.scheduler, brokerTopicStats = brokerTopicStats, time = time, diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index ad26339fc4712..04335be68adff 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -31,22 +31,22 @@ import org.junit.{After, Before, Test} import org.scalatest.junit.JUnitSuite class ProducerStateManagerTest extends JUnitSuite { - var idMappingDir: File = null - var idMapping: ProducerStateManager = null + var logDir: File = null + var stateManager: ProducerStateManager = null val partition = new TopicPartition("test", 0) - val pid = 1L + val producerId = 1L val maxPidExpirationMs = 60 * 1000 val time = new MockTime @Before def setUp(): Unit = { - idMappingDir = TestUtils.tempDir() - idMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + logDir = TestUtils.tempDir() + stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs) } @After def tearDown(): Unit = { - Utils.delete(idMappingDir) + Utils.delete(logDir) } @Test @@ -54,27 +54,27 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 0.toShort // First entry for id 0 added - append(idMapping, pid, 0, epoch, 0L, 0L) + append(stateManager, producerId, epoch, 0, 0L, 0L) // Second entry for id 0 added - append(idMapping, pid, 1, epoch, 0L, 1L) + append(stateManager, producerId, epoch, 1, 0L, 1L) // Duplicate sequence number (matches previous sequence number) assertThrows[DuplicateSequenceNumberException] { - append(idMapping, pid, 1, epoch, 0L, 1L) + append(stateManager, producerId, epoch, 1, 0L, 1L) } // Invalid sequence number (greater than next expected sequence number) assertThrows[OutOfOrderSequenceException] { - append(idMapping, pid, 5, epoch, 0L, 2L) + append(stateManager, producerId, epoch, 5, 0L, 2L) } // Change epoch - append(idMapping, pid, 0, (epoch + 1).toShort, 0L, 3L) + append(stateManager, producerId, (epoch + 1).toShort, 0, 0L, 3L) // Incorrect epoch assertThrows[ProducerFencedException] { - append(idMapping, pid, 0, epoch, 0L, 4L) + append(stateManager, producerId, epoch, 0, 0L, 4L) } } @@ -83,9 +83,9 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 5.toShort val sequence = 16 val offset = 735L - append(idMapping, pid, sequence, epoch, offset, isLoadingFromLog = true) + append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true) - val maybeLastEntry = idMapping.lastEntry(pid) + val maybeLastEntry = stateManager.lastEntry(producerId) assertTrue(maybeLastEntry.isDefined) val lastEntry = maybeLastEntry.get @@ -99,17 +99,17 @@ class ProducerStateManagerTest extends JUnitSuite { @Test def testControlRecordBumpsEpoch(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L) + append(stateManager, producerId, epoch, 0, 0L) val bumpedEpoch = 1.toShort - val (completedTxn, lastStableOffset) = appendEndTxnMarker(idMapping, pid, bumpedEpoch, ControlRecordType.ABORT, 1L) + val (completedTxn, lastStableOffset) = appendEndTxnMarker(stateManager, producerId, bumpedEpoch, ControlRecordType.ABORT, 1L) assertEquals(1L, completedTxn.firstOffset) assertEquals(1L, completedTxn.lastOffset) assertEquals(2L, lastStableOffset) assertTrue(completedTxn.isAborted) - assertEquals(pid, completedTxn.producerId) + assertEquals(producerId, completedTxn.producerId) - val maybeLastEntry = idMapping.lastEntry(pid) + val maybeLastEntry = stateManager.lastEntry(producerId) assertTrue(maybeLastEntry.isDefined) val lastEntry = maybeLastEntry.get @@ -119,8 +119,8 @@ class ProducerStateManagerTest extends JUnitSuite { assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq) // should be able to append with the new epoch if we start at sequence 0 - append(idMapping, pid, 0, bumpedEpoch, 2L) - assertEquals(Some(0), idMapping.lastEntry(pid).map(_.firstSeq)) + append(stateManager, producerId, bumpedEpoch, 0, 2L) + assertEquals(Some(0), stateManager.lastEntry(producerId).map(_.firstSeq)) } @Test @@ -128,16 +128,15 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(pid, None, false) - producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true, - shouldValidateSequenceNumbers = true) + val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty) + producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L, relativePositionInSegment = 234224) producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) - idMapping.update(producerAppendInfo) + stateManager.update(producerAppendInfo) - assertEquals(Some(logOffsetMetadata), idMapping.firstUnstableOffset) + assertEquals(Some(logOffsetMetadata), stateManager.firstUnstableOffset) } @Test @@ -145,18 +144,17 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(pid, None, false) - producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true, - shouldValidateSequenceNumbers = true) + val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty) + producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) // use some other offset to simulate a follower append where the log offset metadata won't typically // match any of the transaction first offsets val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset - 23429, segmentBaseOffset = 990000L, relativePositionInSegment = 234224) producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) - idMapping.update(producerAppendInfo) + stateManager.update(producerAppendInfo) - assertEquals(Some(LogOffsetMetadata(offset)), idMapping.firstUnstableOffset) + assertEquals(Some(LogOffsetMetadata(offset)), stateManager.firstUnstableOffset) } @Test @@ -164,11 +162,10 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 0.toShort val coordinatorEpoch = 15 val offset = 9L - append(idMapping, pid, 0, producerEpoch, offset) + append(stateManager, producerId, producerEpoch, 0, offset) - val appendInfo = new ProducerAppendInfo(pid, idMapping.lastEntry(pid), loadingFromLog = false) - appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true, - shouldValidateSequenceNumbers = true) + val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false) + appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true) var lastEntry = appendInfo.lastEntry assertEquals(producerEpoch, lastEntry.producerEpoch) assertEquals(1, lastEntry.firstSeq) @@ -176,10 +173,9 @@ class ProducerStateManagerTest extends JUnitSuite { assertEquals(16L, lastEntry.firstOffset) assertEquals(20L, lastEntry.lastOffset) assertEquals(Some(16L), lastEntry.currentTxnFirstOffset) - assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions) + assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) - appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, isTransactional = true, - shouldValidateSequenceNumbers = true) + appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, isTransactional = true) lastEntry = appendInfo.lastEntry assertEquals(producerEpoch, lastEntry.producerEpoch) assertEquals(6, lastEntry.firstSeq) @@ -187,11 +183,11 @@ class ProducerStateManagerTest extends JUnitSuite { assertEquals(26L, lastEntry.firstOffset) assertEquals(30L, lastEntry.lastOffset) assertEquals(Some(16L), lastEntry.currentTxnFirstOffset) - assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions) + assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) val completedTxn = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds()) - assertEquals(pid, completedTxn.producerId) + assertEquals(producerId, completedTxn.producerId) assertEquals(16L, completedTxn.firstOffset) assertEquals(40L, completedTxn.lastOffset) assertFalse(completedTxn.isAborted) @@ -204,112 +200,112 @@ class ProducerStateManagerTest extends JUnitSuite { assertEquals(40L, lastEntry.lastOffset) assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch) assertEquals(None, lastEntry.currentTxnFirstOffset) - assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions) + assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) } @Test(expected = classOf[OutOfOrderSequenceException]) def testOutOfSequenceAfterControlRecordEpochBump(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L) - append(idMapping, pid, 1, epoch, 1L) + append(stateManager, producerId, epoch, 0, 0L) + append(stateManager, producerId, epoch, 1, 1L) val bumpedEpoch = 1.toShort - appendEndTxnMarker(idMapping, pid, bumpedEpoch, ControlRecordType.ABORT, 1L) + appendEndTxnMarker(stateManager, producerId, bumpedEpoch, ControlRecordType.ABORT, 1L) // next append is invalid since we expect the sequence to be reset - append(idMapping, pid, 2, bumpedEpoch, 2L) + append(stateManager, producerId, bumpedEpoch, 2, 2L) } @Test(expected = classOf[InvalidTxnStateException]) def testNonTransactionalAppendWithOngoingTransaction(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L, isTransactional = true) - append(idMapping, pid, 1, epoch, 1L, isTransactional = false) + append(stateManager, producerId, epoch, 0, 0L, isTransactional = true) + append(stateManager, producerId, epoch, 1, 1L, isTransactional = false) } @Test def testTruncateAndReloadRemovesOutOfRangeSnapshots(): Unit = { val epoch = 0.toShort - append(idMapping, pid, epoch, 0, 0L) - idMapping.takeSnapshot() - append(idMapping, pid, epoch, 1, 1L) - idMapping.takeSnapshot() - append(idMapping, pid, epoch, 2, 2L) - idMapping.takeSnapshot() - append(idMapping, pid, epoch, 3, 3L) - idMapping.takeSnapshot() - append(idMapping, pid, epoch, 4, 4L) - idMapping.takeSnapshot() - - idMapping.truncateAndReload(1L, 3L, time.milliseconds()) - - assertEquals(Some(2L), idMapping.oldestSnapshotOffset) - assertEquals(Some(3L), idMapping.latestSnapshotOffset) + append(stateManager, producerId, epoch, 0, 0L) + stateManager.takeSnapshot() + append(stateManager, producerId, epoch, 1, 1L) + stateManager.takeSnapshot() + append(stateManager, producerId, epoch, 2, 2L) + stateManager.takeSnapshot() + append(stateManager, producerId, epoch, 3, 3L) + stateManager.takeSnapshot() + append(stateManager, producerId, epoch, 4, 4L) + stateManager.takeSnapshot() + + stateManager.truncateAndReload(1L, 3L, time.milliseconds()) + + assertEquals(Some(2L), stateManager.oldestSnapshotOffset) + assertEquals(Some(3L), stateManager.latestSnapshotOffset) } @Test def testTakeSnapshot(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L, 0L) - append(idMapping, pid, 1, epoch, 1L, 1L) + append(stateManager, producerId, epoch, 0, 0L, 0L) + append(stateManager, producerId, epoch, 1, 1L, 1L) // Take snapshot - idMapping.takeSnapshot() + stateManager.takeSnapshot() // Check that file exists and it is not empty - assertEquals("Directory doesn't contain a single file as expected", 1, idMappingDir.list().length) - assertTrue("Snapshot file is empty", idMappingDir.list().head.length > 0) + assertEquals("Directory doesn't contain a single file as expected", 1, logDir.list().length) + assertTrue("Snapshot file is empty", logDir.list().head.length > 0) } @Test def testRecoverFromSnapshot(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L) - append(idMapping, pid, 1, epoch, 1L) + append(stateManager, producerId, epoch, 0, 0L) + append(stateManager, producerId, epoch, 1, 1L) - idMapping.takeSnapshot() - val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + stateManager.takeSnapshot() + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds) // entry added after recovery - append(recoveredMapping, pid, 2, epoch, 2L) + append(recoveredMapping, producerId, epoch, 2, 2L) } @Test(expected = classOf[OutOfOrderSequenceException]) def testRemoveExpiredPidsOnReload(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L, 0) - append(idMapping, pid, 1, epoch, 1L, 1) + append(stateManager, producerId, epoch, 0, 0L, 0) + append(stateManager, producerId, epoch, 1, 1L, 1) - idMapping.takeSnapshot() - val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + stateManager.takeSnapshot() + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) recoveredMapping.truncateAndReload(0L, 1L, 70000) // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence // we should get an out of order sequence exception. - append(recoveredMapping, pid, 2, epoch, 2L, 70001) + append(recoveredMapping, producerId, epoch, 2, 2L, 70001) } @Test def testDeleteSnapshotsBefore(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L) - append(idMapping, pid, 1, epoch, 1L) - idMapping.takeSnapshot() - assertEquals(1, idMappingDir.listFiles().length) + append(stateManager, producerId, epoch, 0, 0L) + append(stateManager, producerId, epoch, 1, 1L) + stateManager.takeSnapshot() + assertEquals(1, logDir.listFiles().length) assertEquals(Set(2), currentSnapshotOffsets) - append(idMapping, pid, 2, epoch, 2L) - idMapping.takeSnapshot() - assertEquals(2, idMappingDir.listFiles().length) + append(stateManager, producerId, epoch, 2, 2L) + stateManager.takeSnapshot() + assertEquals(2, logDir.listFiles().length) assertEquals(Set(2, 3), currentSnapshotOffsets) - idMapping.deleteSnapshotsBefore(3L) - assertEquals(1, idMappingDir.listFiles().length) + stateManager.deleteSnapshotsBefore(3L) + assertEquals(1, logDir.listFiles().length) assertEquals(Set(3), currentSnapshotOffsets) - idMapping.deleteSnapshotsBefore(4L) - assertEquals(0, idMappingDir.listFiles().length) + stateManager.deleteSnapshotsBefore(4L) + assertEquals(0, logDir.listFiles().length) assertEquals(Set(), currentSnapshotOffsets) } @@ -317,25 +313,25 @@ class ProducerStateManagerTest extends JUnitSuite { def testTruncate(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L) - append(idMapping, pid, 1, epoch, 1L) - idMapping.takeSnapshot() - assertEquals(1, idMappingDir.listFiles().length) + append(stateManager, producerId, epoch, 0, 0L) + append(stateManager, producerId, epoch, 1, 1L) + stateManager.takeSnapshot() + assertEquals(1, logDir.listFiles().length) assertEquals(Set(2), currentSnapshotOffsets) - append(idMapping, pid, 2, epoch, 2L) - idMapping.takeSnapshot() - assertEquals(2, idMappingDir.listFiles().length) + append(stateManager, producerId, epoch, 2, 2L) + stateManager.takeSnapshot() + assertEquals(2, logDir.listFiles().length) assertEquals(Set(2, 3), currentSnapshotOffsets) - idMapping.truncate() + stateManager.truncate() - assertEquals(0, idMappingDir.listFiles().length) + assertEquals(0, logDir.listFiles().length) assertEquals(Set(), currentSnapshotOffsets) - append(idMapping, pid, 0, epoch, 0L) - idMapping.takeSnapshot() - assertEquals(1, idMappingDir.listFiles().length) + append(stateManager, producerId, epoch, 0, 0L) + stateManager.takeSnapshot() + assertEquals(1, logDir.listFiles().length) assertEquals(Set(1), currentSnapshotOffsets) } @@ -344,80 +340,80 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 0.toShort val sequence = 0 - append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) - assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset)) - idMapping.takeSnapshot() + append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) + assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset)) + stateManager.takeSnapshot() - appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 105) - idMapping.onHighWatermarkUpdated(106) - assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset)) - idMapping.takeSnapshot() + appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 105) + stateManager.onHighWatermarkUpdated(106) + assertEquals(None, stateManager.firstUnstableOffset.map(_.messageOffset)) + stateManager.takeSnapshot() - append(idMapping, pid, sequence + 1, epoch, offset = 106) - idMapping.truncateAndReload(0L, 106, time.milliseconds()) - assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset)) + append(stateManager, producerId, epoch, sequence + 1, offset = 106) + stateManager.truncateAndReload(0L, 106, time.milliseconds()) + assertEquals(None, stateManager.firstUnstableOffset.map(_.messageOffset)) - idMapping.truncateAndReload(0L, 100L, time.milliseconds()) - assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset)) + stateManager.truncateAndReload(0L, 100L, time.milliseconds()) + assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset)) } @Test def testFirstUnstableOffsetAfterEviction(): Unit = { val epoch = 0.toShort val sequence = 0 - append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) - assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset)) - append(idMapping, 2L, 0, epoch, offset = 106, isTransactional = true) - idMapping.evictUnretainedProducers(100) - assertEquals(Some(106), idMapping.firstUnstableOffset.map(_.messageOffset)) + append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) + assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset)) + append(stateManager, 2L, epoch, 0, offset = 106, isTransactional = true) + stateManager.evictUnretainedProducers(100) + assertEquals(Some(106), stateManager.firstUnstableOffset.map(_.messageOffset)) } @Test def testEvictUnretainedPids(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L) - append(idMapping, pid, 1, epoch, 1L) - idMapping.takeSnapshot() + append(stateManager, producerId, epoch, 0, 0L) + append(stateManager, producerId, epoch, 1, 1L) + stateManager.takeSnapshot() val anotherPid = 2L - append(idMapping, anotherPid, 0, epoch, 2L) - append(idMapping, anotherPid, 1, epoch, 3L) - idMapping.takeSnapshot() + append(stateManager, anotherPid, epoch, 0, 2L) + append(stateManager, anotherPid, epoch, 1, 3L) + stateManager.takeSnapshot() assertEquals(Set(2, 4), currentSnapshotOffsets) - idMapping.evictUnretainedProducers(2) + stateManager.evictUnretainedProducers(2) assertEquals(Set(4), currentSnapshotOffsets) - assertEquals(Set(anotherPid), idMapping.activeProducers.keySet) - assertEquals(None, idMapping.lastEntry(pid)) + assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) + assertEquals(None, stateManager.lastEntry(producerId)) - val maybeEntry = idMapping.lastEntry(anotherPid) + val maybeEntry = stateManager.lastEntry(anotherPid) assertTrue(maybeEntry.isDefined) assertEquals(3L, maybeEntry.get.lastOffset) - idMapping.evictUnretainedProducers(3) - assertEquals(Set(anotherPid), idMapping.activeProducers.keySet) + stateManager.evictUnretainedProducers(3) + assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) assertEquals(Set(4), currentSnapshotOffsets) - assertEquals(4, idMapping.mapEndOffset) + assertEquals(4, stateManager.mapEndOffset) - idMapping.evictUnretainedProducers(5) - assertEquals(Set(), idMapping.activeProducers.keySet) + stateManager.evictUnretainedProducers(5) + assertEquals(Set(), stateManager.activeProducers.keySet) assertEquals(Set(), currentSnapshotOffsets) - assertEquals(5, idMapping.mapEndOffset) + assertEquals(5, stateManager.mapEndOffset) } @Test def testSkipSnapshotIfOffsetUnchanged(): Unit = { val epoch = 0.toShort - append(idMapping, pid, 0, epoch, 0L, 0L) + append(stateManager, producerId, epoch, 0, 0L, 0L) - idMapping.takeSnapshot() - assertEquals(1, idMappingDir.listFiles().length) + stateManager.takeSnapshot() + assertEquals(1, logDir.listFiles().length) assertEquals(Set(1), currentSnapshotOffsets) // nothing changed so there should be no new snapshot - idMapping.takeSnapshot() - assertEquals(1, idMappingDir.listFiles().length) + stateManager.takeSnapshot() + assertEquals(1, logDir.listFiles().length) assertEquals(Set(1), currentSnapshotOffsets) } @@ -425,16 +421,16 @@ class ProducerStateManagerTest extends JUnitSuite { def testStartOffset(): Unit = { val epoch = 0.toShort val pid2 = 2L - append(idMapping, pid2, 0, epoch, 0L, 1L) - append(idMapping, pid, 0, epoch, 1L, 2L) - append(idMapping, pid, 1, epoch, 2L, 3L) - append(idMapping, pid, 2, epoch, 3L, 4L) - idMapping.takeSnapshot() + append(stateManager, pid2, epoch, 0, 0L, 1L) + append(stateManager, producerId, epoch, 0, 1L, 2L) + append(stateManager, producerId, epoch, 1, 2L, 3L) + append(stateManager, producerId, epoch, 2, 3L, 4L) + stateManager.takeSnapshot() intercept[OutOfOrderSequenceException] { - val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) recoveredMapping.truncateAndReload(0L, 1L, time.milliseconds) - append(recoveredMapping, pid2, 1, epoch, 4L, 5L) + append(recoveredMapping, pid2, epoch, 1, 4L, 5L) } } @@ -442,10 +438,10 @@ class ProducerStateManagerTest extends JUnitSuite { def testPidExpirationTimeout() { val epoch = 5.toShort val sequence = 37 - append(idMapping, pid, sequence, epoch, 1L) + append(stateManager, producerId, epoch, sequence, 1L) time.sleep(maxPidExpirationMs + 1) - idMapping.removeExpiredProducers(time.milliseconds) - append(idMapping, pid, sequence + 1, epoch, 1L) + stateManager.removeExpiredProducers(time.milliseconds) + append(stateManager, producerId, epoch, sequence + 1, 1L) } @Test @@ -453,33 +449,33 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 5.toShort val sequence = 0 - assertEquals(None, idMapping.firstUndecidedOffset) + assertEquals(None, stateManager.firstUndecidedOffset) - append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) - assertEquals(Some(99L), idMapping.firstUndecidedOffset) - assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset)) + append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) + assertEquals(Some(99L), stateManager.firstUndecidedOffset) + assertEquals(Some(99L), stateManager.firstUnstableOffset.map(_.messageOffset)) val anotherPid = 2L - append(idMapping, anotherPid, sequence, epoch, offset = 105, isTransactional = true) - assertEquals(Some(99L), idMapping.firstUndecidedOffset) - assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset)) + append(stateManager, anotherPid, epoch, sequence, offset = 105, isTransactional = true) + assertEquals(Some(99L), stateManager.firstUndecidedOffset) + assertEquals(Some(99L), stateManager.firstUnstableOffset.map(_.messageOffset)) - appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 109) - assertEquals(Some(105L), idMapping.firstUndecidedOffset) - assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset)) + appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 109) + assertEquals(Some(105L), stateManager.firstUndecidedOffset) + assertEquals(Some(99L), stateManager.firstUnstableOffset.map(_.messageOffset)) - idMapping.onHighWatermarkUpdated(100L) - assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset)) + stateManager.onHighWatermarkUpdated(100L) + assertEquals(Some(99L), stateManager.firstUnstableOffset.map(_.messageOffset)) - idMapping.onHighWatermarkUpdated(110L) - assertEquals(Some(105L), idMapping.firstUnstableOffset.map(_.messageOffset)) + stateManager.onHighWatermarkUpdated(110L) + assertEquals(Some(105L), stateManager.firstUnstableOffset.map(_.messageOffset)) - appendEndTxnMarker(idMapping, anotherPid, epoch, ControlRecordType.ABORT, offset = 112) - assertEquals(None, idMapping.firstUndecidedOffset) - assertEquals(Some(105L), idMapping.firstUnstableOffset.map(_.messageOffset)) + appendEndTxnMarker(stateManager, anotherPid, epoch, ControlRecordType.ABORT, offset = 112) + assertEquals(None, stateManager.firstUndecidedOffset) + assertEquals(Some(105L), stateManager.firstUnstableOffset.map(_.messageOffset)) - idMapping.onHighWatermarkUpdated(113L) - assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset)) + stateManager.onHighWatermarkUpdated(113L) + assertEquals(None, stateManager.firstUnstableOffset.map(_.messageOffset)) } @Test @@ -487,17 +483,28 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 5.toShort val sequence = 0 - append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) - assertEquals(Some(99L), idMapping.firstUndecidedOffset) + append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) + assertEquals(Some(99L), stateManager.firstUndecidedOffset) time.sleep(maxPidExpirationMs + 1) - idMapping.removeExpiredProducers(time.milliseconds) + stateManager.removeExpiredProducers(time.milliseconds) - assertTrue(idMapping.lastEntry(pid).isDefined) - assertEquals(Some(99L), idMapping.firstUndecidedOffset) + assertTrue(stateManager.lastEntry(producerId).isDefined) + assertEquals(Some(99L), stateManager.firstUndecidedOffset) + + stateManager.removeExpiredProducers(time.milliseconds) + assertTrue(stateManager.lastEntry(producerId).isDefined) + } + + @Test + def testSequenceNotValidatedForGroupMetadataTopic(): Unit = { + val partition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) + val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + + val epoch = 0.toShort + append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 99, isTransactional = true) + append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 100, isTransactional = true) - idMapping.removeExpiredProducers(time.milliseconds) - assertTrue(idMapping.lastEntry(pid).isDefined) } @Test(expected = classOf[ProducerFencedException]) @@ -505,10 +512,10 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 5.toShort val sequence = 0 - assertEquals(None, idMapping.firstUndecidedOffset) + assertEquals(None, stateManager.firstUndecidedOffset) - append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) - appendEndTxnMarker(idMapping, pid, 3.toShort, ControlRecordType.COMMIT, offset=100) + append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) + appendEndTxnMarker(stateManager, producerId, 3.toShort, ControlRecordType.COMMIT, offset=100) } @Test @@ -516,21 +523,21 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 5.toShort val sequence = 0 - append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) - appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1) + append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) + appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1) - val lastEntry = idMapping.lastEntry(pid) + val lastEntry = stateManager.lastEntry(producerId) assertEquals(Some(1), lastEntry.map(_.coordinatorEpoch)) // writing with the current epoch is allowed - appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 101, coordinatorEpoch = 1) + appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 101, coordinatorEpoch = 1) // bumping the epoch is allowed - appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 102, coordinatorEpoch = 2) + appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 102, coordinatorEpoch = 2) // old epochs are not allowed try { - appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 103, coordinatorEpoch = 1) + appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.COMMIT, offset = 103, coordinatorEpoch = 1) fail("Expected coordinator to be fenced") } catch { case e: TransactionCoordinatorFencedException => @@ -539,49 +546,49 @@ class ProducerStateManagerTest extends JUnitSuite { @Test(expected = classOf[TransactionCoordinatorFencedException]) def testCoordinatorFencedAfterReload(): Unit = { - val epoch = 0.toShort - append(idMapping, pid, 0, epoch, offset = 99, isTransactional = true) - appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1) - idMapping.takeSnapshot() + val producerEpoch = 0.toShort + append(stateManager, producerId, producerEpoch, 0, offset = 99, isTransactional = true) + appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1) + stateManager.takeSnapshot() - val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) recoveredMapping.truncateAndReload(0L, 2L, 70000) // append from old coordinator should be rejected - appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 0) + appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 0) } private def appendEndTxnMarker(mapping: ProducerStateManager, - pid: Long, - epoch: Short, + producerId: Long, + producerEpoch: Short, controlType: ControlRecordType, offset: Long, coordinatorEpoch: Int = 0, timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = { - val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid).getOrElse(ProducerIdEntry.Empty)) + val producerAppendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false) val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch) - val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, epoch, offset, timestamp) + val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp) mapping.update(producerAppendInfo) val lastStableOffset = mapping.completeTxn(completedTxn) mapping.updateMapEndOffset(offset + 1) (completedTxn, lastStableOffset) } - private def append(mapping: ProducerStateManager, - pid: Long, + private def append(stateManager: ProducerStateManager, + producerId: Long, + producerEpoch: Short, seq: Int, - epoch: Short, offset: Long, timestamp: Long = time.milliseconds(), isTransactional: Boolean = false, isLoadingFromLog: Boolean = false): Unit = { - val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid), isLoadingFromLog) - producerAppendInfo.append(epoch, seq, seq, timestamp, offset, isTransactional, shouldValidateSequenceNumbers = true) - mapping.update(producerAppendInfo) - mapping.updateMapEndOffset(offset + 1) + val producerAppendInfo = stateManager.prepareUpdate(producerId, isLoadingFromLog) + producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, isTransactional) + stateManager.update(producerAppendInfo) + stateManager.updateMapEndOffset(offset + 1) } private def currentSnapshotOffsets = - idMappingDir.listFiles().map(file => Log.offsetFromFilename(file.getName)).toSet + logDir.listFiles().map(file => Log.offsetFromFilename(file.getName)).toSet } From 888c49d4b6ca06118c75eb98a38e2bc07abffa5b Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 15 May 2017 16:09:22 -0700 Subject: [PATCH 2/7] Use Files.delete in log test case to ensure exception is raised on failure --- core/src/test/scala/unit/kafka/log/LogTest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1ef2d67ba2ac5..99ebd15476b15 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -19,6 +19,7 @@ package kafka.log import java.io._ import java.nio.ByteBuffer +import java.nio.file.Files import java.util.Properties import org.apache.kafka.common.errors._ @@ -2405,7 +2406,7 @@ class LogTest { // delete all snapshot files logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => - file.delete() + Files.delete(file.toPath) } // delete the last offset and transaction index files to force recovery. this should force us to rebuild From 0a4c7e69bf872d061b1d46a42b39c723544b315a Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 16 May 2017 11:37:50 -0700 Subject: [PATCH 3/7] fix some stuff that needs fixin --- .../coordinator/group/GroupCoordinator.scala | 2 ++ .../main/scala/kafka/server/KafkaApis.scala | 18 +++++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index ee93fa54586e0..36e3c63c940ec 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -25,6 +25,7 @@ import kafka.message.ProducerCompressionCodec import kafka.server._ import kafka.utils._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID} import org.apache.kafka.common.requests._ @@ -439,6 +440,7 @@ class GroupCoordinator(val brokerId: Int, def handleTxnCompletion(producerId: Long, offsetsPartitions: Iterable[TopicPartition], transactionResult: TransactionResult) { + require(offsetsPartitions.forall(_.topic == Topic.GROUP_METADATA_TOPIC_NAME)) val isCommit = transactionResult == TransactionResult.COMMIT groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 755e5e6b60f48..192432cd0aec5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1454,12 +1454,13 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { errors.put(producerId, responseStatus.mapValues(_.error).asJava) - val offsetsPartitions = responseStatus.filterKeys(_.topic == GROUP_METADATA_TOPIC_NAME) - if (offsetsPartitions.nonEmpty) { - val successfulOffsetsPartitions = offsetsPartitions.filter { case (_, partitionResponse) => - partitionResponse.error == Errors.NONE - }.keys + val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) => + topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionResponse.error == Errors.NONE + }.keys + if (successfulOffsetsPartitions.nonEmpty) { + // as soon as the end transaction marker has been written for a transactional offset commit, + // call to the group coordinator to materialize the offsets into the cache try { groupCoordinator.handleTxnCompletion(producerId, successfulOffsetsPartitions, result) } catch { @@ -1602,7 +1603,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorizedForDescribe && exists) debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " + s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning " + - s"UNKNOWN_TOPIC_OR_PARTITION") + s"${Errors.UNKNOWN_TOPIC_OR_PARTITION.name}") authorizedForDescribe && exists } @@ -1645,13 +1646,12 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs val currentTimestamp = time.milliseconds val defaultExpireTimestamp = offsetRetention + currentTimestamp - offsetsMap.mapValues { partitionData => + offsetsMap.values.map { partitionData => val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata new OffsetAndMetadata( offsetMetadata = OffsetMetadata(partitionData.offset, metadata), commitTimestamp = currentTimestamp, - expireTimestamp = defaultExpireTimestamp - ) + expireTimestamp = defaultExpireTimestamp) } } From 2511fc42dffd745bcf7f24f8d56e2c4633671ff1 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 16 May 2017 11:44:23 +0100 Subject: [PATCH 4/7] Remove one of ProducerAppendInfo constructors --- .../main/scala/kafka/log/ProducerStateManager.scala | 13 +++++-------- .../unit/kafka/log/ProducerStateManagerTest.scala | 6 ++++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 45188ba21b400..339a9d8201893 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -66,8 +66,8 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short, */ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: ProducerIdEntry, - validateSequenceNumbers: Boolean = true, - loadingFromLog: Boolean = false) { + validateSequenceNumbers: Boolean, + loadingFromLog: Boolean) { private var producerEpoch = initialEntry.producerEpoch private var firstSeq = initialEntry.firstSeq private var lastSeq = initialEntry.lastSeq @@ -77,9 +77,6 @@ private[log] class ProducerAppendInfo(val producerId: Long, private var coordinatorEpoch = initialEntry.coordinatorEpoch private val transactions = ListBuffer.empty[TxnMetadata] - def this(producerId: Long, initialEntry: Option[ProducerIdEntry], validateSequenceNumbers: Boolean, loadingFromLog: Boolean) = - this(producerId, initialEntry.getOrElse(ProducerIdEntry.Empty), validateSequenceNumbers, loadingFromLog) - private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = { if (this.producerEpoch > producerEpoch) { throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " + @@ -452,9 +449,9 @@ class ProducerStateManager(val topicPartition: TopicPartition, } } - def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo = { - new ProducerAppendInfo(producerId, lastEntry(producerId), validateSequenceNumbers, loadingFromLog) - } + def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo = + new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.Empty), validateSequenceNumbers, + loadingFromLog) /** * Update the mapping with the given append information diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 04335be68adff..7227671beaff6 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -128,7 +128,8 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty) + val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers = true, + loadingFromLog = false) producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L, @@ -144,7 +145,8 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty) + val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers = true, + loadingFromLog = false) producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) // use some other offset to simulate a follower append where the log offset metadata won't typically From d79404107198af8440e4d871a2396049a9799608 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 16 May 2017 11:54:39 -0700 Subject: [PATCH 5/7] Document parameters of ProducerAppendInfo --- .../main/scala/kafka/log/ProducerStateManager.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 339a9d8201893..99b7b5b257c57 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -63,6 +63,17 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short, * It is initialized with the producer's state after the last successful append, and transitively validates the * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata * as the incoming records are validated. + * + * @param producerId The id of the producer appending to the log + * @param initialEntry The last entry associated with the producer id. Validation of the first append will be + * based off of this entry initially + * @param validateSequenceNumbers Whether or not sequence numbers should be validated. The only current use + * of this is the consumer offsets topic which uses producer ids from incoming + * TxnOffsetCommit, but has no sequence number to validate and does not depend + * on the deduplication which sequence numbers provide. + * @param loadingFromLog This parameter indicates whether the new append is from . The only difference in behavior is + * that we do not validate the sequence number of the first append since we may have lost previous + * sequence numbers when segments were removed due to log retention enforcement. */ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: ProducerIdEntry, From 350707ec7ccccc2fd6f0513fae2d006669d4010e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 16 May 2017 12:05:21 -0700 Subject: [PATCH 6/7] Fix compilation error in KafkaApis.convertTxnOffsets --- core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 192432cd0aec5..31680b0cd429f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1646,9 +1646,9 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs val currentTimestamp = time.milliseconds val defaultExpireTimestamp = offsetRetention + currentTimestamp - offsetsMap.values.map { partitionData => + offsetsMap.map { case (topicPartition, partitionData) => val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata - new OffsetAndMetadata( + topicPartition -> new OffsetAndMetadata( offsetMetadata = OffsetMetadata(partitionData.offset, metadata), commitTimestamp = currentTimestamp, expireTimestamp = defaultExpireTimestamp) From 1ae72999f5cd9625479dddaa54adbe68fb28b2c1 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 16 May 2017 12:35:58 -0700 Subject: [PATCH 7/7] Complete param sentence in ProducerAppendInfo --- core/src/main/scala/kafka/log/ProducerStateManager.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 99b7b5b257c57..5ec91ce9f2552 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -71,9 +71,11 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short, * of this is the consumer offsets topic which uses producer ids from incoming * TxnOffsetCommit, but has no sequence number to validate and does not depend * on the deduplication which sequence numbers provide. - * @param loadingFromLog This parameter indicates whether the new append is from . The only difference in behavior is - * that we do not validate the sequence number of the first append since we may have lost previous - * sequence numbers when segments were removed due to log retention enforcement. + * @param loadingFromLog This parameter indicates whether the new append is being loaded directly from the log. + * This is used to repopulate producer state when the broker is initialized. The only + * difference in behavior is that we do not validate the sequence number of the first append + * since we may have lost previous sequence numbers when segments were removed due to log + * retention enforcement. */ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: ProducerIdEntry,