Skip to content

lack of namespace when update consumer offset by persist api in DefaultPullConsumer. #4584

@ShannonDing

Description

@ShannonDing

BUG REPORT

  1. Please describe the issue you observed
  • update Consume Offset after pull message from remote queue. the messagequeue is warped by namespace
    @Override
    public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
        this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);
    }
  • get offset store
@Deprecated
    public OffsetStore getOffsetStore() {
        return offsetStore;
    }
  • persist offset table manually
@Override
    public void persist(MessageQueue mq) {
        AtomicLong offset = this.offsetTable.get(mq);
        if (offset != null) {
            try {
                this.updateConsumeOffsetToBroker(mq, offset.get());
                log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                    this.groupName,
                    this.mQClientFactory.getClientId(),
                    mq,
                    offset.get());
            } catch (Exception e) {
                log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
            }
        }
    }

the offset will not update to broker as the offset can not be found in the offsettable.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions