-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
CustomDataQualityRulesMCLSideEffect.java
82 lines (72 loc) · 2.87 KB
/
CustomDataQualityRulesMCLSideEffect.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.linkedin.metadata.aspect.plugins.hooks;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.entity.ebean.batch.MCLItemImpl;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.mycompany.dq.DataQualityRuleEvent;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
public class CustomDataQualityRulesMCLSideEffect extends MCLSideEffect {
private AspectPluginConfig config;
@Override
protected Stream<MCLItem> applyMCLSideEffect(
@Nonnull Collection<MCLItem> mclItems, @Nonnull RetrieverContext retrieverContext) {
return mclItems.stream()
.map(
item -> {
// Generate Timeseries event aspect based on non-Timeseries aspect
MetadataChangeLog originMCP = item.getMetadataChangeLog();
return buildEvent(originMCP)
.map(
event -> {
try {
MetadataChangeLog eventMCP = originMCP.clone();
eventMCP.setAspect(GenericRecordUtils.serializeAspect(event));
eventMCP.setAspectName("customDataQualityRuleEvent");
return eventMCP;
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
})
.map(
eventMCP ->
MCLItemImpl.builder()
.metadataChangeLog(eventMCP)
.build(retrieverContext.getAspectRetriever()));
})
.filter(Optional::isPresent)
.map(Optional::get);
}
private Optional<DataQualityRuleEvent> buildEvent(MetadataChangeLog originMCP) {
if (originMCP.getAspect() != null) {
DataQualityRuleEvent event = new DataQualityRuleEvent();
if (event.getActor() != null) {
event.setActor(event.getActor());
}
event.setEventTimestamp(originMCP.getSystemMetadata().getLastObserved());
event.setTimestampMillis(originMCP.getSystemMetadata().getLastObserved());
if (originMCP.getPreviousAspectValue() == null) {
event.setEventType("RuleCreated");
} else {
event.setEventType("RuleUpdated");
}
event.setAffectedDataset(originMCP.getEntityUrn());
return Optional.of(event);
}
return Optional.empty();
}
@Nonnull
@Override
public AspectPluginConfig getConfig() {
return config;
}
@Override
public CustomDataQualityRulesMCLSideEffect setConfig(@Nonnull AspectPluginConfig config) {
this.config = config;
return this;
}
}