/
MetadataChangeProposalsProcessor.java
115 lines (104 loc) · 4.79 KB
/
MetadataChangeProposalsProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package com.linkedin.metadata.kafka;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.gms.factory.entityclient.RestliEntityClientFactory;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.FailedMetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.Topics;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@Import({RestliEntityClientFactory.class})
@Conditional(MetadataChangeProposalProcessorCondition.class)
@EnableKafka
@RequiredArgsConstructor
public class MetadataChangeProposalsProcessor {
private final OperationContext systemOperationContext;
private final SystemEntityClient entityClient;
private final Producer<String, IndexedRecord> kafkaProducer;
private final Histogram kafkaLagStats =
MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag"));
@Value(
"${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:"
+ Topics.FAILED_METADATA_CHANGE_PROPOSAL
+ "}")
private String fmcpTopicName;
@KafkaListener(
id = "${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}",
topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}",
containerFactory = "kafkaEventConsumer")
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();
log.info(
"Got MCP event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());
log.debug("Record {}", record);
MetadataChangeProposal event = new MetadataChangeProposal();
try {
event = EventUtils.avroToPegasusMCP(record);
log.debug("MetadataChangeProposal {}", event);
// TODO: Get this from the event itself.
String urn = entityClient.ingestProposal(systemOperationContext, event, false);
log.info("Successfully processed MCP event urn: {}", urn);
} catch (Throwable throwable) {
log.error("MCP Processor Error", throwable);
log.error("Message: {}", record);
sendFailedMCP(event, throwable);
}
}
}
private void sendFailedMCP(@Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) {
final FailedMetadataChangeProposal failedMetadataChangeProposal =
createFailedMCPEvent(event, throwable);
try {
final GenericRecord genericFailedMCERecord =
EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal);
log.debug("Sending FailedMessages to topic - {}", fmcpTopicName);
log.info(
"Error while processing FMCP: FailedMetadataChangeProposal - {}",
failedMetadataChangeProposal);
kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord));
} catch (IOException e) {
log.error(
"Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}",
e.getStackTrace(),
failedMetadataChangeProposal);
}
}
@Nonnull
private FailedMetadataChangeProposal createFailedMCPEvent(
@Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) {
final FailedMetadataChangeProposal fmcp = new FailedMetadataChangeProposal();
fmcp.setError(ExceptionUtils.getStackTrace(throwable));
fmcp.setMetadataChangeProposal(event);
return fmcp;
}
}