diff --git a/pom.xml b/pom.xml index 15cbaeef..f3606c47 100644 --- a/pom.xml +++ b/pom.xml @@ -317,6 +317,23 @@ + + org.apache.avro + avro-maven-plugin + 1.9.2 + + + generate-sources + + schema + + + src/main/avro/ + ${project.basedir}/target/generated-sources/avro + + + + diff --git a/src/main/avro/native-complete-schema.avsc b/src/main/avro/native-complete-schema.avsc new file mode 100644 index 00000000..0eedf5ac --- /dev/null +++ b/src/main/avro/native-complete-schema.avsc @@ -0,0 +1,96 @@ +/* + * + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "all_types.test", + "type": "record", + "name": "NativeComplete", + "fields": [ + { + "name": "bytes", + "type": "bytes" + }, + { + "name": "string", + "type": [ + "string", + "null" + ], + "default": "blue" + }, + { + "name": "int", + "type": [ + "int", + "null" + ] + }, + { + "name": "long", + "type": [ + "long", + "null" + ] + }, + { + "name": "double", + "type": [ + "double", + "null" + ] + }, + { + "name": "float", + "type": [ + "float", + "null" + ] + }, + { + "name": "boolean", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "array", + "type": { + "type": "array", + "items": "string" + } + }, + { + "name": "map", + "type": { + "type": "map", + "values": { + "type": "array", + "items": "long" + } + } + }, + { + "name": "fixed", + "type": { + "type": "fixed", + "size": 40, + "name": "Fixed" + } + } + ] +} \ No newline at end of file diff --git a/src/main/avro/native-simple-outer-schema.avsc b/src/main/avro/native-simple-outer-schema.avsc new file mode 100644 index 00000000..f0c7482c --- /dev/null +++ b/src/main/avro/native-simple-outer-schema.avsc @@ -0,0 +1,45 @@ +/* + * + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "type": "record", + "name": "NativeSimpleOuter", + "namespace": "all_types.test", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "nested", + "type": { + "type": "record", + "name": "Nested", + "fields": [ + { + "name": "int", + "type": "int" + }, + { + "name": "long", + "type": "long" + } + ] + } + } + ] +} \ No newline at end of file diff --git a/src/main/scala/za/co/absa/abris/avro/errors/DeserializationExceptionHandler.scala b/src/main/scala/za/co/absa/abris/avro/errors/DeserializationExceptionHandler.scala new file mode 100644 index 00000000..8aaa8e6d --- /dev/null +++ b/src/main/scala/za/co/absa/abris/avro/errors/DeserializationExceptionHandler.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.avro.errors + +import org.apache.avro.Schema +import org.apache.spark.sql.avro.AbrisAvroDeserializer + +trait DeserializationExceptionHandler extends Serializable { + + def handle(exception: Throwable, deserializer: AbrisAvroDeserializer, readerSchema: Schema): Any + +} diff --git a/src/main/scala/za/co/absa/abris/avro/errors/FailFastExceptionHandler.scala b/src/main/scala/za/co/absa/abris/avro/errors/FailFastExceptionHandler.scala new file mode 100644 index 00000000..9930631b --- /dev/null +++ b/src/main/scala/za/co/absa/abris/avro/errors/FailFastExceptionHandler.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.avro.errors + +import org.apache.avro.Schema +import org.apache.spark.SparkException +import org.apache.spark.sql.avro.AbrisAvroDeserializer + +class FailFastExceptionHandler extends DeserializationExceptionHandler { + + def handle(exception: Throwable, avroDeserializer: AbrisAvroDeserializer, readerSchema: Schema): Any = { + throw new SparkException("Malformed record detected.", exception) + } +} diff --git a/src/main/scala/za/co/absa/abris/avro/errors/SpecificRecordExceptionHandler.scala b/src/main/scala/za/co/absa/abris/avro/errors/SpecificRecordExceptionHandler.scala new file mode 100644 index 00000000..3d905770 --- /dev/null +++ b/src/main/scala/za/co/absa/abris/avro/errors/SpecificRecordExceptionHandler.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.avro.errors + +import org.apache.avro.Schema +import org.apache.avro.specific.SpecificRecordBase +import org.apache.spark.internal.Logging +import org.apache.spark.sql.avro.AbrisAvroDeserializer + +class SpecificRecordExceptionHandler(defaultRecord: SpecificRecordBase) extends DeserializationExceptionHandler with Logging { + + def handle(exception: Throwable, deserializer: AbrisAvroDeserializer, readerSchema: Schema): Any = { + logWarning("Malformed record detected. Replacing with default record.", exception) + deserializer.deserialize(defaultRecord) + } +} diff --git a/src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala b/src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala index bd463174..3ec243db 100644 --- a/src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala +++ b/src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala @@ -20,11 +20,11 @@ import org.apache.avro.Schema import org.apache.avro.generic.GenericDatumReader import org.apache.avro.io.{BinaryDecoder, DecoderFactory} import org.apache.kafka.common.errors.SerializationException -import org.apache.spark.SparkException -import org.apache.spark.sql.avro.{AbrisAvroDeserializer, SchemaConverters} +import org.apache.spark.sql.avro.AbrisAvroDeserializer import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} import org.apache.spark.sql.types.{BinaryType, DataType} +import za.co.absa.abris.avro.errors.DeserializationExceptionHandler import za.co.absa.abris.avro.read.confluent.{ConfluentConstants, SchemaManagerFactory} import za.co.absa.abris.config.InternalFromAvroConfig @@ -36,8 +36,8 @@ import scala.util.{Failure, Success, Try} private[abris] case class AvroDataToCatalyst( child: Expression, - abrisConfig: Map[String,Any], - schemaRegistryConf: Option[Map[String,String]] + abrisConfig: Map[String, Any], + schemaRegistryConf: Option[Map[String, String]] ) extends UnaryExpression with ExpectsInputTypes { @transient private lazy val schemaConverter = loadSchemaConverter(config.schemaConverter) @@ -58,6 +58,8 @@ private[abris] case class AvroDataToCatalyst( @transient private lazy val writerSchemaOption = config.writerSchema + @transient private lazy val deserializationHandler: DeserializationExceptionHandler = config.deserializationHandler + @transient private lazy val vanillaReader: GenericDatumReader[Any] = new GenericDatumReader[Any](writerSchemaOption.getOrElse(readerSchema), readerSchema) @@ -82,7 +84,7 @@ private[abris] case class AvroDataToCatalyst( // There could be multiple possible exceptions here, e.g. java.io.IOException, // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc. // To make it simple, catch all the exceptions here. - case NonFatal(e) => throw new SparkException("Malformed records are detected in record parsing.", e) + case NonFatal(e) => deserializationHandler.handle(e, deserializer, readerSchema) } } diff --git a/src/main/scala/za/co/absa/abris/config/Config.scala b/src/main/scala/za/co/absa/abris/config/Config.scala index 14e99735..e7ca9b45 100644 --- a/src/main/scala/za/co/absa/abris/config/Config.scala +++ b/src/main/scala/za/co/absa/abris/config/Config.scala @@ -16,6 +16,7 @@ package za.co.absa.abris.config +import za.co.absa.abris.avro.errors.DeserializationExceptionHandler import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory import za.co.absa.abris.avro.registry._ @@ -336,6 +337,15 @@ class FromAvroConfig private( schemaRegistryConf ) + /** + * @param exceptionHandler exception handler used for converting from avro + */ + def withExceptionHandler(exceptionHandler: DeserializationExceptionHandler): FromAvroConfig = + new FromAvroConfig( + abrisConfig + (Key.ExceptionHandler -> exceptionHandler), + schemaRegistryConf + ) + def validate(): Unit = { if(!abrisConfig.contains(Key.ReaderSchema)) { throw new IllegalArgumentException(s"Missing mandatory config property ${Key.ReaderSchema}") @@ -353,5 +363,6 @@ object FromAvroConfig { val ReaderSchema = "readerSchema" val WriterSchema = "writerSchema" val SchemaConverter = "schemaConverter" + val ExceptionHandler = "exceptionHandler" } } diff --git a/src/main/scala/za/co/absa/abris/config/InternalFromAvroConfig.scala b/src/main/scala/za/co/absa/abris/config/InternalFromAvroConfig.scala index ce450fb8..d31f28dd 100644 --- a/src/main/scala/za/co/absa/abris/config/InternalFromAvroConfig.scala +++ b/src/main/scala/za/co/absa/abris/config/InternalFromAvroConfig.scala @@ -17,6 +17,7 @@ package za.co.absa.abris.config import org.apache.avro.Schema +import za.co.absa.abris.avro.errors.{FailFastExceptionHandler, DeserializationExceptionHandler} import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils import za.co.absa.abris.config.FromAvroConfig.Key @@ -31,4 +32,9 @@ private[abris] class InternalFromAvroConfig(map: Map[String, Any]) { val schemaConverter: Option[String] = map .get(Key.SchemaConverter) .map(_.asInstanceOf[String]) + + val deserializationHandler: DeserializationExceptionHandler = map + .get(Key.ExceptionHandler) + .map(s => s.asInstanceOf[DeserializationExceptionHandler]) + .getOrElse(new FailFastExceptionHandler) } diff --git a/src/main/scala/za/co/absa/abris/examples/data/generation/TestSchemas.scala b/src/main/scala/za/co/absa/abris/examples/data/generation/TestSchemas.scala index d0e89732..f8b0ed39 100644 --- a/src/main/scala/za/co/absa/abris/examples/data/generation/TestSchemas.scala +++ b/src/main/scala/za/co/absa/abris/examples/data/generation/TestSchemas.scala @@ -16,6 +16,7 @@ package za.co.absa.abris.examples.data.generation +import all_types.test.{NativeComplete, NativeSimpleOuter} import za.co.absa.commons.annotation.DeveloperApi /** @@ -31,24 +32,7 @@ object TestSchemas { case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String) - val NATIVE_SIMPLE_OUTER_SCHEMA = """{ - "type":"record", - "name":"outer_nested", - "namespace":"all-types.test", - "fields": - [ - {"name":"name", "type":"string"}, - {"name":"nested","type": - { - "type":"record","name":"nested","fields": - [ - {"name":"int", "type":"int"}, - {"name":"long","type":"long"} - ] - } - } - ] - }""" + val NATIVE_SIMPLE_OUTER_SCHEMA = NativeSimpleOuter.SCHEMA$.toString() val NATIVE_SIMPLE_NESTED_SCHEMA = """{ "namespace": "all-types.test", @@ -61,23 +45,7 @@ object TestSchemas { ] }""" - val NATIVE_COMPLETE_SCHEMA = """{ - "namespace": "all-types.test", - "type": "record", - "name": "native_complete", - "fields":[ - {"name": "bytes", "type": "bytes" }, - { "name": "string", "type": ["string", "null"], "default":"blue" }, - { "name": "int", "type": ["int", "null"] }, - { "name": "long", "type": ["long", "null"] }, - { "name": "double", "type": ["double", "null"] }, - { "name": "float", "type": ["float", "null"] }, - { "name": "boolean", "type": ["boolean","null"] }, - { "name": "array", "type": {"type": "array", "items": "string"} }, - {"name": "map", "type": { "type": "map", "values": {"type": "array", "items": "long"}}}, - {"name": "fixed", "type": {"type": "fixed", "size": 40, "name": "fixed"}} - ] - }""" + val NATIVE_COMPLETE_SCHEMA = NativeComplete.SCHEMA$.toString() val NATIVE_COMPLETE_SCHEMA_WITHOUT_FIXED = """{ "namespace": "all-types.test", diff --git a/src/test/scala/za/co/absa/abris/avro/errors/FailFastExceptionHandlerSpec.scala b/src/test/scala/za/co/absa/abris/avro/errors/FailFastExceptionHandlerSpec.scala new file mode 100644 index 00000000..687a52e0 --- /dev/null +++ b/src/test/scala/za/co/absa/abris/avro/errors/FailFastExceptionHandlerSpec.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.avro.errors + +import org.apache.spark.SparkException +import org.apache.spark.sql.avro.{AbrisAvroDeserializer, SchemaConverters} +import org.apache.spark.sql.types.DataType +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils +import za.co.absa.abris.examples.data.generation.TestSchemas + + +class FailFastExceptionHandlerSpec extends AnyFlatSpec with Matchers { + + it should "should throw spark exception on error" in { + + val deserializationExceptionHandler = new FailFastExceptionHandler + val schema = AvroSchemaUtils.parse(TestSchemas.COMPLEX_SCHEMA_SPEC) + val dataType: DataType = SchemaConverters.toSqlType(schema).dataType + val deserializer = new AbrisAvroDeserializer(schema, dataType) + + an[SparkException] should be thrownBy (deserializationExceptionHandler.handle(new Exception, deserializer, schema)) + val exceptionThrown = the[SparkException] thrownBy (deserializationExceptionHandler.handle(new Exception, deserializer, schema)) + exceptionThrown.getMessage should equal("Malformed record detected.") + } +} diff --git a/src/test/scala/za/co/absa/abris/avro/errors/SpecificRecordExceptionHandlerSpec.scala b/src/test/scala/za/co/absa/abris/avro/errors/SpecificRecordExceptionHandlerSpec.scala new file mode 100644 index 00000000..b0ca9df7 --- /dev/null +++ b/src/test/scala/za/co/absa/abris/avro/errors/SpecificRecordExceptionHandlerSpec.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.avro.errors + +import all_types.test.{NativeSimpleOuter, Nested} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.{AbrisAvroDeserializer, SchemaConverters} +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.unsafe.types.UTF8String +import org.scalatest.flatspec.AnyFlatSpec +import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils +import za.co.absa.abris.examples.data.generation.TestSchemas + +class SpecificRecordExceptionHandlerSpec extends AnyFlatSpec { + + private val spark = SparkSession + .builder() + .appName("unitTest") + .master("local[1]") + .config("spark.driver.bindAddress", "localhost") + .config("spark.ui.enabled", "false") + .getOrCreate() + + it should "receive empty dataframe row back" in { + // provided + val providedDefaultRecord = NativeSimpleOuter.newBuilder() + .setName("name") + .setNested(Nested.newBuilder() + .setInt$(1) + .setLong$(1) + .build()) + .build() + + // expected + val expectedNestedFieldSchema = new StructType() + .add("int", "int") + .add("long", "long") + val expectedNestedStructSchema = new StructType() + .add("name", "string") + .add("nested", expectedNestedFieldSchema) + + val expectedNestedFieldInternalRow = new SpecificInternalRow(expectedNestedFieldSchema) + expectedNestedFieldInternalRow.setInt(0, 1) + expectedNestedFieldInternalRow.setLong(1, 1L) + + val expectedNestedStructInternalRow = new SpecificInternalRow(expectedNestedStructSchema) + expectedNestedStructInternalRow.update(0, UTF8String.fromString("name")) + expectedNestedStructInternalRow.update(1, expectedNestedFieldInternalRow) + + //actual + val deserializationExceptionHandler = new SpecificRecordExceptionHandler(providedDefaultRecord) + val schema = AvroSchemaUtils.parse(TestSchemas.NATIVE_SIMPLE_OUTER_SCHEMA) + val dataType: DataType = SchemaConverters.toSqlType(schema).dataType + + val actualResult = deserializationExceptionHandler + .handle(new Exception, new AbrisAvroDeserializer(schema, dataType), schema) + + assert(actualResult == expectedNestedStructInternalRow) + } +} diff --git a/src/test/scala/za/co/absa/abris/avro/sql/AvroDataToCatalystSpec.scala b/src/test/scala/za/co/absa/abris/avro/sql/AvroDataToCatalystSpec.scala index 2f47d36b..ffedd866 100644 --- a/src/test/scala/za/co/absa/abris/avro/sql/AvroDataToCatalystSpec.scala +++ b/src/test/scala/za/co/absa/abris/avro/sql/AvroDataToCatalystSpec.scala @@ -16,18 +16,43 @@ package za.co.absa.abris.avro.sql +import all_types.test.{Fixed, NativeComplete} +import org.apache.spark.SparkException import org.apache.spark.SparkConf import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.sql.{DataFrame, Encoder, Row, SparkSession} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} import org.scalatest.BeforeAndAfterEach import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import za.co.absa.abris.avro.errors.{FailFastExceptionHandler, SpecificRecordExceptionHandler} +import za.co.absa.abris.avro.format.SparkAvroConversions import za.co.absa.abris.avro.functions._ +import za.co.absa.abris.avro.utils.AvroSchemaEncoder import za.co.absa.abris.config.{AbrisConfig, FromAvroConfig} import za.co.absa.abris.examples.data.generation.TestSchemas +import java.util.Collections +import java.nio.ByteBuffer +import java.util +import scala.collection.JavaConverters._ + class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach { + + private val spark = SparkSession + .builder() + .appName("unitTest") + .master("local[2]") + .config("spark.driver.bindAddress", "localhost") + .config("spark.ui.enabled", "false") + .getOrCreate() + + import spark.implicits._ + + private val avroSchemaEncoder = new AvroSchemaEncoder + implicit private val encoder: Encoder[Row] = avroSchemaEncoder.getEncoder + it should "not print schema registry configs in the spark plan" in { val sensitiveData = "username:password" val schemaString = TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA @@ -120,4 +145,74 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft // test successful if no exception is thrown } + + it should "throw a Spark exception when unable to deserialize " in { + + val providedData = Seq(Row("$£%^".getBytes())) + val providedDataFrame: DataFrame = spark.sparkContext.parallelize(providedData, 2).toDF() as "bytes" + + val dummyUrl = "dummyUrl" + val fromConfig = AbrisConfig + .fromConfluentAvro + .provideReaderSchema(TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA) + .usingSchemaRegistry(dummyUrl) + .withExceptionHandler(new FailFastExceptionHandler) + + the[SparkException] thrownBy providedDataFrame.select(from_avro(col("bytes"), fromConfig )).collect() + } + + it should "replace undeserializable record with default SpecificRecord" in { + // provided + val providedData = Seq( + Row("$£%^".getBytes()) + ) + val providedDataFrame: DataFrame = spark.sparkContext.parallelize(providedData, 2).toDF() as "bytes" + + val providedDefaultRecord = NativeComplete.newBuilder() + .setBytes(ByteBuffer.wrap(Array[Byte](1,2,3))) + .setString("default-record") + .setInt$(1) + .setLong$(2L) + .setDouble$(3.0) + .setFloat$(4.0F) + .setBoolean$(true) + .setArray(Collections.singletonList("arrayItem1")) + .setMap(Collections.singletonMap[CharSequence, util.List[java.lang.Long]]( + "key1", + Collections.singletonList[java.lang.Long](1L))) + .setFixed(new Fixed(Array.fill[Byte](40){1})) + .build() + + // expected + val expectedData = Seq( + Row(Array[Byte](1,2,3), + "default-record", + 1, + 2L, + 3.0, + 4F, + true, + Collections.singletonList("arrayItem1"), + Collections.singletonMap[CharSequence, util.List[java.lang.Long]]( + "key1", + Collections.singletonList[java.lang.Long](1L)), + Array.fill[Byte](40){1} + )).asJava + + val expectedDataFrame: DataFrame = spark.createDataFrame(expectedData, SparkAvroConversions.toSqlType(NativeComplete.SCHEMA$)) + + // actual + val dummyUrl = "dummyUrl" + val fromConfig = AbrisConfig + .fromConfluentAvro + .provideReaderSchema(NativeComplete.SCHEMA$.toString()) + .usingSchemaRegistry(dummyUrl) + .withExceptionHandler(new SpecificRecordExceptionHandler(providedDefaultRecord)) + + val actualDataFrame = providedDataFrame + .select(from_avro(col("bytes"), fromConfig).as("actual")) + .select(col("actual.*")) + + shouldEqualByData(expectedDataFrame, actualDataFrame) + } } diff --git a/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala b/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala index 6728ef22..7fa8aa7b 100644 --- a/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala +++ b/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala @@ -27,6 +27,7 @@ import za.co.absa.abris.avro.functions._ import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory import za.co.absa.abris.avro.registry.{ConfluentMockRegistryClient, SchemaSubject} +import za.co.absa.abris.avro.utils.AvroSchemaEncoder import za.co.absa.abris.config.AbrisConfig import za.co.absa.abris.examples.data.generation.{ComplexRecordsGenerator, TestSchemas} @@ -42,7 +43,8 @@ class CatalystAvroConversionSpec extends AnyFlatSpec with Matchers with BeforeAn import spark.implicits._ - implicit val encoder: Encoder[Row] = getEncoder + private val avroSchemaEncoder = new AvroSchemaEncoder + implicit val encoder: Encoder[Row] = avroSchemaEncoder.getEncoder private val dummyUrl = "dummyUrl" private val schemaRegistryConfig = Map(AbrisConfig.SCHEMA_REGISTRY_URL -> dummyUrl) @@ -330,7 +332,7 @@ class CatalystAvroConversionSpec extends AnyFlatSpec with Matchers with BeforeAn val fromAvroConfig = AbrisConfig .fromSimpleAvro .downloadSchemaByLatestVersion - .andTopicRecordNameStrategy("fooTopic", "native_complete", "all-types.test") + .andTopicRecordNameStrategy("fooTopic", "NativeComplete", "all_types.test") .usingSchemaRegistry(dummyUrl) val result = avroBytes @@ -506,11 +508,4 @@ class CatalystAvroConversionSpec extends AnyFlatSpec with Matchers with BeforeAn shouldEqualByData(dataFrame, result) } - - private def getEncoder: Encoder[Row] = { - val avroSchema = AvroSchemaUtils.parse(ComplexRecordsGenerator.usedAvroSchema) - val sparkSchema = SparkAvroConversions.toSqlType(avroSchema) - RowEncoder.apply(sparkSchema) - } - } diff --git a/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala b/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala new file mode 100644 index 00000000..5c5e3afc --- /dev/null +++ b/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2019 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.avro.utils + +import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import za.co.absa.abris.avro.format.SparkAvroConversions +import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils +import za.co.absa.abris.examples.data.generation.ComplexRecordsGenerator + +class AvroSchemaEncoder { + + def getEncoder: Encoder[Row] = { + val avroSchema = AvroSchemaUtils.parse(ComplexRecordsGenerator.usedAvroSchema) + val sparkSchema = SparkAvroConversions.toSqlType(avroSchema) + RowEncoder.apply(sparkSchema) + } + +}