Skip to content

Commit

Permalink
spline-132 JDBC write support
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasnalezenec committed Apr 16, 2019
1 parent e15b466 commit 7a3a549
Show file tree
Hide file tree
Showing 14 changed files with 415 additions and 143 deletions.
Expand Up @@ -36,6 +36,7 @@ class DataLineageBuilder(logicalPlan: LogicalPlan, executedPlanOpt: Option[Spark
private val writeCommandParser = writeCommandParserFactory.writeParser()
private val clusterUrl: Option[String] = sparkContext.getConf.getOption("spark.master")
private val tableCommandParser = writeCommandParserFactory.saveAsTableParser(clusterUrl)
private val jdbcCommandParser = writeCommandParserFactory.jdbcParser()

def buildLineage(): Option[DataLineage] = {
val builders = getOperations(logicalPlan)
Expand Down Expand Up @@ -82,19 +83,19 @@ class DataLineageBuilder(logicalPlan: LogicalPlan, executedPlanOpt: Option[Spark

if (maybeExistingBuilder.isEmpty) {

//try to find all possible commands for traversing- save to filesystem, saveAsTable, JDBC
val writes = writeCommandParser.
asWriteCommandIfPossible(curOpNode).
map(wc => Seq(wc.query)).
getOrElse(Nil)
val parsers = Array(jdbcCommandParser, writeCommandParser, tableCommandParser)

val tables = tableCommandParser.
asWriteCommandIfPossible(curOpNode).
map(wc => Seq(wc.query)).
getOrElse(Nil)
val maybePlan: Option[LogicalPlan] = parsers.
map(_.asWriteCommandIfPossible(curOpNode)).
collectFirst {
case Some(wc) => wc.query
}

var newNodesToProcess: Seq[LogicalPlan] = writes ++ tables
if (newNodesToProcess.isEmpty) newNodesToProcess = curOpNode.children
val newNodesToProcess: Seq[LogicalPlan] =
maybePlan match {
case Some(q) => Seq(q)
case None => curOpNode.children
}

traverseAndCollect(
curBuilder +: accBuilders,
Expand Down Expand Up @@ -122,19 +123,23 @@ class DataLineageBuilder(logicalPlan: LogicalPlan, executedPlanOpt: Option[Spark
case a: SubqueryAlias => new AliasNodeBuilder(a)
case lr: LogicalRelation => new BatchReadNodeBuilder(lr) with HDFSAwareBuilder
case StreamingRelationVersionAgnostic(dataSourceInfo) => new StreamReadNodeBuilder(op)
case wc if jdbcCommandParser.matches(op) =>
val (readMetrics: Metrics, writeMetrics: Metrics) = getMetrics()
val tableCmd = jdbcCommandParser.asWriteCommand(wc).asInstanceOf[SaveJDBCCommand]
new SaveJDBCCommandNodeBuilder(tableCmd, writeMetrics, readMetrics)
case wc if writeCommandParser.matches(op) =>
val (readMetrics: Metrics, writeMetrics: Metrics) = makeMetrics()
val (readMetrics: Metrics, writeMetrics: Metrics) = getMetrics()
val writeCmd = writeCommandParser.asWriteCommand(wc).asInstanceOf[WriteCommand]
new BatchWriteNodeBuilder(writeCmd, writeMetrics, readMetrics) with HDFSAwareBuilder
case wc if tableCommandParser.matches(op) =>
val (readMetrics: Metrics, writeMetrics: Metrics) = makeMetrics()
val (readMetrics: Metrics, writeMetrics: Metrics) = getMetrics()
val tableCmd = tableCommandParser.asWriteCommand(wc).asInstanceOf[SaveAsTableCommand]
new SaveAsTableNodeBuilder(tableCmd, writeMetrics, readMetrics)
case x => new GenericNodeBuilder(x)
}
}

private def makeMetrics(): (Metrics, Metrics) = {
private def getMetrics(): (Metrics, Metrics) = {
executedPlanOpt.
map(getExecutedReadWriteMetrics).
getOrElse((Map.empty, Map.empty))
Expand Down
Expand Up @@ -24,13 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.{JDBCRelation, SaveMode}
import za.co.absa.spline.sparkadapterapi.WriteCommand
import za.co.absa.spline.sparkadapterapi.{DataSourceInfo, SaveAsTableCommand, SaveJDBCCommand, WriteCommand}
import za.co.absa.spline.model.endpoint._
import za.co.absa.spline.model.{op, _}
import za.co.absa.spline.sparkadapterapi.{DataSourceInfo, SaveAsTableCommand}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import za.co.absa.spline.sparkadapterapi.StreamingRelationAdapter.instance.extractDataSourceInfo
import za.co.absa.spline.sparkadapterapi.{DataSourceInfo, WriteCommand}

sealed trait OperationNodeBuilder {

Expand Down Expand Up @@ -186,6 +184,27 @@ class SaveAsTableNodeBuilder
}
}

class SaveJDBCCommandNodeBuilder
(val operation: SaveJDBCCommand, val writeMetrics: Map[String, Long], val readMetrics: Map[String, Long])
(implicit val componentCreatorFactory: ComponentCreatorFactory)
extends OperationNodeBuilder with RootNode {

override val output: AttrGroup = new AttrGroup(operation.query.output)

override def build() = op.BatchWrite(
operationProps,
operation.format,
operation.tableName,
append = operation.mode == SaveMode.Append,
writeMetrics = writeMetrics,
readMetrics = readMetrics
)

override def ignoreLineageWrite:Boolean = {
false
}
}

trait RootNode {
def ignoreLineageWrite:Boolean
}
Expand Down
Expand Up @@ -58,12 +58,16 @@ object DataLineageBuilderTest extends MockitoSugar {
private def lineageBuilderFor(df: DataFrame)(implicit sparkContext: SparkContext): DataLineageBuilder = {
val plan = df.queryExecution.analyzed
val mockWriteCommandParser = mock[WriteCommandParser[LogicalPlan]]
val mockJdbcCommandParser = mock[WriteCommandParser[LogicalPlan]]

val factory = mock[WriteCommandParserFactory]

when(mockWriteCommandParser asWriteCommandIfPossible any()) thenReturn None
when(mockJdbcCommandParser asWriteCommandIfPossible any()) thenReturn None

when(factory writeParser()) thenReturn mockWriteCommandParser
when(factory saveAsTableParser(any())) thenReturn mockWriteCommandParser
when(factory jdbcParser()) thenReturn mockJdbcCommandParser

new DataLineageBuilder(plan, None, sparkContext)(mock[Configuration], factory)
}
Expand Down
17 changes: 17 additions & 0 deletions integration-tests/pom.xml
Expand Up @@ -64,6 +64,16 @@
<artifactId>migrator-tool</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>za.co.absa.spline.shadow</groupId>
<artifactId>persistence</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>za.co.absa.spline.shadow</groupId>
<artifactId>migrator-tool</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.compat.version}</artifactId>
Expand All @@ -72,6 +82,13 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.derby/derbyclient -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.14.2.0</version>
</dependency>

</dependencies>

<build>
Expand Down
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.fixture

import java.sql.{Connection, DriverManager, ResultSet}

import org.scalactic.source.Position
import org.scalatest.{BeforeAndAfter, Suite}
import za.co.absa.spline.common.TempDirectory

/**
* Runs and wraps embedded Apache Derby DB.
**/
trait DerbyDatabaseFixture extends BeforeAndAfter{

this: Suite =>

private val dbName = "splineTestDb"
val connectionString = s"jdbc:derby:memory:$dbName ;create=true"

var connection : Connection = null

private def execute(sql: String): ResultSet = {
val statement = connection.createStatement
statement.execute(sql)
statement.getResultSet
}

private def closeDatabase() : Unit = {
def closeCommand(cmd: String) = util.Try({DriverManager.getConnection(cmd)})

val connectionString = "jdbc:derby:memory:" + dbName
closeCommand(connectionString + ";drop=true")
closeCommand(connectionString + ";shutdown=true")
}

private def createTable(table: String): ResultSet = {
execute("Create table " + table + " (id int, name varchar(30))")
}

private def dropTable(table: String): ResultSet = {
execute("drop table " + table)
}

override protected def after(fun: => Any)(implicit pos: Position): Unit = try super.after(fun) finally closeDatabase()

override protected def before(fun: => Any)(implicit pos: Position): Unit = {
val tempPath = TempDirectory("derbyUnitTest", "database").deleteOnExit().path
System.setProperty("derby.system.home", tempPath.toString)
DriverManager.registerDriver(new org.apache.derby.jdbc.EmbeddedDriver)
Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
connection = DriverManager.getConnection(connectionString)
}
}



Expand Up @@ -16,12 +16,13 @@

package za.co.absa.spline.fixture

import java.util.Properties
import java.{util => ju}

import com.mongodb.casbah.MongoDB
import com.mongodb.{DBCollection, DBObject}
import org.apache.commons.configuration.Configuration
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.bson.BSON
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
Expand Down Expand Up @@ -51,6 +52,10 @@ trait AbstractSplineFixture

AbstractSplineFixture.touch()

// def withNewSession[T >: AnyRef](testBody: SparkSession => T): T = {
// testBody(spark.newSession)
// }

abstract override protected def beforeAll(): Unit = {
import za.co.absa.spline.harvester.SparkLineageInitializer._
spark.enableLineageTracking()
Expand Down Expand Up @@ -79,13 +84,19 @@ trait AsyncSplineFixture extends AbstractSplineFixture with AsyncTestSuiteMixin
abstract override def withFixture(test: NoArgAsyncTest): FutureOutcome = exec {
super.withFixture(test)
}

def withSplineEnabled[T](session:SparkSession)(testBody: => T) = {
import za.co.absa.spline.harvester.SparkLineageInitializer._
session.enableLineageTracking()
testBody
}
}

object AbstractSplineFixture {

import scala.concurrent.{ExecutionContext, Future}

// System.getProperties.setProperty(PERSISTENCE_FACTORY, classOf[TestPersistenceFactory].getName)
//System.getProperties.setProperty(PERSISTENCE_FACTORY, classOf[TestPersistenceFactory].getName)

private var justCapturedLineage: DataLineage = _

Expand Down Expand Up @@ -121,6 +132,15 @@ object AbstractSplineFixture {
df.write.mode(mode).saveAsTable(tableName)
AbstractSplineFixture.justCapturedLineage
}

/** Writes dataframe to table and returns captured lineage*/
def jdbcLineage(connectionString:String,
tableName:String,
properties:Properties = new Properties(),
mode: SaveMode = SaveMode.ErrorIfExists): DataLineage = {
df.write.mode(mode).jdbc(connectionString, tableName, properties)
AbstractSplineFixture.justCapturedLineage
}
}

implicit class LineageComponentSizeVerifier(lineage: DataLineage)(implicit ec: ExecutionContext)
Expand Down

This file was deleted.

0 comments on commit 7a3a549

Please sign in to comment.