Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout when reading offset of config.storage.topic #1370

Open
sabancihan opened this issue Nov 22, 2023 · 1 comment
Open

Timeout when reading offset of config.storage.topic #1370

sabancihan opened this issue Nov 22, 2023 · 1 comment

Comments

@sabancihan
Copy link

2023-11-14 05:11:42,432 WARN || Timeout while reading log to end for topic 'connect_configs'. Retrying automatically. This may occur when brokers are unavailable or unreachable. Reason: Timed out while waiting to get end offsets for topic 'connect_configs' on brokers at broker-1:9092 [org.apache.kafka.connect.util.KafkaBasedLog]

This is the error I get, but broker seems to be available and reachable at the time, after multiple tries of accessing offset of connect_configs it becomes accessible after around 3 minutes.

I traced where error message comes from and found that it is from endoffsets method of TopicAdmin.class

    public Map<TopicPartition, Long> endOffsets(Set<TopicPartition> partitions) {
        if (partitions == null || partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        Map<TopicPartition, OffsetSpec> offsetSpecMap = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()));
        ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap, new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED));
        // Get the individual result for each topic partition so we have better error messages
        Map<TopicPartition, Long> result = new HashMap<>();
        for (TopicPartition partition : partitions) {
            try {
                ListOffsetsResultInfo info = resultFuture.partitionResult(partition).get();
                result.put(partition, info.offset());
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                String topic = partition.topic();
                if (cause instanceof AuthorizationException) {
                    String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, bootstrapServers);
                    throw new ConnectException(msg, e);
                } else if (cause instanceof UnsupportedVersionException) {
                    // Should theoretically never happen, because this method is the same as what the consumer uses and therefore
                    // should exist in the broker since before the admin client was added
                    String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, bootstrapServers);
                    throw new UnsupportedVersionException(msg, e);
                } else if (cause instanceof TimeoutException) {
                    String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, bootstrapServers);
                    throw new TimeoutException(msg, e);
                } else if (cause instanceof LeaderNotAvailableException) {
                    String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, bootstrapServers);
                    throw new LeaderNotAvailableException(msg, e);
                } else if (cause instanceof org.apache.kafka.common.errors.RetriableException) {
                    throw (org.apache.kafka.common.errors.RetriableException) cause;
                } else {
                    String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, bootstrapServers);
                    throw new ConnectException(msg, e);
                }
            } catch (InterruptedException e) {
                Thread.interrupted();
                String msg = String.format("Interrupted while attempting to read end offsets for topic '%s' on brokers at %s", partition.topic(), bootstrapServers);
                throw new RetriableException(msg, e);
            }
        }
        return result;
    }

I did not get this timeout error when using a kafka cluster instead of single broker but I am not sure if it is really related to issue, broker seems to be up when offset read happens.

What may cause this timeout error?

@parvezDevIT
Copy link

Check the size of topic created by config.storage.topic . If size is big, delete and re-create topic and check if it solves the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants