Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader API for C++ client #717

Merged
merged 7 commits into from
Sep 12, 2017
Merged

Reader API for C++ client #717

merged 7 commits into from
Sep 12, 2017

Conversation

merlimat
Copy link
Contributor

Motivation

Implement the Reader API also for C++ client library

@merlimat merlimat added the type/feature The PR added a new feature or issue requested a new feature label Aug 27, 2017
@merlimat merlimat added this to the 1.20.0-incubating milestone Aug 27, 2017
@merlimat merlimat self-assigned this Aug 27, 2017
required uint64 ledgerId = 1;
required uint64 entryId = 2;
required int64 ledgerId = 1;
required int64 entryId = 2;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is safe as it doesn't change the encoding on the wire, just the type used in C++. In Java it would anyway be a long with no difference.
This is required to pass -1 as special values.

@merlimat merlimat force-pushed the cpp-reader-api branch 2 times, most recently from 181d5fb to 4117703 Compare August 28, 2017 17:53
@merlimat merlimat mentioned this pull request Aug 28, 2017
@@ -36,7 +36,7 @@
PositionImpl startCursorPosition) {
super(bookkeeper, config, ledger, cursorName);

if (startCursorPosition == null || startCursorPosition.equals(PositionImpl.latest)) {
if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any specific reason to not compare the position(ledgerId and entryId)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem comes from C++ client lib. The entry id in MessageId is 48 bits only (which by itself it's more than enough), and for PositionImpl.latest we're using Long.max() for both ledgerId and entryId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be adding comments for these changes

Position startPosition = new PositionImpl(msgId.getLedgerId(), msgId.getEntryId());
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
if (msgId instanceof BatchMessageIdImpl) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think broker is batch-msg agnostic as it treats it as a normal msg (client manages batch-msg business) and it would be better to avoid introducing BatchMessageIdImpl at broker-side. Can't we always assign entryId= entryId-1 while creating NonDurableSubscription ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we always assign entryId= entryId-1 while creating NonDurableSubscription?

The problem is that you can have these 2 scenarios:

  • Without batching : startMessageId = (ledgerId, entryId). eg. (3, 4)
  • With batching : startMessageId = (ledgerId, entryId, batchIndex). eg. (3,4,5)

In the first case, it will mean that (3,4) was already consumed. So next message to be sent will be (3,5).

In the case of batching, (3,4,5) is the start message. With the old logic, the broker would start delivering at (3,5,0) while there might still be messages to read at (3,4,6)..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without batching : startMessageId = (ledgerId, entryId). eg. (3, 4)

yes, I understand that batch-msg might not be consumed fully at client so, we need to go 1 msg back.
If we do the same for non-batch then (3,4) will be delivered again which will be duplicate. Is there an issue if broker delivers duplicate msg? because anyway for durable cursor, broker can deliver duplicate msg by reseting cursor-position at marDeletePosition. So, duplicate msg is the only conern to set entryId=entryId-1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the purpose of reader is to position exactly on the correct message :) That enables to avoid dups in the consuming side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case, that is an issue even with the current Java reader. I have another PR to fix that as well.

* @return ResultTimeout if the receive timeout was triggered
* @return ResultInvalidConfiguration if a message listener had been set in the configuration
*/
Result readNext(Message& msg, int timeoutMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we also add readNextAsync()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have yet merged the Consumer.receiveAsync() :) Once that's in, adding Reader.readNextAsync() will be trivial since it's just a wrapper.

Copy link
Contributor

@jai1 jai1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far the changes LGTM (minor comments) - will give it another round of review tomorrow before approving.

@@ -36,7 +36,7 @@
PositionImpl startCursorPosition) {
super(bookkeeper, config, ledger, cursorName);

if (startCursorPosition == null || startCursorPosition.equals(PositionImpl.latest)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not too much work - can we move all java file changes as a part of a separate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm going to break it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending

inline bool operator<(const BatchMessageId& mID) const;
inline bool operator<=(const BatchMessageId& mID) const;
protected:
bool operator<(const BatchMessageId& other) const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why we removed the inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using std::tie to have 3 vars comparators so to not have any c++11 in headers. In general, we've kept all methods impl in .cc files.

@@ -301,14 +315,33 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
}

// Zero Queue size is not supported with Batch Messages
unsigned int ConsumerImpl::receiveIndividualMessagesFromBatch(Message& batchedMessage) {
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, I remember @msb-at-yahoo advising to refrain using uint32_t or any other data types from <stdint.h> unless we are serializing and passing the value over the wire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way, I generally prefer explicit sizing to make it clear and also it's shorter than unsigned int :D

}

// Regular path, append individual message to incoming messages queue
incomingMessages_.push(msg);
}
return batchSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be returning the batchSize since not all messages made it to incomingMessages_
Just return the number of messages added to the incomingMessages_ queue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll fix it.

const BatchMessageId& msgId = static_cast<const BatchMessageId&>(msg.getMessageId());

// Only acknowledge on the first message in the batch
if (msgId.batchIndex_ <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we acknowledge on the last message of the batch since you clear the Receiver Queue on reconnect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't really makes a lot of difference (and it's easy to ack on the first rather than the last), because for the reader the acknowledge it's just an internal trick that enables us to track the "backlog" in the stats. Since the cursor is non-durable and the reader always specify where to start reading, there is no other meaning for the acks.

consumerConf.setConsumerName(readerConf_.getReaderName());
}

if (readerConf_.hasReaderListener()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confused since the ReaderAPI has no method for acknowledging the message shouldn't we be calling acknowledgeIfNecessaryourselves before sending the message to ReaderListener.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I missed that

// We can safely cast to 'BatchMessageId' since all the messages queued will have that type of message id,
// irrespective of whether they were part of a batch or not.
const MessageId& nextMessageId = static_cast<const BatchMessageId&>(nextMessageInQueue.getMessageId());
BatchMessageId previousMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_ - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here you cast the BatchMessageId to MessageId again (i.e ignore the batchIndex) hence stand a risk of redelivering the message once the batch is redelivered

On the other hand if you maintain the batchIndex then you can't clear the unAckedMessageTracter and bathIndexTracker on reconnection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually a bug. Need to fix this

const MessageId& nextMessageId = static_cast<const BatchMessageId&>(nextMessageInQueue.getMessageId());
BatchMessageId previousMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_ - 1);
return Optional<BatchMessageId>::of(previousMessageId);
} else if (lastDequedMessage_.is_present()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return (lastDequedMessage_.is_present() ? lastDequedMessage_ : startMessageId_)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer longer form with comments :)

@@ -43,12 +52,59 @@ MessageId::MessageId(int64_t ledgerId, int64_t entryId)
// consumer's partition is assigned to this partition
}

const MessageId& MessageId::earliest() {
static const BatchMessageId _earliest(-1, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious what does _varconvention mean I know that var_ is for private members.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't implying any convention. these are just locally defined static variables.

/**
* Serialize the message id into a binary string for storing
*/
virtual void serialize(std::string& result) const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would we want to use the serialize and deserialize API??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using reader, you need to be able to store the message id somewhere (since you don't have a durable cursor, like in case of subscriptions/consumers).

Then you can deserialize that message id and use it to create a new Reader starting from a particular message.

@@ -187,6 +191,16 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscribe->set_consumer_id(consumerId);
subscribe->set_request_id(requestId);
subscribe->set_consumer_name(consumerName);
subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
if (startMessageId.is_present()) {
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, but subscribe is already pointer so, do we need pointer *subscribe-> again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe->mutable_start_message_id() returns a MessageIdData* pointer. So the * applies to that, and not to subscribe

LOG_DEBUG(
getName() << "Ignoring message from before the startMessageId" << msg.getMessageId());
increaseAvailablePermits(cnx);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchSize--?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Jai also pointed out, will fix that

&& msgId.batchIndex_ <= startMessageId_.value().batchIndex_) {
LOG_DEBUG(
getName() << "Ignoring message from before the startMessageId" << msg.getMessageId());
increaseAvailablePermits(cnx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call increaseAvailablePermits(cnx, numberOfPermits); outside the loop as increaseAvailablePermits sends command to broker if availablePermits > ReceiverQueueSize/2) so, we can save multiple send command.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to address this comment???

@merlimat
Copy link
Contributor Author

@jai1 @rdhabalia I've pulled out all Java changes + the fix for Java client reader to position within a batch into #720. I'll push the stripped version here with just the C++ changes.

@merlimat
Copy link
Contributor Author

@jai1 @rdhabalia Removed Java changes, rebased on #720 and addressed comments.

}

bool BatchMessageId::operator<(const BatchMessageId& other) const {
return std::tie(ledgerId_, entryId_, batchIndex_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am getting this error - when I try to compile it on our machines.
BatchMessageId.cc:49: error: cannot bind bitfield ‘((const pulsar::BatchMessageId*)other)->pulsar::BatchMessageId::<anonymous>.pulsar::MessageId::entryId_’ to ‘int64_t&’
Same with MessageId.cc

I think the only way out is to compare field by field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm.. ok :/ I hoped tuple would have been supported in gcc-4.4 as well

std::string serialized;
msgId.serialize(serialized);

std::shared_ptr<MessageId> deserialized = MessageId::deserialize(serialized);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be
boost::shared_ptr deserialized = MessageId::deserialize(serialized);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was initially using std::shared_ptr and later converted into boost::shared_ptr

return s;
}

}
Copy link
Contributor

@jai1 jai1 Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gtest-internal.h:97: undefined reference to pulsar::operator<<(std::basic_ostream<char, std::char_traits<char> >&, pulsar::BatchMessageId const&)

This operator needs to be visible outside if you want to use it in tests - like MessageId

#pragma GCC visibility push(default)
std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
    s << '(' << messageId.ledgerId_ << ',' << messageId.entryId_ << ',' << messageId.partition_ << ')';
    return s;
}
#pragma GCC visibility pop

@merlimat merlimat force-pushed the cpp-reader-api branch 2 times, most recently from c3e3a50 to d014e25 Compare August 29, 2017 23:57
@merlimat
Copy link
Contributor Author

@jai1 @rdhabalia Updated with fixes for comments plus some error in MessageId to BatchMessageId casting that was getting a test to be flaky.

Copy link
Contributor

@jai1 jai1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two small changes remain - rest is fine


bool MessageId::operator<(const MessageId& other) const {
return std::tie(ledgerId_, entryId_, partition_)
< std::tie(other.ledgerId_, other.entryId_, other.partition_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageId.cc:110: error: cannot bind bitfield ‘other->pulsar::MessageId::entryId_’ to ‘int64_t&’

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the std::tie() to support older g++

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to remove std::tie()

@@ -36,7 +36,7 @@
PositionImpl startCursorPosition) {
super(bookkeeper, config, ledger, cursorName);

if (startCursorPosition == null || startCursorPosition.equals(PositionImpl.latest)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending

@merlimat
Copy link
Contributor Author

merlimat commented Sep 6, 2017

@rdhabalia @jai1 Rebased on last master with required commit merged. All comments should have been addressed already.

@jai1
Copy link
Contributor

jai1 commented Sep 8, 2017

retest this please

1 similar comment
@jai1
Copy link
Contributor

jai1 commented Sep 8, 2017

retest this please


bool MessageId::operator<(const MessageId& other) const {
return std::tie(ledgerId_, entryId_, partition_)
< std::tie(other.ledgerId_, other.entryId_, other.partition_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to remove std::tie()

@merlimat
Copy link
Contributor Author

merlimat commented Sep 8, 2017

Need to remove std::tie()

I thought I already did. let me check again

* subscription. Reader can only work on non-partitioned topics.
* <p>
* The initial reader positioning is done by specifying a message id. The options are:
* <ul>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does CPP support format of comment like java?

}

std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId) {
s << '(' << messageId.ledgerId_ << ':' << messageId.entryId_ << ':' << messageId.batchIndex_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it update value of input param std::ostream& s. So, should we make return type of this function to void so, user would know that it updates input value only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jai1
Copy link
Contributor

jai1 commented Sep 8, 2017

retest this please

@jai1
Copy link
Contributor

jai1 commented Sep 8, 2017

I thought I already did. let me check again

You did it from batchMessageId - forgot to do it for MessageId

@merlimat
Copy link
Contributor Author

@jai1 Updated and also fixed a problem in comparison

@jai1
Copy link
Contributor

jai1 commented Sep 11, 2017

@merlimat - Reviewed the PR - LGTM - once you address Rajan's comment about increaseAvailablePermits we can merge it.

@merlimat
Copy link
Contributor Author

@jai1 @rdhabalia Fixed the increaseAvailablePermits() comment

Copy link
Contributor

@jai1 jai1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Merge it after the tests pass.

@jai1
Copy link
Contributor

jai1 commented Sep 11, 2017

@saandrews @msb-at-yahoo @rdhabalia - Do you want to review this PR, in case I missed something.?

@jai1
Copy link
Contributor

jai1 commented Sep 11, 2017

retest this please

@merlimat
Copy link
Contributor Author

Merging this to rebase dependent changes on top of it. Will address comments if they arise

@merlimat merlimat merged commit ee1ba68 into apache:master Sep 12, 2017
@merlimat merlimat moved this from PR available to Merged in Message deduplication Sep 12, 2017
sijie pushed a commit that referenced this pull request Feb 9, 2021
…artup (#9499)

### Motivation

Standalone pulsar broker and function service can have deadlock which also sometime causes failures in function unit-test case. Pulsar function tries to create a subscription which blocks the zk thread and can be cause of possible deadlock. Below is the thread-dump when the deadlock happens and function service start up fails. 

So, remove blocking call while creating subscription using admin-api.



```

"pulsar-load-manager-139-1" #717 prio=5 os_prio=31 tid=0x00007f8a68698000 nid=0x3610f waiting on condition [0x00007000156e9000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000792a07020> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709)
        at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
        at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationFromZK(SimpleLoadManagerImpl.java:391)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationDouble(SimpleLoadManagerImpl.java:401)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getLoadBalancerBrokerOverloadedThresholdPercentage(SimpleLoadManagerImpl.java:450)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.generateLoadReportForcefully(SimpleLoadManagerImpl.java:1139)
        - locked <0x000000078e53f2d0> (a java.util.HashSet)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.writeLoadReportOnZookeeper(SimpleLoadManagerImpl.java:1295)
        at org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask.run(LoadReportUpdaterTask.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)



"pulsar-ordered-OrderedExecutor-1-0-EventThread" #621 daemon prio=5 os_prio=31 tid=0x00007f89fd8e1800 nid=0x2fe07 waiting on condition [0x000070000f7ce000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000792ecb8b0> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
        at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:2060)
        at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$69(PersistentTopicsBase.java:2034)
        at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase$$Lambda$676/1709042625.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$5(ZooKeeperCache.java:249)
        at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$154/5998675.processResult(Unknown Source)
        at org.apache.bookkeeper.zookeeper.ZooKeeperClient$15$1.processResult(ZooKeeperClient.java:879)
        at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:583)
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:510)


main" #1 prio=5 os_prio=31 tid=0x00007f8aa0013000 nid=0x1603 waiting on condition [0x0000700004160000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007930efe68> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
        at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:178)
        at org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:297)
        at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:632)
        at org.glassfish.jersey.client.JerseyInvocation$$Lambda$672/237755480.call(Unknown Source)
        at org.glassfish.jersey.client.JerseyInvocation.call(JerseyInvocation.java:654)
        at org.glassfish.jersey.client.JerseyInvocation.lambda$runInScope$3(JerseyInvocation.java:648)
        at org.glassfish.jersey.client.JerseyInvocation$$Lambda$673/197134325.call(Unknown Source)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:205)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390)
        at org.glassfish.jersey.client.JerseyInvocation.runInScope(JerseyInvocation.java:648)
        at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:631)
        at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:434)
        at org.glassfish.jersey.client.JerseyInvocation$Builder.put(JerseyInvocation.java:318)
        at org.apache.pulsar.client.admin.internal.TopicsImpl.createSubscription(TopicsImpl.java:1143)
        at org.apache.pulsar.functions.worker.PulsarWorkerService.start(PulsarWorkerService.java:454)
        at org.apache.pulsar.broker.PulsarService.startWorkerService(PulsarService.java:1343)
        at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:671)
        at org.apache.pulsar.io.PulsarFunctionE2ETest.setup(PulsarFunctionE2ETest.java:209)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client type/feature The PR added a new feature or issue requested a new feature
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

None yet

3 participants