Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error while trying to use Schema Reference! #232

Closed
detoxfarm3 opened this issue Jul 19, 2021 · 2 comments
Closed

Error while trying to use Schema Reference! #232

detoxfarm3 opened this issue Jul 19, 2021 · 2 comments

Comments

@detoxfarm3
Copy link

detoxfarm3 commented Jul 19, 2021

Hi

I tried to use multiple schema for different event in single topic.

#1 So far, I have explored TopicRecordNameStrategy, where I can use a multiple schemas in a topic, but, this doesn't work with PySpark; it throws exception when trying to de-serialize multiple types of messages present in a single topic. This is due to the fact that a stream is bounded to a single schema only; I have seem similar question in the Issues as well where the conclusion is that we cant use it for this purpose!

Adding the error log for reference-

2021-07-19 11:24:21 ERROR TaskSetManager:73 - Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/home/ronak/Desktop/POC_KAFKA/avro_abris_test.py", line 117, in <module>
    withColumn('value', to_json(col('value'))).show()
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 484, in show
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o94.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (10.0.2.15 executor driver): 
org.apache.spark.SparkException: Malformed records are detected in record parsing.
        at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:82)
        at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:476)
        at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:472)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroTypeException: Found com.test.High, expecting com.test.Test, missing required field test
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:215)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at za.co.absa.abris.avro.sql.AvroDataToCatalyst.decodeConfluentAvro(AvroDataToCatalyst.scala:145)
        at za.co.absa.abris.avro.sql.AvroDataToCatalyst.decode(AvroDataToCatalyst.scala:122)
        at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:74)
        ... 19 more

#2 Tried Schema reference with TopicNameStrategy; where top level schema is a union of reference schemas ex. schema: ["<namespace>.<schema name>"]. But, this is failing with the following error; which to me looks as if reference schema is not yet supported. I have tried exploring some of the Abris code/example & didn't see any such mention of schema reference.

For, from_avro below config code is used-

    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)
    return jvm_gateway.za.co.absa.abris.config \
        .AbrisConfig \
        .fromConfluentAvro() \
        .downloadReaderSchemaByLatestVersion() \
        .andTopicNameStrategy(topic, is_key) \
        .usingSchemaRegistry(scala_map)

from_avro code-

    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro
    return Column(abris_avro.functions.from_avro(_to_java_column(col), config))jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro
    return Column(abris_avro.functions.from_avro(_to_java_column(col), config))

Below is the error I got when trying to use schema reference

Traceback (most recent call last):
  File "/home/ronak/Desktop/POC_KAFKA/avro_topic_name.py", line 66, in <module>
    f_c = from_avro_abris_config({'schema.registry.url': 'http://localhost:8081'}, 'apic')
  File "/home/ronak/Desktop/POC_KAFKA/abris_topic_name.py", line 34, in from_avro_abris_config
    .usingSchemaRegistry(scala_map)
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o69.usingSchemaRegistry.
: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "references" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"])
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 142] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["references"])
        at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:840)
        at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1179)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1592)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1542)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:438)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3250)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
        at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaMetadataBySubjectAndVersion(SchemaManager.scala:64)
        at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaBySubjectAndVersion(SchemaManager.scala:53)
        at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchema(SchemaManager.scala:44)
        at za.co.absa.abris.config.FromSchemaDownloadingConfigFragment.usingSchemaRegistry(Config.scala:249)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
@detoxfarm3 detoxfarm3 changed the title Error white trying to use Schema Reference! Error while trying to use Schema Reference! Jul 19, 2021
@cerveada
Copy link
Collaborator

  1. Yes, the only way to do this is to separate the data in two dataframes each with one schema and then call abris separately on each of them. If the key is same for both it would be possible to do it by key, I think. But it depends on your use case.

  2. Abris currently uses confluent 5.3. To be compatible with spark libraries. Schema reference are available in confluent 5.5 and higher. (for more details look at Error java.lang.NoSuchMethodError on run #175)

@detoxfarm3
Copy link
Author

@cerveada Thanks for the info. We were able to create multiple streams by filtering the message. But we used Kafka headers to add metadata and used the same for filtering.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants