Skip to content

Commit

Permalink
Introduce topic reader in client API (#371)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed May 6, 2017
1 parent 65da593 commit 7badf1a
Show file tree
Hide file tree
Showing 24 changed files with 1,355 additions and 129 deletions.
19 changes: 11 additions & 8 deletions bin/pulsar-perf
@@ -1,19 +1,19 @@
#!/usr/bin/env bash
#
# Copyright 2016 Yahoo Inc.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#

BINDIR=$(dirname "$0")
PULSAR_HOME=`cd $BINDIR/..;pwd`
Expand All @@ -38,14 +38,14 @@ else
fi

# exclude tests jar
RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? == 0 ]; then
PULSAR_JAR=$RELEASE_JAR
fi

# exclude tests jar
BUILT_JAR=`ls $PULSAR_HOME/pulsar-testclient/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
echo "\nCouldn't find pulsar jar.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
Expand All @@ -58,7 +58,7 @@ add_maven_deps_to_classpath() {
if [ "$MAVEN_HOME" != "" ]; then
MVN=${MAVEN_HOME}/bin/mvn
fi

# Need to generate classpath from maven pom. This is costly so generate it
# and cache it. Save the file into our target dir so a mvn clean will get
# clean it up and force us create a new one.
Expand All @@ -75,7 +75,8 @@ Usage: pulsar <command>
where command is one of:
produce Run a producer
consume Run a consumer
monitor-brokers Continuously receive broker data and/or load reports
read Run a topic reader
monitor-brokers Continuously receive broker data and/or load reports
simulation-client Run a simulation server acting as a Pulsar client
simulation-controller Run a simulation controller to give commands to servers
Expand Down Expand Up @@ -141,6 +142,8 @@ if [ "$COMMAND" == "produce" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceProducer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "consume" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceConsumer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "read" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceReader --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "monitor-brokers" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.BrokerMonitor "$@"
elif [ "$COMMAND" == "simulation-client" ]; then
Expand Down
Expand Up @@ -35,10 +35,8 @@

import com.yahoo.pulsar.broker.authentication.AuthenticationDataCommand;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.client.impl.MessageIdImpl;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck;
Expand All @@ -61,6 +59,7 @@
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.BacklogQuota;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -153,8 +152,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
// ////
// // Incoming commands handling
// ////



@Override
protected void handleLookup(CommandLookupTopic lookup) {
final long requestId = lookup.getRequestId();
Expand Down Expand Up @@ -226,7 +224,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
"Failed due to too many pending lookup requests", requestId));
}
}

@Override
protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
if (log.isDebugEnabled()) {
Expand All @@ -247,8 +245,7 @@ protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
"Consumer " + consumerId + " not found", requestId);
} else {
if (log.isDebugEnabled()) {
log.debug("CommandConsumerStats[requestId = {}, consumer = {}]",
requestId, consumer);
log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", requestId, consumer);
}
msg = Commands.newConsumerStatsResponse(createConsumerStatsResponse(consumer, requestId));
}
Expand All @@ -270,7 +267,7 @@ CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consum
commandConsumerStatsResponseBuilder.setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs);
commandConsumerStatsResponseBuilder.setAddress(consumerStats.address);
commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.connectedSince);

Subscription subscription = consumer.getSubscription();
commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog());
commandConsumerStatsResponseBuilder.setMsgRateExpired(subscription.getExpiredMessageRate());
Expand Down Expand Up @@ -338,6 +335,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final long consumerId = subscribe.getConsumerId();
final SubType subType = subscribe.getSubType();
final String consumerName = subscribe.getConsumerName();
final boolean isDurable = subscribe.getDurable();
final MessageIdImpl startMessageId = subscribe.hasStartMessageId()
? new MessageIdImpl(subscribe.getStartMessageId().getLedgerId(),
subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition())
: null;

final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;

authorizationFuture.thenApply(isAuthorized -> {
Expand All @@ -363,15 +366,16 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress, topicName,
subscriptionName);
ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady : getErrorCode(existingConsumerFuture);;
ctx.writeAndFlush(Commands.newError(requestId, error,
"Consumer is already present on the connection"));
ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
: getErrorCode(existingConsumerFuture);
ctx.writeAndFlush(
Commands.newError(requestId, error, "Consumer is already present on the connection"));
return null;
}
}

service.getTopic(topicName).thenCompose(
topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName))
service.getTopic(topicName).thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName,
consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId))
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
Expand All @@ -393,7 +397,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}) //
.exceptionally(exception -> {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage());
subscriptionName, exception.getCause().getMessage(), exception);

// If client timed out, the future would have been completed by subsequent close. Send error
// back to client, only if not completed already.
Expand Down Expand Up @@ -455,10 +459,11 @@ protected void handleProducer(final CommandProducer cmdProducer) {
// until the previous producer creation
// request
// either complete or fails.
ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady : getErrorCode(existingProducerFuture);
ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
: getErrorCode(existingProducerFuture);
log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName);
ctx.writeAndFlush(Commands.newError(requestId, error,
"Producer is already present on the connection"));
ctx.writeAndFlush(
Commands.newError(requestId, error, "Producer is already present on the connection"));
return null;
}
}
Expand Down Expand Up @@ -740,7 +745,7 @@ public void closeProducer(Producer producer) {
}
long producerId = producer.getProducerId();
producers.remove(producerId);
if(remoteEndpointProtocolVersion >= v5.getNumber()) {
if (remoteEndpointProtocolVersion >= v5.getNumber()) {
ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L));
} else {
close();
Expand All @@ -755,7 +760,7 @@ public void closeConsumer(Consumer consumer) {
}
long consumerId = consumer.consumerId();
consumers.remove(consumerId);
if(remoteEndpointProtocolVersion >= v5.getNumber()) {
if (remoteEndpointProtocolVersion >= v5.getNumber()) {
ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
} else {
close();
Expand Down Expand Up @@ -860,7 +865,7 @@ public String getRole() {
boolean hasConsumer(long consumerId) {
return consumers.containsKey(consumerId);
}

public boolean isBatchMessageCompatibleVersion() {
return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber();
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.CompletableFuture;

import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.client.api.MessageId;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.policies.data.BacklogQuota;
import com.yahoo.pulsar.common.policies.data.Policies;
Expand All @@ -39,7 +40,7 @@ public interface PublishCallback {
void removeProducer(Producer producer);

CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName);
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId);

CompletableFuture<Void> unsubscribe(String subName);

Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.client.impl.MessageImpl;

/**
Expand All @@ -44,10 +43,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private static final AtomicIntegerFieldUpdater<PersistentMessageExpiryMonitor> expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
.newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress");

public PersistentMessageExpiryMonitor(String topicName, ManagedCursor cursor) {
public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor) {
this.topicName = topicName;
this.cursor = cursor;
this.subName = Codec.decode(cursor.getName());
this.subName = subscriptionName;
this.msgExpired = new Rate();
}

Expand Down
Expand Up @@ -46,6 +46,7 @@
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.client.impl.SendCallback;
import com.yahoo.pulsar.common.policies.data.ReplicatorStats;
import com.yahoo.pulsar.common.util.Codec;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
Expand Down Expand Up @@ -107,7 +108,7 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String
this.remoteCluster = remoteCluster;
this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.producer = null;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, cursor);
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
STATE_UPDATER.set(this, State.Stopped);
Expand Down
Expand Up @@ -51,7 +51,6 @@
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.policies.data.PersistentSubscriptionStats;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.utils.CopyOnWriteArrayList;

public class PersistentSubscription implements Subscription {
Expand All @@ -71,12 +70,12 @@ public class PersistentSubscription implements Subscription {
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;

public PersistentSubscription(PersistentTopic topic, ManagedCursor cursor) {
public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = Codec.decode(cursor.getName());
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, cursor);
this.subName = subscriptionName;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
IS_FENCED_UPDATER.set(this, FALSE);
}

Expand Down Expand Up @@ -131,6 +130,12 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
}
if (dispatcher.getConsumers().isEmpty()) {
deactivateCursor();

if (!cursor.isDurable()) {
// If cursor is not durable, we need to clean up the subscription as well
close();
topic.removeSubscription(subName);
}
}

// invalid consumer remove will throw an exception
Expand Down Expand Up @@ -607,6 +612,6 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List
public void markTopicWithBatchMessagePublished() {
topic.markBatchMessagePublished();
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}

0 comments on commit 7badf1a

Please sign in to comment.