Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to_avro specify the key #291

Closed
geoHeil opened this issue Mar 24, 2022 · 10 comments
Closed

to_avro specify the key #291

geoHeil opened this issue Mar 24, 2022 · 10 comments

Comments

@geoHeil
Copy link

geoHeil commented Mar 24, 2022

The commercial (databricks edition) allows to specify a key when writing Avro like: https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/avro-dataframe
Sadly, this was never added to spark apache/spark#31771.

How can I specify a key using ABRiS?

val avroFrame = dataFrame.select(to_avro(allColumns, abrisConfig) as "value")
nicely shows how to write data to kafka - but a key is missing.

When trying to specify a key I either get errors - or no results (= a nulled out key):

for a dataframe df with the following schema:

root
 |-- key: string (nullable = true)
 |-- value: binary (nullable = false)

where both the key and value schema are registered.

  • For the key:
val keySchema = AvroSchemaUtils.toAvroSchema(df, "key")
schemaManager.register(SchemaSubject.usingTopicNameStrategy("t", true), schema)

When now writing to kafka ( for a dataframe with a key kolumn and avro-value column):

val query = aggedAsAvro.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "t")
    .outputMode("update")
    .option("checkpointLocation", "my_query_checkpoint_dir")
    .start()

The Key of the messages is set to null. How can I get the key to actually write the key as well?

@cerveada
Copy link
Collaborator

You need to use to_avro twice, once for key and once for value.

The key needs its own abrisConfig config that points to the correct key schema. For some schema naming strategies, you need to set the isKey to true so the proper shcema name is crated.

Examples of the config are here: https://github.com/AbsaOSS/ABRiS/blob/master/documentation/confluent-avro-documentation.md

@geoHeil
Copy link
Author

geoHeil commented Mar 24, 2022

Do you have a full and working example?

Indeed, I was following https://github.com/AbsaOSS/ABRiS/blob/master/documentation/confluent-avro-documentation.md and also for the schema of the key:

val keySchema = AvroSchemaUtils.toAvroSchema(df, "key")
schemaManager.register(SchemaSubject.usingTopicNameStrategy("t", true), schema)

but then fail with:

Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert SQL top-level record to Avro top-level record because schema is incompatible (sqlType = STRING, avroType = {"type":"record","name":"topLevelRecord","fields":[{"name":"brand","type":["string","null"]},{"name":"rating_mean","type":["double","null"]},{"name":"duration_mean","type":["double","null"]}]})

when trying to use the Avro string type for the string key column.
When instead of only calling to_avro once, I get the NULLed out key field (but no exception).

@geoHeil
Copy link
Author

geoHeil commented Mar 24, 2022

When using:

.andTopicNameStrategy("t")

for a topic t where the schema is postfixed with either: -key or -value I would expect that a single AbrisConfig works.

@cerveada
Copy link
Collaborator

If it was one config, how would to_avro function know if it is key or value?

@geoHeil
Copy link
Author

geoHeil commented Mar 24, 2022

For example from https://github.com/AbsaOSS/ABRiS/blob/master/documentation/confluent-avro-documentation.md:

// Use latest version of already existing schema
val toAvroConfig4: ToAvroConfig = AbrisConfig
    .toConfluentAvro
    .downloadSchemaByLatestVersion
    .andTopicNameStrategy("fooTopic")
    .usingSchemaRegistry("http://registry-url")

If one follows the (default) naming convention for topic: fooTopic, there is a:

  • fooTopic-key
  • fooTopic-value

schema.
And therefore I assume that .andTopicNameStrategy("fooTopic") would work in both cases.

Where would you specify here if it is a key or value?

@cerveada
Copy link
Collaborator

As mentioned in other place in the documentation, the value schema is the default. If you look at the code, you will see that the method looks like this:

 def andTopicNameStrategy(topicName: String, isKey: Boolean = false)

So if you want to use key schema, you must set isKey = true

@geoHeil
Copy link
Author

geoHeil commented Mar 25, 2022

Thanks.

But when setting two different configurations:

Cannot convert SQL top-level record to Avro top-level record because schema is incompatible (sqlType = STRING, avroType = {"type":"record","name":"topLevelRecord","fields":[{"name":"brand","type":["string","null"]},{"name":"rating_mean","type":["double","null"]},{"name":"duration_mean","type":["double","null"]}]

still is the error. For an input data frame of:

|-- key_brand: binary (nullable = true)
|-- value: binary (nullable = false)

As you can see the original:

root
 |-- brand: string (nullable = true)
 |-- rating_mean: double (nullable = true)
 |-- duration_mean: double (nullable = true)

is transformed into a single key and value column.

@geoHeil
Copy link
Author

geoHeil commented Mar 25, 2022

Here you go with a full and self-contained example.
I have tried to follow your suggestions - however, the key wich is outputted to kafka still is nulled out!

import spark.implicits._

val aggedDf = Seq(("foo", 1.0, 1.0), ("bar", 2.0, 2.0)).toDF("brand", "rating_mean", "duration_mean")
aggedDf.printSchema
aggedDf.show

+-----+-----------+-------------+
|brand|rating_mean|duration_mean|
+-----+-----------+-------------+
|  foo|        1.0|          1.0|
|  bar|        2.0|          2.0|
+-----+-----------+-------------+


import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import org.apache.avro.Schema
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.abris.avro.registry.SchemaSubject
import za.co.absa.abris.avro.functions.to_avro
import org.apache.spark.sql._
import za.co.absa.abris.config.ToAvroConfig


// generate schema for all columns in a dataframe
val valueSchema = AvroSchemaUtils.toAvroSchema(aggedDf)
val keySchema = AvroSchemaUtils.toAvroSchema(aggedDf.select($"brand".alias("key_brand")), "key_brand")
val schemaRegistryClientConfig = Map(AbrisConfig.SCHEMA_REGISTRY_URL -> "http://localhost:8081")
val t = "metrics_per_brand_spark222xx"

val schemaManager = SchemaManagerFactory.create(schemaRegistryClientConfig)

// register schema with topic name strategy
def registerSchema1(schemaKey: Schema, schemaValue: Schema, schemaManager: SchemaManager, schemaName:String): Int = {
  schemaManager.register(SchemaSubject.usingTopicNameStrategy(schemaName, true), schemaKey)
  schemaManager.register(SchemaSubject.usingTopicNameStrategy(schemaName, false), schemaValue)
}
registerSchema1(keySchema, valueSchema, schemaManager, t)

val toAvroConfig4 = AbrisConfig
    .toConfluentAvro
    .downloadSchemaByLatestVersion
    .andTopicNameStrategy(t)
    .usingSchemaRegistry("http://localhost:8081")

val toAvroConfig4Key = AbrisConfig
    .toConfluentAvro
    .downloadSchemaByLatestVersion
    .andTopicNameStrategy(t, isKey = true)
    .usingSchemaRegistry("http://localhost:8081")


def writeDfToAvro(keyAvroConfig: ToAvroConfig, toAvroConfig: ToAvroConfig)(dataFrame:DataFrame) = {
  // this is the key! need to keep the key to guarantee temporal ordering
  val availableCols = dataFrame.columns//.drop("brand").columns
  val allColumns = struct(availableCols.head, availableCols.tail: _*)
  dataFrame.select(to_avro($"brand", keyAvroConfig).alias("key_brand"), to_avro(allColumns, toAvroConfig) as 'value)
  // dataFrame.select($"brand".alias("key_brand"), to_avro(allColumns, toAvroConfig) as 'value)
}


val aggedAsAvro = aggedDf.transform(writeDfToAvro(toAvroConfig4Key, toAvroConfig4))
aggedAsAvro.printSchema

root
 |-- key_brand: binary (nullable = true)
 |-- value: binary (nullable = false)

aggedAsAvro.write
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", t).save()

@cerveada
Copy link
Collaborator

Oh, ok now I understand. Your problem is not Abris.

The value column is the only required option. If a key column is not specified then a null valued key column will be automatically added

from https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

You need to rename the key_brand to key.

@geoHeil
Copy link
Author

geoHeil commented Mar 25, 2022

Indeed. Many thanks.

@geoHeil geoHeil closed this as completed Mar 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants