# 基础理论

### Apache Kafka is a distributed streaming platform

* A streaming platform has three key capabilities:
    * Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
    * Store streams of records in a fault-tolerant durable way.
    * Process streams of records as they occur.
* Kafka is generally used for two broad classes of applications:
    * Building <b>real-time streaming data pipelines</b> that reliably get data between systems or applications
    * Building <b>real-time streaming applications</b> that transform or react to the streams of data
* <b>Kafka is run as a cluster on one or more servers that can span multiple datacenters. The Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp.</b>
    * For each topic, the Kafka cluster maintains a partitioned log
* Kafka has four core APIs:
    * The <b>Producer API</b> allows an application to publish a stream of records to one or more Kafka topics.
        * Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function.
    * The <b>Consumer API</b> allows an application to subscribe to one or more topics and process the stream of records produced to them.
        * Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
        * If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.
        * If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.
    * The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
    * The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
* 
<table>
    <tr>
        <td><img src="../../images/javaee/kafka-apis.png" width="300px"></td>
        <td><img src="../../images/javaee/log_anatomy.png" width="300px"></td>
        <td><img src="../../images/javaee/log_consumer.png" width="300px"></td>
    </tr>
</table>
* <b>Distribution</b>
    * The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.
    * Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.
* 消息队列和发布订阅模式的区别: https://kafka.apache.org/documentation/#kafka_mq
* <b>Kafka communication modal</b>: https://kafka.apache.org/protocol
<img src="../../images/javaee/kafka-communication.png" width="500px">
    * Can't just configure each client with some static mapping file. Instead all Kafka brokers can answer a metadata request that describes the current state of the cluster: what topics there are, which partitions those topics have, which broker is the leader for those partitions, and the host and port information for these brokers.
    * In other words, the client needs to somehow find one broker and that broker will tell the client about all the other brokers that exist and what partitions they host. This first broker may itself go down so the best practice for a client implementation is to take a list of two or three URLs to bootstrap from. The user can then choose to use a load balancer or just statically configure two or three of their Kafka hosts in the clients.
    * <b>The client does not need to keep polling to see if the cluster has changed; it can fetch metadata once when it is instantiated cache that metadata until it receives an error indicating that the metadata is out of date.</b> This error can come in two forms: (1) a socket error indicating the client cannot communicate with a particular broker, (2) an error code in the response to a request indicating that this broker no longer hosts the partition for which data was requested.
        * Cycle through a list of "bootstrap" Kafka URLs until we find one we can connect to. Fetch cluster metadata.
        * Process fetch or produce requests, directing them to the appropriate broker based on the topic/partitions they send to or fetch from.
        * If we get an appropriate error, refresh the metadata and try again.

****

# 应用实战
* 主要应用领域（核心价值实时的流数据处理，TCP长连接）: https://kafka.apache.org/documentation/#uses
* 配置属性参考: https://kafka.apache.org/documentation/#configuration
* 通信消息以及存储消息格式: https://kafka.apache.org/documentation/#messageformat
* 实战示例: https://kafka.apache.org/documentation/#operations
* 安全配置实战: https://kafka.apache.org/documentation/#security
* Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems: https://kafka.apache.org/documentation/#connect
* Kafka Streams is a client library for processing and analyzing data stored in Kafka（对Kafka的producer和consumer二次封装，面向Kafka编程更简单）: https://kafka.apache.org/documentation/streams/
    * Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more.
    * End-to-end exactly-once processing semantics: 
        * 通过事务保证消息只会被消费一次，而不是保证消息只被发送一次
        * https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
        * https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics
        * Kafka has added support to allow its producers to send messages to different topic partitions in a transactional and idempotent manner, and Kafka Streams has hence added the end-to-end exactly-once processing semantics by leveraging these features. More specifically, it guarantees that for any record read from the source Kafka topics, its processing results will be reflected exactly once in the output Kafka topic as well as in the state stores for stateful operations. Note the key difference between Kafka Streams end-to-end exactly-once guarantee with other stream processing frameworks' claimed guarantees is that Kafka Streams tightly integrates with the underlying Kafka storage system and ensure that commits on the input topic offsets, updates on the state stores, and writes to the output topics will be completed atomically instead of treating Kafka as an external system that may have side-effects.

****

