Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding exception handlers #290

Merged
merged 12 commits into from
Apr 29, 2022
17 changes: 17 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,23 @@
</executions>
</plugin>

<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.9.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/target/generated-sources/avro</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
96 changes: 96 additions & 0 deletions src/main/avro/native-complete-schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{
"namespace": "all_types.test",
"type": "record",
"name": "NativeComplete",
"fields": [
{
"name": "bytes",
"type": "bytes"
},
{
"name": "string",
"type": [
"string",
"null"
],
"default": "blue"
},
{
"name": "int",
"type": [
"int",
"null"
]
},
{
"name": "long",
"type": [
"long",
"null"
]
},
{
"name": "double",
"type": [
"double",
"null"
]
},
{
"name": "float",
"type": [
"float",
"null"
]
},
{
"name": "boolean",
"type": [
"boolean",
"null"
]
},
{
"name": "array",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "map",
"type": {
"type": "map",
"values": {
"type": "array",
"items": "long"
}
}
},
{
"name": "fixed",
"type": {
"type": "fixed",
"size": 40,
"name": "Fixed"
}
}
]
}
45 changes: 45 additions & 0 deletions src/main/avro/native-simple-outer-schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{
"type": "record",
"name": "NativeSimpleOuter",
"namespace": "all_types.test",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "nested",
"type": {
"type": "record",
"name": "Nested",
"fields": [
{
"name": "int",
"type": "int"
},
{
"name": "long",
"type": "long"
}
]
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.abris.avro.errors

import org.apache.avro.Schema
import org.apache.spark.sql.avro.AbrisAvroDeserializer

trait DeserializationExceptionHandler extends Serializable {

def handle(exception: Throwable, deserializer: AbrisAvroDeserializer, readerSchema: Schema): Any

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.abris.avro.errors

import org.apache.avro.Schema
import org.apache.spark.SparkException
import org.apache.spark.sql.avro.AbrisAvroDeserializer

class FailFastExceptionHandler extends DeserializationExceptionHandler {

def handle(exception: Throwable, avroDeserializer: AbrisAvroDeserializer, readerSchema: Schema): Any = {
throw new SparkException("Malformed record detected.", exception)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.abris.avro.errors

import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.AbrisAvroDeserializer

class SpecificRecordExceptionHandler(defaultRecord: SpecificRecordBase) extends DeserializationExceptionHandler with Logging {

def handle(exception: Throwable, deserializer: AbrisAvroDeserializer, readerSchema: Schema): Any = {
logWarning("Malformed record detected. Replacing with default record.", exception)
deserializer.deserialize(defaultRecord)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
import org.apache.kafka.common.errors.SerializationException
import org.apache.spark.SparkException
import org.apache.spark.sql.avro.{AbrisAvroDeserializer, SchemaConverters}
import org.apache.spark.sql.avro.AbrisAvroDeserializer
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression}
import org.apache.spark.sql.types.{BinaryType, DataType}
import za.co.absa.abris.avro.errors.DeserializationExceptionHandler
import za.co.absa.abris.avro.read.confluent.{ConfluentConstants, SchemaManagerFactory}
import za.co.absa.abris.config.InternalFromAvroConfig

Expand All @@ -36,8 +36,8 @@ import scala.util.{Failure, Success, Try}

private[abris] case class AvroDataToCatalyst(
child: Expression,
abrisConfig: Map[String,Any],
schemaRegistryConf: Option[Map[String,String]]
abrisConfig: Map[String, Any],
schemaRegistryConf: Option[Map[String, String]]
) extends UnaryExpression with ExpectsInputTypes {

@transient private lazy val schemaConverter = loadSchemaConverter(config.schemaConverter)
Expand All @@ -58,6 +58,8 @@ private[abris] case class AvroDataToCatalyst(

@transient private lazy val writerSchemaOption = config.writerSchema

@transient private lazy val deserializationHandler: DeserializationExceptionHandler = config.deserializationHandler

@transient private lazy val vanillaReader: GenericDatumReader[Any] =
new GenericDatumReader[Any](writerSchemaOption.getOrElse(readerSchema), readerSchema)

Expand All @@ -82,7 +84,7 @@ private[abris] case class AvroDataToCatalyst(
// There could be multiple possible exceptions here, e.g. java.io.IOException,
// AvroRuntimeException, ArrayIndexOutOfBoundsException, etc.
// To make it simple, catch all the exceptions here.
case NonFatal(e) => throw new SparkException("Malformed records are detected in record parsing.", e)
case NonFatal(e) => deserializationHandler.handle(e, deserializer, readerSchema)
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/main/scala/za/co/absa/abris/config/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.abris.config

import za.co.absa.abris.avro.errors.DeserializationExceptionHandler
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import za.co.absa.abris.avro.registry._
Expand Down Expand Up @@ -336,6 +337,15 @@ class FromAvroConfig private(
schemaRegistryConf
)

/**
* @param exceptionHandler exception handler used for converting from avro
*/
def withExceptionHandler(exceptionHandler: DeserializationExceptionHandler): FromAvroConfig =
new FromAvroConfig(
abrisConfig + (Key.ExceptionHandler -> exceptionHandler),
schemaRegistryConf
)

def validate(): Unit = {
if(!abrisConfig.contains(Key.ReaderSchema)) {
throw new IllegalArgumentException(s"Missing mandatory config property ${Key.ReaderSchema}")
Expand All @@ -353,5 +363,6 @@ object FromAvroConfig {
val ReaderSchema = "readerSchema"
val WriterSchema = "writerSchema"
val SchemaConverter = "schemaConverter"
val ExceptionHandler = "exceptionHandler"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.abris.config

import org.apache.avro.Schema
import za.co.absa.abris.avro.errors.{FailFastExceptionHandler, DeserializationExceptionHandler}
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.config.FromAvroConfig.Key

Expand All @@ -31,4 +32,9 @@ private[abris] class InternalFromAvroConfig(map: Map[String, Any]) {
val schemaConverter: Option[String] = map
.get(Key.SchemaConverter)
.map(_.asInstanceOf[String])

val deserializationHandler: DeserializationExceptionHandler = map
.get(Key.ExceptionHandler)
.map(s => s.asInstanceOf[DeserializationExceptionHandler])
.getOrElse(new FailFastExceptionHandler)
}
Loading