-
Notifications
You must be signed in to change notification settings - Fork 1
/
kafka_producer.rs
70 lines (63 loc) · 2.13 KB
/
kafka_producer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use schema_registry_converter::async_impl::proto_raw::ProtoRawEncoder;
use schema_registry_converter::async_impl::schema_registry::SrSettings;
use schema_registry_converter::schema_registry_common::SubjectNameStrategy;
pub struct RecordProducer<'a> {
producer: FutureProducer,
proto_encoder: ProtoRawEncoder<'a>,
}
pub enum Name{
Person,
AdressUpdate,
}
fn get_full_name(name: &Name) -> &'static str{
match name {
Name::Person => "tech.gklijs.kgpoc.Person",
Name::AdressUpdate => "tech.gklijs.kgpoc.AddressUpdate",
}
}
fn get_topic(name: &Name) -> &'static str{
match name {
Name::Person => "persons",
Name::AdressUpdate => "address-updates",
}
}
impl RecordProducer <'_>{
pub async fn send_proto(
&'_ mut self,
key_bytes: Vec<u8>,
value_bytes: Vec<u8>,
name: Name,
) {
let value_strategy= SubjectNameStrategy::TopicNameStrategy(String::from(get_topic(&name)), false);
let payload = match self.proto_encoder.encode(&*value_bytes, get_full_name(&name), value_strategy).await {
Ok(v) => v,
Err(e) => panic!("Error getting payload: {}", e),
};
let fr = FutureRecord {
topic: get_topic(&name),
partition: None,
payload: Some(&payload),
key: Some(&key_bytes),
timestamp: None,
headers: None,
};
self.producer.send_result(fr).unwrap().await.unwrap().unwrap();
}
}
pub fn get_producer(brokers: &str, schema_registry_url: String) -> RecordProducer {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("produce.offset.report", "true")
.set("message.timeout.ms", "60000")
.set("queue.buffering.max.messages", "10")
.create()
.expect("Producer creation error");
let sr_settings = SrSettings::new(schema_registry_url);
let proto_encoder = ProtoRawEncoder::new(sr_settings);
RecordProducer {
producer,
proto_encoder,
}
}