Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spline #605 MergeIntoNodeBuilder: java.lang.IllegalArgumentException #606

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExternalRDD, LeafExecNode, LogicalRDD, SparkPlan}
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import za.co.absa.commons.CollectionImplicits._
import za.co.absa.commons.graph.GraphImplicits._
import za.co.absa.commons.lang.CachingConverter
import za.co.absa.commons.reflect.ReflectionUtils
import za.co.absa.commons.reflect.ReflectionUtils.extractValue
import za.co.absa.commons.reflect.extractors.SafeTypeMatchingExtractor
import za.co.absa.spline.agent.SplineAgent.FuncName
import za.co.absa.spline.harvester.LineageHarvester._
import za.co.absa.spline.harvester.ModelConstants.{AppMetaInfo, ExecutionEventExtra, ExecutionPlanExtra}
Expand All @@ -38,7 +35,6 @@ import za.co.absa.spline.harvester.builder.write.{WriteCommand, WriteCommandExtr
import za.co.absa.spline.harvester.converter.DataTypeConverter
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
import za.co.absa.spline.harvester.logging.ObjectStructureLogging
import za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin.{`_: MergeIntoCommandEdge`, `_: MergeIntoCommand`}
import za.co.absa.spline.harvester.postprocessing.PostProcessor
import za.co.absa.spline.producer.model._

Expand Down Expand Up @@ -174,7 +170,7 @@ class LineageHarvester(

if (maybeExistingBuilder.isEmpty) {

val newNodesToProcess = extractChildren(curOpNode)
val newNodesToProcess = opNodeBuilderFactory.nodeChildren(curOpNode)

traverseAndCollect(
curBuilder +: accBuilders,
Expand All @@ -196,38 +192,16 @@ class LineageHarvester(
.map(rc => opNodeBuilderFactory.readNodeBuilder(rc, por))
.getOrElse(opNodeBuilderFactory.genericNodeBuilder(por))

private def extractChildren(por: PlanOrRdd): Seq[PlanOrRdd] = por match {
case PlanWrap(plan) =>
plan match {
case AnalysisBarrierExtractor(_) =>
// special handling - spark 2.3 sometimes includes AnalysisBarrier in the plan
val child = ReflectionUtils.extractValue[LogicalPlan](plan, "child")
Seq(PlanWrap(child))
case erdd: ExternalRDD[_] =>
Seq(RddWrap(erdd.rdd))
case lrdd: LogicalRDD =>
Seq(RddWrap(lrdd.rdd))
case `_: MergeIntoCommand`(command) =>
val target = extractValue[LogicalPlan](command, "target")
val source = extractValue[LogicalPlan](command, "source")
Seq(PlanWrap(source), PlanWrap(target))
case `_: MergeIntoCommandEdge`(command) =>
val target = extractValue[LogicalPlan](command, "target")
val source = extractValue[LogicalPlan](command, "source")
Seq(PlanWrap(source), PlanWrap(target))
case _ => plan.children.map(PlanWrap)
}
case RddWrap(rdd) =>
rdd.dependencies.map(dep => RddWrap(dep.rdd))
}
}

object LineageHarvester {

import za.co.absa.commons.version.Version

trait PlanOrRdd

case class PlanWrap(plan: LogicalPlan) extends PlanOrRdd

case class RddWrap(rdd: RDD[_]) extends PlanOrRdd

val SparkVersionInfo: NameAndVersion = NameAndVersion(
Expand Down Expand Up @@ -268,7 +242,4 @@ object LineageHarvester {
(cumulatedReadMetrics, getNodeMetrics(executedPlan))
}

object AnalysisBarrierExtractor extends SafeTypeMatchingExtractor[LogicalPlan](
"org.apache.spark.sql.catalyst.plans.logical.AnalysisBarrier")

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.spline.harvester.builder

import org.apache.spark.sql.catalyst.{expressions => sparkExprssions}
import za.co.absa.spline.harvester.IdGeneratorsBundle
import za.co.absa.spline.harvester.builder.OperationNodeBuilder.Attributes
import za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder.OperationId
import za.co.absa.spline.producer.model.{Attribute, FunctionalExpression, Literal}

Expand All @@ -33,10 +34,10 @@ trait OperationNodeBuilder {
def addChild(childBuilder: OperationNodeBuilder): Unit = childBuilders :+= childBuilder
protected def resolveAttributeChild(attribute: sparkExprssions.Attribute): Option[sparkExprssions.Expression] = None

protected def inputAttributes: Seq[Seq[Attribute]] = childBuilders.map(_.outputAttributes)
protected def inputAttributes: Seq[Attributes] = childBuilders.map(_.outputAttributes)
protected def idGenerators: IdGeneratorsBundle

def outputAttributes: Seq[Attribute]
def outputAttributes: Attributes

def childIds: Seq[OperationId] = childBuilders.map(_.operationId)

Expand All @@ -46,3 +47,7 @@ trait OperationNodeBuilder {

def outputExprToAttMap: Map[sparkExprssions.ExprId, Attribute]
}

object OperationNodeBuilder {
type Attributes = Seq[Attribute]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this will only decrease the readability. I don't see any problems with using Seq[Attribute] as a type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was primarily introduced to get rid of Seq[Seq[Attribute]] in other places.

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package za.co.absa.spline.harvester.builder

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExternalRDD, LogicalRDD}
import za.co.absa.commons.reflect.ReflectionUtils
import za.co.absa.commons.reflect.extractors.SafeTypeMatchingExtractor
import za.co.absa.spline.harvester.IdGeneratorsBundle
import za.co.absa.spline.harvester.LineageHarvester.{PlanOrRdd, PlanWrap, RddWrap}
import za.co.absa.spline.harvester.builder.OperationNodeBuilderFactory.AnalysisBarrierExtractor
import za.co.absa.spline.harvester.builder.plan._
import za.co.absa.spline.harvester.builder.plan.read.ReadNodeBuilder
import za.co.absa.spline.harvester.builder.plan.write.WriteNodeBuilder
import za.co.absa.spline.harvester.builder.plan._
import za.co.absa.spline.harvester.builder.rdd.GenericRddNodeBuilder
import za.co.absa.spline.harvester.builder.rdd.read.RddReadNodeBuilder
import za.co.absa.spline.harvester.builder.read.ReadCommand
Expand Down Expand Up @@ -50,6 +54,27 @@ class OperationNodeBuilderFactory(
case RddWrap(rdd) => genericRddNodeBuilder(rdd)
}

def nodeChildren(por: PlanOrRdd): Seq[PlanOrRdd] = por match {
case PlanWrap(plan) =>
plan match {
case AnalysisBarrierExtractor(_) =>
// special handling - spark 2.3 sometimes includes AnalysisBarrier in the plan
val child = ReflectionUtils.extractValue[LogicalPlan](plan, "child")
Seq(PlanWrap(child))
case erdd: ExternalRDD[_] =>
Seq(RddWrap(erdd.rdd))
case lrdd: LogicalRDD =>
Seq(RddWrap(lrdd.rdd))
case `_: MergeIntoCommand`(command) =>
MergeIntoNodeBuilder.extractChildren(command).map(PlanWrap)
case `_: MergeIntoCommandEdge`(command) =>
MergeIntoNodeBuilder.extractChildren(command).map(PlanWrap)
case _ => plan.children.map(PlanWrap)
}
case RddWrap(rdd) =>
rdd.dependencies.map(dep => RddWrap(dep.rdd))
}

private def genericPlanNodeBuilder(lp: LogicalPlan): OperationNodeBuilder = lp match {
case p: Project => new ProjectNodeBuilder(p)(idGenerators, dataTypeConverter, dataConverter, postProcessor)
case u: Union => new UnionNodeBuilder(u)(idGenerators, dataTypeConverter, dataConverter, postProcessor)
Expand All @@ -66,3 +91,8 @@ class OperationNodeBuilderFactory(
case _ => new GenericRddNodeBuilder(rdd)(idGenerators, postProcessor)
}
}

object OperationNodeBuilderFactory {
object AnalysisBarrierExtractor extends SafeTypeMatchingExtractor[LogicalPlan](
"org.apache.spark.sql.catalyst.plans.logical.AnalysisBarrier")
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import za.co.absa.spline.harvester.converter.{DataConverter, DataTypeConverter}
import za.co.absa.spline.harvester.postprocessing.PostProcessor

class GenerateNodeBuilder
(override val logicalPlan: Generate)
(override val idGenerators: IdGeneratorsBundle, dataTypeConverter: DataTypeConverter, dataConverter: DataConverter, postProcessor: PostProcessor)
(logicalPlan: Generate)
(idGenerators: IdGeneratorsBundle, dataTypeConverter: DataTypeConverter, dataConverter: DataConverter, postProcessor: PostProcessor)
extends GenericPlanNodeBuilder(logicalPlan)(idGenerators, dataTypeConverter, dataConverter, postProcessor) {

override def resolveAttributeChild(attribute: Attribute): Option[Expression] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,56 +16,94 @@

package za.co.absa.spline.harvester.builder.plan

import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import za.co.absa.commons.reflect.ReflectionUtils.extractValue
import za.co.absa.spline.harvester.IdGeneratorsBundle
import za.co.absa.spline.harvester.ModelConstants.CommonExtras
import za.co.absa.spline.harvester.builder.plan.MergeIntoNodeBuilder._
import za.co.absa.spline.harvester.converter.{DataConverter, DataTypeConverter}
import za.co.absa.spline.harvester.postprocessing.PostProcessor
import za.co.absa.spline.producer.model.{AttrRef, Attribute, DataOperation, FunctionalExpression}
import za.co.absa.spline.producer.model._

class MergeIntoNodeBuilder
(override val logicalPlan: LogicalPlan)
(logicalPlan: LogicalPlan)
(idGenerators: IdGeneratorsBundle, dataTypeConverter: DataTypeConverter, dataConverter: DataConverter, postProcessor: PostProcessor)
extends GenericPlanNodeBuilder(logicalPlan)(idGenerators, dataTypeConverter, dataConverter, postProcessor) {

private lazy val mergeInputs: Seq[Seq[Attribute]] = inputAttributes.transpose

override lazy val functionalExpressions: Seq[FunctionalExpression] = Seq.empty

override lazy val outputAttributes: Seq[Attribute] =
mergeInputs.map(constructMergeAttribute)

private def constructMergeAttribute(attributes: Seq[Attribute]) = {
val attr1 = attributes.head
val idRefs = attributes.map(a => AttrRef(a.id))
Attribute(
id = idGenerators.attributeIdGenerator.nextId(),
dataType = attr1.dataType,
childRefs = idRefs,
extra = Map(CommonExtras.Synthetic -> true),
name = attr1.name
)
override lazy val outputAttributes: Seq[Attribute] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this method work when the input attributes are synthetic? I think in this case the proper way is to take inputAttributes and use their id's because they may differ from the Spark ones.

val target: LogicalPlan = extractTarget(logicalPlan)
val trgAttrs: Seq[Attribute] = target.output.map(attributeConverter.convert)

val dependenciesByAttrName: Map[String, Seq[AttrOrExprRef]] =
(extractMatchedClauses(logicalPlan) ++ extractNonMatchedClauses(logicalPlan))
.flatMap(extractClauseActions)
.map((a: DeltaMergeAction) => {
val trgAttrName = extractActionTargetAttrName(a)
val srcExpr = extractActionSourceExpression(a)
trgAttrName -> exprToRefConverter.convert(srcExpr)
})
.groupBy { case (attrName, _) => attrName }
.mapValues(_.map({ case (_, ref) => ref }).distinct)

val outAttrs = trgAttrs.map(trgAttr => {
val targetRef = AttrRef(trgAttr.id)
val sourceRefs: Seq[AttrOrExprRef] = dependenciesByAttrName(trgAttr.name)
Attribute(
id = idGenerators.attributeIdGenerator.nextId(),
dataType = trgAttr.dataType,
childRefs = (targetRef +: sourceRefs).distinct,
extra = Map(CommonExtras.Synthetic -> true),
name = trgAttr.name
)
})

outAttrs
}

override def build(): DataOperation = {

val condition = extractValue[Any](logicalPlan, "condition").toString
val matchedClauses = extractValue[Seq[Any]](logicalPlan, "matchedClauses").map(_.toString)
val notMatchedClauses = extractValue[Seq[Any]](logicalPlan, "notMatchedClauses").map(_.toString)
val conditionStr = extractCondition(logicalPlan).toString
val matchedClausesStr = extractMatchedClauses(logicalPlan).map(_.toString)
val notMatchedClausesStr = extractNonMatchedClauses(logicalPlan).map(_.toString)

val dop = DataOperation(
id = operationId,
name = logicalPlan.nodeName,
childIds = childIds,
output = outputAttributes.map(_.id),
params = Map(
"condition" -> condition,
"matchedClauses" -> matchedClauses,
"notMatchedClauses" -> notMatchedClauses),
"condition" -> conditionStr,
"matchedClauses" -> matchedClausesStr,
"notMatchedClauses" -> notMatchedClausesStr),
extra = Map.empty
)

postProcessor.process(dop)
}
}

object MergeIntoNodeBuilder {

type DeltaMergeIntoClause = SparkExpression
type DeltaMergeAction = SparkExpression

def extractChildren(mergeNode: LogicalPlan): Seq[LogicalPlan] = Seq(extractSource(mergeNode), extractTarget(mergeNode))

private def extractSource(mergeNode: LogicalPlan): LogicalPlan = extractValue[LogicalPlan](mergeNode, "source")

private def extractTarget(mergeNode: LogicalPlan): LogicalPlan = extractValue[LogicalPlan](mergeNode, "target")

private def extractCondition(mergeNode: LogicalPlan): SparkExpression = extractValue[SparkExpression](mergeNode, "condition")

private def extractMatchedClauses(mergeNode: LogicalPlan): Seq[DeltaMergeIntoClause] = extractValue[Seq[DeltaMergeIntoClause]](mergeNode, "matchedClauses")

private def extractNonMatchedClauses(mergeNode: LogicalPlan): Seq[DeltaMergeIntoClause] = extractValue[Seq[DeltaMergeIntoClause]](mergeNode, "notMatchedClauses")

private def extractClauseActions(clause: DeltaMergeIntoClause): Seq[DeltaMergeAction] = extractValue[Seq[DeltaMergeAction]](clause, "actions")

private def extractActionTargetAttrName(clause: DeltaMergeAction): String = extractValue[Seq[String]](clause, "targetColNameParts").head

private def extractActionSourceExpression(clause: DeltaMergeAction): SparkExpression = extractValue[SparkExpression](clause, "expr")
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ import org.apache.spark.sql.catalyst.{expressions => sparkExprssions}
import za.co.absa.commons.lang.extensions.NonOptionExtension._
import za.co.absa.spline.harvester.IdGeneratorsBundle
import za.co.absa.spline.harvester.ModelConstants.CommonExtras
import za.co.absa.spline.harvester.builder.OperationNodeBuilder.Attributes
import za.co.absa.spline.harvester.builder.plan.UnionNodeBuilder.Names
import za.co.absa.spline.harvester.converter.{DataConverter, DataTypeConverter}
import za.co.absa.spline.harvester.postprocessing.PostProcessor
import za.co.absa.spline.producer.model.{AttrRef, Attribute, ExprRef, FunctionalExpression}

class UnionNodeBuilder
(override val logicalPlan: Union)
(logicalPlan: Union)
(idGenerators: IdGeneratorsBundle, dataTypeConverter: DataTypeConverter, dataConverter: DataConverter, postProcessor: PostProcessor)
extends GenericPlanNodeBuilder(logicalPlan)(idGenerators, dataTypeConverter, dataConverter, postProcessor) {

private lazy val unionInputs: Seq[Seq[Attribute]] = inputAttributes.transpose
private lazy val unionInputs: Seq[Attributes] = inputAttributes.transpose

override lazy val functionalExpressions: Seq[FunctionalExpression] =
unionInputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ object DeltaMergeDSV2Job extends SparkApp(
// Initializing library to hook up to Apache Spark
spark.enableLineageTracking()

spark.sql("CREATE DATABASE dsv2 LOCATION '$path'")
spark.sql(s"CREATE DATABASE dsv2 LOCATION '$path'")

spark.sql("CREATE TABLE dsv2.foo ( ID int, NAME string ) USING DELTA")
spark.sql("INSERT INTO dsv2.foo VALUES (1014, 'Warsaw'), (1002, 'Corte')")
spark.sql("CREATE TABLE dsv2.foo ( id INT, code STRING, name STRING ) USING DELTA")
spark.sql("INSERT INTO dsv2.foo VALUES (1014, 'PLN', 'Warsaw'), (1002, 'FRA', 'Corte')")

spark.sql("CREATE TABLE dsv2.fooUpdate ( ID Int, NAME String ) USING DELTA")
spark.sql("CREATE TABLE dsv2.fooUpdate ( id INT, name STRING ) USING DELTA")
spark.sql("INSERT INTO dsv2.fooUpdate VALUES (1014, 'Lodz'), (1003, 'Prague')")

spark.sql("CREATE TABLE dsv2.barUpdate ( ID Int, NAME String ) USING DELTA")
spark.sql("CREATE TABLE dsv2.barUpdate ( id INT, name STRING ) USING DELTA")
spark.sql("INSERT INTO dsv2.barUpdate VALUES (4242, 'Paris'), (3342, 'Bordeaux')")

spark.sql("UPDATE dsv2.foo SET NAME = 'Korok' WHERE ID == 1002")
spark.sql("UPDATE dsv2.foo SET name = 'Korok' WHERE id == 1002")

spark.sql(
"""
Expand All @@ -59,17 +59,19 @@ object DeltaMergeDSV2Job extends SparkApp(

spark.sql(
"""
| MERGE INTO dsv2.foo
| USING tempview AS foobar
| ON dsv2.foo.ID = foobar.ID
| MERGE INTO dsv2.foo AS dst
| USING tempview AS src
| ON dst.id = src.id
| WHEN MATCHED THEN
| UPDATE SET
| NAME = foobar.NAME
| NAME = src.name
| WHEN NOT MATCHED
| THEN INSERT (ID, NAME)
| VALUES (foobar.ID, foobar.NAME)
| THEN INSERT (id, name)
| VALUES (src.id, src.name)
|""".stripMargin
)
).show

spark.read.table("dsv2.foo").show()

spark.sql("DELETE FROM dsv2.foo WHERE ID == 1014")
}
Loading