# 设计实现原理
* <b>Use case motivation</b>
    * It would have to have high-throughput to support high volume event streams such as real-time log aggregation.
    * It would need to deal gracefully with large data backlogs to be able to support periodic data loads from offline systems.
    * <b>It also meant the system would have to handle low-latency delivery to handle more traditional messaging use-cases.</b>
* <b>Persistence design</b>
    * Existing method, tranditional way:
        * A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache. `This feature cannot easily be turned off without using direct I/O`, so even if a process maintains an in-process cache of the data, this data will likely be duplicated in OS pagecache, effectively storing everything twice.
        * The persistent data structure used in messaging systems are often a per-consumer queue with an associated BTree or other general-purpose random access data structures to maintain metadata about messages. Btree operations are $O(log N)$. Normally $O(log N)$ is considered essentially equivalent to constant time, but this is not true for disk operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so parallelism is limited. 
    * <b>Pagecache-centric design solutions:</b>
        * Using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure, all data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel's pagecache.
        * Intuitively a persistent queue could be built on simple reads and appends to files as is commonly the case with logging solutions. This structure has the advantage that all operations are O(1) and reads do not block writes or each other. 
* <b>Efficiency design</b>
    * Issues for affecting efficiency:
        * Too many small I/O operations
        * Excessive byte copying
        * Network bandwidth
    * <b>Built around a "message set" abstraction that naturally groups messages together.</b> This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time. The server in turn appends chunks of messages to its log in one go, and the consumer fetches large linear chunks at a time.
    * The message log maintained by the broker is itself just a directory of files, each populated by a sequence of message sets that have been written to disk in the same format used by the producer and consumer. Maintaining this common format allows optimization of the most important operation: network transfer of persistent log chunks. Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket.
        * The common data path for transfer of data from file to socket
            * The operating system reads data from the disk into pagecache in kernel space
            * The application reads the data from kernel space into a user-space buffer
            * The application writes the data back into kernel space into a socket buffer
            * The operating system copies the data from the socket buffer to the NIC buffer where it is sent over the network
        * <b>The zero-copy optimization</b>, data is copied into pagecache exactly once and reused on each consumption instead of being stored in memory and copied out to user-space every time it is read. This allows messages to be consumed at a rate that approaches the limit of the network connection. <i>(transferFrom and transferTo methond: https://docs.oracle.com/javase/8/docs/api/index.html?java/nio/channels/FileChannel.html)</i>
    * <b>A batch of messages</b> can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.
* <b>Producer design</b>
    * <b>Publish message with load balancing</b>
        * The producer sends data directly to the broker that is the leader for the partition without any intervening routing tier. To help the producer do this all Kafka nodes can answer a request for metadata about which servers are alive and where the leaders for the partitions of a topic are at any given time to allow the producer to appropriately direct its requests. (The interface for semantic partitioning by allowing the user to specify a key to partition by and using this to hash to a partition)
    * Asynchronous publish message with batch: 
        * producer can accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). 
* <b>Consumer design</b>
    * Push and Pull design
        * Push-based system 
            * Difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred
            * Sending a request immediately or accumulate more data and then send it later without knowledge of whether the downstream consumer will be able to immediately process it. 
        * Pull-based system 
            * With some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize)
            * The consumer always pulls all available messages after its current position in the log (or up to some configurable max size)
            * If the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive.
        * <b>Message publish and consume with data is pushed to the broker from the producer and pulled from the broker by the consumer.</b> 
    * <b>Consumer position storage design: https://kafka.apache.org/documentation/#impl_offsettracking</b>
        * Defects of keeping metadata about what messages have been consumed on the broker
            * <i>`Message lost issue`</i>: if the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message that message will be lost.
            * <i>`Message lost issue solutions with acknowledgement feature`</i>: messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. But create new problems: 
                * First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. 
                * The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed).
        * <b>Topic is divided into a set of totally ordered partitions, each of which is consumed by exactly one consumer within each subscribing consumer group at any given time. This means that the position of a consumer in each partition is just a single integer, the offset of the next message to consume.</b> 
            * https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html
            * Producing a message to Kafka, to a special `__consumer_offsets topic`, with the committed offset for each partition. `Automatic Commit or Commit Current Offset or Asynchronous Commit`
            * Offsets are stored in external system (database) other than Kafka, let you seek a specific offset.
            * Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. <b>Note however that there cannot be more consumer instances in a consumer group than partitions.</b>
* <b>Message delivery demantics design</b>: https://kafka.apache.org/documentation/#semantics
    * Message delivery guarantees level: 
        * At most once, messages may be lost but are never redelivered.
        * At least once, messages are never lost but may be redelivered.
        * Exactly once, each message is delivered once and only once.
            * <i>The transactional producer/consumer can be used generally to provide exactly-once delivery when transfering and processing data between Kafka topics. (Storing its offset in the same place as its output: external database system, or Kafka topic)</i>
    * The durability guarantees for publishing a message, Since 0.11.0.0
        * <i>`at-least-once:`</i> the Kafka producer supports an idempotent delivery option which guarantees that resending will not result in duplicate entries in the log. To achieve this, the broker assigns each producer an ID and deduplicates messages using a sequence number that is sent by the producer along with every message. 
        * <i>`exactly-once:`</i> the producer supports the ability to send messages to multiple topic partitions using transaction-like semantics: i.e. either all messages are successfully written or none of them are.
    * The guarantees when consuming a message
        * <i>`at-most-once: read the messages, then save its position, and finally process the messages.`</i> In this case there is a possibility that the consumer process crashes after saving its position but before saving the output of its message processing. In this case the process that took over processing would start at the saved position even though a few messages prior to that position had not been processed. 
        * <i>`at-least-once: read the messages, process the messages, and finally save its position.`</i> In this case there is a possibility that the consumer process crashes after processing messages but before saving its position. In this case when the new process takes over the first few messages it receives will already have been processed.
    * <b>Kafka guarantees at-least-once delivery by default, and allows the user to implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages. Kafka streams supports exactly-once processing semantics even when there is a failure on either Streams clients or Kafka brokers in the middle of processing.</b>
* <b>Replication design</b>: https://kafka.apache.org/documentation/#replication
    * <b>The unit of replication is the topic partition. Under non-failure conditions, each partition in Kafka has a single leader and zero or more followers. The total number of replicas including the leader constitute the replication factor. All reads and writes go to the leader of the partition. Typically, there are many more partitions than brokers and the leaders are evenly distributed among brokers. The logs on the followers are identical to the leader's log—all have the same offsets and messages in the same order.</b>
    * <i>`Handle a "fail/recover" model of failures in distributed systems terminology`</i>
        * `Node in sync status`: 
            * A node must be able to maintain its session with ZooKeeper
            * If it is a slave it must replicate the writes happening on the leader and not fall "too far" behind (leader through `replica.lag.time.max.ms`  determination of stuck and lagging replicas)
        * `Notion of the message being "committed" to the log.`: A message is considered committed when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer and once a published message is committed it will not be lost as long as one broker that replicates the partition to which this message was written remains "alive". Producers have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability.
        * `Notion of Quorum`: If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap, then this is called a Quorum.
        * <b>The leader election algorithms</b>
            * `Majority vote for both the commit decision and the leader election:` let's say we have 2f+1 replicas. If f+1 replicas must receive a message prior to a commit being declared by the leader, and if we elect a new leader by electing the follower with the most complete log from at least f+1 replicas, then, with no more than f failures, the leader is guaranteed to have all committed messages. This is because among any f+1 replicas, there must be at least one replica that contains all committed messages. That replica's log will be the most complete and therefore will be selected as the new leader. Hence, if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
                * There are a rich variety of algorithms in this family including `ZooKeeper's Zab, Raft, and Viewstamped Replication`. The most similar academic publication we are aware of to Kafka's actual implementation is `PacificA from Microsoft`.
                * This majority vote approach has a very nice property: the latency is dependent on only the fastest servers. That is, if the replication factor is three, the latency is determined by the faster slave not the slower one.
                * The downside of majority vote is that it doesn't take many failures to leave you with no electable leaders. To tolerate one failure requires three copies of the data, and to tolerate two failures requires five copies of the data. In our experience having only enough redundancy to tolerate a single failure is not enough for a practical system, but doing every write five times, with 5x the disk space requirements and 1/5th the throughput, is not very practical for large volume data problems. This is likely why quorum algorithms more commonly appear for shared cluster configuration such as ZooKeeper but are less common for primary data storage. 
            * `The ISR approach`: Kafka dynamically maintains a set of in-sync replicas (ISR) that are caught-up to the leader. Only members of this set are eligible for election as leader. A write to a Kafka partition is not considered committed until all in-sync replicas have received the write. This ISR set is persisted to ZooKeeper whenever it changes. Because of this, any replica in the ISR is eligible to be elected leader. With this ISR model and f+1 replicas, a Kafka topic can tolerate f failures without losing committed messages.
        * <i>Unclean leader election: What if they all die</i>
            * Default solution: wait for a replica in the ISR to come back to life and choose this replica as the leader (hopefully it still has all its data).
            * `unclean.leader.election.enable` configutaion: choose the first replica (not necessarily in the ISR) that comes back to life as the leader.
    * <b>Availability and Durability Guarantees</b>
        * Two topic-level configurations that can be used to prefer message durability over availability:
            * Disable unclean leader election - if all replicas become unavailable, then the partition will remain unavailable until the most recent leader becomes available again. This effectively prefers unavailability over the risk of message loss.
            * Specify a minimum ISR size - the partition will only accept writes if the size of the ISR is above a certain minimum, in order to prevent the loss of messages that were written to just a single replica, which subsequently becomes unavailable. This setting only takes effect if the producer uses acks=all and guarantees that the message will be acknowledged by at least this many in-sync replicas. This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold.
    * <b>Replica Management</b>
        * Balancing partitions within a cluster in a round-robin fashion to avoid clustering all partitions for high-volume topics on a small number of nodes.
        * Electing one of the brokers as the "controller". This controller detects failures at the broker level and is responsible for changing the leader of all affected partitions in a failed broker. The result is that we are able to batch together many of the required leadership change notifications which makes the election process far cheaper and faster for a large number of partitions. If the controller fails, one of the surviving brokers will become the new controller.
