Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

writing to kafka topic #2

Closed
bclipp opened this issue May 17, 2018 · 22 comments
Closed

writing to kafka topic #2

bclipp opened this issue May 17, 2018 · 22 comments

Comments

@bclipp
Copy link

bclipp commented May 17, 2018

I have tested .toAvro when writing to a kafka topic.
I believe the to avro is serializing, but when the action portion is run spark just hands and the job is not completed. Kafka never sees the message.

import org.apache.spark.sql.SparkSession
import za.co.absa.abris.avro.AvroSerDe._

object oppStreamingMatching {
def main(args: Array[String]): Unit = {

val spark = SparkSession
  .builder
  .appName("oppStreamingMatching")
  .getOrCreate()

val brokers = "IP:9092"
val schemaRegistryURL = "http://IP:8081"
var topic = "exampletopic"

val path = ".. file.avro"
val schema = spark.read.format("com.databricks.spark.avro").load(path).schema

val df = spark
  .read
  .format("com.databricks.spark.avro")
  .schema(schema)
  .load(path)

df
  .toAvro("test", "mytest")
  .write
  .format("kafka")
  .option("checkpointLocation", "/tmp")
  .option("kafka.bootstrap.servers",brokers )
  .option("topic", topic)
  .save()

}}

@felipemmelo
Copy link
Collaborator

felipemmelo commented May 17, 2018

Could you give us some more details related to exceptions and logs? Also, if possible (in case it is small enough), would you send us the Avro file you're using along with the schema? Finally, would you also let us know what results you get when running df.show and matchDF.show?

Yet, how is matchDF being created? Maybe should it be df.toAvro( ... ?

@bclipp
Copy link
Author

bclipp commented May 17, 2018

felip,

I corrected the code, there is only one dataframe df.

  1. the DF once loaded from the avro file, shows the full contents with the show command and appears to be loaded fine. I have also tested a few select, filter statements with it to verify it works.

  2. I was mistaken kafka does receive the message, when i run the confluent commandline consumer I get:

./kafka-avro-console-consumer --topic exampletopic --zookeeper ip:2181 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Processed a total of 1 messages
[2018-05-17 11:42:10,044] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:107)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1214396213
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:148)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:140)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:146)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2018-05-17 11:42:10,044] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:107)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1214396213
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)

prior to running the producer code there wasn't a schema registered. My understanding is that this is fine, and it will pull the schema once you produce a message. No exeptions are thrown from the REPL.

@bclipp
Copy link
Author

bclipp commented May 17, 2018

if I am using confluent kafka (and schema reg)

@felipemmelo
Copy link
Collaborator

It seems that this is the issue for this exception: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1214396213
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

Would you be able to try the examples as explained here: https://github.com/AbsaOSS/ABRiS? You'll see snippets showing how to use Spark to read from Kafka, so that you can check the results and also examples of how to use the library to perform the schema registration.

@bclipp
Copy link
Author

bclipp commented May 17, 2018

I deleted the topic, recreated it, and used the commandline to producer a few test messages to the same topic. I again tried the same code.. I
[2018-05-17 12:05:11,519] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:107)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1214396213
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:148)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:140)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:146)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2018-05-17 12:05:11,519] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:107)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1214396213
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:148)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:140)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:146)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

@felipemmelo
Copy link
Collaborator

felipemmelo commented May 17, 2018

Is this exception being generated by the program or the command line?

@bclipp
Copy link
Author

bclipp commented May 17, 2018

by the commandline. I went through verified the topic was created. I also generated a json with the avro file's schema, and used that instead of schema inference. The first attempt seems to have an issue with talking to Kafka, but after restarting the kafka cluster it runs normally. When i try to use confluent's command line consumer tool with the library, that's the exception thrown. I have tried using the command line producer and it consumes correctly. I have reread your examples. Can you point me to a simple example that loads a avro file and sends it to a kafka topic. I would love to use your library.

@felipemmelo
Copy link
Collaborator

Sure thing. Here there's an example of how to write a Dataframe into Kafka: https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/examples/SampleKafkaDataframeWriterApp.scala

Here you'll find an example of how to read it: https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/examples/SampleKafkaAvroFilterApp.scala

Since Confluent enhances the payload with the schema version, there's a separate API for this, which you'll also find in the library.

@bclipp
Copy link
Author

bclipp commented May 17, 2018

@bclipp
Copy link
Author

bclipp commented May 17, 2018

   val avroSchema =  new SparkToAvroProcessor(df.schema, "test", "test")

    val schemaId = SchemaManager.register(avroSchema.getAvroSchema, subject)

val pw = new PrintWriter(new File("/tmp/test.avsc" ))
    pw.write(schema.toString)
    pw.close

    df.toAvro("test","test").write.format("kafka").option("kafka.bootstrap.servers", "kafka01:9092").option("topic","testTopic").save()

So I have it registering to the schema reg, and writing that same schema out to a file, then using it. The same error is present.

@felipemmelo
Copy link
Collaborator

felipemmelo commented May 17, 2018

This is probably related to the fact that Confluent adds the schema id to the payload and the command line consumer is waiting for it. The library does not add this id since it is a Confluent specific construct. Would you please try reading from Kafka using Spark, as in the example, just to be sure?

If this is the case, I can add a .toConfluentAvro(... to the APi.

@bclipp
Copy link
Author

bclipp commented May 17, 2018

ok I will just to point out when I run
curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
I get the correct schema, but when I do a post, and just
http://localhost:8081/subjects/Kafka-value/
I get a {"error_code":40403,"message":"Schema not found"}

@bclipp
Copy link
Author

bclipp commented May 17, 2018

@felipemmelo
Copy link
Collaborator

This is a Schema Registry specific question, but the expected result is actually schema not found. The GET API always expects you to inform the version. Check the spec for the GET method: https://docs.confluent.io/current/schema-registry/docs/api.html

Also, take a look at my previous comment, please.

@bclipp
Copy link
Author

bclipp commented May 17, 2018

yes I am able to read from the topic, so it appears to be a schema-reg issue.

@bclipp
Copy link
Author

bclipp commented May 18, 2018

When you say that confluent adds a schema Id to the payload, do you mean the schema id that's sent to the schema reg or are you saying that the schema id is added to the kafka message?

@felipemmelo
Copy link
Collaborator

felipemmelo commented May 18, 2018

The schema id is added to the Kafka message as part of the Avro payload.

You can check line 84 here

and line 120 here

@bclipp
Copy link
Author

bclipp commented May 18, 2018

I get the same error when I use .fromConfluentAvro as in the example. I do not get this error when I use the commandline producer and .fromConfluentAvro

@bclipp
Copy link
Author

bclipp commented May 18, 2018

how can I verify that the id is being sent with the kafka message?

@felipemmelo
Copy link
Collaborator

In the context of the library, there is a fromConfluentAvro( ... that you can use to retrieve from Confluent Kafka. From an inspection perspective, you can check the method.

@bclipp
Copy link
Author

bclipp commented May 18, 2018

I get the same error when it tries to retrieve the schema.

@felipemmelo
Copy link
Collaborator

felipemmelo commented May 18, 2018

"it" what?

Also, have you been able to run the examples just changing the parameters?

Finally, can you share the Avro record you're trying to send, along with the schema?

@bclipp bclipp closed this as completed Jun 27, 2018
jhsb25 pushed a commit to jhsb25/ABRiS that referenced this issue Oct 29, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants