Skip to content

Commit

Permalink
Make schema converter configurable (#268)
Browse files Browse the repository at this point in the history
* Make schema converter configurable

* Use SPI to load custom schema converter

* Add Readme

* Optimize imports
  • Loading branch information
kevinwallimann committed Feb 1, 2022
1 parent 5e068d9 commit c5301ee
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 14 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,40 @@ val sqlSchema = new StructType(new StructField ....
val avroSchema = SparkAvroConversions.toAvroSchema(sqlSchema, avro_schema_name, avro_schema_namespace)
```

#### Custom data conversions
If you would like to use custom logic to convert from Avro to Spark, you can implement the `SchemaConverter` trait.
The custom class is loaded in ABRiS using the service provider interface (SPI), so you need to register your class in your
`META-INF/services` resource directory. You can then configure the custom class with its short name or the fully qualified name.

**Example**

Custom schema converter implementation
```scala
package za.co.absa.abris.avro.sql
import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType

class CustomSchemaConverter extends SchemaConverter {
override val shortName: String = "custom"
override def toSqlType(avroSchema: Schema): DataType = ???
}
```

Provider configuration file `META-INF/services/za.co.absa.abris.avro.sql.SchemaConverter`:
```
za.co.absa.abris.avro.sql.CustomSchemaConverter
```

Abris configuration
```scala
val abrisConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy("topic123")
.usingSchemaRegistry(registryConfig)
.withSchemaConverter("custom")
```

## Multiple schemas in one topic
The naming strategies RecordName and TopicRecordName allow for a one topic to receive different payloads,
i.e. payloads containing different schemas that do not have to be compatible,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
za.co.absa.abris.avro.sql.DefaultSchemaConverter
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import za.co.absa.abris.avro.read.confluent.{ConfluentConstants, SchemaManagerFa
import za.co.absa.abris.config.InternalFromAvroConfig

import java.nio.ByteBuffer
import java.util.ServiceLoader
import scala.collection.mutable
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
Expand All @@ -39,9 +40,11 @@ private[abris] case class AvroDataToCatalyst(
schemaRegistryConf: Option[Map[String,String]]
) extends UnaryExpression with ExpectsInputTypes {

private val schemaConverter = loadSchemaConverter(config.schemaConverter)

override def inputTypes: Seq[BinaryType.type] = Seq(BinaryType)

override lazy val dataType: DataType = SchemaConverters.toSqlType(readerSchema).dataType
override lazy val dataType: DataType = schemaConverter.toSqlType(readerSchema)

override def nullable: Boolean = true

Expand Down Expand Up @@ -168,4 +171,13 @@ private[abris] case class AvroDataToCatalyst(
override protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)

private def loadSchemaConverter(nameOpt: Option[String]) = {
import scala.collection.JavaConverters._
nameOpt match {
case Some(name) => ServiceLoader.load(classOf[SchemaConverter]).asScala
.find(c => c.shortName == name || c.getClass.getName == name)
.getOrElse(throw new ClassNotFoundException(s"Could not find schema converter $name"))
case None => new DefaultSchemaConverter()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.abris.avro.sql
import org.apache.avro.Schema
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.DataType

class DefaultSchemaConverter extends SchemaConverter {
override val shortName: String = "default"
override def toSqlType(avroSchema: Schema): DataType = SchemaConverters.toSqlType(avroSchema).dataType
}
25 changes: 25 additions & 0 deletions src/main/scala/za/co/absa/abris/avro/sql/SchemaConverter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.abris.avro.sql

import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType

trait SchemaConverter {
val shortName: String
def toSqlType(avroSchema: Schema): DataType
}
7 changes: 7 additions & 0 deletions src/main/scala/za/co/absa/abris/config/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@ class FromAvroConfig private(
schemaRegistryConf
)

def withSchemaConverter(schemaConverter: String): FromAvroConfig =
new FromAvroConfig(
abrisConfig + (Key.SchemaConverter -> schemaConverter),
schemaRegistryConf
)

def validate(): Unit = {
if(!abrisConfig.contains(Key.ReaderSchema)) {
throw new IllegalArgumentException(s"Missing mandatory config property ${Key.ReaderSchema}")
Expand All @@ -346,5 +352,6 @@ object FromAvroConfig {
private[abris] object Key {
val ReaderSchema = "readerSchema"
val WriterSchema = "writerSchema"
val SchemaConverter = "schemaConverter"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ private[abris] class InternalFromAvroConfig(map: Map[String, Any]) {
val writerSchema: Option[Schema] = map
.get(Key.WriterSchema)
.map(s => AvroSchemaUtils.parse(s.asInstanceOf[String]))

val schemaConverter: Option[String] = map
.get(Key.SchemaConverter)
.map(_.asInstanceOf[String])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
za.co.absa.abris.avro.sql.DummySchemaConverter
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,16 @@

package za.co.absa.abris.avro.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.scalatest.flatspec.AnyFlatSpec
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.abris.avro.functions._
import za.co.absa.abris.config.{AbrisConfig, FromAvroConfig}
import za.co.absa.abris.examples.data.generation.TestSchemas

class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach {
private val spark = SparkSession
.builder()
.appName("unitTest")
.master("local[2]")
.config("spark.driver.bindAddress", "localhost")
.config("spark.ui.enabled", "false")
.getOrCreate()

import spark.implicits._


it should "not print schema registry configs in the spark plan" in {
val sensitiveData = "username:password"
val schemaString = TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA
Expand All @@ -53,4 +42,66 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft
column.expr.toString() should not include sensitiveData
}

it should "use the default schema converter by default" in {
val schemaString = TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA
val dummyUrl = "dummyUrl"
val expectedDataType = StructType(Seq(
StructField("int", IntegerType, nullable = false),
StructField("long", LongType, nullable = false)
))

val fromAvroConfig = FromAvroConfig()
.withReaderSchema(schemaString)
.withSchemaRegistryConfig(Map(
AbrisConfig.SCHEMA_REGISTRY_URL -> dummyUrl
))

val column = from_avro(col("avroBytes"), fromAvroConfig)
column.expr.dataType shouldBe expectedDataType
}

it should "use a custom schema converter identified by the short name" in {
val schemaString = TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA
val dummyUrl = "dummyUrl"

val fromAvroConfig = FromAvroConfig()
.withReaderSchema(schemaString)
.withSchemaRegistryConfig(Map(
AbrisConfig.SCHEMA_REGISTRY_URL -> dummyUrl
))
.withSchemaConverter(DummySchemaConverter.name)

val column = from_avro(col("avroBytes"), fromAvroConfig)
column.expr.dataType shouldBe DummySchemaConverter.dataType
}

it should "use a custom schema converter identified by the fully qualified name" in {
val schemaString = TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA
val dummyUrl = "dummyUrl"

val fromAvroConfig = FromAvroConfig()
.withReaderSchema(schemaString)
.withSchemaRegistryConfig(Map(
AbrisConfig.SCHEMA_REGISTRY_URL -> dummyUrl
))
.withSchemaConverter("za.co.absa.abris.avro.sql.DummySchemaConverter")

val column = from_avro(col("avroBytes"), fromAvroConfig)
column.expr.dataType shouldBe DummySchemaConverter.dataType
}

it should "throw an error if the specified custom schema converter does not exist" in {
val schemaString = TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA
val dummyUrl = "dummyUrl"

val fromAvroConfig = FromAvroConfig()
.withReaderSchema(schemaString)
.withSchemaRegistryConfig(Map(
AbrisConfig.SCHEMA_REGISTRY_URL -> dummyUrl
))
.withSchemaConverter("nonexistent")

val ex = intercept[ClassNotFoundException](from_avro(col("avroBytes"), fromAvroConfig))
ex.getMessage should include ("nonexistent")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.abris.avro.sql

import org.apache.avro.Schema
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
import za.co.absa.abris.avro.sql.DummySchemaConverter._

class DummySchemaConverter extends SchemaConverter {
override val shortName: String = name
override def toSqlType(avroSchema: Schema): DataType = dataType
}

object DummySchemaConverter {
val name: String = "dummy"
val dataType: DataType = StructType(Seq(StructField("long", LongType)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ class FromAvroConfigSpec extends AnyFlatSpec with Matchers {
behavior of "FromAvroConfig"

it should "provide all set configurations" in {
val dummySchemaConverter = "dummy"
val config = FromAvroConfig()
.withWriterSchema("foo")
.withReaderSchema("bar")
.withSchemaConverter(dummySchemaConverter)
.withSchemaRegistryConfig(Map(AbrisConfig.SCHEMA_REGISTRY_URL -> "url"))

val map = config.abrisConfig()
map(Key.WriterSchema) shouldBe "foo"
map(Key.ReaderSchema) shouldBe "bar"
map(Key.SchemaConverter) shouldBe dummySchemaConverter

config.schemaRegistryConf().get(AbrisConfig.SCHEMA_REGISTRY_URL) shouldBe "url"
}
Expand Down

0 comments on commit c5301ee

Please sign in to comment.