Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ The data ingestion pipeline of Hyperdrive consists of four components: readers,
- `KafkaStreamReader` - reads from a Kafka topic.
- `ConfluentAvroDecodingTransformer` - decodes the payload as Confluent Avro (through [ABRiS](https://github.com/AbsaOSS/ABRiS)), retrieving the schema from the specified Schema Registry. This transformer is capable of seamlessly handling whatever schemas the payload messages are using.
- `ColumnSelectorStreamTransformer` - selects all columns from the decoded DataFrame.
- `ParquetStreamWriter` - writes the DataFrame as Parquet, in **append** mode, by invoking Spark's `processAllAvailable` method on the stream writer.
- `ParquetPartitioningStreamWriter` - writes the DataFrame as Parquet, partitioned by the ingestion date and an auto-incremented version number.
- `AddDateVersionTransformerStreamWriter` - adds columns for ingestion date and an auto-incremented version number, to be used for partitioning.
- `ParquetStreamWriter` - writes the DataFrame as Parquet, in **append** mode.
- `KafkaStreamWriter` - writes to a Kafka topic.

### Custom components
Expand Down Expand Up @@ -86,7 +86,7 @@ The configuration file may be created from the template located at `driver/src/r
| `component.reader` | Yes | Fully qualified name of reader component, e.g.`za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader` |
| `component.transformer.id.{order}` | No | An arbitrary but unique string, referenced in this documentation as `{transformer-id}` |
| `component.transformer.class.{transformer-id}` | No | Fully qualified name of transformer component, e.g. `za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer` |
| `component.writer` | Yes | Fully qualified name of writer component, e.g. `za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetPartitioningStreamWriter` |
| `component.writer` | Yes | Fully qualified name of writer component, e.g. `za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter` |

Multiple transformers can be configured in the pipeline, including multiple instances of the same transformer.
For each transformer instance, `component.transformer.id.{order}` and `component.transformer.class.{transformer-id}` have to specified, where `{order}` and `{transformer-id}` need to be unique.
Expand Down Expand Up @@ -135,6 +135,15 @@ For detailed information on the subject name strategy, please take a look at the
| :--- | :---: | :--- |
| `transformer.{transformer-id}.columns.to.select` | Yes | Comma-separated list of columns to select. `*` can be used to select all columns. Only existing columns using column names may be selected (i.e. expressions cannot be constructed) |

##### AddDateVersionTransformer
The `AddDateVersionTransformer` adds the columns `hyperdrive_date` and `hyperdrive_version`. `hyperdrive_date` is the ingestion date (or a user-defined date), while `hyperdrive_version` is a number automatically incremented with every ingestion, starting at 1.
For the auto-increment to work, `hyperdrive_date` and `hyperdrive_version` need to be defined as partition columns.
Caution: This transformer requires a writer which defines `writer.parquet.destination.directory`.

| Property Name | Required | Description |
| :--- | :---: | :--- |
| `transformer.{transformer-id}.report.date` | No | User-defined date for `hyperdrive_date` in format `yyyy-MM-dd`. Default date is the date of the ingestion |

See [Pipeline settings](#pipeline-settings) for details about `{transformer-id}`.
##### ParquetStreamWriter
| Property Name | Required | Description |
Expand All @@ -147,18 +156,6 @@ See [Pipeline settings](#pipeline-settings) for details about `{transformer-id}`

Any additional properties for the `DataStreamWriter` can be added with the prefix `writer.parquet.options`, e.g. `writer.parquet.options.key=value`

##### ParquetPartitioningStreamWriter
The `ParquetPartitioningStreamWriter` partitions every ingestion in the columns `hyperdrive_date` and `hyperdrive_version`. `hyperdrive_date` is the ingestion date (or a user-defined date), while `hyperdrive_version` is a number automatically incremented with every ingestion, starting at 1.

| Property Name | Required | Description |
| :--- | :---: | :--- |
| `writer.parquet.destination.directory` | Yes | Destination path of the sink. Equivalent to Spark property `path` for the `DataStreamWriter` |
| `writer.parquet.partitioning.report.date` | No | User-defined date for `hyperdrive_date` in format `yyyy-MM-dd`. Default date is the date of the ingestion |
| `writer.common.trigger.type` | No | See [Combination writer properties](#common-writer-properties) |
| `writer.common.trigger.processing.time` | No | See [Combination writer properties](#common-writer-properties) |

Any additional properties for the `DataStreamWriter` can be added with the prefix `writer.parquet.options`, e.g. `writer.parquet.options.key=value`

##### KafkaStreamWriter

| Property Name | Required | Description |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.driver.drivers

import java.util.Properties

import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.commons.io.TempDirectory
import za.co.absa.commons.spark.SparkTestBase

/**
* This e2e test requires a Docker installation on the executing machine.
*/
class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers with SparkTestBase with BeforeAndAfter {

private val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
private val baseDir = TempDirectory("hyperdriveE2eTest").deleteOnExit()
private val baseDirPath = baseDir.path.toUri
private val checkpointDir = s"$baseDirPath/checkpoint"
private val destinationDir = s"$baseDirPath/destination"

behavior of "CommandLineIngestionDriver"

before {
fs.mkdirs(new Path(destinationDir))
}

it should "execute the whole kafka-to-parquet pipeline with the incrementing version transformer" in {
// given
val kafkaSchemaRegistryWrapper = new KafkaSchemaRegistryWrapper
val topic = "e2etest"
val ingestionSize = 50
val schemaString = raw"""{"type": "record", "name": "$topic", "fields": [
{"type": "string", "name": "field1"},
{"type": "int", "name": "field2"}
]}"""
val schema = new Parser().parse(schemaString)
val producer = createProducer(kafkaSchemaRegistryWrapper)

val driverConfig = Map(
// Pipeline settings
"component.ingestor" -> "spark",
"component.reader" -> "za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader",
"component.transformer.id.0" -> "[avro.decoder]",
"component.transformer.class.[avro.decoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer",
"component.transformer.id.1" -> "[version.incrementer]",
"component.transformer.class.[version.incrementer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer",
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter",

// Spark settings
"ingestor.spark.app.name" -> "ingestor-app",
"ingestor.spark.termination.method" -> "AwaitTermination",

// Source(Kafka) settings
"reader.kafka.topic" -> topic,
"reader.kafka.brokers" -> kafkaSchemaRegistryWrapper.kafkaUrl,

// Format(ABRiS) settings
"transformer.[avro.decoder].schema.registry.url" -> kafkaSchemaRegistryWrapper.schemaRegistryUrl,
"transformer.[avro.decoder].value.schema.id" -> "latest",
"transformer.[avro.decoder].value.schema.naming.strategy" -> "topic.name",

// Transformations(Enceladus) settings
// comma separated list of columns to select
"transformer.[version.incrementer].report.date" -> "2020-03-31",

// Sink(Parquet) settings
"writer.common.checkpoint.location" -> (checkpointDir + "/${reader.kafka.topic}"),
"writer.parquet.destination.directory" -> destinationDir,
"writer.parquet.partition.columns" -> "hyperdrive_date, hyperdrive_version",
"writer.parquet.metadata.check" -> "true"
)
val driverConfigArray = driverConfig.map { case (key, value) => s"$key=$value" }.toArray

// when (1)
produceMessage(ingestionSize, producer, schema, topic)
CommandLineIngestionDriver.main(driverConfigArray)

// then (1)
fs.exists(new Path(s"$checkpointDir/$topic")) shouldBe true

fs.exists(new Path(s"$destinationDir/hyperdrive_date=2020-03-31/hyperdrive_version=1")) shouldBe true
fs.exists(new Path(s"$destinationDir/hyperdrive_date=2020-03-31/hyperdrive_version=2")) shouldBe false
val df = spark.read.parquet(destinationDir)
df.count shouldBe ingestionSize
import spark.implicits._
df.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version")
df.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1)

// when (2)
produceMessage(ingestionSize, producer, schema, topic)
CommandLineIngestionDriver.main(driverConfigArray)

// then (2)
fs.exists(new Path(s"$destinationDir/hyperdrive_date=2020-03-31/hyperdrive_version=1")) shouldBe true
fs.exists(new Path(s"$destinationDir/hyperdrive_date=2020-03-31/hyperdrive_version=2")) shouldBe true
val df2 = spark.read.parquet(destinationDir)
df2.count shouldBe 2 * ingestionSize
import spark.implicits._
df2.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version")
df2.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1, 2)
df2.select("field1").distinct().as[String].collect() should contain theSameElementsAs List("hello")
df2.select("field2").as[Int].collect() should contain theSameElementsAs (0 until ingestionSize) ++ (0 until ingestionSize)
df2.select("hyperdrive_date").distinct()
.as[java.sql.Date].collect() should contain theSameElementsAs List(java.sql.Date.valueOf("2020-03-31"))
}

after {
SchemaManager.reset()
}

private def produceMessage(numberOfRecords: Int, producer: KafkaProducer[Int, GenericRecord], schema: Schema, topic: String) = {
for (i <- 0 until numberOfRecords) {
val record = new GenericData.Record(schema)
record.put("field1", "hello")
record.put("field2", i)
val producerRecord = new ProducerRecord[Int, GenericRecord](topic, 1, record)
producer.send(producerRecord)
}
}

private def createProducer(kafkaSchemaRegistryWrapper: KafkaSchemaRegistryWrapper): KafkaProducer[Int, GenericRecord] = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSchemaRegistryWrapper.kafka.getBootstrapServers)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer")
kafkaSchemaRegistryWrapper.createProducer(props)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
# limitations under the License.
#
za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformerLoader
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformerLoader
za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformerLoader
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformerLoader
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@
# limitations under the License.
#
za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriterLoader
za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetPartitioningStreamWriterLoader
za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriterLoader
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion

import java.time.LocalDate
import java.time.format.DateTimeFormatter

import org.apache.commons.configuration2.Configuration
import org.apache.hadoop.fs.Path
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.functions.{lit, to_date}
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow
import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys

private[transformer] class AddDateVersionTransformer(val reportDate: String, val destination: String) extends StreamTransformer {

import AddDateVersionTransformer.{ColumnDate, ColumnVersion}
override def transform(dataFrame: DataFrame): DataFrame = {
val spark = dataFrame.sparkSession
val initialVersion = 1
val nextVersion = findNextVersion(spark, initialVersion)
val dfWithDate = dataFrame
.withColumn(ColumnDate, to_date(lit(reportDate), AddDateVersionTransformer.reportDateFormat))
.withColumn(ColumnVersion, lit(nextVersion))
dfWithDate
}

private def findNextVersion(spark: SparkSession, initialVersion: Int): Int = {
if (noCommittedParquetFilesExist(spark)) {
initialVersion
} else {
import spark.implicits._
val df = spark.read.parquet(destination)
val versions = df.select(df(ColumnVersion))
.filter(df(ColumnDate) === lit(reportDate))
.distinct()
.as[Int]
.collect().toList

if (versions.nonEmpty) versions.max + 1 else initialVersion
}
}

private def noCommittedParquetFilesExist(spark: SparkSession): Boolean = {
val fileCatalog = new MetadataLogFileIndex(spark, new Path(destination), None)
!FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration) || fileCatalog.allFiles().isEmpty
}
}

object AddDateVersionTransformer extends StreamTransformerFactory with AddDateVersionTransformerAttributes {
val reportDateFormat: String = "yyyy-MM-dd"
val reportDateFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern(reportDateFormat)
val ColumnDate = "hyperdrive_date"
val ColumnVersion = "hyperdrive_version"

def apply(config: Configuration): StreamTransformer = {
val reportDate = getReportDateString(config)
val destinationDirectory = getDestinationDirectory(config)

LogManager.getLogger.info(s"Going to create AddDateVersionTransformer instance")

new AddDateVersionTransformer(reportDate, destinationDirectory)
}

override def getMappingFromRetainedGlobalConfigToLocalConfig(globalConfig: Configuration): Map[String, String] = Map(
ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY -> ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY
)

private def getReportDateString(configuration: Configuration): String = {
configuration.getString(KeyReportDate) match {
case value: String => value
case _ => reportDateFormatter.format(LocalDate.now())
}
}

def getDestinationDirectory(configuration: Configuration): String =
getOrThrow(ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY, configuration,
errorMessage = s"Destination directory not found. Is '${ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY}' defined?")

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion

import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ColumnSelectorStreamTransformerKeys.KEY_COLUMNS_TO_SELECT

trait AddDateVersionTransformerAttributes extends HasComponentAttributes {

val KeyReportDate = "report.date"

override def getName: String = "Add Date Version Transformer"

override def getDescription: String = "This transformer adds a date and an incrementing version number as columns hyperdrive_date and hyperdrive_version." +
" hyperdrive_date and hyperdrive_version need to be specified in that order as partition columns for the increment to work." +
" This transformer depends on a writer which defines writer.parquet.destination.directory"

override def getProperties: Map[String, PropertyMetadata] = Map(
KeyReportDate -> PropertyMetadata("Date", Some("Date to add as a column, e.g. 2018-01-23. By default, the current date will be taken."), required = false)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet
package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion

import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWriterFactoryProvider}
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider}

class ParquetPartitioningStreamWriterLoader extends StreamWriterFactoryProvider {
override def getComponentFactory: StreamWriterFactory = ParquetPartitioningStreamWriter
class AddDateVersionTransformerLoader extends StreamTransformerFactoryProvider {
override def getComponentFactory: StreamTransformerFactory = AddDateVersionTransformer
}
Loading