Skip to content

Commit

Permalink
[pulsar-client-cpp] Expose getLastMessageId in the Reader API (#11723)
Browse files Browse the repository at this point in the history
### Motivation

The changes are trivial. getLastMessageIdAsync is already implemented in the ConsumerImpl class but it is only used internally for checking if there are any available messages in the topic. It is really helpful to have it exposed in the Reader API e.g., to be able to read all messages currently available in the topic. I.e., to get last message id and then to read all messages till this id. (hasMessageAvailable is not helpful because it potentially might always return 'false' if new messages are produced)

### Modifications

Trivial changes of ReaderImpl and Reader classes to expose getLastMessageId.
  • Loading branch information
VadimMolodyh committed Aug 24, 2021
1 parent ac5fce5 commit 640e63b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pulsar-client-cpp/include/pulsar/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class PulsarFriend;
class ReaderImpl;

typedef std::function<void(Result result, bool hasMessageAvailable)> HasMessageAvailableCallback;
typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;

/**
* A Reader can be used to scan through all the messages currently available in a topic.
Expand Down Expand Up @@ -137,6 +138,17 @@ class PULSAR_PUBLIC Reader {
*/
bool isConnected() const;

/**
* Asynchronously get an ID of the last available message or a message ID with -1 as an entryId if the
* topic is empty.
*/
void getLastMessageIdAsync(GetLastMessageIdCallback callback);

/**
* Get an ID of the last available message or a message ID with -1 as an entryId if the topic is empty.
*/
Result getLastMessageId(MessageId& messageId);

private:
typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
ReaderImplPtr impl_;
Expand Down
15 changes: 15 additions & 0 deletions pulsar-client-cpp/lib/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,19 @@ Result Reader::seek(uint64_t timestamp) {

bool Reader::isConnected() const { return impl_ && impl_->isConnected(); }

void Reader::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized, MessageId());
return;
}
impl_->getLastMessageIdAsync(callback);
}

Result Reader::getLastMessageId(MessageId& messageId) {
Promise<Result, MessageId> promise;

getLastMessageIdAsync(WaitForCallbackValue<MessageId>(promise));
return promise.getFuture().get(messageId);
}

} // namespace pulsar
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ReaderImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
consumer_->seekAsync(timestamp, callback);
}

void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
consumer_->getLastMessageIdAsync(callback);
}

ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }

bool ReaderImpl::isConnected() const { return consumer_->isConnected(); }
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ReaderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
void seekAsync(const MessageId& msgId, ResultCallback callback);
void seekAsync(uint64_t timestamp, ResultCallback callback);

void getLastMessageIdAsync(GetLastMessageIdCallback callback);

ReaderImplWeakPtr getReaderImplWeakPtr();

bool isConnected() const;
Expand Down

0 comments on commit 640e63b

Please sign in to comment.