Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get ConsumerStats using Binary Protocol #216

Merged
merged 1 commit into from
Feb 27, 2017
Merged

Conversation

jai1
Copy link
Contributor

@jai1 jai1 commented Feb 16, 2017

Motivation

Sometimes we need topic statics before consuming messages (Common use cases listed below), currently such information is obtained by making admin calls which use HTTP. Admin calls are expensive because each call spawns a new connection and may cause a redirect if the current broker doesn’t own the topic, we need a way to get this information without using HTTP.

Common use cases

  • When Pulsar is used as a write-ahead log we decide whether to allow updates to a topic based on the amount of backlog the topic has.
  • Consumer may need the number of pending acknowledgements.

Modifications

  • Created a new command to get consumer stats - Changed Protobuf and regenerated files.
  • Changed the C++ Client to get consumer stats.
  • Made changed to Broker to respond to consumer stats request
  • Added a caching mechanism to cache the consumer stats for 30 seconds.
  • Changed ClientConnection to handle operation timeout

Result

We will be able to get consumer stats without having to use HTTP

@rdhabalia rdhabalia added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature labels Feb 16, 2017
@rdhabalia rdhabalia added this to the 1.17 milestone Feb 16, 2017
@jai1 jai1 changed the title Cms 2214 Get ConsumerStats using Binary Protocol Feb 16, 2017
@@ -28,6 +29,7 @@ make

# Re-generate PulsarApi
cd pulsar-path/pulsar-common
PROTOC=~/protobuf/src/protoc ./generate_protobuf.sh
PROTOC=~/protobuf/src/protoc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you break it in 2 lines, then you need to use export PROTOC=... otherwise the variable will not picked up by the next script

}
}

private Map<String, String> createConsumerStatsMap(Consumer consumer, PersistentSubscription subscription) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reuse the already existing ConsumerStats class?

Copy link
Contributor Author

@jai1 jai1 Feb 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some subscriber level details as well - backlog, subscription type etc
But I have added a toMap method to convert the Class to a map

ca16032

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia, @saandrews and @merlimat Would it be better if I add a class in the protobuf and client side instead of using a map?


ResultMessageTooBig /// Trying to send a messages exceeding the max size
ResultMessageTooBig, /// Trying to send a messages exceeding the max size
ResultConsumerStatsError /// Broker returned an error while trying to fetch consumer stats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot we use the existing ResultError instead of ResultConsumerStatsError. If I am making a call to get consumer stats, there's no need to re-state the type of the request.

Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • all the formatting changes are with /pulsar-client-cpp/eclipse-formatter.xml ??

@@ -484,6 +484,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude>
<exclude>pulsar-client-cpp/lib/checksum/crc32c*</exclude>
<exclude>pulsar-client-cpp/lib/lz4/lz4.*</exclude>
<exclude>pulsar-client-cpp/.idea/*</exclude>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think cpp/.gitignore already contains: .idea so, we may not need this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Git ignore will prevent the idea folder to be checked in and this change will prevent the local build to fail

Object msg = null;
try {
PersistentTopic topic;
topic = (PersistentTopic) getBrokerService().getTopicReference(topicName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need of multiple lines :
PersistentTopic topic = (PersistentTopic) getBrokerService().getTopicReference(topicName);

@@ -5,7 +5,7 @@ Pulsar uses protocol buffer messages for the client/broker wire protocol.

The protocol definition is located at `pulsar-common/src/main/proto/PulsarApi.proto` and the pre-generated Java code is at `pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java`.

When making a change to the `PulsarApi.proto` definition, we have regenerate the `PulsarApi.java` and include that in the same commit.
When making a change to the `PulsarApi.proto` definition, we have to regenerate the `PulsarApi.java` and include that in the same commit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now, cpp library is added, we can also mention : PulsarApi.pb.cc

log.debug("CommandConsumerStats[requestId = {}, topicName = {}, subscriptionName = {}, consumerId = {}]", requestId, topicName, subscriptionName, consumerId);
}

Object msg = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteBuf msg

requestId, topicName, subscriptionName, consumerId);
}
} catch (Exception e) {
log.error("Exception: {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should write appropriate message such as topic not found

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have written appropriate message in line 244, 240 and 236 (same file)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log remaining info as well in all the above error logs? topic name, requested, surname & consumerid?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean error log such as: Failed to get consumer-stats response so, it can help while reading log file where exactly it occurred.

}
} catch (Exception e) {
log.error("Exception: {}", e);
msg = Commands.newConsumerStatsResponse("Exception: " + e, requestId, topicName, subscriptionName, consumerId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here: error-msg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have written appropriate message in line 244, 240 and 236 (same file)
This case is for handling some exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this exception is sent to the client, shouldn't it be the one client knows about?


#pragma GCC visibility push(default)

class PulsarFriend;

namespace pulsar {

typedef std::map<std::string, std::string> StatsMap;

class BrokerSideConsumerStats {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename it as BrokerConsumerStats or ConsumerStats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a ConsumerStats class in Java client which captures the message rate, acks etc as sent by client - created by Sangming and Sid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer BrokerConsumerStats or ConsumerStatsInfo.

@@ -117,7 +117,8 @@ SharedBuffer BatchMessageContainer::getBatchedPayload() {
void BatchMessageContainer::clear() {
LOG_DEBUG(*this << " BatchMessageContainer::clear() called");
timer_->cancel();
averageBatchSize_ = (messagesContainerListPtr_->size() + (averageBatchSize_ * numberOfBatchesSent_))/++numberOfBatchesSent_;
averageBatchSize_ = (messagesContainerListPtr_->size() + (averageBatchSize_ * numberOfBatchesSent_))/(numberOfBatchesSent_ + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any difference here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the compiler doesn't give ordering guarantees for ++ if used in complex calculations
Try compiling in your mac m/c and you will see the warning.

case CONSUMER_STATS:
checkArgument(cmd.hasConsumerStats());
handleConsumerStats(cmd.getConsumerStats());
cmd.getConsumerStats().recycle();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting

case CONSUMER_STATS_RESPONSE:
checkArgument(cmd.hasConsumerStatsResponse());
handleConsumerStatsResponse(cmd.getConsumerStatsResponse());
cmd.getConsumerStatsResponse().recycle();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting

@rdhabalia
Copy link
Contributor

just a thought:

  • if client requires subscription level stats then : if consumerId is not provided then should broker return subscription level stats in the map?


# Apply patch
patch -p1 < ../pulsar-path/protobuf/protobuf.patch
## For C++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for C++ -> git checkout v2.6.0


message CommandConsumerStatsResponse {
required uint64 request_id = 1;
required string topic_name = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need topic_name, subscription_name, consumer_id in response? can't we just return request_id as client knows information attached with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can but a little more info might help in debug if we run into some trouble and since this call can be made at most once in 30 seconds - it won't harm the Network BW much.

Still if you feel strongly about it let me know - i will remove the fields

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should not send extra field in response except request_id

requestId, topicName, subscriptionName, consumerId);
}
} catch (Exception e) {
log.error("Exception: {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean error log such as: Failed to get consumer-stats response so, it can help while reading log file where exactly it occurred.

@@ -266,7 +274,12 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
// Signals whether we're waiting for a response from broker
bool havePendingPingRequest_;
DeadlineTimerPtr keepAliveTimer_;
DeadlineTimerPtr consumerStatsRequestTimer_;

void handleConsumerStatsTimeout(const boost::system::error_code &ec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need timer to expiry the request. we already have keep alive time which will break the connection if broker is not responding and it should clear-out pending requests.

@@ -60,6 +60,7 @@ namespace pulsar {
virtual void redeliverUnacknowledgedMessages();
virtual const std::string& getName() const;
virtual int getNumOfPrefetchedMessages() const ;
virtual Result getConsumerStats(BrokerSideConsumerStats& brokerSideConsumerStats, int partitionIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also provide getConsumerStats(BrokerSideConsumerStats& brokerSideConsumerStats) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed yesterday - We can the code is not difficult but the interface will become quite ugly since now we need to return a list of stats always.

@@ -23,6 +23,17 @@ namespace pulsar {

const std::string EMPTY_STRING;

BrokerSideConsumerStats::BrokerSideConsumerStats():validTill_(now()) {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we initialize validTill_+X so it can be valid for X time.??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do that in ClientConnection

                        boost::posix_time::ptime currentTime = now() + milliseconds(consumerStatsTTLMs_);
                        consumerStatsPromise.setValue(BrokerSideConsumerStats(consumerStats, currentTime));

" Sending getConsumerStats command for Consumer - " << getConsumerId() << ", requestId - "<<requestId);

BrokerSideConsumerStats consumerStats;
Result res = cnx->newConsumerStats(topic_, subscription_, consumerId_, requestId).get(consumerStats);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a blocking call on future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then should we make it non blocking call by attaching the listener?

Copy link
Contributor Author

@jai1 jai1 Feb 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will block the user thread not out io thread, similar logic is used to producer.send and consumer.receive

Result Producer::send(const Message& msg) {
    Promise<Result, Message> promise;
    sendAsync(msg, WaitForCallbackValue<Message>(promise));

    Message m;
    Result result = promise.getFuture().get(m);
    return result;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think Is it worth it to provide callback option also in case of client doesn't want to block such as: sendAsync(msg, callBack)

}
return res;
} else {
LOG_ERROR(getName() << " In getConsumerStats call - Operation not supported since server protobuf version is older than proto::v6");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v7?

@jai1 jai1 added type/feature The PR added a new feature or issue requested a new feature and removed type/feature The PR added a new feature or issue requested a new feature labels Feb 23, 2017
}
} catch (Exception e) {
log.error("Exception: {}", e);
msg = Commands.newConsumerStatsResponse("Exception: " + e, requestId, topicName, subscriptionName, consumerId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this exception is sent to the client, shouldn't it be the one client knows about?

requestId, topicName, subscriptionName, consumerId);
}
} catch (Exception e) {
log.error("Exception: {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log remaining info as well in all the above error logs? topic name, requested, surname & consumerid?


#pragma GCC visibility push(default)

class PulsarFriend;

namespace pulsar {

typedef std::map<std::string, std::string> StatsMap;

class BrokerSideConsumerStats {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer BrokerConsumerStats or ConsumerStatsInfo.

consumerStats[kv.key()] = kv.value();
LOG_DEBUG(kv.key() << " = " << kv.value());
}
boost::posix_time::ptime currentTime = now() + milliseconds(consumerStatsTTLMs_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not really currentTime


for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.begin();
it != pendingConsumerStatsMap_.end(); ++it) {
LOG_ERROR(cnxString_ << " In getConsumerStats call - Closing Client Connection, please try again later");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy pasted message? Also we shouldn't have to explicitly say "In getConsumerStats call"

@@ -686,4 +687,44 @@ int ConsumerImpl::getNumOfPrefetchedMessages() const {
return incomingMessages_.size();
}

Result ConsumerImpl::getConsumerStats(BrokerSideConsumerStats& brokerSideConsumerStats, int partitionIndex) {
if (partitionIndex != -1) {
LOG_WARN(getName() << " In getConsumerStats call - Ignoring the partitionIndex since the topic is not partitioned")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "In getConsumerStats call -"

}

if (brokerSideConsumerStats_.isValid()) {
LOG_INFO(getName() << " In getConsumerStats call - Serving data from cache");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why INFO log?

@@ -300,7 +392,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}) //
.exceptionally(exception -> {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCause().getMessage() is not very informative

@@ -45,7 +45,9 @@ struct ClientConfiguration::Impl {
operationTimeoutSeconds(30),
messageListenerThreads(1),
concurrentLookupRequest(5000),
logConfFilePath() {}
logConfFilePath(),
useTls(false),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a few test runs I found that tls was automatically enabled - hence initialized the params

@@ -113,7 +114,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
lock.unlock();

ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datatype compatible with what we send over the wire

@@ -483,13 +484,13 @@ inline proto::CommandSubscribe_SubType ConsumerImpl::getSubType() {
ConsumerType type = config_.getConsumerType();
switch (type) {
case ConsumerExclusive:
return proto::CommandSubscribe_SubType_Exclusive;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both mean the same - changed since underscores were ugly

static const SubType Exclusive = CommandSubscribe_SubType_Exclusive;
static const SubType Shared = CommandSubscribe_SubType_Shared;
static const SubType Failover = CommandSubscribe_SubType_Failover;

@@ -300,7 +392,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}) //
.exceptionally(exception -> {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage());
subscriptionName, exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be CompletionException and it will not be helpful, we have to log exception.getCause()

}
if (!consumerFound) {
log.error(
"Failed to get consumer-stats response - Consumer {} not found for CommandConsumerStats request id {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log topic and subscription id as well?

LOG_ERROR(cnxString_ << " In getConsumerStats call - Closing Client Connection, please try again later");
it->second.setFailed(ResultConnectError);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlock here.

// Complex logic since promises need to be fulfilled outside the lock
for (int i = 0; i < consumerStatsPromises.size(); i++) {
consumerStatsPromises[i].setFailed(ResultTimeout);
LOG_ERROR(cnxString_ << " In getConsumerStats call - Operation timedout, didn't get response from broker");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix log msg.


if (consumerStatsResponse.has_error_code()) {
if (consumerStatsResponse.has_error_message()) {
LOG_ERROR(cnxString_ << " In getConsumerStats call - "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix msg

@jai1 jai1 force-pushed the CMS-2214 branch 2 times, most recently from 83c4319 to 20675ca Compare February 25, 2017 01:30
}

break;
}

case BaseCommand::SEND_ERROR: {
const CommandSendError& error = incomingCmd_.send_error();
const CommandSendError &error = incomingCmd_.send_error();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the formatting caused by the editor w.r.t. space and &.

@jai1 jai1 force-pushed the CMS-2214 branch 4 times, most recently from 97827ef to 12cf4d1 Compare February 25, 2017 06:24
@jai1 jai1 removed the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Feb 25, 2017
@jai1 jai1 added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Feb 25, 2017
@jai1
Copy link
Contributor Author

jai1 commented Feb 26, 2017

@merlimat - if you find time, can you review this PR.

@jai1 jai1 merged commit ae381b4 into apache:master Feb 27, 2017
@merlimat
Copy link
Contributor

merlimat commented Feb 27, 2017 via email

@jai1
Copy link
Contributor Author

jai1 commented Feb 28, 2017 via email

@jai1 jai1 deleted the CMS-2214 branch March 6, 2017 17:43
sijie pushed a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
* Rename FunctionConfig names to make them more consistent

* Added generated python file
hrsakai pushed a commit to hrsakai/pulsar that referenced this pull request Dec 10, 2020
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
speed up entriesToRecords for consuming multi entries
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants