Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The data ingestion pipeline of Hyperdrive consists of four components: readers,
### Built-in components
- `KafkaStreamReader` - reads from a Kafka topic.
- `ConfluentAvroDecodingTransformer` - decodes the payload as Confluent Avro (through [ABRiS](https://github.com/AbsaOSS/ABRiS)), retrieving the schema from the specified Schema Registry. This transformer is capable of seamlessly handling whatever schemas the payload messages are using.
- `ConfluentAvroEncodingTransformer` - encodes the payload as Confluent Avro (through [ABRiS](https://github.com/AbsaOSS/ABRiS)), updating the schema to the specified Schema Registry. This transformer is capable of seamlessly handling whatever schema the dataframe is using.
- `ColumnSelectorStreamTransformer` - selects all columns from the decoded DataFrame.
- `AddDateVersionTransformerStreamWriter` - adds columns for ingestion date and an auto-incremented version number, to be used for partitioning.
- `ParquetStreamWriter` - writes the DataFrame as Parquet, in **append** mode.
Expand Down Expand Up @@ -130,6 +131,21 @@ The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.

For detailed information on the subject name strategy, please take a look at the [Schema Registry Documentation](https://docs.confluent.io/current/schema-registry/).

##### ConfluentAvroStreamEncodingTransformer
The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroStreamEncodingTransformer` requires the property `writer.kafka.topic` to be set.

| Property Name | Required | Description |
| :--- | :---: | :--- |
| `transformer.{transformer-id}.schema.registry.url` | Yes | URL of Schema Registry, e.g. http://localhost:8081. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_REGISTRY_URL` |
| `transformer.{transformer-id}.value.schema.naming.strategy` | Yes | Subject name strategy of Schema Registry. Possible values are `topic.name`, `record.name` or `topic.record.name`. Equivalent to ABRiS property `SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY` |
| `transformer.{transformer-id}.value.schema.record.name` | Yes for naming strategies `record.name` and `topic.record.name` | Name of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY` |
| `transformer.{transformer-id}.value.schema.record.namespace` | Yes for naming strategies `record.name` and `topic.record.name` | Namespace of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY` |
| `transformer.{transformer-id}.produce.keys` | No | If set to `true`, keys will be produced according to the properties `key.column.prefix` and `key.column.names` of the [Hyperdrive Context](#hyperdrive-context) |
| `transformer.{transformer-id}.key.schema.naming.strategy` | Yes if `produce.keys` is true | Subject name strategy for key |
| `transformer.{transformer-id}.key.schema.record.name` | Yes for key naming strategies `record.name` and `topic.record.name` | Name of the record. |
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |

##### ColumnSelectorStreamTransformer
| Property Name | Required | Description |
| :--- | :---: | :--- |
Expand Down Expand Up @@ -162,14 +178,6 @@ Any additional properties for the `DataStreamWriter` can be added with the prefi
| :--- | :---: | :--- |
| `writer.kafka.topic` | Yes | The name of the kafka topic to ingest data from. Equivalent to Spark property `topic` |
| `writer.kafka.brokers` | Yes | List of kafka broker URLs . Equivalent to Spark property `kafka.bootstrap.servers` |
| `writer.kafka.schema.registry.url` | Yes | URL of Schema Registry, e.g. http://localhost:8081. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_REGISTRY_URL` |
| `writer.kafka.value.schema.naming.strategy` | Yes | Subject name strategy of Schema Registry. Possible values are `topic.name`, `record.name` or `topic.record.name`. Equivalent to ABRiS property `SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY` |
| `writer.kafka.value.schema.record.name` | Yes for naming strategies `record.name` and `topic.record.name` | Name of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY` |
| `writer.kafka.value.schema.record.namespace` | Yes for naming strategies `record.name` and `topic.record.name` | Namespace of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY` |
| `writer.kafka.produce.keys` | No | If set to `true`, keys will be produced according to the properties `key.column.prefix` and `key.column.names` of the [Hyperdrive Context](#hyperdrive-context) |
| `writer.kafka.key.schema.naming.strategy` | Yes if `writer.kafka.produce.keys` is true | Subject name strategy for the key |
| `writer.kafka.key.schema.record.name` | Yes if key naming strategy is either `record.name` or `topic.record.name` | Name of the record. |
| `writer.kafka.key.schema.record.namespace` | Yes if key naming strategy is either `record.name` or `topic.record.name` | Namespace of the record. |
| `writer.common.trigger.type` | No | See [Combination writer properties](#common-writer-properties) |
| `writer.common.trigger.processing.time` | No | See [Combination writer properties](#common-writer-properties) |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w
"component.transformer.class.[avro.decoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer",
"component.transformer.id.1" -> "column.selector",
"component.transformer.class.column.selector" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer",
"component.transformer.id.2" -> "[avro.encoder]",
"component.transformer.class.[avro.encoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroEncodingTransformer",
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter",

// Spark settings
Expand All @@ -96,28 +98,29 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w
"reader.kafka.topic" -> sourceTopic,
"reader.kafka.brokers" -> kafkaSchemaRegistryWrapper.kafkaUrl,

// Format(ABRiS) settings
// Avro Decoder (ABRiS) settings
"transformer.[avro.decoder].schema.registry.url" -> kafkaSchemaRegistryWrapper.schemaRegistryUrl,
"transformer.[avro.decoder].value.schema.id" -> "latest",
"transformer.[avro.decoder].value.schema.naming.strategy" -> "topic.name",
"transformer.[avro.decoder].consume.keys" -> "true",
"transformer.[avro.decoder].key.schema.id" -> "latest",
"transformer.[avro.decoder].key.schema.naming.strategy" -> "topic.name",

// Transformations(Enceladus) settings
// comma separated list of columns to select
"transformer.column.selector.columns.to.select" -> "*",

// Avro Encoder (ABRiS) settings
"transformer.[avro.encoder].schema.registry.url" -> "${decoder.avro.schema.registry.url}",
"transformer.[avro.encoder].value.schema.naming.strategy" -> "topic.name",
"transformer.[avro.encoder].produce.keys" -> "true",
"transformer.[avro.encoder].key.schema.naming.strategy" -> "topic.name",

// Sink(Kafka) settings
"writer.common.checkpoint.location" -> (checkpointDir + "/${reader.kafka.topic}"),
"writer.common.trigger.type" -> "ProcessingTime",
"writer.common.trigger.processing.time" -> "1000",
"writer.kafka.topic" -> destinationTopic,
"writer.kafka.brokers" -> "${reader.kafka.brokers}",
"writer.kafka.schema.registry.url" -> "${decoder.avro.schema.registry.url}",
"writer.kafka.value.schema.naming.strategy" -> "topic.name",
"writer.kafka.produce.keys" -> "true",
"writer.kafka.key.schema.naming.strategy" -> "topic.name"
"writer.kafka.brokers" -> "${reader.kafka.brokers}"
)
val driverConfigArray = driverConfig.map { case (key, value) => s"$key=$value" }.toArray

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, struct}
import za.co.absa.abris.avro.functions.to_confluent_avro
import za.co.absa.abris.avro.read.confluent.SchemaManager.{PARAM_KEY_SCHEMA_ID, PARAM_KEY_SCHEMA_NAMING_STRATEGY, PARAM_VALUE_SCHEMA_ID, PARAM_VALUE_SCHEMA_NAMING_STRATEGY}
import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils
import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys
import za.co.absa.hyperdrive.ingestor.implementation.utils.{SchemaRegistryProducerConfigKeys, SchemaRegistrySettingsUtil}
import za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter.KEY_TOPIC

private[transformer] class ConfluentAvroEncodingTransformer(
val valueSchemaRegistrySettings: Map[String, String],
val keySchemaRegistrySettings: Option[Map[String, String]])
extends StreamTransformer {

if (valueSchemaRegistrySettings.isEmpty) {
throw new IllegalArgumentException(
"Empty Schema Registry settings received.")
}

private val logger = LogManager.getLogger

override def transform(dataFrame: DataFrame): DataFrame = {
logger.info(s"SchemaRegistry settings: $valueSchemaRegistrySettings")

keySchemaRegistrySettings match {
case Some(keySettings) => getKeyValueDataFrame(dataFrame, keySettings)
case None => getValueDataFrame(dataFrame)
}
}

private def getKeyValueDataFrame(dataFrame: DataFrame, keySchemaRegistrySettings: Map[String, String]): DataFrame = {
val keyColumnPrefix = HyperdriveContext.get[String](HyperdriveContextKeys.keyColumnPrefix).get
val keyColumnNames = HyperdriveContext.get[Seq[String]](HyperdriveContextKeys.keyColumnNames).get
val prefixedKeyColumnNames = keyColumnNames.map(c => s"$keyColumnPrefix$c")

val valueColumns = dataFrame.columns.toSeq
.filterNot(columnName => prefixedKeyColumnNames.contains(columnName))
.map(c => dataFrame(c))
val unprefixedKeyColumns = keyColumnNames.map(c => dataFrame(s"$keyColumnPrefix$c").as(c))
val unprefixedDataFrame = dataFrame.select(struct(unprefixedKeyColumns: _*) as 'key, struct(valueColumns: _*) as 'value)
unprefixedDataFrame.select(
to_confluent_avro(col("key"), keySchemaRegistrySettings) as 'key,
to_confluent_avro(col("value"), valueSchemaRegistrySettings) as 'value)
}

private def getValueDataFrame(dataFrame: DataFrame): DataFrame = {
val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
dataFrame.select(to_confluent_avro(allColumns, valueSchemaRegistrySettings) as 'value)
}
}

object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with ConfluentAvroEncodingTransformerAttributes {

object ValueSchemaConfigKeys extends SchemaRegistryProducerConfigKeys {
override val schemaRegistryUrl: String = KEY_SCHEMA_REGISTRY_URL
override val namingStrategy: String = KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY
override val recordName: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME
override val recordNamespace: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE
override val paramSchemaId: String = PARAM_VALUE_SCHEMA_ID
override val paramSchemaNamingStrategy: String = PARAM_VALUE_SCHEMA_NAMING_STRATEGY
}

object KeySchemaConfigKeys extends SchemaRegistryProducerConfigKeys {
override val schemaRegistryUrl: String = KEY_SCHEMA_REGISTRY_URL
override val namingStrategy: String = KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY
override val recordName: String = KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME
override val recordNamespace: String = KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE
override val paramSchemaId: String = PARAM_KEY_SCHEMA_ID
override val paramSchemaNamingStrategy: String = PARAM_KEY_SCHEMA_NAMING_STRATEGY
}

override def apply(config: Configuration): StreamTransformer = {
val topic = config.getString(KEY_TOPIC)

val valueSchemaRegistrySettings = SchemaRegistrySettingsUtil.getProducerSettings(config, topic, ValueSchemaConfigKeys)
val keySchemaRegistrySettingsOpt = ConfigUtils.getOrNone(KEY_PRODUCE_KEYS, config)
.flatMap(_ => Some(SchemaRegistrySettingsUtil.getProducerSettings(config, topic, KeySchemaConfigKeys)))

new ConfluentAvroEncodingTransformer(valueSchemaRegistrySettings, keySchemaRegistrySettingsOpt)
}

override def getMappingFromRetainedGlobalConfigToLocalConfig(globalConfig: Configuration): Map[String, String] = Map(
KEY_TOPIC -> KEY_TOPIC
)

}



Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.AvroKafkaStreamDecoderKeys._

trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes {
val KEY_SCHEMA_REGISTRY_URL = "schema.registry.url"
val KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY = "value.schema.naming.strategy"
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME = "value.schema.record.name"
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE = "value.schema.record.namespace"

val KEY_PRODUCE_KEYS = "produce.keys"
val KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY = "key.schema.naming.strategy"
val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME = "key.schema.record.name"
val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE = "key.schema.record.namespace"

override def getName: String = "Confluent Avro Stream Encoder"

override def getDescription: String = "Encoder for records in Avro format. The encoder connects to a Schema Registry instance to update the schema information."

override def getProperties: Map[String, PropertyMetadata] = Map(
KEY_SCHEMA_REGISTRY_URL -> PropertyMetadata("Schema Registry URL", None, required = true),
KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY -> PropertyMetadata("Value-Schema naming strategy",
Some("Subject name strategy of Schema Registry. Must be one of \"topic.name\", \"record.name\" or \"topic.record.name\""),
required = true),
KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME -> PropertyMetadata("Value-Record name",
Some("Record name for naming strategies record.name or topic.record.name"), required = false),
KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE -> PropertyMetadata("Value-Record namespace",
Some("Record namespace for naming strategies record.name or topic.record.name"), required = false),

KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY -> PropertyMetadata("Key-Schema naming strategy",
Some("Subject name strategy of Schema Registry. Must be one of \"topic.name\", \"record.name\" or \"topic.record.name\""), required = false),
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME -> PropertyMetadata("Key-Record name", Some("Key-Record name for naming strategies record.name or topic.record.name"), required = false),
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false)
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider}

class ConfluentAvroEncodingTransformerLoader extends StreamTransformerFactoryProvider {
override def getComponentFactory: StreamTransformerFactory = ConfluentAvroEncodingTransformer
}
Loading