Skip to content
This repository has been archived by the owner on Jun 9, 2022. It is now read-only.

KafkaCache: Could not read to end offsets #26

Closed
aldoborrero opened this issue May 4, 2020 · 1 comment
Closed

KafkaCache: Could not read to end offsets #26

aldoborrero opened this issue May 4, 2020 · 1 comment
Assignees
Labels
Type: Bug Something isn't working

Comments

@aldoborrero
Copy link
Member

aldoborrero commented May 4, 2020

Expected Behavior

If I execute the following command with no previous volumes set:

docker-compose -f docker/exflo/docker-compose.kafka.yml up

It should start processing blocks and inserting those into Kafka.

Current Behavior

What is the current behavior?

As soon as Kafka is initalized, Exflo outputs the following error:

besu_1       | 2020-05-04 11:37:39.170+00:00 | main | WARN  | ConsumerConfig | The configuration 'topic' was supplied but isn't a known config.
besu_1       | 2020-05-04 11:37:39.170+00:00 | main | INFO  | AppInfoParser | Kafka version: 2.4.0
besu_1       | 2020-05-04 11:37:39.170+00:00 | main | INFO  | AppInfoParser | Kafka commitId: 77a89fcf8d7fa018
besu_1       | 2020-05-04 11:37:39.170+00:00 | main | INFO  | AppInfoParser | Kafka startTimeMs: 1588592259170
besu_1       | 2020-05-04 11:37:39.182+00:00 | main | INFO  | KafkaConsumer | [Consumer clientId=kafka-cache-reader-_exflo-import-cache, groupId=kafkacache] Subscribed to partition(s): _exflo-import-cache-0
besu_1       | 2020-05-04 11:37:39.185+00:00 | main | INFO  | KafkaCache | Seeking to beginning for all partitions
besu_1       | 2020-05-04 11:37:39.186+00:00 | main | INFO  | SubscriptionState | [Consumer clientId=kafka-cache-reader-_exflo-import-cache, groupId=kafkacache] Seeking to EARLIEST offset of partition _exflo-import-cache-0
besu_1       | 2020-05-04 11:37:39.186+00:00 | main | INFO  | KafkaCache | Initialized last consumed offset to {0=-1}
besu_1       | 2020-05-04 11:37:39.187+00:00 | main | INFO  | KafkaCache | KafkaTopicReader thread started for kafka-cache-reader-_exflo-import-cache.
besu_1       | 2020-05-04 11:37:39.193+00:00 | main | INFO  | Metadata | [Consumer clientId=kafka-cache-reader-_exflo-import-cache, groupId=kafkacache] Cluster ID: 1o2lE4tqRC24CfgQlD3jnw
besu_1       | 2020-05-04 11:37:39.216+00:00 | main | INFO  | SubscriptionState | [Consumer clientId=kafka-cache-reader-_exflo-import-cache, groupId=kafkacache] Resetting offset for partition _exflo-import-cache-0 to offset 0.
besu_1       | 2020-05-04 11:37:39.226+00:00 | kafka-producer-network-thread | producer-1 | INFO  | Metadata | [Producer clientId=producer-1] Cluster ID: 1o2lE4tqRC24CfgQlD3jnw
besu_1       | 2020-05-04 11:37:39.284+00:00 | main | INFO  | KafkaCache | During init or sync, processed 1 records from topic _exflo-import-cache
besu_1       | 2020-05-04 11:37:39.285+00:00 | main | INFO  | BlockImportTask | Syncing import cache
besu_1       | 2020-05-04 11:37:39.285+00:00 | kafka-cache-reader-thread-_exflo-import-cache | INFO  | ShutdownableThread | Starting
besu_1       | 2020-05-04 11:37:39.286+00:00 | main | WARN  | KafkaCache | Could not read to end offsets
besu_1       | org.apache.kafka.common.errors.WakeupException: null
besu_1       |  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511) ~[plugin-6bda013.jar:?]
besu_1       |  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275) ~[plugin-6bda013.jar:?]
besu_1       |  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[plugin-6bda013.jar:?]
besu_1       |  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[plugin-6bda013.jar:?]
besu_1       |  at org.apache.kafka.clients.consumer.internals.Fetcher.fetchOffsetsByTimes(Fetcher.java:537) ~[plugin-6bda013.jar:?]
besu_1       |  at org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:578) ~[plugin-6bda013.jar:?]
besu_1       |  at org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:567) ~[plugin-6bda013.jar:?]
besu_1       |  at org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2193) ~[plugin-6bda013.jar:?]
besu_1       |  at org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2165) ~[plugin-6bda013.jar:?]
besu_1       |  at io.kcache.KafkaCache$WorkerThread.readToEndOffsets(KafkaCache.java:634) ~[plugin-6bda013.jar:?]
besu_1       |  at io.kcache.KafkaCache$WorkerThread.waitUntilEndOffsets(KafkaCache.java:802) [plugin-6bda013.jar:?]
besu_1       |  at io.kcache.KafkaCache$WorkerThread.access$100(KafkaCache.java:565) [plugin-6bda013.jar:?]
besu_1       |  at io.kcache.KafkaCache.sync(KafkaCache.java:205) [plugin-6bda013.jar:?]
besu_1       |  at io.exflo.ingestion.kafka.tasks.BlockImportTask.<init>(KafkaImportTask.kt:113) [plugin-6bda013.jar:?]
besu_1       |  at io.exflo.ingestion.kafka.KafkaBlockWriter.<init>(KafkaBlockWriter.kt:36) [plugin-6bda013.jar:?]
besu_1       |  at io.exflo.ingestion.kafka.ExfloKafkaPlugin$implKoinModules$1$3.invoke(ExfloKafkaPlugin.kt:47) [plugin-6bda013.jar:?]
besu_1       |  at io.exflo.ingestion.kafka.ExfloKafkaPlugin$implKoinModules$1$3.invoke(ExfloKafkaPlugin.kt:37) [plugin-6bda013.jar:?]
besu_1       |  at org.koin.core.instance.DefinitionInstance.create(DefinitionInstance.kt:54) [plugin-6bda013.jar:?]
besu_1       |  at org.koin.core.instance.FactoryDefinitionInstance.get(FactoryDefinitionInstance.kt:37) [plugin-6bda013.jar:?]
besu_1       |  at org.koin.core.definition.BeanDefinition.resolveInstance(BeanDefinition.kt:70) [plugin-6bda013.jar:?]
besu_1       |  at org.koin.core.scope.Scope.resolveInstance(Scope.kt:165) [plugin-6bda013.jar:?]
besu_1       |  at org.koin.core.scope.Scope.get(Scope.kt:128) [plugin-6bda013.jar:?]
besu_1       |  at io.exflo.ingestion.ExfloPlugin.start(ExfloPlugin.kt:236) [plugin-6bda013.jar:?]
besu_1       |  at org.hyperledger.besu.services.BesuPluginContextImpl.startPlugins(BesuPluginContextImpl.java:140) [besu-1.4.4.jar:1.4.4]
besu_1       |  at org.hyperledger.besu.cli.BesuCommand.startPlugins(BesuCommand.java:1106) [besu-1.4.4.jar:1.4.4]
besu_1       |  at org.hyperledger.besu.cli.BesuCommand.run(BesuCommand.java:958) [besu-1.4.4.jar:1.4.4]
besu_1       |  at picocli.CommandLine.executeUserObject(CommandLine.java:1769) [picocli-4.1.4.jar:4.1.4]
besu_1       |  at picocli.CommandLine.access$900(CommandLine.java:145) [picocli-4.1.4.jar:4.1.4]
besu_1       |  at picocli.CommandLine$RunLast.handle(CommandLine.java:2141) [picocli-4.1.4.jar:4.1.4]
besu_1       |  at picocli.CommandLine$RunLast.handle(CommandLine.java:2108) [picocli-4.1.4.jar:4.1.4]
besu_1       |  at picocli.CommandLine$AbstractParseResultHandler.handleParseResult(CommandLine.java:1968) [picocli-4.1.4.jar:4.1.4]
besu_1       |  at picocli.CommandLine.parseWithHandlers(CommandLine.java:2322) [picocli-4.1.4.jar:4.1.4]
besu_1       |  at org.hyperledger.besu.cli.util.ConfigOptionSearchAndRunHandler.handle(ConfigOptionSearchAndRunHandler.java:61) [besu-1.4.4.jar:1.4.4]
besu_1       |  at org.hyperledger.besu.cli.util.ConfigOptionSearchAndRunHandler.handle(ConfigOptionSearchAndRunHandler.java:31) [besu-1.4.4.jar:1.4.4]
besu_1       |  at picocli.CommandLine$AbstractParseResultHandler.handleParseResult(CommandLine.java:1968) [picocli-4.1.4.jar:4.1.4]
besu_1       |  at picocli.CommandLine.parseWithHandlers(CommandLine.java:2322) [picocli-4.1.4.jar:4.1.4]
besu_1       |  at org.hyperledger.besu.cli.BesuCommand.parse(BesuCommand.java:1074) [besu-1.4.4.jar:1.4.4]
besu_1       |  at org.hyperledger.besu.cli.BesuCommand.parse(BesuCommand.java:942) [besu-1.4.4.jar:1.4.4]
besu_1       |  at org.hyperledger.besu.Besu.main(Besu.java:49) [besu-1.4.4.jar:1.4.4]
besu_1       | 2020-05-04 11:37:39.350+00:00 | main | INFO  | RunnerBuilder | Detecting NAT service.
besu_1       | 2020-05-04 11:37:39.424+00:00 | ExfloExecutorThread-%d | ERROR | BlockImportTask | Critical failure

It seems we should manage the WakeupException accordingly (as described here), this is happening mostly Besu has not started syncing blocks as is finding peers.

We should listen probably as well to main Besu events in order to start processing instead of just trying to do so.

Steps to Reproduce

As described above:

docker-compose -f docker/exflo/docker-compose.kafka.yml up

Make sure volumes are empty (to double check docker-compose -f docker/exflo/docker-compose.kafka.yml down -v)

@aldoborrero aldoborrero added the Type: Bug Something isn't working label May 4, 2020
@aldoborrero
Copy link
Member Author

Closing this issue as is now deprecated

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Type: Bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants