Kafka
是用 Scala
语言开发的一个多分区、多副本且基于 zookeeper
协调的分布式消息系统。它可以作为消息系统、存储系统和流式处理平台。
典型的Kafka
体系中包含三种角色,Broker作为服务节点,Producer 作为生产者发送消息,Consumer 作为消费者从 Broker 拉取消息进行消费
每个主题可以有多个分区,多个分区可以分布在不同的 Broker 上,而每个分区又引入了多副本机制,通过增加副本数量来提高容灾能力。 Leader副本负责处理读写请求,Follower副本只负责与Leader副本进行消息同步。图示中 Broker 节点数为4,某主题分区为3,副本因子也为3。
主题、分区、副本和 Log(日志) 的关系如上图所示,主题和分区是逻辑上的存在,副本日志文件才是实际物理上的存在, 同一个分区中的多个副本必须分布在不同的 broker 中,也就是说同一个分区在同一个 broker 节点中不可能出现它的多个副本,这样才能提供有效的数据冗余和可靠性保证。 主题的分区数支持增加,但不支持减少,增加分区数时需要注意带有key值的消息会被重新分区,因此可能会产生消息顺序错乱和重复消费的问题,所以需要尽量避免增加分区数。
在创建主题的时候,该主题的分区及副本会尽可能均匀地分布到 Kafka 集群的各个 broker 节点上,随着时间的更替,broker节点可能会出现宕机, 它的follower节点就会成为新的leader节点,这样就可能会造成集群的负载不均衡。
Kafka使用优先副本来治理分区负载失衡,优先副本是AR集合中的第一个副本。如果优先副本在Kafka集群中是均匀分布的,那么所有分区的Leader也是均衡分布的, 但是优先副本选举治理的是分区平衡,它并不是真正意义上的负载均衡,因为可能有些leader副本负载很高而有的leader负载很低
默认是开启自动分区平衡,Kafka会通过执行定时任务去检查broker节点的分区不平衡率,超过规定值的话,就会执行优先副本选举以达到分区平衡的目的。
但是在生产环境需要把这个参数改成关闭,且执行优先副本选举时需要配合 --path-to-json-file
参数指定主题分区来分批操作(eg: election.json
),
避免大规模主题分区的优先副本选举。
执行命令: bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file election.json
集群中新增 broker 节点时,将之前已经完成分配的分区进行重分配以均衡负载或者计划下线节点时,用于保证分区及副本的合理分配。 它的原理是先从leader分区副本复制,增加的新副本到要被分配到的broker节点,复制完成后,将旧副本从副本清单内移除。
将 topic-reassign
主题在0,2节点上进行分区重分配
- 创建
reassign.json
文件
{
"topics":[
{
"topic":"topic-reassign"
}
],
"version":1
}
- 执行脚本:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --generate --topics-to-move-json-file reassign.json --broker-list 0,2
执行结果为重分区方案,将其保存到 project.json
文件中
- 执行脚本:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --execute --reassignment-json-file project.json
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息, 然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)。
在消息累加器中,发往各个不同分区的消息以 ProducerBatch
批量消息的形式保存在双向队列中,Sender 线程负责从消息累加器中获取消息并将其发送到 Kafka。
因为消息在网络上都是以字节(Byte)的形式传输的,所以在线程中会对消息进行封装,封装成 <Broker, Request>
的形式,完成了应用层到I/O层的转换。
这些Request请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests
,它的主要作用是缓存了已经发出去但还没有收到响应的请求,
能够通过参数配置最大的请求缓存数,以此来判断对应的 Broker 节点是否堆积了未响应的消息。
消费位移的自动提交是在 poll() 方法里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交(提交位移为当前消费消息的offset + 1),它非常简便,
但是与此同时的是重复消费和消息丢失的问题。提交后的位移参数会在Kafka内部主题 __consumer_offsets
中记录,供之后拉取消息时判断,避免拉到重复的消息。
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障, 使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。
消费者拦截器对消息处理的时机:poll()方法拉取消息返回之前和提交完消费位移之后
在远程Linux上配置
listeners=PLAINTEXT://内网IP:9092
advertised.listeners=PLAINTEXT://外网IP:9092
- broker.id: 在集群情况下该值需要不同
- zookeeper.connect: 该参数指明 broker 要连接的 ZooKeeper 集群的服务地址(包含端口号),没有默认值,且此参数为必填项
- listeners: 该参数指明 broker 监听客户端连接的地址列表,即为客户端要连接 broker 的入口地址列表, protocol1://hostname1:port1,protocol2://hostname2:port2(协议://主机名:port,多个以逗号隔开)
- advertised.listeners: 公有云上的机器通常配备有多块网卡,即包含私网网卡和公网网卡,对于这种情况而言, 可以设置 advertised.listeners 参数绑定公网IP供外部客户端使用,而配置 listeners 参数来绑定私网IP地址供 broker 间通信使用。
- log.dir和log.dirs: Kafka 把所有的消息都保存在磁盘上,而这两个参数用来配置 Kafka 日志文件存放的根目录。一般情况下,log.dir 用来配置单个根目录, 而 log.dirs 用来配置多个根目录(以逗号分隔),但是 Kafka 并没有对此做强制性限制,也就是说,log.dir 和 log.dirs 都可以用来配置单个或多个根目录。 log.dirs 的优先级比 log.dir 高,但是如果没有配置 log.dirs,则会以 log.dir 配置为准。默认情况下只配置了 log.dir 参数,其默认值为 /tmp/kafka-logs。
- message.max.bytes: 该参数用来指定 broker 所能接收消息的最大值,默认值为1000012(B),约等于976.6KB。 如果 Producer 发送的消息大于这个参数所设置的值,那么(Producer)就会报出 RecordTooLargeException 的异常。 如果需要修改这个参数,那么还要考虑 max.request.size(客户端参数)、max.message.bytes(topic端参数)等参数的影响。 为了避免修改此参数而引起级联的影响,建议在修改此参数之前考虑分拆消息的可行性。
- auto.create.topics.enable: 默认值为true,生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认1), 副本因子为default.replication.factor(默认1)的主题;当消费者从未知主题中读取消息时,也会自动创建主题,所以该参数不建议为true
- delete.topic.enable: 默认为true且为true时才能删除主题
- auto.leader.rebalance.enable: 默认为true,开启执行周期为5min的定时任务来检查分区平衡情况,生产环境不建议开启,可能会造成客户端阻塞
- acks 这个参数指定分区中必须要有多少个副本收到这条消息,生产者才认为这条消息是成功写入的。
默认值为1,即生产者发送消息后,只要分区leader副本成功写入消息,那么它就会收到来自服务器的成功响应。如果消息写入leader副本成功,但是在被follower 副本拉取之前leader崩溃,那么这条消息还是会丢失。它是消息可靠性和吞吐量之间的折中方案。
当acks = 0时,生产者发送消息之后不需要等待任何服务器的响应就认定发送成功。这是吞吐量最大的方案。
当acks = -1或acks = all时,生产者发送消息之后,需要等待ISR中所有副本写入才能收到来自服务器的成功响应。它是可靠性最高的方案, 但是也并不意味着消息就一定可靠,因为如果ISR中只有leader一个那么它其实和配置ack = 1的效果是一样的。
- max.request.size: 这个参数用来限制生产者客户端能发送消息的最大值,默认为1048576B,即1MB。这个参数会涉及与 message.max.bytes 参数的联动,如果将 message.max.bytes配置为10而max.request.size配置为20,这时候发送一条15b的消息,就会出现异常。
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
-
max.in.flight.requests.per.connection: 限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数,在需要保证消息顺序的场景下 建议把这个参数配置为1,如果这个参数大于1且失败重试次数非零时,会出现错序的现象:第一批次写入失败,第二批次写入成功,第一批次重试写入成功后则会发生错序。
-
retries和retry.backoff.ms: 前者用来配置生产者发送消息失败后重试的次数,后者用来配置两次重试间的时间间隔,尽可能避免无效重试,也可以估算 一下异常恢复的时间,合理配置时间间隔来避免生产者过早的放弃重试。像网络波动和leader副本选举发生时,通过重试是能将消息发送成功的。
-
compression.type: 这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。 该参数还可以配置为“gzip”“snappy”和“lz4”。对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。 消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。
-
connections.max.idle.ms: 这个参数用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。
-
linger.ms: 这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入 ProducerBatch 的时间,默认值为0。 生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
-
receive.buffer.bytes: 这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。 如果设置为-1,则使用操作系统的默认值。如果 Producer 与 Kafka 处于不同的机房,则可以适地调大这个参数值。
-
send.buffer.bytes: 这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。 与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。
-
request.timeout.ms: 这个参数用来配置 Producer 等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。 注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
-
enable.idempotence: 是否开启幂等性功能
-
buffer.memory: 指
RecordAccumulator
用来缓存生产者发送消息的最大大小 -
max.block.ms: 如果生产者发送消息过快,当消息缓存大小超过 buffer.memory 的大小时,会进入阻塞状态,该参数表示阻塞的最长时间,超过该时间后抛出异常
-
batch.size:
RecordAccumulator
中ProducerBatch
的标准大小,当消息小于该大小时,以该大小创建;否则以实际消息大小创建 -
max.in.flight.requests.per.connection: 默认值为5,即在
InFlightRequests
中每个连接最多只能缓存5个未响应的请求 -
metadata.max.age.ms: 超过该配置的时间时则会对元数据进行更新,默认300000ms,5分钟。 (原数据指Kafka集群的元数据,记录了集群中的哪些主题、主题有哪些分区,每个分区的Leader副本在哪个节点上,Follower副本在哪个节点上,哪些副本在AR、ISR等集合中)
-
fetch.min.bytes: 配置 Consumer 在调用
poll()方法
中能从broker
中拉取的最小数据量,默认值为1(B)。broker
在收到 Consumer 的拉取请求时,如果返回给 Consumer 的数据量小于这个参数所配置的值,那么它就需要进行等待, 直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency), 对于延迟敏感的应用可能就不可取了。 -
fetch.max.bytes: 用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800(B),也就是50MB。 如果这个参数设置的值比任何一条写入 Kafka 中的消息要小且它是第一个非空分区中拉取的第一条消息的话,它是不会被认为无法消费,这条消息仍然会正常返回。
-
fetch.max.wait.ms: 与fetch.min.bytes参数有关,为了满足拉取的数据量的最小值,但是也不能无限时长等待,默认500ms, 超过这个时间之后,数据量不满足最小值也会响应
-
max.partition.fetch.bytes: 这个参数和fetch.max.bytes相似,它用来限制的是一次拉取每个分区的消息大小, 同样为了保证Kafka的正常,也并不会严格限制大小造成无法消费的情况
-
max.poll.records: 配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为500(条)
-
connections.max.idle.ms: 用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟
-
exclude.internal.topics: 指定内部主题(
__consumer_offsets
和__transaction_state
)的消费方式, 默认为true,这种情况必须使用subscribe(Collection)
的方式消费, 改成false时,内部主题就像变成了公开主题一样,怎么订阅都行了 -
receive.buffer.bytes: 配置Socket接收消息缓冲区大小,默认为65536(B),如果设置为-1,则为系统默认值
-
request.timeout.ms: 配置 Consumer 等待请求响应的最长时间,默认值为30000ms
-
metadata.max.age.ms: 配置元数据的过期时间,默认值为300000ms,过期后强制更新
-
reconnect.backoff.ms: 配置尝试重新连接指定主机之前的等待时间,避免频繁地连接主机,默认值为50ms
-
retry.backoff.ms: 配置尝试重新发送失败的请求到指定的主题分区之前的等待,默认100ms
-
isolation.level: 配置消费者的事务隔离级别,
read_committed
消费者就会忽略事务未提交的消息,即只能消费到LSO, 默认情况下为read_uncommitted
,即可以消费到HW处的位置 -
group.id: 消费组名称,一般这个名称有业务意义
-
client.id: 用来设定消费者的ID名称
-
enable.auto.commit: 消费者位移是否自动提交,默认为 true
-
auto.commit.interval.ms: 消费者位移自动提交的周期,默认5000ms
-
auto.offset.reset: 消费者找不到消费位移时,默认为
latest
,从分区末尾开始消费;earliest
从分区开始进行消费;none
则会抛出异常
创建分区为3副本因子为3的主题
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic fang-yuan --replication-factor 3 --partitions 3
展示主题的更多具体信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-demo
通过 kafka-console-consumer.sh 脚本来订阅主题 topic-demo 其中--bootstrap-server 指定了连接的 Kafka 集群地址
bin/kafka-console-consumer.sh --bootstrap-server 10.0.24.15:9092 --topic topic-demo
使用 kafka-console-producer.sh 脚本发送一条消息“Hello, Kafka!”至主题 topic-demo
bin/kafka-console-producer.sh --broker-list 10.0.24.15:9092 --topic topic-demo
> hello Kafka!
Kafka
是用 Scala
语言开发的一个多分区、多副本且基于 zookeeper
协调的分布式消息系统。它可以作为消息系统、存储系统和流式处理平台。
消息系统用来实现解耦、流量削峰、异步通信等;存储可用于数据的持久化;流式处理可以为一些开源的流式处理框架提供数据源。
AR(Assigned Replicas) 是Kafka分区中所有的副本,ISR(In-Sync Replicas) 是所有与 leader
副本保持一定程度同步的副本,
!! 什么参数控制着这个一定程度的要求呢 !!
还有一个OSR(Out-of-Sync Replicas)是与 leader
副本滞后过多的副本。它们的滞后状态由 leader 副本进行维护,当 follower 副本落后太多时,
会被 leader 副本从ISR集合中剔除,反之,赶上来的副本会从OSR集合中加入到ISR集合中。
!! ISR 的伸缩又指什么 !!
LEO(Log End Offset) 它表示的是当前分区日志中下一条待写入消息的 offset
;HW(High Watermark)高水位,它表示的是消费者只能拉取到这个 offset
之前的消息,它是该主题下所有ISR分区日志文件中最小的LEO。
LSO、LW
Kafka 借助 ISR 实现主从复制: 当其中副本有复制完成消息时更新HW来控制消费者能消费的消息, 有效地权衡了数据可靠性和性能之间的关系,如果是完全同步复制的话,性能太差;如果是异步复制的话,数据可靠性又比较难保证;
拦截器、分区器、序列化器都是生产者在向 Broker 节点发送消息之前对消息进行处理的,拦截器可以用来在消息发送前做一些业务准备工作; 序列化器是将消息对象转换成字节数组;而分区器则是确定这个消息发送到该主题的具体分区。
它们的处理顺序是:拦截器、序列化器、分区器。
使用了两个线程来进行处理,分别是主线程和Sender线程。在主线程中由 KafkaProducer
创建消息,然后通过可能存在的拦截器、序列化器和分区器的作用之后,
缓存到消息累加器中;而Sender线程负责从消息累加器获取消息并将其发送到Kafka中。
消费组是逻辑上的概念,每个消费组可以包含多个消费者,每个消费者只能隶属于一个消费组。因此 Kafka
能够支持基于队列的点对点模式和发布订阅模式,
点对点是对同一个消费组而言,消息会被均衡的投递给每一个消费者,即每条消息只会被一个消费者处理;
发布订阅模式则对于多个消费组,每条消息都会被不同消费组的不同消费者消费。
是正确的,在同一个消费组中,如果消费者数量大于分区数量,那么就会出现消费者闲置。可以通过将该闲置消费者加入新的消费组订阅这个主题进行消费。
是 offset + 1
在自动提交消费位移的情况下,如果拉取到一批消息开始消费后,在下一次消费位移提交前,出现消费者宕机的情况就会造成重复消费
如果消息的消费不是拉取到就及时消费的话,比如某消费者使用线程A拉取消息并提交消费位移,线程B来对消息进行消费,但是在消息没消费完之前B线程因异常被终止, 那么这就会出现消息丢失的问题