-
Notifications
You must be signed in to change notification settings - Fork 8
/
VehicleEventProducer.java
51 lines (37 loc) · 1.77 KB
/
VehicleEventProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.sn.challenge.producer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sn.challenge.model.Vehicle;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.List;
/**
* @author Ehsan Sh
*/
@Component
@Slf4j
public class VehicleEventProducer {
private final String topic = "vehicle-position-events";
private final KafkaTemplate<String,Vehicle> kafkaTemplate;
private final ObjectMapper objectMapper;
public VehicleEventProducer(KafkaTemplate<String, Vehicle> kafkaTemplate, ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
}
public ListenableFuture<SendResult<String,Vehicle>> sendVehicleEvents(Vehicle vehicle) {
String key = vehicle.getVin();
ProducerRecord<String,Vehicle> producerRecord = buildProducerRecord(key, vehicle, topic);
ListenableFuture<SendResult<String,Vehicle>> listenableFuture = kafkaTemplate.send(producerRecord);
listenableFuture.addCallback(new VehicleEventListenableFutureCallback(key,vehicle));
return listenableFuture;
}
private ProducerRecord<String, Vehicle> buildProducerRecord(String key, Vehicle value, String topic) {
List<Header> recordHeaders = List.of(new RecordHeader("event-source", "Rest Call".getBytes()));
return new ProducerRecord<String, Vehicle>(topic, null, key, value, recordHeaders);
}
}