diff --git a/README.md b/README.md index e30f837a..d3dde6e6 100644 --- a/README.md +++ b/README.md @@ -39,8 +39,8 @@ The data ingestion pipeline of Hyperdrive consists of four components: readers, - `KafkaStreamReader` - reads from a Kafka topic. - `ConfluentAvroDecodingTransformer` - decodes the payload as Confluent Avro (through [ABRiS](https://github.com/AbsaOSS/ABRiS)), retrieving the schema from the specified Schema Registry. This transformer is capable of seamlessly handling whatever schemas the payload messages are using. - `ColumnSelectorStreamTransformer` - selects all columns from the decoded DataFrame. -- `ParquetStreamWriter` - writes the DataFrame as Parquet, in **append** mode, by invoking Spark's `processAllAvailable` method on the stream writer. -- `ParquetPartitioningStreamWriter` - writes the DataFrame as Parquet, partitioned by the ingestion date and an auto-incremented version number. +- `AddDateVersionTransformerStreamWriter` - adds columns for ingestion date and an auto-incremented version number, to be used for partitioning. +- `ParquetStreamWriter` - writes the DataFrame as Parquet, in **append** mode. - `KafkaStreamWriter` - writes to a Kafka topic. ### Custom components @@ -86,7 +86,7 @@ The configuration file may be created from the template located at `driver/src/r | `component.reader` | Yes | Fully qualified name of reader component, e.g.`za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader` | | `component.transformer.id.{order}` | No | An arbitrary but unique string, referenced in this documentation as `{transformer-id}` | | `component.transformer.class.{transformer-id}` | No | Fully qualified name of transformer component, e.g. `za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer` | -| `component.writer` | Yes | Fully qualified name of writer component, e.g. `za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetPartitioningStreamWriter` | +| `component.writer` | Yes | Fully qualified name of writer component, e.g. `za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter` | Multiple transformers can be configured in the pipeline, including multiple instances of the same transformer. For each transformer instance, `component.transformer.id.{order}` and `component.transformer.class.{transformer-id}` have to specified, where `{order}` and `{transformer-id}` need to be unique. @@ -135,6 +135,15 @@ For detailed information on the subject name strategy, please take a look at the | :--- | :---: | :--- | | `transformer.{transformer-id}.columns.to.select` | Yes | Comma-separated list of columns to select. `*` can be used to select all columns. Only existing columns using column names may be selected (i.e. expressions cannot be constructed) | +##### AddDateVersionTransformer +The `AddDateVersionTransformer` adds the columns `hyperdrive_date` and `hyperdrive_version`. `hyperdrive_date` is the ingestion date (or a user-defined date), while `hyperdrive_version` is a number automatically incremented with every ingestion, starting at 1. +For the auto-increment to work, `hyperdrive_date` and `hyperdrive_version` need to be defined as partition columns. +Caution: This transformer requires a writer which defines `writer.parquet.destination.directory`. + +| Property Name | Required | Description | +| :--- | :---: | :--- | +| `transformer.{transformer-id}.report.date` | No | User-defined date for `hyperdrive_date` in format `yyyy-MM-dd`. Default date is the date of the ingestion | + See [Pipeline settings](#pipeline-settings) for details about `{transformer-id}`. ##### ParquetStreamWriter | Property Name | Required | Description | @@ -147,18 +156,6 @@ See [Pipeline settings](#pipeline-settings) for details about `{transformer-id}` Any additional properties for the `DataStreamWriter` can be added with the prefix `writer.parquet.options`, e.g. `writer.parquet.options.key=value` -##### ParquetPartitioningStreamWriter -The `ParquetPartitioningStreamWriter` partitions every ingestion in the columns `hyperdrive_date` and `hyperdrive_version`. `hyperdrive_date` is the ingestion date (or a user-defined date), while `hyperdrive_version` is a number automatically incremented with every ingestion, starting at 1. - -| Property Name | Required | Description | -| :--- | :---: | :--- | -| `writer.parquet.destination.directory` | Yes | Destination path of the sink. Equivalent to Spark property `path` for the `DataStreamWriter` | -| `writer.parquet.partitioning.report.date` | No | User-defined date for `hyperdrive_date` in format `yyyy-MM-dd`. Default date is the date of the ingestion | -| `writer.common.trigger.type` | No | See [Combination writer properties](#common-writer-properties) | -| `writer.common.trigger.processing.time` | No | See [Combination writer properties](#common-writer-properties) | - -Any additional properties for the `DataStreamWriter` can be added with the prefix `writer.parquet.options`, e.g. `writer.parquet.options.key=value` - ##### KafkaStreamWriter | Property Name | Required | Description | diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetIncrementingVersionDockerTest.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetIncrementingVersionDockerTest.scala new file mode 100644 index 00000000..4f91cc50 --- /dev/null +++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetIncrementingVersionDockerTest.scala @@ -0,0 +1,150 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.driver.drivers + +import java.util.Properties + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} +import za.co.absa.abris.avro.read.confluent.SchemaManager +import za.co.absa.commons.io.TempDirectory +import za.co.absa.commons.spark.SparkTestBase + +/** + * This e2e test requires a Docker installation on the executing machine. + */ +class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers with SparkTestBase with BeforeAndAfter { + + private val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + private val baseDir = TempDirectory("hyperdriveE2eTest").deleteOnExit() + private val baseDirPath = baseDir.path.toUri + private val checkpointDir = s"$baseDirPath/checkpoint" + private val destinationDir = s"$baseDirPath/destination" + + behavior of "CommandLineIngestionDriver" + + before { + fs.mkdirs(new Path(destinationDir)) + } + + it should "execute the whole kafka-to-parquet pipeline with the incrementing version transformer" in { + // given + val kafkaSchemaRegistryWrapper = new KafkaSchemaRegistryWrapper + val topic = "e2etest" + val ingestionSize = 50 + val schemaString = raw"""{"type": "record", "name": "$topic", "fields": [ + {"type": "string", "name": "field1"}, + {"type": "int", "name": "field2"} + ]}""" + val schema = new Parser().parse(schemaString) + val producer = createProducer(kafkaSchemaRegistryWrapper) + + val driverConfig = Map( + // Pipeline settings + "component.ingestor" -> "spark", + "component.reader" -> "za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader", + "component.transformer.id.0" -> "[avro.decoder]", + "component.transformer.class.[avro.decoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer", + "component.transformer.id.1" -> "[version.incrementer]", + "component.transformer.class.[version.incrementer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer", + "component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter", + + // Spark settings + "ingestor.spark.app.name" -> "ingestor-app", + "ingestor.spark.termination.method" -> "AwaitTermination", + + // Source(Kafka) settings + "reader.kafka.topic" -> topic, + "reader.kafka.brokers" -> kafkaSchemaRegistryWrapper.kafkaUrl, + + // Format(ABRiS) settings + "transformer.[avro.decoder].schema.registry.url" -> kafkaSchemaRegistryWrapper.schemaRegistryUrl, + "transformer.[avro.decoder].value.schema.id" -> "latest", + "transformer.[avro.decoder].value.schema.naming.strategy" -> "topic.name", + + // Transformations(Enceladus) settings + // comma separated list of columns to select + "transformer.[version.incrementer].report.date" -> "2020-03-31", + + // Sink(Parquet) settings + "writer.common.checkpoint.location" -> (checkpointDir + "/${reader.kafka.topic}"), + "writer.parquet.destination.directory" -> destinationDir, + "writer.parquet.partition.columns" -> "hyperdrive_date, hyperdrive_version", + "writer.parquet.metadata.check" -> "true" + ) + val driverConfigArray = driverConfig.map { case (key, value) => s"$key=$value" }.toArray + + // when (1) + produceMessage(ingestionSize, producer, schema, topic) + CommandLineIngestionDriver.main(driverConfigArray) + + // then (1) + fs.exists(new Path(s"$checkpointDir/$topic")) shouldBe true + + fs.exists(new Path(s"$destinationDir/hyperdrive_date=2020-03-31/hyperdrive_version=1")) shouldBe true + fs.exists(new Path(s"$destinationDir/hyperdrive_date=2020-03-31/hyperdrive_version=2")) shouldBe false + val df = spark.read.parquet(destinationDir) + df.count shouldBe ingestionSize + import spark.implicits._ + df.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version") + df.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1) + + // when (2) + produceMessage(ingestionSize, producer, schema, topic) + CommandLineIngestionDriver.main(driverConfigArray) + + // then (2) + fs.exists(new Path(s"$destinationDir/hyperdrive_date=2020-03-31/hyperdrive_version=1")) shouldBe true + fs.exists(new Path(s"$destinationDir/hyperdrive_date=2020-03-31/hyperdrive_version=2")) shouldBe true + val df2 = spark.read.parquet(destinationDir) + df2.count shouldBe 2 * ingestionSize + import spark.implicits._ + df2.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version") + df2.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1, 2) + df2.select("field1").distinct().as[String].collect() should contain theSameElementsAs List("hello") + df2.select("field2").as[Int].collect() should contain theSameElementsAs (0 until ingestionSize) ++ (0 until ingestionSize) + df2.select("hyperdrive_date").distinct() + .as[java.sql.Date].collect() should contain theSameElementsAs List(java.sql.Date.valueOf("2020-03-31")) + } + + after { + SchemaManager.reset() + } + + private def produceMessage(numberOfRecords: Int, producer: KafkaProducer[Int, GenericRecord], schema: Schema, topic: String) = { + for (i <- 0 until numberOfRecords) { + val record = new GenericData.Record(schema) + record.put("field1", "hello") + record.put("field2", i) + val producerRecord = new ProducerRecord[Int, GenericRecord](topic, 1, record) + producer.send(producerRecord) + } + } + + private def createProducer(kafkaSchemaRegistryWrapper: KafkaSchemaRegistryWrapper): KafkaProducer[Int, GenericRecord] = { + val props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSchemaRegistryWrapper.kafka.getBootstrapServers) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer") + props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer") + kafkaSchemaRegistryWrapper.createProducer(props) + } + +} diff --git a/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider index 5166a422..2ab1a4c2 100644 --- a/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider +++ b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider @@ -13,4 +13,5 @@ # limitations under the License. # za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformerLoader -za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformerLoader \ No newline at end of file +za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformerLoader +za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformerLoader diff --git a/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider index 0efe7e2b..79322e9e 100644 --- a/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider +++ b/ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider @@ -13,5 +13,4 @@ # limitations under the License. # za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriterLoader -za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetPartitioningStreamWriterLoader za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriterLoader diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala new file mode 100644 index 00000000..1b64ffe0 --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion + +import java.time.LocalDate +import java.time.format.DateTimeFormatter + +import org.apache.commons.configuration2.Configuration +import org.apache.hadoop.fs.Path +import org.apache.logging.log4j.LogManager +import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.functions.{lit, to_date} +import org.apache.spark.sql.{DataFrame, SparkSession} +import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory} +import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys + +private[transformer] class AddDateVersionTransformer(val reportDate: String, val destination: String) extends StreamTransformer { + + import AddDateVersionTransformer.{ColumnDate, ColumnVersion} + override def transform(dataFrame: DataFrame): DataFrame = { + val spark = dataFrame.sparkSession + val initialVersion = 1 + val nextVersion = findNextVersion(spark, initialVersion) + val dfWithDate = dataFrame + .withColumn(ColumnDate, to_date(lit(reportDate), AddDateVersionTransformer.reportDateFormat)) + .withColumn(ColumnVersion, lit(nextVersion)) + dfWithDate + } + + private def findNextVersion(spark: SparkSession, initialVersion: Int): Int = { + if (noCommittedParquetFilesExist(spark)) { + initialVersion + } else { + import spark.implicits._ + val df = spark.read.parquet(destination) + val versions = df.select(df(ColumnVersion)) + .filter(df(ColumnDate) === lit(reportDate)) + .distinct() + .as[Int] + .collect().toList + + if (versions.nonEmpty) versions.max + 1 else initialVersion + } + } + + private def noCommittedParquetFilesExist(spark: SparkSession): Boolean = { + val fileCatalog = new MetadataLogFileIndex(spark, new Path(destination), None) + !FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration) || fileCatalog.allFiles().isEmpty + } +} + +object AddDateVersionTransformer extends StreamTransformerFactory with AddDateVersionTransformerAttributes { + val reportDateFormat: String = "yyyy-MM-dd" + val reportDateFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern(reportDateFormat) + val ColumnDate = "hyperdrive_date" + val ColumnVersion = "hyperdrive_version" + + def apply(config: Configuration): StreamTransformer = { + val reportDate = getReportDateString(config) + val destinationDirectory = getDestinationDirectory(config) + + LogManager.getLogger.info(s"Going to create AddDateVersionTransformer instance") + + new AddDateVersionTransformer(reportDate, destinationDirectory) + } + + override def getMappingFromRetainedGlobalConfigToLocalConfig(globalConfig: Configuration): Map[String, String] = Map( + ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY -> ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY + ) + + private def getReportDateString(configuration: Configuration): String = { + configuration.getString(KeyReportDate) match { + case value: String => value + case _ => reportDateFormatter.format(LocalDate.now()) + } + } + + def getDestinationDirectory(configuration: Configuration): String = + getOrThrow(ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY, configuration, + errorMessage = s"Destination directory not found. Is '${ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY}' defined?") + +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformerAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformerAttributes.scala new file mode 100644 index 00000000..14e4a3ee --- /dev/null +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformerAttributes.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion + +import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ColumnSelectorStreamTransformerKeys.KEY_COLUMNS_TO_SELECT + +trait AddDateVersionTransformerAttributes extends HasComponentAttributes { + + val KeyReportDate = "report.date" + + override def getName: String = "Add Date Version Transformer" + + override def getDescription: String = "This transformer adds a date and an incrementing version number as columns hyperdrive_date and hyperdrive_version." + + " hyperdrive_date and hyperdrive_version need to be specified in that order as partition columns for the increment to work." + + " This transformer depends on a writer which defines writer.parquet.destination.directory" + + override def getProperties: Map[String, PropertyMetadata] = Map( + KeyReportDate -> PropertyMetadata("Date", Some("Date to add as a column, e.g. 2018-01-23. By default, the current date will be taken."), required = false) + ) +} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterLoader.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformerLoader.scala similarity index 62% rename from ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterLoader.scala rename to ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformerLoader.scala index 1e051b9e..70d99986 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterLoader.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformerLoader.scala @@ -13,10 +13,10 @@ * limitations under the License. */ -package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet +package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion -import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWriterFactoryProvider} +import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider} -class ParquetPartitioningStreamWriterLoader extends StreamWriterFactoryProvider { - override def getComponentFactory: StreamWriterFactory = ParquetPartitioningStreamWriter +class AddDateVersionTransformerLoader extends StreamTransformerFactoryProvider { + override def getComponentFactory: StreamTransformerFactory = AddDateVersionTransformer } diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/AbstractParquetStreamWriter.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/AbstractParquetStreamWriter.scala deleted file mode 100644 index 5c8fe488..00000000 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/AbstractParquetStreamWriter.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet - -import org.apache.commons.configuration2.Configuration -import org.apache.commons.lang3.StringUtils -import org.apache.logging.log4j.LogManager -import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, StreamingQuery, Trigger} -import org.apache.spark.sql.{DataFrame, Row} -import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils -import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.{getOrNone, getOrThrow} -import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterProperties} -import za.co.absa.hyperdrive.ingestor.implementation.utils.MetadataLogUtil -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys._ - -import scala.util.{Failure, Success} - -private[writer] abstract class AbstractParquetStreamWriter(destination: String, trigger: Trigger, - val checkpointLocation: String, - val partitionColumns: Option[Seq[String]], - val doMetadataCheck: Boolean, - val extraConfOptions: Map[String, String]) extends StreamWriter { - private val logger = LogManager.getLogger - if (StringUtils.isBlank(destination)) { - throw new IllegalArgumentException(s"Invalid PARQUET destination: '$destination'") - } - - override def write(dataFrame: DataFrame): StreamingQuery = { - - if (doMetadataCheck) { - MetadataLogUtil.getParquetFilesNotListedInMetadataLog(dataFrame.sparkSession, destination) match { - case Failure(exception) => throw exception - case Success(inconsistentFiles) if inconsistentFiles.nonEmpty => throw new IllegalStateException( - "Inconsistent Metadata Log. The following files are on the filesystem, but not in the metadata log," + - "most probably due to a previous partial write. If that is the case, they should be removed.\n " + - s"${inconsistentFiles.reduce(_ + "\n" + _)}") - case _ => // do nothing - } - } - - val outDataframe = transformDataframe(dataFrame) - logger.info(s"Writing to $destination") - getOutStream(outDataframe) - .options(extraConfOptions) - .option(StreamWriterProperties.CheckpointLocation, checkpointLocation) - .start(destination) - } - - def getDestination: String = destination - - /** - * This method has only been added to preserve existing functionality of ParquetPartitioningStreamWriter. - * This method will be removed when ParquetPartitioningStreamWriter has been refactored to a Transformer. - * @see https://github.com/AbsaOSS/hyperdrive/issues/118 - */ - @deprecated - protected def transformDataframe(dataFrame: DataFrame): DataFrame = dataFrame - - private def getOutStream(dataFrame: DataFrame): DataStreamWriter[Row] = { - val dataStreamWriter = dataFrame.writeStream - val dataStreamWriterWithPartition = partitionColumns match { - case Some(columns) => dataStreamWriter.partitionBy(columns: _*) - case None => dataStreamWriter - } - dataStreamWriterWithPartition - .trigger(trigger) - .format(source = "parquet") - .outputMode(OutputMode.Append()) - } -} - -object AbstractParquetStreamWriter { - - def getDestinationDirectory(configuration: Configuration): String = getOrThrow(KEY_DESTINATION_DIRECTORY, configuration, errorMessage = s"Destination directory not found. Is '$KEY_DESTINATION_DIRECTORY' defined?") - - def getMetadataCheck(configuration: Configuration): Boolean = getOrNone(KEY_METADATA_CHECK, configuration).isDefined - - def getExtraOptions(configuration: Configuration): Map[String, String] = ConfigUtils.getPropertySubset(configuration, KEY_EXTRA_CONFS_ROOT) -} - - diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriter.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriter.scala deleted file mode 100644 index 3d24cda8..00000000 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriter.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet - -import java.time.LocalDate -import java.time.format.DateTimeFormatter - -import org.apache.commons.configuration2.Configuration -import org.apache.hadoop.fs.Path -import org.apache.logging.log4j.LogManager -import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} -import org.apache.spark.sql.functions.{lit, to_date} -import org.apache.spark.sql.streaming.Trigger -import org.apache.spark.sql.{DataFrame, SparkSession} -import za.co.absa.hyperdrive.ingestor.api.utils.StreamWriterUtil -import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory} -import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.AbstractParquetStreamWriter._ -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetPartitioningStreamWriterKeys._ -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys.KEY_EXTRA_CONFS_ROOT - - -private[writer] class ParquetPartitioningStreamWriter(destination: String, trigger: Trigger, - checkpointLocation: String, - partitionColumns: Option[Seq[String]], - doMetadataCheck: Boolean, - reportDate: String, extraConfOptions: Map[String, String]) - extends AbstractParquetStreamWriter(destination, trigger, checkpointLocation, partitionColumns, doMetadataCheck, extraConfOptions) { - - import ParquetPartitioningStreamWriter.{COL_DATE, COL_VERSION} - override protected def transformDataframe(dataFrame: DataFrame): DataFrame = { - val spark = dataFrame.sparkSession - val initialVersion = 1 - val nextVersion = findNextVersion(spark, initialVersion) - val dfWithDate = dataFrame - .withColumn(COL_DATE, to_date(lit(reportDate), ParquetPartitioningStreamWriter.reportDateFormat)) - .withColumn(COL_VERSION, lit(nextVersion)) - dfWithDate - } - - private def findNextVersion(spark: SparkSession, initialVersion: Int): Int = { - if (noCommittedParquetFilesExist(spark)) { - initialVersion - } else { - import spark.implicits._ - val df = spark.read.parquet(destination) - val versions = df.select(df(COL_VERSION)) - .filter(df(COL_DATE) === lit(reportDate)) - .distinct() - .as[Int] - .collect().toList - - if (versions.nonEmpty) versions.max + 1 else initialVersion - } - } - - private def noCommittedParquetFilesExist(spark: SparkSession): Boolean = { - val fileCatalog = new MetadataLogFileIndex(spark, new Path(destination), None) - !FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration) || fileCatalog.allFiles().isEmpty - } -} - -object ParquetPartitioningStreamWriter extends StreamWriterFactory with ParquetPartitioningStreamWriterAttributes { - val reportDateFormat: String = "yyyy-MM-dd" - val reportDateFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern(reportDateFormat) - val COL_DATE = "hyperdrive_date" - val COL_VERSION = "hyperdrive_version" - - def apply(config: Configuration): StreamWriter = { - val destinationDirectory = getDestinationDirectory(config) - val doMetadataCheck = getMetadataCheck(config) - val trigger = StreamWriterUtil.getTrigger(config) - val checkpointLocation = StreamWriterUtil.getCheckpointLocation(config) - val partitionColumns = Some(Seq(COL_DATE, COL_VERSION)) - val reportDateString = getReportDateString(config) - val extraOptions = getExtraOptions(config) - - LogManager.getLogger.info(s"Going to create ParquetPartitioningStreamWriter instance using: " + - s"destination directory='$destinationDirectory', trigger='$trigger', checkpointLocation='$checkpointLocation', extra options='$extraOptions'") - - new ParquetPartitioningStreamWriter(destinationDirectory, trigger, checkpointLocation, partitionColumns, doMetadataCheck, reportDateString, extraOptions) - } - - private def getReportDateString(configuration: Configuration): String = { - configuration.getString(KEY_REPORT_DATE) match { - case value: String => value - case _ => reportDateFormatter.format(LocalDate.now()) - } - } - - override def getExtraConfigurationPrefix: Option[String] = Some(KEY_EXTRA_CONFS_ROOT) -} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterAttributes.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterAttributes.scala deleted file mode 100644 index 9fa70199..00000000 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetPartitioningStreamWriterAttributes.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet - -import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterCommonAttributes -import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata} -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys.{KEY_DESTINATION_DIRECTORY, KEY_METADATA_CHECK} - -trait ParquetPartitioningStreamWriterAttributes extends HasComponentAttributes { - - override def getName: String = "Parquet Partitioning Stream Writer" - - override def getDescription: String = "This writer saves the ingested data in parquet format, partitioned by ingestion date and version. " + - "The version is incremented automatically for each ingestion on the same day" - - override def getProperties: Map[String, PropertyMetadata] = Map( - KEY_DESTINATION_DIRECTORY -> PropertyMetadata("Destination directory", Some("A path to a directory"), required = true), - StreamWriterCommonAttributes.keyTriggerProcessingTime -> StreamWriterCommonAttributes.triggerProcessingTimeMetadata, - StreamWriterCommonAttributes.keyCheckpointBaseLocation -> StreamWriterCommonAttributes.checkpointBaseLocation - ) -} diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriter.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriter.scala index 9c341cd8..e17f9980 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriter.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/ParquetStreamWriter.scala @@ -16,20 +16,58 @@ package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet import org.apache.commons.configuration2.Configuration +import org.apache.commons.lang3.StringUtils import org.apache.logging.log4j.LogManager -import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} import za.co.absa.hyperdrive.ingestor.api.utils.{ConfigUtils, StreamWriterUtil} -import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory} -import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.AbstractParquetStreamWriter._ +import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory, StreamWriterProperties} +import za.co.absa.hyperdrive.ingestor.implementation.utils.MetadataLogUtil import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys.KEY_EXTRA_CONFS_ROOT +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys._ + +import scala.util.{Failure, Success} private[writer] class ParquetStreamWriter(destination: String, trigger: Trigger, checkpointLocation: String, partitionColumns: Option[Seq[String]], doMetadataCheck: Boolean, - extraConfOptions: Map[String, String]) - extends AbstractParquetStreamWriter(destination, trigger, checkpointLocation, partitionColumns, doMetadataCheck, extraConfOptions) + val extraConfOptions: Map[String, String]) extends StreamWriter { + private val logger = LogManager.getLogger + if (StringUtils.isBlank(destination)) { + throw new IllegalArgumentException(s"Invalid PARQUET destination: '$destination'") + } + + override def write(dataFrame: DataFrame): StreamingQuery = { + + if (doMetadataCheck) { + MetadataLogUtil.getParquetFilesNotListedInMetadataLog(dataFrame.sparkSession, destination) match { + case Failure(exception) => throw exception + case Success(inconsistentFiles) if inconsistentFiles.nonEmpty => throw new IllegalStateException( + "Inconsistent Metadata Log. The following files are on the filesystem, but not in the metadata log," + + "most probably due to a previous partial write. If that is the case, they should be removed.\n " + + s"${inconsistentFiles.reduce(_ + "\n" + _)}") + case _ => // do nothing + } + } + + logger.info(s"Writing to $destination") + val dataStreamWriter = dataFrame.writeStream + val dataStreamWriterWithPartition = partitionColumns match { + case Some(columns) => dataStreamWriter.partitionBy(columns: _*) + case None => dataStreamWriter + } + dataStreamWriterWithPartition + .trigger(trigger) + .format(source = "parquet") + .outputMode(OutputMode.Append()) + .options(extraConfOptions) + .option(StreamWriterProperties.CheckpointLocation, checkpointLocation) + .start(destination) + } + + def getDestination: String = destination +} object ParquetStreamWriter extends StreamWriterFactory with ParquetStreamWriterAttributes { @@ -46,6 +84,11 @@ object ParquetStreamWriter extends StreamWriterFactory with ParquetStreamWriterA new ParquetStreamWriter(destinationDirectory, trigger, checkpointLocation, partitionColumns, doMetadataCheck, extraOptions) } + def getDestinationDirectory(configuration: Configuration): String = ConfigUtils.getOrThrow(KEY_DESTINATION_DIRECTORY, configuration, errorMessage = s"Destination directory not found. Is '$KEY_DESTINATION_DIRECTORY' defined?") + + def getMetadataCheck(configuration: Configuration): Boolean = ConfigUtils.getOrNone(KEY_METADATA_CHECK, configuration).isDefined + + def getExtraOptions(configuration: Configuration): Map[String, String] = ConfigUtils.getPropertySubset(configuration, KEY_EXTRA_CONFS_ROOT) override def getExtraConfigurationPrefix: Option[String] = Some(KEY_EXTRA_CONFS_ROOT) } diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala index 2140c26b..5aeaaf36 100644 --- a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala +++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala @@ -25,8 +25,9 @@ import za.co.absa.hyperdrive.ingestor.api.{ComponentFactory, ComponentFactoryPro import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer +import za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer import za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter -import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.{ParquetPartitioningStreamWriter, ParquetStreamWriter} +import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter import scala.reflect.ClassTag @@ -41,12 +42,12 @@ class TestServiceProviderConfiguration extends FlatSpec with Matchers { it should "load StreamTransformers" in { val factoryProviders = loadServices[StreamTransformerFactoryProvider, StreamTransformerFactory]() - factoryProviders should contain theSameElementsAs Seq(ColumnSelectorStreamTransformer, ConfluentAvroDecodingTransformer) + factoryProviders should contain theSameElementsAs Seq(AddDateVersionTransformer, ColumnSelectorStreamTransformer, ConfluentAvroDecodingTransformer) } it should "load StreamWriters" in { val factoryProviders = loadServices[StreamWriterFactoryProvider, StreamWriterFactory]() - factoryProviders should contain theSameElementsAs Seq(ParquetPartitioningStreamWriter, ParquetStreamWriter, KafkaStreamWriter) + factoryProviders should contain theSameElementsAs Seq(ParquetStreamWriter, KafkaStreamWriter) } private def loadServices[P <: ComponentFactoryProvider[F], F <: ComponentFactory[_]]()(implicit classTag: ClassTag[P]): Iterable[F] = { diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/TestAddDateVersionTransformer.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/TestAddDateVersionTransformer.scala new file mode 100644 index 00000000..d073d9e3 --- /dev/null +++ b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/TestAddDateVersionTransformer.scala @@ -0,0 +1,111 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion + +import java.time.LocalDate + +import org.apache.commons.configuration2.BaseConfiguration +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.streaming.{OutputMode, Trigger} +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} +import za.co.absa.commons.io.TempDirectory +import za.co.absa.commons.spark.SparkTestBase +import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterProperties +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys + +class TestAddDateVersionTransformer extends FlatSpec with SparkTestBase with Matchers with BeforeAndAfter { + + import spark.implicits._ + + private var baseDir: TempDirectory = _ + + private def baseDirPath = baseDir.path.toUri.toString + + private def destinationDir = s"$baseDirPath/destination" + + private def checkpointDir = s"$baseDirPath/checkpoint" + + private val random = scala.util.Random + + behavior of "AddDateVersionTransformer" + + before { + baseDir = TempDirectory("testAddDateVersion").deleteOnExit() + } + + after { + baseDir.delete() + } + + it should "write partitioned by date and version=1 where destination directory is empty" in { + val config = new BaseConfiguration() + config.addProperty(ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY, destinationDir) + config.addProperty(AddDateVersionTransformer.KeyReportDate, "2020-02-29") + val underTest = AddDateVersionTransformer(config) + val df = getDummyReadStream().toDF() + executeQuery(underTest.transform(df)) + dfShouldContainDateAndVersion(spark.read.parquet(destinationDir), LocalDate.of(2020, 2, 29), Seq(1)) + } + + it should "write to partition version=2 when version=1 already exists for the same date" in { + // given + val config = new BaseConfiguration() + config.addProperty(ParquetStreamWriterKeys.KEY_DESTINATION_DIRECTORY, destinationDir) + config.addProperty(AddDateVersionTransformer.KeyReportDate, "2020-02-29") + val underTest = AddDateVersionTransformer(config) + + // when + val stream = getDummyReadStream() + stream.addData(List.range(1000, 1100)) + executeQuery(underTest.transform(stream.toDF())) + val df1 = spark.read.parquet(destinationDir) + dfShouldContainDateAndVersion(df1, LocalDate.of(2020, 2, 29), Seq(1)) + + stream.addData(List.range(2000, 2100)) + executeQuery(underTest.transform(stream.toDF())) + val df2 = spark.read.parquet(destinationDir) + dfShouldContainDateAndVersion(df2, LocalDate.of(2020, 2, 29), Seq(1, 2)) + } + + private def executeQuery(df: DataFrame) = { + val query = df + .writeStream + .option(StreamWriterProperties.CheckpointLocation, checkpointDir) + .partitionBy(AddDateVersionTransformer.ColumnDate, AddDateVersionTransformer.ColumnVersion) + .outputMode(OutputMode.Append) + .trigger(Trigger.Once) + .start(destinationDir) + query.awaitTermination() + } + + private def dfShouldContainDateAndVersion(df: DataFrame, date: LocalDate, versions: Seq[Int]): Unit = { + df.select(AddDateVersionTransformer.ColumnDate) + .distinct() + .as[java.sql.Date] + .collect().toList should contain theSameElementsAs Seq(java.sql.Date.valueOf(date)) + df.select(AddDateVersionTransformer.ColumnVersion) + .distinct() + .as[Int] + .collect().toList should contain theSameElementsAs versions + } + + private def getDummyReadStream() = { + val input = MemoryStream[Int](random.nextInt(), spark.sqlContext) + input.addData(List.range(0, 100)) + input + } +} diff --git a/shared/src/main/scala/za/co/absa/hyperdrive/shared/configurations/ConfigurationsKeys.scala b/shared/src/main/scala/za/co/absa/hyperdrive/shared/configurations/ConfigurationsKeys.scala index e5e38780..fdca9394 100644 --- a/shared/src/main/scala/za/co/absa/hyperdrive/shared/configurations/ConfigurationsKeys.scala +++ b/shared/src/main/scala/za/co/absa/hyperdrive/shared/configurations/ConfigurationsKeys.scala @@ -70,9 +70,4 @@ private[hyperdrive] object ConfigurationsKeys { val KEY_METADATA_CHECK = s"$rootFactoryConfKey.metadata.check" val KEY_EXTRA_CONFS_ROOT = s"$rootFactoryConfKey.options" } - - object ParquetPartitioningStreamWriterKeys { - val rootFactoryConfKey = s"${ParquetStreamWriterKeys.rootFactoryConfKey}.partitioning" - val KEY_REPORT_DATE = s"$rootFactoryConfKey.report.date" - } }