Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception while trying to use avro serdes: "No implementation of method: :clj->avro of protocol:" #169

Open
lurodrigo opened this issue Jul 10, 2019 · 5 comments

Comments

@lurodrigo
Copy link

lurodrigo commented Jul 10, 2019

Hi! I'm trying to use various jackdaw serdes, but unfortunately I'm not managing to get avro serdes to work. The following exception is thrown:

Execution error (IllegalArgumentException) at jackdaw.serdes.avro/eval4495$fn$G (avro.clj:123).
No implementation of method: :clj->avro of protocol: #'jackdaw.serdes.avro/SchemaCoercion found for class: nil

I'm using clojure 1.10.0 and jackdaw 0.6.6. Kafka clusters/schema registry is managed by Confluent Cloud. Using Confluent Platform 5.2.1. Key/value schemas are already in registry. I'd really appreciate any help or suggestion! This code is mostly the same as examples/serdes. String and JSON serdes are working fine.

(ns trying-jackdaw.core
  (:require [clojure.algo.generic.functor :refer [fmap]]
            [clojure.java.shell :refer [sh]]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jcl]
            [jackdaw.admin :as ja]
            [jackdaw.serdes.edn :as jse]
            [jackdaw.streams :as j]
            [jackdaw.serdes]
            [jackdaw.serdes.json :as json]
            [jackdaw.serdes.avro.confluent :as jsac]
            [jackdaw.serdes.edn :as jse]
            [jackdaw.serdes.resolver :as resolver]))

;; of course not pasting my keys here
#_(defn basic-config
  []
  {"ssl.endpoint.identification.algorithm" "https"
   "sasl.mechanism"                        "PLAIN"
   "request.timeout.ms"                    (int 20000)
   "bootstrap.servers"                     "xxxxx.confluent.cloud:9092"
   "retry.backoff.ms"                      (int 500)
   "sasl.jaas.config"                      (str "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                                                ccloud-key "\" password=\"" ccloud-secret "\";")
   "security.protocol"                     "SASL_SSL"

   ; Schema Registry specific settings
   "basic.auth.credentials.source"         "USER_INFO"
   "schema.registry.basic.auth.user.info"  (str sr-key ":" sr-secret)
   "schema.registry.url"                   "https://xxxxxxx.confluent.cloud"

   ; Enable Avro serializer with Schema Registry
   "key.serializer"                        "io.confluent.kafka.serializers.KafkaAvroSerializer"
   "value.serializer"                      "io.confluent.kafka.serializers.KafkaAvroSerializer"
   })
(defn basic-config
  "Returns the application config."
  []
  {"application.id" "serdes"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"})

(defn producer-config [] (basic-config))
(defn admin-config [] (basic-config))

;;; ------------------------------------------------------------
;;;
;;; Configure topics
;;;

(def +topic-metadata+
  {"example_avro"
                  {:topic-name  "example_avro"
                   :key-serde   {:serde-keyword   :jackdaw.serdes.avro.confluent/serde
                                 :schema-filename "key_schema.json"
                                 :key?            true}
                   :value-serde {:serde-keyword   :jackdaw.serdes.avro.confluent/serde
                                 :schema-filename "value_schema.json"
                                 :key?            false}}
   "example_json" {
                   :topic-name  "example_json"
                   :key-serde   (json/serde)
                   :value-serde (json/serde)
                   }
   "example_str"
                  {:topic-name  "example_str"
                   :key-serde   {:serde-keyword :jackdaw.serdes/string-serde}
                   :value-serde {:serde-keyword :jackdaw.serdes/string-serde}}})

#_(def serde-resolver
  (partial resolver/serde-resolver :schema-registry-url (get (basic-config) "schema.registry.url")))
(def serde-resolver
  (partial resolver/serde-resolver :schema-registry-url "http://localhost:8081"))

(def topic-metadata
  (memoize (fn []
             (fmap #(if (= (:topic-name %) "example_json")
                      %
                      (assoc % :key-serde ((serde-resolver) (:key-serde %))
                               :value-serde ((serde-resolver) (:value-serde %))))
                   +topic-metadata+))))

(defn publish
  "Takes a topic config and record value, and (optionally) a key and
  parition number, and produces to a Kafka topic."
  ([topic-config value]
   (with-open [client (jc/producer (producer-config) topic-config)]
     @(jc/produce! client topic-config value))
   nil)

  ([topic-config key value]
   (with-open [client (jc/producer (producer-config) topic-config)]
     @(jc/produce! client topic-config key value))
   nil)

  ([topic-config partition key value]
   (with-open [client (jc/producer (producer-config) topic-config)]
     @(jc/produce! client topic-config partition key value))
   nil))

(def example-key {"timestamp" 1562266979037
                  "name"      "example-name"})

(def example-value {"int_field"    1
                    "string_field" ""})

(defn -main
  [& args]
  (publish (get (topic-metadata) "example_str") "example-key" "example-value")
  (publish (get (topic-metadata) "example_json") example-key example-value)
  (publish (get (topic-metadata) "example_avro") example-key example-value))

resources/key_schema.json

{
  "type" : "record",
  "name" : "key_example",
  "namespace" : "avro",
  "fields" : [
    {
      "name" : "timestamp",
      "type" : "long"
    },
    {
      "name" : "name",
      "type" : "string"
    }
  ]
}

resources/value_schema.json

{
  "type" : "record",
  "name" : "value_example",
  "namespace" : "avro",
  "fields" : [
    {
      "name" : "int_field",
      "type" : "int"
    },
    {
      "name" : "string_field",
      "type" : "string"
    }
  ]
}

Update: Using Confluent Platform, since issue doesn't depend on Confluent Cloud.

@cddr
Copy link
Contributor

cddr commented Jul 10, 2019

The included confluent serde currently expects to find a local copy of the schema on the classpath when serializing a message but not when deserializing the message. This behavior is inherited from upstream Serializer.

If it can't find one, that might be why it is throwing an error (admittedly not the most informative). In earlier versions, it would have thrown an error at resolution time but this PR relaxed that a bit so that consumers are not required to provide one.

Try putting those avro files on the resource-path (i.e. assuming a standard lein project layout, in the files resources/avro_key.json and resources/avro_value.json).

@lurodrigo
Copy link
Author

Thank you for your reply! Actually the schema files are already under the resources dir. When they're not, the friendlier exception "Execution error (ExceptionInfo) at jackdaw.serdes.resolver/load-schema (resolver.clj:18).
Could not find schema key_schema.json" is thrown

@lurodrigo
Copy link
Author

Updated OP, using confluent platform for better reproducibility.

@cddr
Copy link
Contributor

cddr commented Jul 11, 2019

Ah ok sorry. Our confluent serde expects keys in the message and value maps to be keyword, hyphenated versions of the corresponding avro fields.

There is #126 which seeks to provide the option to not mangle the names in this way.

@lurodrigo
Copy link
Author

Thank you! Using keywords will do it for now until the option not to mangle names gets released. I tried here and it worked with local Confluent Platform install. I had some problems with Confluent Cloud, but I'll open another issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants