From e6702e0275dd304d3e26e8c838f20d7edc849bcb Mon Sep 17 00:00:00 2001 From: Oleksandr Vayda Date: Fri, 12 Nov 2021 02:39:55 +0100 Subject: [PATCH 1/4] spline #669 Add a new Producer protocol version 1.2 --- .../scala/za/co/absa/spline/producer/rest/ProducerAPI.scala | 5 +++-- .../producer/rest/controller/ExecutionPlansController.scala | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/ProducerAPI.scala b/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/ProducerAPI.scala index a04a80f83..3e6a21615 100644 --- a/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/ProducerAPI.scala +++ b/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/ProducerAPI.scala @@ -20,10 +20,11 @@ import za.co.absa.commons.version.Version import za.co.absa.commons.version.Version._ object ProducerAPI { - val CurrentVersion: Version = ver"1.1" + val CurrentVersion: Version = ver"1.2" val DeprecatedVersions: Seq[Version] = Seq(ver"1" /*, ...*/) - val LTSVersions: Seq[Version] = Seq(CurrentVersion /*, ...*/) + val LTSVersions: Seq[Version] = Seq(CurrentVersion, ver"1.1" /*, ...*/) val SupportedVersions: Seq[Version] = LTSVersions ++ DeprecatedVersions final val MimeTypeV1_1 = "application/vnd.absa.spline.producer.v1.1+json" + final val MimeTypeV1_2 = "application/vnd.absa.spline.producer.v1.2+json" } diff --git a/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/controller/ExecutionPlansController.scala b/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/controller/ExecutionPlansController.scala index 2b7ea7491..e036de276 100644 --- a/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/controller/ExecutionPlansController.scala +++ b/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/controller/ExecutionPlansController.scala @@ -29,7 +29,10 @@ import za.co.absa.spline.producer.service.repo.ExecutionProducerRepository import scala.concurrent.{ExecutionContext, Future} @RestController -@RequestMapping(consumes = Array(ProducerAPI.MimeTypeV1_1)) +@RequestMapping(consumes = Array( + ProducerAPI.MimeTypeV1_1, + ProducerAPI.MimeTypeV1_2, +)) @Api(tags = Array("execution")) class ExecutionPlansController @Autowired()( val repo: ExecutionProducerRepository) { From a7421107becf2f9ab63fe40dc8c4bb228b9a779e Mon Sep 17 00:00:00 2001 From: Oleksandr Vayda Date: Fri, 12 Nov 2021 23:36:41 +0100 Subject: [PATCH 2/4] spline #669 Add a new Producer protocol version 1.2 --- .../producer/rest/controller/ExecutionEventsController.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/controller/ExecutionEventsController.scala b/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/controller/ExecutionEventsController.scala index 6334423d8..086d32da6 100644 --- a/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/controller/ExecutionEventsController.scala +++ b/producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/controller/ExecutionEventsController.scala @@ -28,7 +28,10 @@ import za.co.absa.spline.producer.service.repo.ExecutionProducerRepository import scala.concurrent.{ExecutionContext, Future} @Controller -@RequestMapping(consumes = Array(ProducerAPI.MimeTypeV1_1)) +@RequestMapping(consumes = Array( + ProducerAPI.MimeTypeV1_1, + ProducerAPI.MimeTypeV1_2, +)) @Api(tags = Array("execution")) class ExecutionEventsController @Autowired()( val repo: ExecutionProducerRepository) { From 140964de33a6ee34b2c48f491704c12dbc0e2684 Mon Sep 17 00:00:00 2001 From: Oleksandr Vayda Date: Sat, 13 Nov 2021 03:02:30 +0100 Subject: [PATCH 3/4] spline #669 Filter out failed execution events in lineage queries --- .../consumer/service/model/WriteEventInfo.scala | 11 +++++++---- .../service/repo/DataSourceRepositoryImpl.scala | 3 ++- .../service/repo/ExecutionEventRepositoryImpl.scala | 3 ++- .../foxx/spline/services/observed-writes-by-read.js | 2 ++ 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/WriteEventInfo.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/WriteEventInfo.scala index 8d6f4fd8a..835f3db33 100644 --- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/WriteEventInfo.scala +++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/WriteEventInfo.scala @@ -32,8 +32,10 @@ case class WriteEventInfo applicationId: String, @ApiModelProperty(value = "When the execution was triggered") timestamp: WriteEventInfo.Timestamp, - @ApiModelProperty(value = "When the execution was triggered") - durationNs: WriteEventInfo.DurationNs, + @ApiModelProperty(value = "Duration of execution in nanoseconds (for successful executions)") + durationNs: Option[WriteEventInfo.DurationNs], + @ApiModelProperty(value = "Error (for failed executions)") + error: Option[WriteEventInfo.Error], @ApiModelProperty(value = "Output data source name") dataSourceName: String, @ApiModelProperty(value = "Output data source URI") @@ -43,13 +45,14 @@ case class WriteEventInfo @ApiModelProperty(value = "Write mode - (true=Append; false=Override)") append: WriteEventInfo.Append ) { - def this() = this(null, null, null, null, null, null, null, null, null, null, null) + def this() = this(null, null, null, null, null, null, null, null, null, null, null, null) } object WriteEventInfo { type Id = String type Timestamp = java.lang.Long - type DurationNs = Option[Progress.JobDurationInNanos] + type DurationNs = Progress.JobDurationInNanos + type Error = Any type Append = java.lang.Boolean } diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/DataSourceRepositoryImpl.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/DataSourceRepositoryImpl.scala index 8aafd8880..7610b542f 100644 --- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/DataSourceRepositoryImpl.scala +++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/DataSourceRepositoryImpl.scala @@ -156,7 +156,8 @@ class DataSourceRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends Dat | "dataSourceUri" : ds.uri, | "dataSourceType" : lwe.execPlanDetails.dataSourceType, | "append" : lwe.execPlanDetails.append, - | "durationNs" : lwe.durationNs + | "durationNs" : lwe.durationNs, + | "error" : lwe.error | } | | SORT resItem.@sortField @sortOrder diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionEventRepositoryImpl.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionEventRepositoryImpl.scala index 63414ab3d..087d9331f 100644 --- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionEventRepositoryImpl.scala +++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionEventRepositoryImpl.scala @@ -115,7 +115,8 @@ class ExecutionEventRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends | "dataSourceUri" : ee.execPlanDetails.dataSourceUri, | "dataSourceType" : ee.execPlanDetails.dataSourceType, | "append" : ee.execPlanDetails.append, - | "durationNs" : ee.durationNs + | "durationNs" : ee.durationNs, + | "error" : ee.error | } | | SORT resItem.@sortField @sortOrder diff --git a/persistence/src/main/resources/foxx/spline/services/observed-writes-by-read.js b/persistence/src/main/resources/foxx/spline/services/observed-writes-by-read.js index 24ae31ac9..911682477 100644 --- a/persistence/src/main/resources/foxx/spline/services/observed-writes-by-read.js +++ b/persistence/src/main/resources/foxx/spline/services/observed-writes-by-read.js @@ -33,6 +33,7 @@ function observedWritesByRead(readEvent) { FILTER !wo.append FOR e IN 2 INBOUND wo executes, progressOf FILTER e.timestamp < readTime + AND e.error == null SORT e.timestamp DESC LIMIT 1 RETURN e @@ -43,6 +44,7 @@ function observedWritesByRead(readEvent) { FOR e IN 2 INBOUND wo executes, progressOf FILTER e.timestamp > maybeObservedOverwrite[0].timestamp AND e.timestamp < readTime + AND e.error == null SORT e.timestamp ASC RETURN e ) From dee4a5b1539a05824eb00d75a6b35992ecf030b6 Mon Sep 17 00:00:00 2001 From: Oleksandr Vayda Date: Sat, 13 Nov 2021 03:04:00 +0100 Subject: [PATCH 4/4] spline #669 Add DB migration script 0.7.0 -> 1.0.0. Add missing index on "progress.durationNs" --- .../migration-scripts/0.7.0-1.0.0.js | 26 +++++++++++++++++++ .../persistence/model/persistentDefs.scala | 1 + 2 files changed, 27 insertions(+) create mode 100644 persistence/src/main/resources/migration-scripts/0.7.0-1.0.0.js diff --git a/persistence/src/main/resources/migration-scripts/0.7.0-1.0.0.js b/persistence/src/main/resources/migration-scripts/0.7.0-1.0.0.js new file mode 100644 index 000000000..355c21e87 --- /dev/null +++ b/persistence/src/main/resources/migration-scripts/0.7.0-1.0.0.js @@ -0,0 +1,26 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +const VER = "1.0.0" + +const {db, aql} = require("@arangodb"); + +console.log(`[Spline] Start migration to ${VER}`); + +console.log("[Spline] Create index 'progress.durationNs'"); +db.progress.ensureIndex({type: "persistent", fields: ["durationNs"]}); + +console.log(`[Spline] Migration done. Version ${VER}`); diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/model/persistentDefs.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/model/persistentDefs.scala index 44cf6d3de..195f13dd7 100644 --- a/persistence/src/main/scala/za/co/absa/spline/persistence/model/persistentDefs.scala +++ b/persistence/src/main/scala/za/co/absa/spline/persistence/model/persistentDefs.scala @@ -177,6 +177,7 @@ object NodeDef { object Progress extends NodeDef("progress") with CollectionDef { override def indexDefs: Seq[IndexDef] = Seq( IndexDef(Seq("timestamp"), new PersistentIndexOptions), + IndexDef(Seq("durationNs"), new PersistentIndexOptions), IndexDef(Seq("_created"), new PersistentIndexOptions), IndexDef(Seq("extra.appId"), new PersistentIndexOptions().sparse(true)), IndexDef(Seq("execPlanDetails.executionPlanKey"), new PersistentIndexOptions),