Skip to content

Commit

Permalink
spline #831 fix operation schema inference for schema agnostic operat…
Browse files Browse the repository at this point in the history
…ion graphs
  • Loading branch information
wajda committed Apr 7, 2021
1 parent 418b792 commit 2849c37
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case class DataOperation(
override val id: Id,
override val name: Option[OperationLike.Name] = None,
override val childIds: Seq[Id] = Nil,
override val output: Seq[Attribute.Id],
override val output: Seq[Attribute.Id] = Nil,
override val params: Map[String, Any] = Map.empty,
override val extra: Map[String, Any] = Map.empty
) extends OperationLike
Expand All @@ -56,7 +56,7 @@ case class ReadOperation(
inputSources: Seq[String],
override val id: Id,
override val name: Option[OperationLike.Name] = None,
override val output: Seq[Attribute.Id],
override val output: Seq[Attribute.Id] = Nil,
override val params: Map[String, Any] = Map.empty,
override val extra: Map[String, Any] = Map.empty
) extends OperationLike {
Expand All @@ -68,7 +68,7 @@ case class WriteOperation(
append: Boolean,
override val id: Id,
override val name: Option[OperationLike.Name] = None,
override val childIds: Seq[Id],
override val childIds: Seq[Id] = Nil,
override val params: Map[String, Any] = Map.empty,
override val extra: Map[String, Any] = Map.empty
) extends OperationLike {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import za.co.absa.commons.lang.OptionImplicits._
import za.co.absa.spline.persistence.DefaultJsonSerDe._
import za.co.absa.spline.persistence.model.{Edge, EdgeDef}
import za.co.absa.spline.persistence.{model => pm}
import za.co.absa.spline.producer.model.v1_1.AttrOrExprRef
import za.co.absa.spline.producer.model.v1_1.OperationLike.Id
import za.co.absa.spline.producer.model.v1_1.{AttrOrExprRef, OperationLike}
import za.co.absa.spline.producer.model.{v1_1 => am}
import za.co.absa.spline.producer.service.model.ExecutionPlanPersistentModelBuilder._
import za.co.absa.spline.producer.service.{InconsistentEntityException, model}
Expand Down Expand Up @@ -348,27 +348,52 @@ object ExecutionPlanPersistentModelBuilder {
.build()
}

private case class SchemaInfo(oid: am.OperationLike.Id, schema: am.OperationLike.Schema, diff: Set[am.Attribute.Id])

private def getSchemaInfos(operations: Seq[am.OperationLike]): Map[Id, SchemaInfo] = {
operations
.sortedTopologically(reverse = true)
.foldLeft(Map.empty[Id, SchemaInfo]) {
(schemaByOpId, op) =>
val inSchemaInfos = op.childIds.map(schemaByOpId)
val outSchema = op.output
inSchemaInfos match {
case (si@SchemaInfo(oid, inSchema, _)) +: sis
if sis.forall(_.oid == oid) && (outSchema.isEmpty || outSchema == inSchema) =>
schemaByOpId.updated(op.id, si)
case _ if outSchema.nonEmpty =>
val inputSchemas = inSchemaInfos.map(_.schema)
val diff = inputSchemas.foldLeft(outSchema.toSet)(_ -- _)
schemaByOpId.updated(op.id, SchemaInfo(op.id, outSchema, diff))
case _ =>
throw new InconsistentEntityException(s"Cannot infer schema for operation #${op.id}: the input is either empty or ambiguous")
}
private[model] case class SchemaInfo(oid: am.OperationLike.Id, schema: am.OperationLike.Schema, diff: Set[am.Attribute.Id])

private[model] def getSchemaInfos(operations: Seq[am.OperationLike]): Map[Id, SchemaInfo] = {
require(operations.nonEmpty)

// check for schema definition consistency on the terminal operations level
val (maybeSchemaAgnosticTermOp, maybeSchemaAwareAnyOp) =
operations.foldLeft((None: Option[OperationLike], None: Option[OperationLike])) {
case ((None, maybeSchemaAwareOp), op) if op.output.isEmpty && op.childIds.isEmpty =>
Some(op) -> maybeSchemaAwareOp
case ((maybeSchemaAgnosticTerminalOp, None), op) if op.output.nonEmpty =>
maybeSchemaAgnosticTerminalOp -> Some(op)
case (z, _) => z
}

(maybeSchemaAgnosticTermOp, maybeSchemaAwareAnyOp) match {
case (Some(schemaAgnosticTermOp), Some(schemaAwareAnyOp)) =>
throw new InconsistentEntityException(s"" +
s"At least one operation defines output schema [#${schemaAwareAnyOp.id}], " +
s"while some terminal input operations lack of it [#${schemaAgnosticTermOp.id}]")
case (None, None) =>
// the graph cannot be neither schema aware nor schema agnostic
// it's can only be caused by lack of terminal input operations in a graph
throw new InconsistentEntityException(s"Operation DAG must have terminal nodes")
case (_, None) =>
// the operation graph is schema agnostic
Map.empty
case (None, _) => operations
.sortedTopologically(reverse = true)
.foldLeft(Map.empty[Id, SchemaInfo]) {
(schemaByOpId, op) =>
val inSchemaInfos = op.childIds.map(schemaByOpId)
val outSchema = op.output
inSchemaInfos match {
case (si@SchemaInfo(oid, inSchema, _)) +: sis
if sis.forall(_.oid == oid) && (outSchema.isEmpty || outSchema == inSchema) =>
schemaByOpId.updated(op.id, si)
case _ if outSchema.nonEmpty =>
val inputSchemas = inSchemaInfos.map(_.schema)
val diff = inputSchemas.foldLeft(outSchema.toSet)(_ -- _)
schemaByOpId.updated(op.id, SchemaInfo(op.id, outSchema, diff))
case _ =>
throw new InconsistentEntityException(s"Cannot infer schema for operation #${op.id}: the input schema is ambiguous")
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.producer.service.model

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.spline.producer.model.v1_1.DataOperation
import za.co.absa.spline.producer.service.InconsistentEntityException
import za.co.absa.spline.producer.service.model.ExecutionPlanPersistentModelBuilder.SchemaInfo

class ExecutionPlanPersistentModelBuilderSpec extends AnyFlatSpec with Matchers {

behavior of "ExecutionPlanPersistentModelBuilder"
behavior of "getSchemaInfos"

it should "infer missing schemas" in {
// 1[b, c] -> 2 -> 3 -> 4[a, b]
val ops = Seq(
DataOperation(id = "1", childIds = Seq("2"), output = Seq("b", "c")),
DataOperation(id = "2", childIds = Seq("3")),
DataOperation(id = "3", childIds = Seq("4")),
DataOperation(id = "4", childIds = Seq.empty, output = Seq("a", "b")),
)

ExecutionPlanPersistentModelBuilder.getSchemaInfos(ops) shouldEqual Map(
"1" -> SchemaInfo("1", Seq("b", "c"), Set("c")),
"2" -> SchemaInfo("4", Seq("a", "b"), Set("a", "b")), // inferred schema (=== op #4)
"3" -> SchemaInfo("4", Seq("a", "b"), Set("a", "b")), // inferred schema (=== op #4)
"4" -> SchemaInfo("4", Seq("a", "b"), Set("a", "b")),
)
}

it should "infer schema for union-like operations" in {
// /-> 2 -\
// 1 -| |-> 4[a, b]
// \-> 3 -/
val ops = Seq(
DataOperation(id = "1", childIds = Seq("2", "3")),
DataOperation(id = "2", childIds = Seq("4")),
DataOperation(id = "3", childIds = Seq("4")),
DataOperation(id = "4", childIds = Seq.empty, output = Seq("a", "b")),
)

ExecutionPlanPersistentModelBuilder.getSchemaInfos(ops) shouldEqual Map(
"1" -> SchemaInfo("4", Seq("a", "b"), Set("a", "b")), // inferred schema (=== op #4)
"2" -> SchemaInfo("4", Seq("a", "b"), Set("a", "b")), // inferred schema (=== op #4)
"3" -> SchemaInfo("4", Seq("a", "b"), Set("a", "b")), // inferred schema (=== op #4)
"4" -> SchemaInfo("4", Seq("a", "b"), Set("a", "b")),
)
}

it should "fail with ambiguity error on join-like operations" in {
// /-> 2[a]
// 1 -|
// \-> 3[b]
val ops = Seq(
DataOperation(id = "1", childIds = Seq("2", "3"), output = Nil),
DataOperation(id = "2", childIds = Seq.empty, output = Seq("a")),
DataOperation(id = "3", childIds = Seq.empty, output = Seq("b")),
)

(the[InconsistentEntityException]
thrownBy ExecutionPlanPersistentModelBuilder.getSchemaInfos(ops)
should have message "Inconsistent entity: Cannot infer schema for operation #1: the input schema is ambiguous")
}

it should "support schema/attribute agnostic operations" in {
// /-> 2 -\ /-> 5
// 1 -| |-> 4 -|
// \-> 3 -/ \-> 6
val ops = Seq(
DataOperation(id = "1", childIds = Seq("2", "3")),
DataOperation(id = "2", childIds = Seq("4")),
DataOperation(id = "3", childIds = Seq("4")),
DataOperation(id = "4", childIds = Seq("5", "6")),
DataOperation(id = "5", childIds = Seq.empty),
DataOperation(id = "6", childIds = Seq.empty),
)

ExecutionPlanPersistentModelBuilder.getSchemaInfos(ops) shouldBe empty
}

it should "complain about incorrect graphs" in {
// /-> 2 -> ?4?
// 1 -|
// \-> 3 -> ?4?
val ops = Seq(
DataOperation(id = "1", childIds = Seq("2", "3")),
DataOperation(id = "2", childIds = Seq("4")),
DataOperation(id = "3", childIds = Seq("4")),
)

(the[InconsistentEntityException]
thrownBy ExecutionPlanPersistentModelBuilder.getSchemaInfos(ops)
should have message "Inconsistent entity: Operation DAG must have terminal nodes")
}

it should "complain about cycles" in {
// 1 <-> 2
val ops = Seq(
DataOperation(id = "1", childIds = Seq("2")),
DataOperation(id = "2", childIds = Seq("1")),
)

(the[InconsistentEntityException]
thrownBy ExecutionPlanPersistentModelBuilder.getSchemaInfos(ops)
should have message "Inconsistent entity: Operation DAG must have terminal nodes")
}
}

0 comments on commit 2849c37

Please sign in to comment.