Skip to content

Commit

Permalink
📦整理 Kafka 生产者
Browse files Browse the repository at this point in the history
  • Loading branch information
0xcaffebabe committed Apr 6, 2021
1 parent 2ae1464 commit 5799f4c
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 101 deletions.
1 change: 1 addition & 0 deletions SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@
- [RocketMQ](./中间件/消息队列/RocketMQ.md)
- [Kafka](./中间件/消息队列/Kafka/Kafka.md)
- [消费者](./中间件/消息队列/Kafka/消费者.md)
- [生产者](./中间件/消息队列/Kafka/生产者.md)
- [web容器/服务器](./中间件/web中间件/web中间件.md)
- [Tomcat](./中间件/web中间件/Tomcat.md)
- [Nginx](./中间件/web中间件/Nginx.md)
Expand Down
101 changes: 0 additions & 101 deletions 中间件/消息队列/Kafka/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,107 +172,6 @@ index与log文件的作用:

![屏幕截图 2020-08-05 155619](/assets/屏幕截图%202020-08-05%20155619.png)

## 生产者

![屏幕截图 2020-08-20 150154](/assets/屏幕截图%202020-08-20%20150154.png)

发送消息:

```java
Properties props = new Properties();
//kafka 集群,broker-list
props.put("bootstrap.servers", "172.24.211.140:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new
KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
var record =
new ProducerRecord<>("test", "Precision Products",
"France");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println(metadata);
}
});

}
producer.close();
```

### 配置

- acks
- 定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的
- acks=0 ,生产者在成功写入消息之前不会等待任何来自服务器的响应 当 broker 故障时有可能 丢失数据
- acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应 如果在 follower同步成功之前 leader 故障,那么将会丢失数据
- acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成 数据重复
- buffer.memory
- 设置生产者内存缓冲区的大小
- compression.type
- 设置消息压缩算法
- retries
- 决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误
- batch.size
- 指定了一个批次可以使用的内存大小
- linger.ms
- KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去
- client.id
- max.in.flight.requests.per.connection
- 指定了生产者在收到服务器响应之前可以发送多少个消息
- timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
- max.block.ms
- 调用send时最长的阻塞时间
- max.request.siz
- receive.buffer.bytes 和 send.buffer.bytes
- 分别指定了 TCP socket 接收和发送数据包的缓冲区大小

**顺序保证**

- 将max.in.flight.requests.per.connection设置为1

![屏幕截图 2020-08-24 085111](/assets/屏幕截图%202020-08-24%20085111.png)

保证顺序的方法就是:

1. 每个主题只分为一个区
2. 每次发送的消息发送到同一个分区

### 序列化器

- 自定义序列化器:实现`Serializer`接口
- 不推荐使用
- 其他序列化
- avro:一种将shcema嵌入在数据里的序列化方式

### 分区策略

分区的原因:

- 方便扩展
- 提高并发

分区原则:

- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值
- 否则就是随机取一个值 然后再这个值的基础上进行轮询

自定义分区器:

实现`Partitioner`接口

### 数据可靠性保证

- Ecactly Once

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 AtLeast Once 语义

At Least Once + 幂等性 = Exactly Once

## 深入

### 集群成员关系
Expand Down
100 changes: 100 additions & 0 deletions 中间件/消息队列/Kafka/生产者.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# 生产者

![屏幕截图 2020-08-20 150154](/assets/屏幕截图%202020-08-20%20150154.png)

发送消息:

```java
Properties props = new Properties();
//kafka 集群,broker-list
props.put("bootstrap.servers", "172.24.211.140:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new
KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
var record =
new ProducerRecord<>("test", "Precision Products",
"France");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println(metadata);
}
});

}
producer.close();
```

## 配置

- acks
- 定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的
- acks=0 ,生产者在成功写入消息之前不会等待任何来自服务器的响应 当 broker 故障时有可能 丢失数据
- acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应 如果在 follower同步成功之前 leader 故障,那么将会丢失数据
- acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成 数据重复
- buffer.memory
- 设置生产者内存缓冲区的大小
- compression.type
- 设置消息压缩算法
- retries
- 决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误
- batch.size
- 指定了一个批次可以使用的内存大小
- linger.ms
- KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去
- client.id
- max.in.flight.requests.per.connection
- 指定了生产者在收到服务器响应之前可以发送多少个消息
- timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
- max.block.ms
- 调用send时最长的阻塞时间
- max.request.siz
- receive.buffer.bytes 和 send.buffer.bytes
- 分别指定了 TCP socket 接收和发送数据包的缓冲区大小

**顺序保证**

- 将max.in.flight.requests.per.connection设置为1

![屏幕截图 2020-08-24 085111](/assets/屏幕截图%202020-08-24%20085111.png)

保证顺序的方法就是:

1. 每个主题只分为一个区
2. 每次发送的消息发送到同一个分区

## 序列化器

- 自定义序列化器:实现`Serializer`接口
- 不推荐使用
- 其他序列化
- avro:一种将shcema嵌入在数据里的序列化方式

## 分区策略

分区的原因:

- 方便扩展
- 提高并发

分区原则:

- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值
- 否则就是随机取一个值 然后再这个值的基础上进行轮询

自定义分区器:

实现`Partitioner`接口

## 数据可靠性保证

- Exactly Once

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 AtLeast Once 语义

At Least Once + 幂等性 = Exactly Once

0 comments on commit 5799f4c

Please sign in to comment.