-
Notifications
You must be signed in to change notification settings - Fork 90
/
OpenLineageModelMapper.scala
115 lines (99 loc) · 4.24 KB
/
OpenLineageModelMapper.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package za.co.absa.spline.harvester.dispatcher.modelmapper
import za.co.absa.spline.commons.lang.extensions.NonOptionExtension._
import za.co.absa.spline.commons.lang.extensions.TraversableExtension._
import za.co.absa.spline.commons.version.Version
import za.co.absa.spline.harvester.LineageHarvester
import za.co.absa.spline.harvester.dispatcher.ProducerApiVersion.JsonSchemaURLs
import za.co.absa.spline.harvester.dispatcher.modelmapper.OpenLineageModelMapper._
import za.co.absa.spline.harvester.dispatcher.openlineage.model.facet.SplinePayloadFacet
import za.co.absa.spline.producer.model.openlineage.v0_3_1._
import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan}
import java.time.{Duration, Instant}
import java.util.UUID
class OpenLineageModelMapper(splineModelMapper: ModelMapper[_, _], apiVersion: Version, namespace: String) {
def toDtos(plan: ExecutionPlan, event: ExecutionEvent): Seq[RunEvent] = {
val runId = UUID.randomUUID()
val job = Job(namespace = namespace, name = plan.name, facets = None)
val completeTime = Instant.ofEpochMilli(event.timestamp)
val duration = Duration.ofNanos(event.durationNs.getOrElse(0))
val startTime = completeTime.minus(duration)
val eventStart = RunEvent(
eventType = EventType.Start.toOption,
eventTime = java.util.Date.from(startTime),
run = Run(runId = runId, facets = None),
job = job,
inputs = None,
outputs = None,
producer = Producer,
schemaURL = SchemaUrl
)
val eventCompleted = RunEvent(
eventType = event.error.map(_ => EventType.Fail).orElse(EventType.Complete.toOption),
eventTime = java.util.Date.from(completeTime),
run = Run(runId = runId, facets = Some(Map(
SplinePlan -> createSplinePayloadFacet(splineModelMapper.toDTO(plan), JsonSchemaURLs.planSchemaForAPIVersion(apiVersion)),
SplineEvent -> createSplinePayloadFacet(splineModelMapper.toDTO(event), JsonSchemaURLs.eventSchemaForAPIVersion(apiVersion))
))),
job = job,
inputs = plan.operations.reads
.flatMap(ro => ro.inputSources.map(createInputDataset))
.toNonEmptyOption,
outputs = Some(Seq(createOutputDataset(plan.operations.write.outputSource))),
producer = Producer,
schemaURL = SchemaUrl
)
Seq(eventStart, eventCompleted)
}
private def createSplinePayloadFacet(payload: AnyRef, payloadSchemaUrl: String) =
new SplinePayloadFacet(
_producer = Producer,
_schemaURL = PayloadFacetSchemaUrl,
payloadSchemaURL = payloadSchemaUrl,
payload = payload
)
private def createInputDataset(source: String): InputDataset = {
val (namespace, name) = OpenLineageUriMapper.uriToNamespaceAndName(source)
InputDataset(
namespace = namespace,
name = name,
facets = None,
inputFacets = None
)
}
private def createOutputDataset(source: String): OutputDataset = {
val (namespace, name) = OpenLineageUriMapper.uriToNamespaceAndName(source)
OutputDataset(
namespace = namespace,
name = name,
facets = None,
outputFacets = None
)
}
}
object OpenLineageModelMapper {
private val Producer = s"https://github.com/AbsaOSS/spline-spark-agent/tree/release/${LineageHarvester.SplineVersionInfo.version}"
private val SchemaUrl = "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/RunEvent"
private val PayloadFacetSchemaUrl = "https://cdn.jsdelivr.net/gh/AbsaOSS/spline@api-doc/schemas/openlineage/spline-payload-facet-1.0.json"
object EventType {
val Start = "START"
val Complete = "COMPLETE"
val Fail = "FAIL"
}
private val SplineEvent = "splineEvent"
private val SplinePlan = "splineEvent"
}