diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/AllNullableParquetStreamWriter.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/AllNullableParquetStreamWriter.scala deleted file mode 100644 index 4736edbc..00000000 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/AllNullableParquetStreamWriter.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2018-2019 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet - -import org.apache.commons.configuration2.Configuration -import org.apache.logging.log4j.LogManager -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.streaming.StreamingQuery -import za.co.absa.hyperdrive.ingestor.api.manager.OffsetManager -import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory} -import za.co.absa.hyperdrive.shared.utils.SparkUtils - -/** - * Works as a decorator for [[ParquetStreamWriter]] by setting the nullability of all fields of the - * incoming DataFrame to null. - */ -private[parquet] class AllNullableParquetStreamWriter(val parquetStreamWriter: StreamWriter) - extends StreamWriter(parquetStreamWriter.getDestination) { - - override def getDestination: String = parquetStreamWriter.getDestination - - override def write(dataFrame: DataFrame, offsetManager: OffsetManager): StreamingQuery = { - val allNullableDataFrame = SparkUtils.setAllColumnsNullable(dataFrame) - parquetStreamWriter.write(allNullableDataFrame, offsetManager) - } -} - -/** - * Reuses factory for [[ParquetStreamWriter]]. - */ -object AllNullableParquetStreamWriter extends StreamWriterFactory { - - override def apply(config: Configuration): StreamWriter = { - LogManager.getLogger.info(s"Going to create ${classOf[AllNullableParquetStreamWriter].getSimpleName} instance. Injecting ${classOf[ParquetStreamWriter].getSimpleName} instance.") - val parquetWriter = ParquetStreamWriter(config) - new AllNullableParquetStreamWriter(parquetWriter) - } -} diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/TestAllNullableParquetStreamWriter.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/TestAllNullableParquetStreamWriter.scala deleted file mode 100644 index 6fa2a9f8..00000000 --- a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/TestAllNullableParquetStreamWriter.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2018-2019 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet - -import org.apache.spark.sql.streaming.StreamingQuery -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.types._ -import org.scalatest.mockito.MockitoSugar -import org.scalatest.{BeforeAndAfterEach, FunSuite} -import za.co.absa.hyperdrive.ingestor.api.manager.OffsetManager -import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriter -import za.co.absa.hyperdrive.testutils.SparkTestUtils._ - -class TestAllNullableParquetStreamWriter extends FunSuite with BeforeAndAfterEach with MockitoSugar { - - private object dummyStreamWriter extends StreamWriter("/tmp/any") { - override def getDestination: String = "/tmp/any" - - var capturedDataframe: DataFrame = _ - var capturedOffsetManager: OffsetManager = _ - - override def write(dataFrame: DataFrame, offsetManager: OffsetManager): StreamingQuery = { - capturedDataframe = dataFrame - capturedOffsetManager = offsetManager - null - } - } - - private val spark = SparkSession.builder().appName(classOf[TestAllNullableParquetStreamWriter].getSimpleName).master("local").getOrCreate() - spark.sparkContext.setLogLevel("ERROR") - - private val stubOffsetManager = mock[OffsetManager] - private val fakeDataFrame = getTestDataframe - - test(testName = "Dataframe has all columns nullabilities set to true before being passed to injected writer") { - assert(!areAllFieldsNullable(fakeDataFrame.schema)) - - val writer = new AllNullableParquetStreamWriter(dummyStreamWriter) - writer.write(fakeDataFrame, stubOffsetManager) - - assert(dummyStreamWriter.capturedOffsetManager == stubOffsetManager) - assert(areAllFieldsNullable(dummyStreamWriter.capturedDataframe.schema)) - } - - private def getTestDataframe: DataFrame = { - val nestedType1 = new StructType() - .add(StructField("a", IntegerType, nullable = false)) - .add(StructField("b", LongType, nullable = false)) - - val nestedType2 = new StructType() - .add(StructField("c", ArrayType(nestedType1, containsNull = false), nullable = false)) - - val schema = new StructType() - .add(StructField("d", StringType, nullable = false)) - .add(StructField("e", ArrayType(nestedType2, containsNull = false), nullable = false)) - - spark.createDataFrame(spark.emptyDataFrame.rdd, schema) - } -} diff --git a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/TestAllNullableParquetStreamWriterObject.scala b/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/TestAllNullableParquetStreamWriterObject.scala deleted file mode 100644 index da797958..00000000 --- a/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/writer/parquet/TestAllNullableParquetStreamWriterObject.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2018-2019 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet - -import org.apache.commons.configuration2.{BaseConfiguration, Configuration} -import org.scalatest.{BeforeAndAfterEach, FlatSpec} -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStreamWriterKeys.{KEY_DESTINATION_DIRECTORY, KEY_EXTRA_CONFS_ROOT} - -class TestAllNullableParquetStreamWriterObject extends FlatSpec with BeforeAndAfterEach { - - private val configStub: Configuration = new BaseConfiguration() - - private val destinationDirectory = "/tmp/destination/parquet" - private val extraConfs = Map("key.1" -> "value.1", "key.2" -> "value.2") - - override def beforeEach(): Unit = configStub.clear() - - behavior of AllNullableParquetStreamWriter.getClass.getSimpleName - - it should "throw on blank destination directory" in { - val throwable = intercept[IllegalArgumentException](AllNullableParquetStreamWriter(configStub)) - assert(throwable.getMessage.toLowerCase.contains("destination")) - } - - it should "instantiate a AllNullableParquetStreamWriter instance from configurations" in { - stubDestinationDirectory() - stubExtraConfs() - - val writer = AllNullableParquetStreamWriter(configStub).asInstanceOf[AllNullableParquetStreamWriter] - assert(destinationDirectory == writer.getDestination) - val injectedWriter = writer.parquetStreamWriter.asInstanceOf[ParquetStreamWriter] - assert(extraConfs.toSet.diff(injectedWriter.extraConfOptions.get.toSet).isEmpty) - } - - it should "throw if an extra option is malformed" in { - stubDestinationDirectory() - - val writer = AllNullableParquetStreamWriter(configStub).asInstanceOf[AllNullableParquetStreamWriter] - assert(destinationDirectory == writer.getDestination) - val injectedWriter = writer.parquetStreamWriter.asInstanceOf[ParquetStreamWriter] - assert(injectedWriter.extraConfOptions.isEmpty) - } - - it should "not throw on absent extra configurations" in { - stubDestinationDirectory() - stubExtraConfs() - stubStringConfig(s"$KEY_EXTRA_CONFS_ROOT.wrong.conf","only.key=") - - assertThrows[IllegalArgumentException](AllNullableParquetStreamWriter(configStub)) - } - - private def stubStringConfig(key: String, value: String): Unit = configStub.addProperty(key, value) - - private def stubDestinationDirectory(): Unit = stubStringConfig(KEY_DESTINATION_DIRECTORY, destinationDirectory) - - private def stubExtraConfs(): Unit = { - extraConfs - .zipWithIndex - .foreach { - case((key,value),index) => stubStringConfig(s"$KEY_EXTRA_CONFS_ROOT.$index", s"$key=$value") - } - } -}