Skip to content

Commit

Permalink
spline #831 + ExecutionPlan.name property
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Mar 25, 2021
1 parent 7d8eb76 commit 44bbd12
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ function eventLineageOverviewGraph(startEvent, maxDepth) {
: {
"_id": vert._key,
"_class": "za.co.absa.spline.consumer.service.model.ExecutionNode",
"name": vert.extra.appName
"name": vert.name
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,17 @@ object DataSource {
* from the inputs to the output.
*/
case class ExecutionPlan(
name: Option[ExecutionPlan.Name],
systemInfo: Map[String, Any],
agentInfo: Map[String, Any],
extra: Map[String, Any],
override val _key: ArangoDocument.Key
) extends Vertex with RootEntity

object ExecutionPlan {
type Name = String
}

/**
* Represents a moment in time WHEN a particular execution plan is executed.
* It can also hold the result of the execution and related stats, and any other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.spline.producer.{model => v1}
import scala.PartialFunction.condOpt

trait ExecutionPlanComponentConverterFactory {
def planNameExtractor: v1.ExecutionPlan => Option[v1_1.ExecutionPlan.Name]
def attributeConverter: Option[CachingConverter {type To = v1_1.Attribute}]
def expressionConverter: Option[CachingConverter {type To = v1_1.ExpressionLike}]
def outputConverter: Option[OperationOutputConverter]
Expand All @@ -33,6 +34,8 @@ trait ExecutionPlanComponentConverterFactory {
object ExecutionPlanComponentConverterFactory {

object EmptyFactory extends ExecutionPlanComponentConverterFactory {
override def planNameExtractor: v1.ExecutionPlan => Option[v1_1.ExecutionPlan.Name] = _ => None

override def attributeConverter: Option[AttributeConverter with CachingConverter] = None

override def expressionConverter: Option[ExpressionConverter with CachingConverter] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object ModelMapperV1 extends ModelMapper {
val maybeExpressionConverter = epccf.expressionConverter
val maybeOutputConverter = epccf.outputConverter
val objectConverter = epccf.objectConverter
val planNameExtractor = epccf.planNameExtractor

val operationConverter = new OperationConverter(objectConverter, maybeOutputConverter) with CachingConverter

Expand All @@ -62,6 +63,7 @@ object ModelMapperV1 extends ModelMapper {

ExecutionPlan(
id = plan1.id,
name = planNameExtractor(plan1),
operations = operations,
attributes = attributes,
expressions = maybeExpressions,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.producer.modelmapper.v1.spark

object Fields {

object Extra {
val AppName = "appName"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scala.util.Try

class SparkSplineExecutionPlanComponentConverterFactory(agentVersion: String, plan1: v1.ExecutionPlan) extends ExecutionPlanComponentConverterFactory {

override def planNameExtractor: v1.ExecutionPlan => Option[v1_1.ExecutionPlan.Name] = _.extraInfo.get(Fields.Extra.AppName).map(_.toString)

override def expressionConverter: Option[CachingConverter {type To = v1_1.ExpressionLike}] = Some(_expressionConverter)

override def attributeConverter: Option[CachingConverter {type To = v1_1.Attribute}] = Some(_attributeConverter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.UUID

case class ExecutionPlan(
id: UUID = UUID.randomUUID(),
name: Option[ExecutionPlan.Name],

operations: Operations,
attributes: Seq[Attribute] = Nil,
Expand All @@ -32,13 +33,18 @@ case class ExecutionPlan(
// User payload
extraInfo: Map[String, Any] = Map.empty
) {
def dataSources: Set[String] = {
def dataSources: Set[ExecutionPlan.DataSourceUri] = {
val readSources = operations.reads.flatMap(_.inputSources).toSet
val writeSource = operations.write.outputSource
readSources + writeSource
}
}

object ExecutionPlan {
type Name = String
type DataSourceUri = String
}

case class Operations(
write: WriteOperation,
reads: Seq[ReadOperation] = Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ object ExecutionPlansControllerDeserFixAspectSpec {

ExecutionPlan(
id = planId,
name = None,
operations = Operations(
write = WriteOperation(
id = "1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ class ExecutionPlanPersistentModelBuilder private(

def build(): ExecutionPlanPersistentModel = {
val pmExecutionPlan = pm.ExecutionPlan(
name = ep.name,
_key = ep.id.toString,
systemInfo = ep.systemInfo.toJsonAs[Map[String, Any]],
agentInfo = ep.agentInfo.map(_.toJsonAs[Map[String, Any]]).orNull,
extra = ep.extraInfo,
_key = ep.id.toString)
extra = ep.extraInfo)

val pmExecutes = EdgeDef.Executes.edge(ep.id, keyCreator.asOperationKey(ep.operations.write.id), epPKey)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) exte
| RETURN {
| "${ExecutionPlanDetails.ExecutionPlanId}" : ep._key,
| "${ExecutionPlanDetails.FrameworkName}" : CONCAT(ep.systemInfo.name, " ", ep.systemInfo.version),
| "${ExecutionPlanDetails.ApplicationName}" : ep.extra.appName,
| "${ExecutionPlanDetails.ApplicationName}" : ep.name,
| "${ExecutionPlanDetails.DataSourceUri}" : writeOp.outputSource,
| "${ExecutionPlanDetails.DataSourceType}" : writeOp.extra.destinationType,
| "${ExecutionPlanDetails.Append}" : writeOp.append
Expand Down

0 comments on commit 44bbd12

Please sign in to comment.