Skip to content

Kafka如何保证消息的可靠性 #8

@2pc

Description

@2pc

配置

1.确保所有isr副本写成功 producer的配置:保证所有isr同步acks=-1(all),producer.type=sync,
2. 确保非isr副本不参与选举 broker leader的选举条件的配置:unclean.leader.election.enable=false
3. 确保至少两个ISR副本 topic的配置:副本数至少是3个(replication.factor>=3),即;2<=min.insync.replicas<=replication.factor

可能的异常场景

假设Partition拥有3个副本R0,R1,R2,分别位于broker0,broker1,broker1,且AR为(0,1,2),ISR(0,1)

在设置request.required.acks=all, min.insync.replicas=2,unclean.leader.election.enable=false。的情况下

至少等两个副本在ISR返回客户端正常响应,也能保证单个leader挂掉之后,新晋升的leader没有数据丢失

一,ISR(0)挂掉后,ISR(1)选举为新的leader

  1. 此时读取数据可以正常运转
  2. 写数据由于设置了request.required.acks=all和min.insync.replicas=2,因为必须保证至少ISR有两个副本写成功
  3. 如果R0恢复,又可以提供正常服务,如果不能可以修改min.insync.replicas=1恢复正常服务

二,如果ISR(0)和ISR(1)相机挂掉

  1. 读写服务都不可用
  2. 如果只有ISR副本恢复,可以通过修改min.insync.replicas=1继续提供服务
  3. 如果两个都不恢复,只能通过退化设置unclean.leader.election.enable=true,让非ISR副本可参与leader选举

ISR

ReplicaManager启动定时调度,定期调用maybeShrinkIsr

def startup(): Unit = {
  // start ISR expiration thread
  // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
  scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
    scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)

会遍历所有的partition,执行maybeShrinkIsr

private def maybeShrinkIsr(): Unit = {
  trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")

  // Shrink ISRs for non offline partitions
  allPartitions.keys.foreach { topicPartition =>
    onlinePartition(topicPartition).foreach(_.maybeShrinkIsr())//遍历所有的partition
  }
}
def maybeShrinkIsr(): Unit = {
  def needsIsrUpdate: Boolean = {
    !isrState.isInflight && inReadLock(leaderIsrUpdateLock) {
      needsShrinkIsr()//判断是否需要执行伸缩
    }
  }

  if (needsIsrUpdate) {
    val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
      leaderLogIfLocal.flatMap { leaderLog =>
        val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)//获取OSR,不同步的副本
        if (!isrState.isInflight && outOfSyncReplicaIds.nonEmpty) {
          val outOfSyncReplicaLog = outOfSyncReplicaIds.map { replicaId =>
            val logEndOffsetMessage = getReplica(replicaId)
              .map(_.logEndOffset.toString)
              .getOrElse("unknown")
            s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage)"
          }.mkString(" ")
          val newIsrLog = (isrState.isr -- outOfSyncReplicaIds).mkString(",")
          info(s"Shrinking ISR from ${isrState.isr.mkString(",")} to $newIsrLog. " +
            s"Leader: (highWatermark: ${leaderLog.highWatermark}, " +
            s"endOffset: ${leaderLog.logEndOffset}). " +
            s"Out of sync replicas: $outOfSyncReplicaLog.")
          Some(prepareIsrShrink(outOfSyncReplicaIds))
        } else {
          None
        }
      }
    }
    // Send the AlterIsr request outside of the LeaderAndIsr lock since the completion logic
    // may increment the high watermark (and consequently complete delayed operations).
    alterIsrUpdateOpt.foreach(submitAlterIsr)
  }
}

如何判断需要伸缩

当前时间与上一次同步时间lastCaughtUpTimeMs比较,相差不能超过replica.lag.time.max.ms

  private def needsShrinkIsr(): Boolean = {
    leaderLogIfLocal.exists { _ => getOutOfSyncReplicas(replicaLagTimeMaxMs).nonEmpty }
  }

  private def isFollowerOutOfSync(replicaId: Int,
                                  leaderEndOffset: Long,
                                  currentTimeMs: Long,
                                  maxLagMs: Long): Boolean = {
    getReplica(replicaId).fold(true) { followerReplica =>
      followerReplica.logEndOffset != leaderEndOffset &&
        (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs//currentTimeMs当前时间与上一次同步时间lastCaughtUpTimeMs比较,相差不能超过replica.lag.time.max.ms
    }
  }

###maybePropagateIsrChanges

  def maybePropagateIsrChanges(): Unit = {
    val now = System.currentTimeMillis()
    isrChangeSet synchronized {
      if (isrChangeSet.nonEmpty &&
        (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
          lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
        //更新zk
        zkClient.propagateIsrChanges(isrChangeSet)
        isrChangeSet.clear()
        lastIsrPropagationMs.set(now)
      }
    }
  }

follow从leader 拉取消息扩容ISR

  def fetchMessages(timeout: Long,
                    replicaId: Int,
                    fetchMinBytes: Int,
                    fetchMaxBytes: Int,
                    hardMaxBytesLimit: Boolean,
                    fetchInfos: Seq[(TopicIdPartition, PartitionData)],
                    quota: ReplicaQuota,
                    responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
                    isolationLevel: IsolationLevel,
                    clientMetadata: Option[ClientMetadata]): Unit = {
    def readFromLog(): Seq[(TopicIdPartition, LogReadResult)] = {
      val result = readFromLocalLog(
        replicaId = replicaId,
        fetchOnlyFromLeader = fetchOnlyFromLeader,
        fetchIsolation = fetchIsolation,
        fetchMaxBytes = fetchMaxBytes,
        hardMaxBytesLimit = hardMaxBytesLimit,
        readPartitionInfo = fetchInfos,
        quota = quota,
        clientMetadata = clientMetadata)
      //如果是来自follow
      if (isFromFollower) updateFollowerFetchState(replicaId, result)
      else result
    }

    val logReadResults = readFromLog()
}

updateFollowerFetchState-->Partition.updateFollowerFetchState-->Partition.maybeExpandIsr

  private def maybeExpandIsr(followerReplica: Replica): Unit = {
    val needsIsrUpdate = !isrState.isInflight && canAddReplicaToIsr(followerReplica.brokerId) && inReadLock(leaderIsrUpdateLock) {
      needsExpandIsr(followerReplica)
    }
    if (needsIsrUpdate) {
      val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
        // check if this replica needs to be added to the ISR
        if (!isrState.isInflight && needsExpandIsr(followerReplica)) {
          Some(prepareIsrExpand(followerReplica.brokerId))
        } else {
          None
        }
      }
      // Send the AlterIsr request outside of the LeaderAndIsr lock since the completion logic
      // may increment the high watermark (and consequently complete delayed operations).
      alterIsrUpdateOpt.foreach(submitAlterIsr)
    }
  }

needsExpandIsr内部会调用,判断是否大于等于leader的HW

  private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
    leaderLogIfLocal.exists { leaderLog =>
      val followerEndOffset = followerReplica.logEndOffset
      followerEndOffset >= leaderLog.highWatermark && leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
    }
  }

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions