Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema registry aborts after startup #135

Closed
the-ram opened this issue May 21, 2020 · 7 comments
Closed

Schema registry aborts after startup #135

the-ram opened this issue May 21, 2020 · 7 comments

Comments

@the-ram
Copy link

the-ram commented May 21, 2020

Hi!

I tried to use this for an IT that i have been trying. I am using 2.13 scala and the version of kafka libs are using 5.5.0-ccs. So i ended up using 5.5.0.1 version of embedded kafka. It seems to be booting up alright and i do see netstat happening initially. The SR dies midway though

        bearer.auth.token = [hidden]
        proxy.port = -1
        schema.reflection = false
        auto.register.schemas = true
  | => nmax.schemas.per.subject = 1000kaRedisSyncIT 0s
        basic.auth.credentials.source = URL
        specific.avro.reader = false
        value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
        schema.registry.url = [http://localhost:8081]
        basic.auth.user.info = [hidden]
        proxy.host = 
        schema.registry.basic.auth.user.info = [hidden]
        bearer.auth.credentials.source = STATIC_TOKEN
        key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-05-21 23:23:25,072] INFO KafkaAvroDeserializerConfig values: 
        bearer.auth.token = [hidden]
        proxy.port = -1
        schema.reflection = false
        auto.register.schemas = true
  | => nmax.schemas.per.subject = 1000kaRedisSyncIT 0s
        basic.auth.credentials.source = URL
        specific.avro.reader = false
        value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
        schema.registry.url = [http://localhost:8081]
        basic.auth.user.info = [hidden]
        proxy.host = 
        schema.registry.basic.auth.user.info = [hidden]
        bearer.auth.credentials.source = STATIC_TOKEN
        key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
**Getting connected to localhost:6001 
[2020-05-21 23:23:42,134] INFO SchemaRegistryConfig values: 
        access.control.allow.headers = 
        access.control.allow.methods = 
        access.control.allow.origin = 
        authentication.method = NONE
  | => nauthentication.realm = ync.KafkaRedisSyncIT 17s
        authentication.roles = [*]
        authentication.skip.paths = []
        avro.compatibility.level = 
        compression.enable = true
        debug = false
        host.name = 192.168.0.152
        idle.timeout.ms = 30000
        inter.instance.headers.whitelist = []
        inter.instance.protocol = http
        kafkastore.bootstrap.servers = [localhost:6001]
        kafkastore.connection.url = 
        kafkastore.group.id = 
        kafkastore.init.timeout.ms = 60000
        kafkastore.sasl.kerberos.kinit.cmd = /usr/bin/kinit
        kafkastore.sasl.kerberos.min.time.before.relogin = 60000
        kafkastore.sasl.kerberos.service.name = 
        kafkastore.sasl.kerberos.ticket.renew.jitter = 0.05
        kafkastore.sasl.kerberos.ticket.renew.window.factor = 0.8
        kafkastore.sasl.mechanism = GSSAPI
        kafkastore.security.protocol = PLAINTEXT
        kafkastore.ssl.cipher.suites = 
        kafkastore.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
        kafkastore.ssl.endpoint.identification.algorithm = 
        kafkastore.ssl.key.password = [hidden]
        kafkastore.ssl.keymanager.algorithm = SunX509
        kafkastore.ssl.keystore.location = 
        kafkastore.ssl.keystore.password = [hidden]
        kafkastore.ssl.keystore.type = JKS
        kafkastore.ssl.protocol = TLS
        kafkastore.ssl.provider = 
        kafkastore.ssl.trustmanager.algorithm = PKIX
        kafkastore.ssl.truststore.location = 
        kafkastore.ssl.truststore.password = [hidden]
        kafkastore.ssl.truststore.type = JKS
        kafkastore.timeout.ms = 500
        kafkastore.topic = _schemas
        kafkastore.topic.replication.factor = 3
        kafkastore.write.max.retries = 5
        kafkastore.zk.session.timeout.ms = 30000
        listeners = [http://localhost:6002]
        master.eligibility = true
        metric.reporters = []
        metrics.jmx.prefix = kafka.schema.registry
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        metrics.tag.map = []
        mode.mutability = false
        port = 8081
        request.logger.name = io.confluent.rest-utils.requests
        resource.extension.class = []
        resource.extension.classes = []
        resource.static.locations = []
        response.mediatype.default = application/vnd.schemaregistry.v1+json
        response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json]
        rest.servlet.initializor.classes = []
        schema.compatibility.level = backward
        schema.providers = []
        schema.registry.group.id = schema-registry
        schema.registry.inter.instance.protocol = 
        schema.registry.resource.extension.class = []
        schema.registry.zk.namespace = schema_registry
        shutdown.graceful.ms = 1000
        ssl.cipher.suites = []
        ssl.client.auth = false
        ssl.client.authentication = NONE
        ssl.enabled.protocols = []
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm = 
        ssl.keystore.location = 
        ssl.keystore.password = [hidden]
        ssl.keystore.reload = false
        ssl.keystore.type = JKS
        ssl.keystore.watch.location = 
        ssl.protocol = TLS
        ssl.provider = 
        ssl.trustmanager.algorithm = 
        ssl.truststore.location = 
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        websocket.path.prefix = /ws
        websocket.servlet.initializor.classes = []
        zookeeper.set.acl = false
 (io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig:347)
[2020-05-21 23:23:42,153] INFO Logging initialized @44049ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:169)
[2020-05-21 23:23:42,232] INFO Adding listener: http://localhost:6002 (io.confluent.rest.ApplicationServer:344)
[2020-05-21 23:23:42,308] INFO Initializing KafkaStore with broker endpoints: PLAINTEXT://localhost:6001 (io.confluent.kafka.schemaregistry.storage.KafkaStore:108)
[2020-05-21 23:23:42,309] INFO Registering schema provider for AVRO: io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:210)
[2020-05-21 23:23:42,309] INFO Registering schema provider for JSON: io.confluent.kafka.schemaregistry.json.JsonSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:210).smartdrive.refdata.sync.KafkaRedisSyncIT 17s
[2020-05-21 23:23:42,310] INFO Registering schema provider for PROTOBUF: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:210)
[2020-05-21 23:23:42,345] INFO Creating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:193)
[2020-05-21 23:23:42,346] WARN Creating the schema topic _schemas using a replication factor of 1, which is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:203)
[2020-05-21 23:23:42,513] INFO Kafka store reader thread starting consumer (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:110)
[2020-05-21 23:23:42,527] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:144)
[2020-05-21 23:23:42,528] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:42,623] INFO Wait to catch up until the offset at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore:304)
[2020-05-21 23:23:42,640] INFO Joining schema registry with Kafka-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:297)
[2020-05-21 23:23:45,817] INFO Finished rebalance with master election result: Assignment{version=1, error=0, master='sr-1-5e328d55-d8bd-41dc-b711-cfb93e166265', masterIdentity=version=1,host=192.168.0.152,port=6002,scheme=http,masterEligibility=true} (io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector:228)
[2020-05-21 23:23:45,821] INFO Wait to catch up until the offset at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:304)
[2020-05-21 23:23:45,991] INFO jetty-9.4.24.v20191120; built: 2019-11-20T21:37:49.771Z; git: 363d5f2df3a8a28de40604320230664b9c793c16; jvm 1.8.0_242-b08 (org.eclipse.jetty.server.Server:359)
[2020-05-21 23:23:46,036] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session:333)
[2020-05-21 23:23:46,037] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session:338)
[2020-05-21 23:23:46,038] INFO node0 Scavenging every 660000ms (org.eclipse.jetty.server.session:140)
A provider io.confluent.kafka.schemaregistry.rest.resources.ConfigResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.ConfigResource will be ignored.  
A provider io.confluent.kafka.schemaregistry.rest.resources.CompatibilityResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.CompatibilityResource will be ignored.  
A provider io.confluent.kafka.schemaregistry.rest.resources.SchemasResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.SchemasResource will be ignored.  
A provider io.confluent.kafka.schemaregistry.rest.resources.ServerMetadataResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.ServerMetadataResource will be ignored.  
A provider io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource will be ignored.  
A provider io.confluent.kafka.schemaregistry.rest.resources.ModeResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.ModeResource will be ignored.  
A provider io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource will be ignored.  
[2020-05-21 23:23:46,587] INFO HV000001: Hibernate Validator 6.0.17.Final (org.hibernate.validator.internal.util.Version:21)
[2020-05-21 23:23:46,841] INFO JVM Runtime does not support Modules (org.eclipse.jetty.util.TypeUtil:201)
[2020-05-21 23:23:46,842] INFO Started o.e.j.s.ServletContextHandler@745c35cf{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:825)
[2020-05-21 23:23:46,867] INFO Started o.e.j.s.ServletContextHandler@7079c42e{/ws,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:825)
[2020-05-21 23:23:46,881] INFO Started NetworkTrafficServerConnector@4e1c56f1{HTTP/1.1,[http/1.1]}{localhost:6002} (org.eclipse.jetty.server.AbstractConnector:330)
[2020-05-21 23:23:46,881] INFO Started @48777ms (org.eclipse.jetty.server.Server:399)
[2020-05-21 23:23:46,891] INFO KafkaAvroSerializerConfig values: 
        bearer.auth.token = [hidden]
  | => nproxy.port = -1efdata.sync.KafkaRedisSyncIT 22s
        schema.reflection = false
        auto.register.schemas = true
        max.schemas.per.subject = 1000
        basic.auth.credentials.source = URL
        value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
        schema.registry.url = [http://localhost:6002]
        basic.auth.user.info = [hidden]
        proxy.host = 
        schema.registry.basic.auth.user.info = [hidden]
        bearer.auth.credentials.source = STATIC_TOKEN
        key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-05-21 23:23:46,891] INFO KafkaAvroSerializerConfig values: 
        bearer.auth.token = [hidden]
        proxy.port = -1
        schema.reflection = false
        auto.register.schemas = true
        max.schemas.per.subject = 1000
        basic.auth.credentials.source = URL
        value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
        schema.registry.url = [http://localhost:6002]
        basic.auth.user.info = [hidden]
        proxy.host = 
        schema.registry.basic.auth.user.info = [hidden]
        bearer.auth.credentials.source = STATIC_TOKEN
        key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-05-21 23:23:46,952] INFO Stopped NetworkTrafficServerConnector@4e1c56f1{HTTP/1.1,[http/1.1]}{localhost:6002} (org.eclipse.jetty.server.AbstractConnector:380)
[2020-05-21 23:23:46,952] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:158)
[2020-05-21 23:23:46,956] INFO Stopped o.e.j.s.ServletContextHandler@7079c42e{/ws,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-05-21 23:23:46,960] INFO Stopped o.e.j.s.ServletContextHandler@745c35cf{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-05-21 23:23:46,962] INFO Shutting down schema registry (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:1084)
[2020-05-21 23:23:46,962] INFO [kafka-store-reader-thread-_schemas]: Shutting down (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,963] INFO [kafka-store-reader-thread-_schemas]: Stopped (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,963] INFO [kafka-store-reader-thread-_schemas]: Shutdown completed (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,965] INFO KafkaStoreReaderThread shutdown complete. (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:253)
[2020-05-21 23:23:46,967] ERROR Unexpected exception in schema registry group processing thread (io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector:200)
org.apache.kafka.common.errors.WakeupException
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at io.confluent.kafka.schemaregistry.masterelector.kafka.SchemaRegistryCoordinator.poll(SchemaRegistryCoordinator.java:120)
        at io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector$1.run(KafkaGroupMasterElector.java:197)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-05-21 23:23:46,952] INFO Stopped NetworkTrafficServerConnector@4e1c56f1{HTTP/1.1,[http/1.1]}{localhost:6002} (org.eclipse.jetty.server.AbstractConnector:380)
[2020-05-21 23:23:46,952] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:158)
[2020-05-21 23:23:46,956] INFO Stopped o.e.j.s.ServletContextHandler@7079c42e{/ws,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-05-21 23:23:46,960] INFO Stopped o.e.j.s.ServletContextHandler@745c35cf{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-05-21 23:23:46,962] INFO Shutting down schema registry (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:1084)
[2020-05-21 23:23:46,962] INFO [kafka-store-reader-thread-_schemas]: Shutting down (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,963] INFO [kafka-store-reader-thread-_schemas]: Stopped (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,963] INFO [kafka-store-reader-thread-_schemas]: Shutdown completed (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,965] INFO KafkaStoreReaderThread shutdown complete. (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:253)
[2020-05-21 23:23:46,967] ERROR Unexpected exception in schema registry group processing thread (io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector:200)
org.apache.kafka.common.errors.WakeupException
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at io.confluent.kafka.schemaregistry.masterelector.kafka.SchemaRegistryCoordinator.poll(SchemaRegistryCoordinator.java:120)
        at io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector$1.run(KafkaGroupMasterElector.java:197)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
org.apache.kafka.common.errors.TimeoutException: Topic topic_test not present in metadata after 60000 ms.
The End {0} 
org.apache.kafka.common.errors.TimeoutException: Topic topic_test not present in metadata after 60000 ms.

The test is quite simple , send a message and end after asserts

  "RefDataKafkaVerticle" should "start kafka and sink a message" in {
    implicit val kafkaEmbeddedConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(zooKeeperPort = 6000, kafkaPort = 6001 , schemaRegistryPort = 6002)
    kafkaEmbeddedConfig.customSchemaRegistryProperties

I have since then tried , changing to broadcast IP (0.0.0.0) , tweaking kafka versions to no avail.

Any pointers deeply appreciated

@francescopellegrini
Copy link
Member

Hi @the-ram,
could you provide a failing test?

@the-ram
Copy link
Author

the-ram commented May 22, 2020

Hello @francescopellegrini

Here you go : https://github.com/the-ram/kafka-refdata-sink-vertx-it

sbt clean test

This should fail it!. I am able to get a local cp-schema-registry to run and the test to pass.

@francescopellegrini
Copy link
Member

francescopellegrini commented May 22, 2020

When I run the tests, I get a different error:

org.apache.avro.AvroRuntimeException: Not a valid schema field: unit

At L83 you're trying to set value for a field which doesn't exist (latency and bandwidth are smol only fields).

Also, smol implements SpecificRecord, while in the test you're creating a GenericRecord...

EDIT: please refrain from importing SNAPSHOT versions of this library.

@the-ram
Copy link
Author

the-ram commented May 22, 2020

Hello @francescopellegrini , sorry for that mistake. I have been trying to strip out as many specifics from my code and in the process picked up a random schema on the internet :). Apologies for that , I have since your comment
a) Removed the snapshot version
b) Attached the full trace that i obtained. (logs.txt)

For now i think Generic/Specific record does not seem to be an issue , i just want to get it through kafka to the other parts of it and then probably move to a generic record anyway

@francescopellegrini
Copy link
Member

Hi again,
I found a series of issues with the test, but the main one is that the withRunningKafka closure returns a Future which requires the Schema Registry up and running to be successful.

You either have to wait for its result before exiting the closure (using Await.result) or to start/stop EmbeddedKafka using BeforeAndAfterAll methods.

Other issues that I found were:

  • the built Avro records miss some fields, so the serializers throw NPE.
  • the expected result is wrong, for the promise handler returns the topic being written to.

@the-ram
Copy link
Author

the-ram commented May 22, 2020

Hello @francescopellegrini , the closure was the thing. withKafkaRunning was to be waited on just like you mentioned. As for tests , those were mistakes when i was trying to produce a reproducible test. I added the tests with BeforeAndAfter all and control the lifecycle manually!

@the-ram the-ram closed this as completed May 22, 2020
@the-ram
Copy link
Author

the-ram commented May 22, 2020

the block needed to be waited on , the closure withRunningKafka returns a future and needs to be waited on. Await.result was good . Better even to avoid time variability is to probably control using Before/After methods!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants