/
AuditEventProducer.java
161 lines (147 loc) · 7.41 KB
/
AuditEventProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package org.folio.event.service;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.event.AuditEventType;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.kafka.SimpleKafkaProducerManager;
import org.folio.kafka.services.KafkaProducerRecordBuilder;
import org.folio.rest.jaxrs.model.Metadata;
import org.folio.rest.jaxrs.model.OrderAuditEvent;
import org.folio.rest.jaxrs.model.OrderLineAuditEvent;
import org.folio.rest.jaxrs.model.OutboxEventLog.EntityType;
import org.folio.rest.jaxrs.model.Piece;
import org.folio.rest.jaxrs.model.PieceAuditEvent;
import org.folio.rest.jaxrs.model.PoLine;
import org.folio.rest.jaxrs.model.PurchaseOrder;
import org.folio.rest.tools.utils.TenantTool;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class AuditEventProducer {
private static final Logger log = LogManager.getLogger();
private final KafkaConfig kafkaConfig;
/**
* Sends event for order change(Create, Edit, Delete) to kafka.
* OrderId is used as partition key to send all events for particular order to the same partition.
*
* @param order the event payload
* @param eventAction the event action
* @param okapiHeaders the okapi headers
* @return future with true if sending was success or failed future in another case
*/
public Future<Boolean> sendOrderEvent(PurchaseOrder order,
OrderAuditEvent.Action eventAction,
Map<String, String> okapiHeaders) {
OrderAuditEvent event = getOrderEvent(order, eventAction);
log.info("Starting to send event with id: {} for Order to Kafka for orderId: {}", event.getId(), order.getId());
return sendToKafka(AuditEventType.ACQ_ORDER_CHANGED, event, okapiHeaders, event.getOrderId(), EntityType.ORDER)
.onFailure(t -> log.warn("sendOrderEvent failed, order id={}", order.getId(), t));
}
/**
* Sends change event for order line to kafka.
* OrderLineId is used as partition key to send all events for particular order to the same partition.
*
* @param poLine the event payload
* @param eventAction the event action
* @param okapiHeaders the okapi headers
* @return future with true if sending was success or failed future otherwise
*/
public Future<Boolean> sendOrderLineEvent(PoLine poLine,
OrderLineAuditEvent.Action eventAction,
Map<String, String> okapiHeaders) {
OrderLineAuditEvent event = getOrderLineEvent(poLine, eventAction);
log.info("Starting to send event with id: {} for Order Line to Kafka for orderLineId: {}", event.getId(),
poLine.getId());
return sendToKafka(AuditEventType.ACQ_ORDER_LINE_CHANGED, event, okapiHeaders, event.getOrderLineId(), EntityType.ORDER_LINE)
.onFailure(t -> log.warn("sendOrderLineEvent failed, poLine id={}", poLine.getId(), t));
}
/**
* Sends change event for piece to kafka.
* PieceId is used as partition key to send all events for particular piece to the same partition.
*
* @param piece the event payload
* @param eventAction the event action
* @param okapiHeaders the okapi headers
* @return future with true if sending was success or failed future otherwise
*/
public Future<Boolean> sendPieceEvent(Piece piece,
PieceAuditEvent.Action eventAction,
Map<String, String> okapiHeaders) {
PieceAuditEvent event = getPieceEvent(piece, eventAction);
log.info("Starting to send event with id: {} for Piece to Kafka for pieceId: {}", event.getId(),
piece.getId());
return sendToKafka(AuditEventType.ACQ_PIECE_CHANGED, event, okapiHeaders, event.getPieceId(), EntityType.PIECE)
.onFailure(t -> log.warn("sendPieceEvent failed, piece id={}", piece.getId(), t));
}
private OrderAuditEvent getOrderEvent(PurchaseOrder order, OrderAuditEvent.Action eventAction) {
Metadata metadata = order.getMetadata();
return new OrderAuditEvent()
.withId(UUID.randomUUID().toString())
.withAction(eventAction)
.withOrderId(order.getId())
.withEventDate(new Date())
.withActionDate(metadata.getUpdatedDate())
.withUserId(metadata.getUpdatedByUserId())
.withOrderSnapshot(order.withMetadata(null)); // not populate metadata to not include it in snapshot's comparation in UI
}
private OrderLineAuditEvent getOrderLineEvent(PoLine poLine, OrderLineAuditEvent.Action eventAction) {
Metadata metadata = poLine.getMetadata();
return new OrderLineAuditEvent()
.withId(UUID.randomUUID().toString())
.withAction(eventAction)
.withOrderId(poLine.getPurchaseOrderId())
.withOrderLineId(poLine.getId())
.withEventDate(new Date())
.withActionDate(metadata.getUpdatedDate())
.withUserId(metadata.getUpdatedByUserId())
.withOrderLineSnapshot(poLine.withMetadata(null)); // not populate metadata to not include it in snapshot's comparation in UI
}
private PieceAuditEvent getPieceEvent(Piece piece, PieceAuditEvent.Action eventAction) {
Metadata metadata = piece.getMetadata();
return new PieceAuditEvent()
.withId(UUID.randomUUID().toString())
.withAction(eventAction)
.withPieceId(piece.getId())
.withEventDate(new Date())
.withActionDate(metadata.getUpdatedDate())
.withUserId(metadata.getUpdatedByUserId())
.withPieceSnapshot(piece.withMetadata(null)); // not populate metadata to not include it in snapshot's comparation in UI
}
private Future<Boolean> sendToKafka(AuditEventType eventType,
Object eventPayload,
Map<String, String> okapiHeaders,
String key,
EntityType entityType) {
String tenantId = TenantTool.tenantId(okapiHeaders);
String topicName = buildTopicName(kafkaConfig.getEnvId(), tenantId, eventType.getTopicName());
KafkaProducerRecord<String, String> kafkaProducerRecord = new KafkaProducerRecordBuilder<String, Object>(tenantId)
.key(key)
.value(eventPayload)
.topic(topicName)
.propagateOkapiHeaders(okapiHeaders)
.build();
SimpleKafkaProducerManager producerManager = new SimpleKafkaProducerManager(Vertx.currentContext().owner(), kafkaConfig);
KafkaProducer<String, String> producer = producerManager.createShared(topicName);
return producer.send(kafkaProducerRecord)
.map(event -> true)
.onComplete(reply -> {
producer.end(ear -> producer.close());
if (reply.succeeded()) {
log.info("Event with type '{}' for {} id: '{}' was sent to kafka topic '{}'", eventType, entityType, key, topicName);
} else {
log.error("Producer write error for event '{}' for {} id: '{}' for kafka topic '{}'", eventType, entityType, key, topicName, reply.cause());
}
});
}
private String buildTopicName(String envId, String tenantId, String eventType) {
return KafkaTopicNameHelper.formatTopicName(envId, KafkaTopicNameHelper.getDefaultNameSpace(),
tenantId, eventType);
}
}