diff --git a/persistence/src/main/resources/foxx/spline/services/lineage-overview.js b/persistence/src/main/resources/foxx/spline/services/lineage-overview.js index 978614fb4..4dac7c5e6 100644 --- a/persistence/src/main/resources/foxx/spline/services/lineage-overview.js +++ b/persistence/src/main/resources/foxx/spline/services/lineage-overview.js @@ -98,7 +98,7 @@ function eventLineageOverviewGraph(startEvent, maxDepth) { : { "_id": vert._key, "_class": "za.co.absa.spline.consumer.service.model.ExecutionNode", - "name": vert.extra.appName + "name": vert.name } ) diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/model/entities.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/model/entities.scala index e9ced5c09..a42b044ae 100644 --- a/persistence/src/main/scala/za/co/absa/spline/persistence/model/entities.scala +++ b/persistence/src/main/scala/za/co/absa/spline/persistence/model/entities.scala @@ -100,12 +100,17 @@ object DataSource { * from the inputs to the output. */ case class ExecutionPlan( + name: Option[ExecutionPlan.Name], systemInfo: Map[String, Any], agentInfo: Map[String, Any], extra: Map[String, Any], override val _key: ArangoDocument.Key ) extends Vertex with RootEntity +object ExecutionPlan { + type Name = String +} + /** * Represents a moment in time WHEN a particular execution plan is executed. * It can also hold the result of the execution and related stats, and any other diff --git a/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/ExecutionPlanComponentConverterFactory.scala b/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/ExecutionPlanComponentConverterFactory.scala index f404772ec..6387e1ed3 100644 --- a/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/ExecutionPlanComponentConverterFactory.scala +++ b/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/ExecutionPlanComponentConverterFactory.scala @@ -24,6 +24,7 @@ import za.co.absa.spline.producer.{model => v1} import scala.PartialFunction.condOpt trait ExecutionPlanComponentConverterFactory { + def planNameExtractor: v1.ExecutionPlan => Option[v1_1.ExecutionPlan.Name] def attributeConverter: Option[CachingConverter {type To = v1_1.Attribute}] def expressionConverter: Option[CachingConverter {type To = v1_1.ExpressionLike}] def outputConverter: Option[OperationOutputConverter] @@ -33,6 +34,8 @@ trait ExecutionPlanComponentConverterFactory { object ExecutionPlanComponentConverterFactory { object EmptyFactory extends ExecutionPlanComponentConverterFactory { + override def planNameExtractor: v1.ExecutionPlan => Option[v1_1.ExecutionPlan.Name] = _ => None + override def attributeConverter: Option[AttributeConverter with CachingConverter] = None override def expressionConverter: Option[ExpressionConverter with CachingConverter] = None diff --git a/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/ModelMapperV1.scala b/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/ModelMapperV1.scala index 0ad3a5398..1de92c48d 100644 --- a/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/ModelMapperV1.scala +++ b/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/ModelMapperV1.scala @@ -43,6 +43,7 @@ object ModelMapperV1 extends ModelMapper { val maybeExpressionConverter = epccf.expressionConverter val maybeOutputConverter = epccf.outputConverter val objectConverter = epccf.objectConverter + val planNameExtractor = epccf.planNameExtractor val operationConverter = new OperationConverter(objectConverter, maybeOutputConverter) with CachingConverter @@ -62,6 +63,7 @@ object ModelMapperV1 extends ModelMapper { ExecutionPlan( id = plan1.id, + name = planNameExtractor(plan1), operations = operations, attributes = attributes, expressions = maybeExpressions, diff --git a/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/spark/Fields.scala b/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/spark/Fields.scala new file mode 100644 index 000000000..a92f3fe34 --- /dev/null +++ b/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/spark/Fields.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.spline.producer.modelmapper.v1.spark + +object Fields { + + object Extra { + val AppName = "appName" + } + +} diff --git a/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/spark/SparkSplineExecutionPlanComponentConverterFactory.scala b/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/spark/SparkSplineExecutionPlanComponentConverterFactory.scala index 5f7664bdf..c413f5923 100644 --- a/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/spark/SparkSplineExecutionPlanComponentConverterFactory.scala +++ b/producer-model-mapper/src/main/scala/za/co/absa/spline/producer/modelmapper/v1/spark/SparkSplineExecutionPlanComponentConverterFactory.scala @@ -27,6 +27,8 @@ import scala.util.Try class SparkSplineExecutionPlanComponentConverterFactory(agentVersion: String, plan1: v1.ExecutionPlan) extends ExecutionPlanComponentConverterFactory { + override def planNameExtractor: v1.ExecutionPlan => Option[v1_1.ExecutionPlan.Name] = _.extraInfo.get(Fields.Extra.AppName).map(_.toString) + override def expressionConverter: Option[CachingConverter {type To = v1_1.ExpressionLike}] = Some(_expressionConverter) override def attributeConverter: Option[CachingConverter {type To = v1_1.Attribute}] = Some(_attributeConverter) diff --git a/producer-model/src/main/scala/za/co/absa/spline/producer/model/v1_1/executionPlan.scala b/producer-model/src/main/scala/za/co/absa/spline/producer/model/v1_1/executionPlan.scala index d2614f741..bc417965b 100644 --- a/producer-model/src/main/scala/za/co/absa/spline/producer/model/v1_1/executionPlan.scala +++ b/producer-model/src/main/scala/za/co/absa/spline/producer/model/v1_1/executionPlan.scala @@ -20,6 +20,7 @@ import java.util.UUID case class ExecutionPlan( id: UUID = UUID.randomUUID(), + name: Option[ExecutionPlan.Name], operations: Operations, attributes: Seq[Attribute] = Nil, @@ -32,13 +33,18 @@ case class ExecutionPlan( // User payload extraInfo: Map[String, Any] = Map.empty ) { - def dataSources: Set[String] = { + def dataSources: Set[ExecutionPlan.DataSourceUri] = { val readSources = operations.reads.flatMap(_.inputSources).toSet val writeSource = operations.write.outputSource readSources + writeSource } } +object ExecutionPlan { + type Name = String + type DataSourceUri = String +} + case class Operations( write: WriteOperation, reads: Seq[ReadOperation] = Nil, diff --git a/producer-rest-core/src/test/scala/za/co/absa/spline/producer/rest/controller/ExecutionPlansControllerDeserFixAspectSpec.scala b/producer-rest-core/src/test/scala/za/co/absa/spline/producer/rest/controller/ExecutionPlansControllerDeserFixAspectSpec.scala index bb9e5baa2..d1f97b4f3 100644 --- a/producer-rest-core/src/test/scala/za/co/absa/spline/producer/rest/controller/ExecutionPlansControllerDeserFixAspectSpec.scala +++ b/producer-rest-core/src/test/scala/za/co/absa/spline/producer/rest/controller/ExecutionPlansControllerDeserFixAspectSpec.scala @@ -118,6 +118,7 @@ object ExecutionPlansControllerDeserFixAspectSpec { ExecutionPlan( id = planId, + name = None, operations = Operations( write = WriteOperation( id = "1", diff --git a/producer-services/src/main/scala/za/co/absa/spline/producer/service/model/ExecutionPlanPersistentModelBuilder.scala b/producer-services/src/main/scala/za/co/absa/spline/producer/service/model/ExecutionPlanPersistentModelBuilder.scala index 2265aef6e..d8b667519 100644 --- a/producer-services/src/main/scala/za/co/absa/spline/producer/service/model/ExecutionPlanPersistentModelBuilder.scala +++ b/producer-services/src/main/scala/za/co/absa/spline/producer/service/model/ExecutionPlanPersistentModelBuilder.scala @@ -75,10 +75,11 @@ class ExecutionPlanPersistentModelBuilder private( def build(): ExecutionPlanPersistentModel = { val pmExecutionPlan = pm.ExecutionPlan( + name = ep.name, + _key = ep.id.toString, systemInfo = ep.systemInfo.toJsonAs[Map[String, Any]], agentInfo = ep.agentInfo.map(_.toJsonAs[Map[String, Any]]).orNull, - extra = ep.extraInfo, - _key = ep.id.toString) + extra = ep.extraInfo) val pmExecutes = EdgeDef.Executes.edge(ep.id, keyCreator.asOperationKey(ep.operations.write.id), epPKey) diff --git a/producer-services/src/main/scala/za/co/absa/spline/producer/service/repo/ExecutionProducerRepositoryImpl.scala b/producer-services/src/main/scala/za/co/absa/spline/producer/service/repo/ExecutionProducerRepositoryImpl.scala index a535f3ac1..694b3f1c8 100644 --- a/producer-services/src/main/scala/za/co/absa/spline/producer/service/repo/ExecutionProducerRepositoryImpl.scala +++ b/producer-services/src/main/scala/za/co/absa/spline/producer/service/repo/ExecutionProducerRepositoryImpl.scala @@ -83,7 +83,7 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) exte | RETURN { | "${ExecutionPlanDetails.ExecutionPlanId}" : ep._key, | "${ExecutionPlanDetails.FrameworkName}" : CONCAT(ep.systemInfo.name, " ", ep.systemInfo.version), - | "${ExecutionPlanDetails.ApplicationName}" : ep.extra.appName, + | "${ExecutionPlanDetails.ApplicationName}" : ep.name, | "${ExecutionPlanDetails.DataSourceUri}" : writeOp.outputSource, | "${ExecutionPlanDetails.DataSourceType}" : writeOp.extra.destinationType, | "${ExecutionPlanDetails.Append}" : writeOp.append