Skip to content

Commit

Permalink
Spline 831 Producer API enhancement (#860)
Browse files Browse the repository at this point in the history
* spline #831 move methods from `AttrOrExprRef` case class to an implicit extension class to prevent Swagger from generating unwanted properties out of them.

* spline #831 + `ExecutionPlan.name` property

* spline #831 + `ExecutionEvent.durationNs` property

* spline #831 refactoring: use a case class instead of a map

* spline #831 + `OperationLike.name` property

* spline #831 clean up: remove redundant `OperationID` type alias

* spline #831 expose new properties on the Consumer API

* spline #831 rename `(Expression | Attribute).childIds` to `(Expression | Attribute).childRefs`

* spline #831 Fix ser/de

* spline #831 Consumer API: rename property for backward compat with UI 0.5

* spline #831 update Swagger doc

* spline #831 fix operation schema inference for schema agnostic operation graphs

* spline #831 fix persistent model builder for schema agnostic operation graphs

* spline #831 Producer model: refine operation case class structure as well as Swagger def

* spline #831: fix refactoring leftover

* spline #831 Consumer model: fix query for compatibility with Spline Web Client 0.5

* spline #846 Kafka Gateway: rename package

* spline #831 Update Swagger doc
  • Loading branch information
wajda committed Apr 12, 2021
1 parent 3c9c57c commit 02cd81c
Show file tree
Hide file tree
Showing 40 changed files with 457 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class JsonPath(tokens: Seq[Token]) {

object JsonPath {

// fixme: too many Any's. Need to do something with it... one day... probably
// fixme: too many Any's. Need to do something with it (perhaps proper algebraic types in Scala 3 will solve it)
private type Token = Any // String (property) or Int (index)
private type Target = Any // Map[String, Any] or Seq[Any]
private type Value = Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class OperationDetailsControllerTest extends AsyncFunSuite with MockitoSugar wit

private val operation: Operation = new Operation(
_id = "2141834d-abd6-4be4-80b9-01661b842ab9",
`type` = "Transformation",
name = "Project",
_type = "Transformation",
name = Some("Project"),
properties = null
)
private val dataTypes = Array[DataType](
Expand Down Expand Up @@ -297,7 +297,7 @@ class OperationDetailsControllerTest extends AsyncFunSuite with MockitoSugar wit
operation,
dataTypes,
schemas,
Array(new Integer(0)),
Array(0),
1
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package za.co.absa.spline.consumer.service.model

import io.swagger.annotations.{ApiModel, ApiModelProperty}
import za.co.absa.spline.consumer.service.model.LineageDetailed.OperationID

@ApiModel(description = "Attribute Node")
case class AttributeNode
Expand All @@ -29,10 +28,10 @@ case class AttributeNode
name: String,

@ApiModelProperty(value = "Operation Id in which the attribute was created")
originOpId: OperationID,
originOpId: Operation.Id,

@ApiModelProperty(value = "Operation Ids which the attribute passes through")
transOpIds: Seq[OperationID]
transOpIds: Seq[Operation.Id]

) extends Graph.Node {
def this() = this(null, null, null, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ case class ExecutionPlanInfo
(
@ApiModelProperty(value = "Execution plan Id")
_id: Id,
@ApiModelProperty(value = "Name of the execution plan (script / application / job)")
name: Option[String],
@ApiModelProperty(value = "Name and version of the system or framework that created this execution plan")
systemInfo: Map[String, Any],
@ApiModelProperty(value = "Name and version of the Spline agent that collected this execution plan")
Expand All @@ -37,7 +39,7 @@ case class ExecutionPlanInfo
@ApiModelProperty(value = "Write destination")
output: DataSourceInfo
) {
def this() = this(null, null, null, null, null, null)
def this() = this(null, null, null, null, null, null, null)
}

object ExecutionPlanInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,3 @@ case class LineageDetailed(
) {
def this() = this(null, null)
}

object LineageDetailed {
type OperationID = String
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@
package za.co.absa.spline.consumer.service.model

import io.swagger.annotations.{ApiModel, ApiModelProperty}
import za.co.absa.spline.consumer.service.model.LineageDetailed.OperationID

@ApiModel(description = "Operation")
case class Operation
(
@ApiModelProperty(value = "Operation Id")
_id: OperationID,
_id: Operation.Id,
@ApiModelProperty(value = "Type of the operation", example = "Read / Transformation / Write")
`type`: String,
_type: Operation.Type,
@ApiModelProperty(value = "Name of the operation")
name: String,
name: Option[String],
@ApiModelProperty(value = "Properties of the operation")
properties: Map[String, Any]
) extends Graph.Node {
override type Id = OperationID
override type Id = Operation.Id

def this() = this(null, null, null, null)
}

object Operation {
type Id = String
type Type = String // Read / Transformation / Write
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@
package za.co.absa.spline.consumer.service.model

import io.swagger.annotations.{ApiModel, ApiModelProperty}
import za.co.absa.spline.consumer.service.model.LineageDetailed.OperationID

@ApiModel(description="Link between operations")
case class Transition
(
@ApiModelProperty(value = "Source Operation")
source: OperationID,
source: Operation.Id,
@ApiModelProperty(value = "Target Operation")
target: OperationID
target: Operation.Id
) extends Graph.Edge {
def this() = this(null, null)

override type JointId = OperationID
override type JointId = Operation.Id
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@ import io.swagger.annotations.ApiModelProperty
case class WriteEventInfo
(
@ApiModelProperty(value = "Id of the execution event")
executionEventId: String,
executionEventId: WriteEventInfo.Id,
@ApiModelProperty(value = "Id of the execution plan")
executionPlanId: String,
executionPlanId: ExecutionPlanInfo.Id,
@ApiModelProperty(value = "Name of the framework that triggered this execution event")
frameworkName: String,
@ApiModelProperty(value = "Name of the application/job")
applicationName: String,
@ApiModelProperty(value = "Id of the application/job")
applicationId: String,
@ApiModelProperty(value = "When the execution was triggered")
timestamp: Long,
timestamp: WriteEventInfo.Timestamp,
@ApiModelProperty(value = "When the execution was triggered")
durationNs: WriteEventInfo.DurationNs,
@ApiModelProperty(value = "Output data source name")
dataSourceName: String,
@ApiModelProperty(value = "Output data source URI")
Expand All @@ -40,10 +42,12 @@ case class WriteEventInfo
@ApiModelProperty(value = "Write mode - (true=Append; false=Override)")
append: Boolean
) {
def this() = this(null, null, null, null, null, 0, null, null, null, false)
def this() = this(null, null, null, null, null, 0, 0, null, null, null, false)
}

object WriteEventInfo {
type Id = String
type Timestamp = Long
type DurationNs = Long
}

Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class DataSourceRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends Dat
|
| LET resItem = {
| "executionEventId" : lwe._key,
| "executionPlanId" : lwe.execPlanDetails.executionPlanId,
| "executionPlanId" : lwe.execPlanDetails.executionPlanKey,
| "frameworkName" : lwe.execPlanDetails.frameworkName,
| "applicationName" : lwe.execPlanDetails.applicationName,
| "applicationId" : lwe.extra.appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ExecutionEventRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends
|
| LET resItem = {
| "executionEventId" : ee._key,
| "executionPlanId" : ee.execPlanDetails.executionPlanId,
| "executionPlanId" : ee.execPlanDetails.executionPlanKey,
| "frameworkName" : ee.execPlanDetails.frameworkName,
| "applicationName" : ee.execPlanDetails.applicationName,
| "applicationId" : ee.extra.appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class ExecutionPlanRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends
| "graph": {
| "nodes": ops[* RETURN {
| "_id" : CURRENT._key,
| "type": CURRENT.type,
| "name" : CURRENT.extra.name
| "_type": CURRENT.type,
| "name" : CURRENT.name || CURRENT.type
| }],
| "edges": edges[* RETURN {
| "source": PARSE_IDENTIFIER(CURRENT._to).key,
Expand All @@ -91,7 +91,11 @@ class ExecutionPlanRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends
| "_id" : execPlan._key,
| "systemInfo": execPlan.systemInfo,
| "agentInfo" : execPlan.agentInfo,
| "extra" : MERGE(execPlan.extra, { attributes }),
| "extra" : MERGE(
| execPlan.extra,
| { attributes },
| { "appName" : execPlan.name || execPlan._key }
| ),
| "inputs" : inputs,
| "output" : output
| }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class OperationRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends Oper
|
| LET dataTypesFormatted = (
| LET execPlan = DOCUMENT(ope._belongsTo)
| FOR d IN execPlan.extra.dataTypes
| FOR d IN execPlan.extra.dataTypes || []
| RETURN MERGE(
| KEEP(d, "id", "name", "fields", "nullable", "elementDataTypeId"),
| {
Expand All @@ -72,8 +72,8 @@ class OperationRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends Oper
| RETURN {
| "operation": {
| "_id" : ope._key,
| "type" : ope.type,
| "name" : ope.extra.name,
| "_type" : ope.type,
| "name" : ope.name || ope.type,
| "properties": MERGE(
| {
| "inputSources": ope.inputSources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.spline.producer.kafka
package za.co.absa.spline.gateway.kafka

import org.springframework.web.WebApplicationInitializer
import org.springframework.web.context.ContextLoaderListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.spline.producer.kafka
package za.co.absa.spline.gateway.kafka

import com.fasterxml.jackson.databind.{ObjectMapper, PropertyNamingStrategies, PropertyNamingStrategy}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.spline.producer.kafka.listener
package za.co.absa.spline.gateway.kafka.listener

import org.slf4s.Logging
import org.springframework.beans.factory.annotation.Autowired
Expand All @@ -23,7 +23,7 @@ import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.handler.annotation.Header
import org.springframework.stereotype.Component
import za.co.absa.commons.annotation.Unstable
import za.co.absa.spline.producer.kafka.KafkaGatewayConfig
import za.co.absa.spline.gateway.kafka.KafkaGatewayConfig
import za.co.absa.spline.producer.model.v1_1.{ExecutionEvent, ExecutionPlan}
import za.co.absa.spline.producer.service.repo.ExecutionProducerRepository

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.spline.producer.kafka
package za.co.absa.spline.gateway.kafka

package object listener {
trait _package
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ function eventLineageOverviewGraph(startEvent, maxDepth) {
: MERGE(KEEP(vert, ["systemInfo", "agentInfo"]), {
"_id": vert._key,
"_class": "za.co.absa.spline.consumer.service.model.ExecutionNode",
"name": vert.extra.appName
"name": vert.name || ""
})
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,33 +100,45 @@ object DataSource {
* from the inputs to the output.
*/
case class ExecutionPlan(
name: Option[ExecutionPlan.Name],
systemInfo: Map[String, Any],
agentInfo: Map[String, Any],
extra: Map[String, Any],
override val _key: ArangoDocument.Key
) extends Vertex with RootEntity

object ExecutionPlan {
type Name = String
}

/**
* Represents a moment in time WHEN a particular execution plan is executed.
* It can also hold the result of the execution and related stats, and any other
* custom data logically connected to the event.
*/
case class Progress(
timestamp: Long,
durationNs: Option[Progress.JobDurationInNanos],
error: Option[Any],
extra: Map[String, Any],
override val _key: String,
override val _key: ArangoDocument.Key,
execPlanDetails: ExecPlanDetails
) extends Vertex with RootEntity

object Progress {
type JobDurationInNanos = Long
}

/**
* These values are copied from other entities for performance optimization.
*/
case class ExecPlanDetails(
executionPlanId: ArangoDocument.Key,
executionPlanKey: ArangoDocument.Key,
frameworkName: String,
applicationName: String,
dataSourceUri: DataSource.Uri,
dataSourceType: String,
append: Boolean
)
) {
def this() = this(null, null, null, null, null, false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,52 @@
package za.co.absa.spline.persistence.model

sealed trait Operation extends Vertex {
def name: Option[Operation.Name]
def params: Map[String, Any]
def extra: Map[String, Any]
def `type`: String
def `type`: Operation.Type
}

object Operation {
type Name = String
type Type = String
}

case class Read(
inputSources: Seq[String],
override val name: Option[Operation.Name],
override val params: Map[String, Any],
override val extra: Map[String, Any],
override val _key: ArangoDocument.Key,
override val _belongsTo: Option[ArangoDocument.Id]
) extends Operation {
def this() = this(null, null, null, null, null)
def this() = this(null, null, null, null, null, null)

override val `type`: String = "Read"
override val `type`: Operation.Type = "Read"
}

case class Write(
outputSource: String,
append: Boolean,
override val name: Option[Operation.Name],
override val params: Map[String, Any],
override val extra: Map[String, Any],
override val _key: ArangoDocument.Key,
override val _belongsTo: Option[ArangoDocument.Id]
) extends Operation {
def this() = this(null, false, null, null, null, null)
def this() = this(null, false, null, null, null, null, null)

override val `type`: String = "Write"
override val `type`: Operation.Type = "Write"
}

case class Transformation(
override val name: Option[Operation.Name],
override val params: Map[String, Any],
override val extra: Map[String, Any],
override val _key: ArangoDocument.Key,
override val _belongsTo: Option[ArangoDocument.Id]
) extends Operation {
def this() = this(null, null, null, null)
def this() = this(null, null, null, null, null)

override val `type`: String = "Transformation"
override val `type`: Operation.Type = "Transformation"
}
Loading

0 comments on commit 02cd81c

Please sign in to comment.