* <b>Log compaction design</b>
    * <i>`Log compaction gives us a more granular retention mechanism so that we are guaranteed to retain at least the last update for each primary key. By doing this we guarantee that the log contains a full snapshot of the final value for every key not just keys that changed recently.`</i>
    * Log compaction guarantees the following:
        * Any consumer that stays caught-up to within the head of the log will see every message that is written; these messages will have sequential offsets. The topic's `min.compaction.lag.ms` can be used to guarantee the minimum length of time must pass after a message is written before it could be compacted. I.e. it provides a lower bound on how long each message will remain in the (uncompacted) head.
        * Ordering of messages is always maintained. Compaction will never re-order messages, just remove some.
        * The offset for a message never changes. It is the permanent identifier for a position in the log.
        * Any consumer progressing from the start of the log will see at least the final state of all records in the order they were written. Additionally, all delete markers for deleted records will be seen, provided the consumer reaches the head of the log in a time period less than the topic's delete.retention.ms setting (the default is 24 hours). In other words: since the removal of delete markers happens concurrently with reads, it is possible for a consumer to miss delete markers if it lags by more than `delete.retention.ms`.
    * <b>Each compactor thread works as follows:</b>
        * It chooses the log that has the highest ratio of log head to log tail
        * It creates a succinct summary of the last offset for each key in the head of the log
        * It recopies the log from beginning to end removing keys which have a later occurrence in the log. New, clean segments are swapped into the log immediately so the additional disk space required is just one additional log segment (not a fully copy of the log).
        * The summary of the log head is essentially just a space-compact hash table. It uses exactly 24 bytes per entry. As a result with 8GB of cleaner buffer one cleaner iteration can clean around 366GB of log head (assuming 1k messages).
* <b>Quotas design</b>: https://kafka.apache.org/documentation/#design_quotas
    * Two types of client quotas can be enforced by Kafka brokers for each group of clients sharing a quota:
        * Network bandwidth quotas define byte-rate thresholds (since 0.9)
        * Request rate quotas define CPU utilization thresholds as a percentage of network and I/O threads (since 0.11)
    * <i>Why are quotas necessary:</i> it is possible for producers and consumers to produce/consume very high volumes of data or generate requests at a very high rate and thus monopolize broker resources, cause network saturation and generally DOS other clients and the brokers themselves. Having quotas protects against these issues and is all the more important in large multi-tenant clusters where a small set of badly behaved clients can degrade user experience for the well behaved ones.
    * <i>How does a broker react when it detects a quota violation:</i> the broker first computes the amount of delay needed to bring the violating client under its quota and returns a response with the delay immediately. In case of a fetch request, the response will not contain any data. Then, the broker mutes the channel to the client, not to process requests from the client anymore, until the delay is over. Upon receiving a response with a non-zero delay duration, the Kafka client will also refrain from sending further requests to the broker during the delay. Therefore, requests from a throttled client are effectively blocked from both sides.