Skip to content

Commit

Permalink
spline #831 + ExecutionEvent.durationNs property
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Mar 25, 2021
1 parent 8c35671 commit e8eef9b
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,17 @@ object ExecutionPlan {
*/
case class Progress(
timestamp: Long,
durationNs: Option[Progress.JobDurationInNanos],
error: Option[Any],
extra: Map[String, Any],
override val _key: String,
override val _key: ArangoDocument.Key,
execPlanDetails: ExecPlanDetails
) extends Vertex with RootEntity

object Progress {
type JobDurationInNanos = Long
}

/**
* These values are copied from other entities for performance optimization.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import za.co.absa.commons.graph.GraphImplicits._
import za.co.absa.commons.lang.CachingConverter
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.producer.modelmapper.ModelMapper
import za.co.absa.spline.producer.modelmapper.v1.spark.Fields
import za.co.absa.spline.producer.{model => v1}

object ModelMapperV1 extends ModelMapper {
Expand Down Expand Up @@ -73,12 +74,26 @@ object ModelMapperV1 extends ModelMapper {
)
}

override def fromDTO(event: v1.ExecutionEvent): ExecutionEvent = ExecutionEvent(
planId = event.planId,
timestamp = event.timestamp,
error = event.error,
extra = event.extra
)
override def fromDTO(event: v1.ExecutionEvent): ExecutionEvent = {
// Strictly speaking I should not have been doing this, and instead resolve a linked exec plan,
// look at its `agentInfo` property, find a proper conversion rule and use that one for conversion.
// But realistically speaking I don't believe there are many enough non-Spline v1 agents out there,
// for the risk of misinterpreting the "durationNs" property in extras to be practically possible.
// So I'm going to make a shortcut here for sake of performance and simplicity.
val durationNs = event.extra.get(Fields.Extra.DurationNs).
flatMap(PartialFunction.condOpt(_) {
case num: Long => num
case str: String if str.forall(_.isDigit) => str.toLong
})

ExecutionEvent(
planId = event.planId,
timestamp = event.timestamp,
durationNs = durationNs,
error = event.error,
extra = event.extra
)
}

private def asOperationsObject(ops: Seq[OperationLike]) = {
val (Some(write), reads, others) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ object Fields {

object Extra {
val AppName = "appName"
val DurationNs = "durationNs"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import java.util.UUID
case class ExecutionEvent(
planId: UUID,
timestamp: Long,
durationNs: Option[ExecutionEvent.JobDurationInNanos],
error: Option[Any] = None,
extra: Map[String, Any] = Map.empty
)

object ExecutionEvent {
type JobDurationInNanos = Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,6 @@ object ExecutionProducerRepositoryImpl {
planDetails(ExecutionPlanDetails.DataSourceType).asInstanceOf[String],
planDetails(ExecutionPlanDetails.Append).asInstanceOf[Boolean]
)
Progress(e.timestamp, e.error, e.extra, key, epd)
Progress(e.timestamp, e.durationNs, e.error, e.extra, key, epd)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ExecutionEventKeyCreatorSpec extends AnyFlatSpec with Matchers {
val testEvent = ExecutionEvent(
planId = UUID.fromString("00000000-0000-0000-0000-000000000000"),
timestamp = 1234567890,
durationNs = None,
error = None,
extra = Map.empty)

Expand Down

0 comments on commit e8eef9b

Please sign in to comment.