Skip to content

Commit

Permalink
spline #331 Kafka source type support
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Sep 25, 2019
1 parent 2fd8f1d commit ed4154c
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 221 deletions.
Expand Up @@ -17,6 +17,8 @@
package za.co.absa.spline.common.extractors

class SafeTypeMatchingExtractor[T](lazyClass: => Class[T]) {
def this(className: String) = this(Class.forName(className).asInstanceOf[Class[T]])

private val classIfAvailable: Option[Class[_]] =
try Some(lazyClass)
catch {
Expand Down
Expand Up @@ -35,8 +35,8 @@ class SafeTypeMatchingExtractorSpec extends FlatSpec with Matchers {
}

it should "unapply for missing types" in {
object A extends SafeTypeMatchingExtractor(throw new ClassNotFoundException("simulate missing type"))
object B extends SafeTypeMatchingExtractor(throw new NoClassDefFoundError("simulate missing type"))
object A extends SafeTypeMatchingExtractor("definitely.missing.type")
object B extends SafeTypeMatchingExtractor((throw new NoClassDefFoundError("simulate missing type")): Class[String])
A.unapply(new Object) should be(None)
B.unapply(new Object) should be(None)
}
Expand Down
23 changes: 23 additions & 0 deletions integration-tests/pom.xml
Expand Up @@ -58,6 +58,29 @@
<artifactId>spark-hive_${scala.compat.version}</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.compat.version}</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.github.embeddedkafka</groupId>
<artifactId>embedded-kafka_${scala.compat.version}</artifactId>
<version>2.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
Expand Down
Expand Up @@ -59,7 +59,7 @@ class JDBCWriteSpec extends FlatSpec
plan1.operations.write.outputSource shouldBe s"$jdbcConnectionString:$tableName"

plan2.operations.reads.head.inputSources.head shouldBe plan1.operations.write.outputSource
plan2.operations.reads.head.params("sourceType") shouldBe Some("JDBC")
plan2.operations.reads.head.params("sourceType") shouldBe Some("jdbc")
})
})
}
Expand Down
@@ -0,0 +1,94 @@
/*
* Copyright 2019 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}
import org.scalatest.{FlatSpec, Matchers}
import za.co.absa.spline.test.fixture.SparkFixture
import za.co.absa.spline.test.fixture.spline.SplineFixture

class KafkaSinkSpec
extends FlatSpec
with Matchers
with SparkFixture
with SplineFixture
with EmbeddedKafka {

it should "support Kafka as a write source" in {
withRunningKafkaOnFoundPort(EmbeddedKafkaConfig(0, 0)) { kafkaConfig =>
val topicName = "bananas"
val kafkaUrl = s"localhost:${kafkaConfig.kafkaPort}"

withNewSparkSession(spark => {
withLineageTracking(spark)(lineageCaptor => {
val testData: DataFrame = {
val schema = StructType(StructField("ID", IntegerType, nullable = false) :: StructField("NAME", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}

val (plan1, _) = lineageCaptor.lineageOf(testData
.selectExpr("CAST (NAME as STRING) as value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUrl)
.option("topic", topicName)
.save())

def reader = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUrl)

val (plan2, _) = lineageCaptor.lineageOf(reader
.option("subscribe", s"$topicName,anotherTopic")
.load()
.write.mode(Overwrite).saveAsTable("somewhere"))

val (plan3, _) = lineageCaptor.lineageOf(reader
.option("subscribePattern", ".*")
.load()
.write.mode(Overwrite).saveAsTable("somewhere"))

val (plan4, _) = lineageCaptor.lineageOf(reader
.option("assign", s"""{"$topicName":[0]}""")
.load()
.write.mode(Overwrite).saveAsTable("somewhere"))

plan1.operations.write.append shouldBe false
plan1.operations.write.params("destinationType") shouldBe Some("kafka")
plan1.operations.write.outputSource shouldBe s"kafka:$topicName"

plan2.operations.reads.head.params("sourceType") shouldBe Some("kafka")
plan2.operations.reads.head.inputSources should contain(s"kafka:$topicName")
plan2.operations.reads.head.params should contain key "subscribe"

plan3.operations.reads.head.params("sourceType") shouldBe Some("kafka")
plan3.operations.reads.head.inputSources should contain(s"kafka:$topicName")
plan3.operations.reads.head.params should contain key "subscribepattern"

plan4.operations.reads.head.params("sourceType") shouldBe Some("kafka")
plan4.operations.reads.head.inputSources should contain(s"kafka:$topicName")
plan4.operations.reads.head.params should contain key "assign"
})
})
}
}
}
5 changes: 5 additions & 0 deletions parent/pom.xml
Expand Up @@ -413,6 +413,11 @@
<artifactId>spark-hive_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_${scala.compat.version}</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions spark/agent/pom.xml
Expand Up @@ -63,6 +63,11 @@
<artifactId>spark-hive_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_${scala.compat.version}</artifactId>
Expand Down

This file was deleted.

Expand Up @@ -19,4 +19,4 @@ package za.co.absa.spline.harvester.builder.read
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import za.co.absa.spline.harvester.builder.SourceIdentifier

case class ReadCommand(sourceIdentifier: SourceIdentifier, operation: LogicalPlan)
case class ReadCommand(sourceIdentifier: SourceIdentifier, operation: LogicalPlan, params: Map[String, Any] = Map.empty)
Expand Up @@ -16,47 +16,86 @@

package za.co.absa.spline.harvester.builder.read

import java.util.Properties

import com.databricks.spark.xml.XmlRelation
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.{JDBCOptionsExtractor, SparkSession}
import za.co.absa.spline.harvester.builder.{CatalogTableUtils, SourceIdentifier, read}
import org.apache.spark.sql.kafka010.{AssignStrategy, ConsumerStrategy, SubscribePatternStrategy, SubscribeStrategy}
import org.apache.spark.sql.sources.BaseRelation
import za.co.absa.spline.common.ReflectionUtils.extractFieldValue
import za.co.absa.spline.common.extractors.{AccessorMethodValueExtractor, SafeTypeMatchingExtractor}
import za.co.absa.spline.harvester.builder.read.ReadCommandExtractor._
import za.co.absa.spline.harvester.builder.{CatalogTableUtils, SourceIdentifier}
import za.co.absa.spline.harvester.qualifier.PathQualifier

import scala.PartialFunction.condOpt
import scala.language.implicitConversions
import scala.collection.JavaConverters._


class ReadCommandExtractor(pathQualifier: PathQualifier, session: SparkSession) {
def asReadCommand(operation: LogicalPlan): Option[ReadCommand] =
condOpt(operation) {
case lr: LogicalRelation =>
read.ReadCommand(toSourceIdentifier(lr), operation)
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
val uris = hr.location.rootPaths.map(path => pathQualifier.qualify(path.toString))
val format = hr.fileFormat.toString
ReadCommand(SourceIdentifier(Some(format), uris), operation, hr.options)

case xr: XmlRelation =>
val uris = xr.location.toSeq.map(pathQualifier.qualify)
ReadCommand(SourceIdentifier(Some("XML"), uris), operation, xr.parameters)

case `_: JDBCRelation`(jr) =>
val jdbcOptions = extractFieldValue[JDBCOptions](jr, "jdbcOptions")
val url = extractFieldValue[String](jdbcOptions, "url")
val params = extractFieldValue[Map[String, String]](jdbcOptions, "parameters")
val TableOrQueryFromJDBCOptionsExtractor(toq) = jdbcOptions
ReadCommand(SourceIdentifier(Some("jdbc"), Seq(s"$url:$toq")), operation, params)

case `_: KafkaRelation`(kr) =>
val options = extractFieldValue[Map[String, String]](kr, "sourceOptions")
val topics: Seq[String] = extractFieldValue[ConsumerStrategy](kr, "strategy") match {
case AssignStrategy(partitions) => partitions.map(_.topic)
case SubscribeStrategy(topics) => topics
case SubscribePatternStrategy(pattern) => kafkaTopics(options("kafka.bootstrap.servers")).filter(_.matches(pattern))
}
ReadCommand(SourceIdentifier(Some("kafka"), topics.map(topic => s"kafka:$topic")), operation, options ++ Map(
"startingOffsets" -> extractFieldValue[AnyRef](kr, "startingOffsets"),
"endingOffsets" -> extractFieldValue[AnyRef](kr, "endingOffsets")
))

case br: BaseRelation =>
sys.error(s"Relation is not supported: $br")
}

case htr: HiveTableRelation =>
val catalogTable = htr.tableMeta
read.ReadCommand(CatalogTableUtils.toSourceIdentifier(catalogTable)(pathQualifier, session), operation)
ReadCommand(CatalogTableUtils.toSourceIdentifier(catalogTable)(pathQualifier, session), operation)
}

private def toSourceIdentifier(lr: LogicalRelation) = {
val (sourceType, paths) = lr.relation match {
case HadoopFsRelation(loc, _, _, _, fileFormat, _) => (
fileFormat.toString,
loc.rootPaths.map(path => pathQualifier.qualify(path.toString))
)
case XmlRelation(_, loc, _, _) => (
"XML",
loc.toSeq map pathQualifier.qualify
)
case JDBCOptionsExtractor(jdbcOpts) => (
"JDBC",
Seq(s"${jdbcOpts.url}:${jdbcOpts.table}")
)
case _ => // unrecognized relation type
(s"???: ${lr.relation.getClass.getName}", Nil)
}
SourceIdentifier(Some(sourceType), paths)
}
}

object ReadCommandExtractor {

object `_: JDBCRelation` extends SafeTypeMatchingExtractor[AnyRef]("org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation")

object `_: KafkaRelation` extends SafeTypeMatchingExtractor[AnyRef]("org.apache.spark.sql.kafka010.KafkaRelation")

object TableOrQueryFromJDBCOptionsExtractor extends AccessorMethodValueExtractor[String]("table", "tableOrQuery")

private def kafkaTopics(bootstrapServers: String): Seq[String] = {
val kc = new KafkaConsumer(new Properties {
put("bootstrap.servers", bootstrapServers)
put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
})
try kc.listTopics.keySet.asScala.toSeq
finally kc.close()
}
}

Expand Up @@ -34,7 +34,7 @@ class ReadNodeBuilder
inputSources = command.sourceIdentifier.uris,
id = id,
schema = Some(outputSchema),
params = Map(
params = command.params ++ Map(
OperationParams.Name -> operation.nodeName,
OperationParams.SourceType -> command.sourceIdentifier.format
))
Expand Down

0 comments on commit ed4154c

Please sign in to comment.