Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicated subscriptions - Configuration and client changes #4299

Merged
merged 4 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ protected void internalResetCursor(String subName, long timestamp, boolean autho
}
}

protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative) {
protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative, boolean replicated) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
Expand Down Expand Up @@ -968,7 +968,7 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl
}

PersistentSubscription subscription = (PersistentSubscription) topic
.createSubscription(subscriptionName, InitialPosition.Latest).get();
.createSubscription(subscriptionName, InitialPosition.Latest, replicated).get();
// Mark the cursor as "inactive" as it was created without a real consumer connected
subscription.deactivateCursor();
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("pro
validateTopicName(property, cluster, namespace, encodedTopic);
return internalGetPartitionedStatsInternal(authoritative);
}

@DELETE
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
Expand Down Expand Up @@ -397,9 +397,9 @@ public void resetCursorOnPosition(@PathParam("property") String property, @PathP
public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic,
@PathParam("subscriptionName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) {
validateTopicName(property, cluster, namespace, topic);
internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void createPartitionedTopic(@PathParam("tenant") String tenant, @PathPara
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createNonPartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateGlobalNamespaceOwnership(tenant,namespace);
validateTopicName(tenant, namespace, encodedTopic);
Expand Down Expand Up @@ -316,7 +316,7 @@ public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("ten
validateTopicName(tenant, namespace, encodedTopic);
return internalGetPartitionedStatsInternal(authoritative);
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
Expand Down Expand Up @@ -390,9 +390,9 @@ public void expireMessagesForAllSubscriptions(@PathParam("tenant") String tenant
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
public void createSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) {
validateTopicName(tenant, namespace, topic);
internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
final InitialPosition initialPosition = subscribe.getInitialPosition();
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();

CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
Expand Down Expand Up @@ -685,7 +686,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition);
readCompacted, initialPosition, isReplicated);
} else {
return FutureUtil.failedFuture(
new IncompatibleSchemaException(
Expand All @@ -696,7 +697,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition);
startMessageId, metadata, readCompacted, initialPosition,
isReplicated);
}
})
.thenAccept(consumer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public interface Subscription {

String getTopicName();

boolean isReplicated();

Dispatcher getDispatcher();

long getNumberOfEntriesInBacklog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,11 @@ default long getOriginalSequenceId() {

CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition);
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
boolean replicateSubscriptionState);

CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition);
CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState);

CompletableFuture<Void> unsubscribe(String subName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public Topic getTopic() {
return topic;
}

@Override
public boolean isReplicated() {
return false;
}

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (IS_FENCED_UPDATER.get(this) == TRUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void addProducer(Producer producer) throws BrokerServiceException {
lock.readLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());

if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic", topic);
throw new TopicFencedException("Topic is temporarily unavailable");
Expand Down Expand Up @@ -315,10 +315,11 @@ public void removeProducer(Producer producer) {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
boolean replicateSubscriptionState) {

final CompletableFuture<Consumer> future = new CompletableFuture<>();

try {
brokerService.checkTopicNsOwnership(getName());
} catch (Exception e) {
Expand Down Expand Up @@ -396,7 +397,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
}

@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class CompactorSubscription extends PersistentSubscription {

public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
String subscriptionName, ManagedCursor cursor) {
super(topic, subscriptionName, cursor);
super(topic, subscriptionName, cursor, false);
checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
this.compactedTopic = compactedTopic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@

import com.google.common.base.MoreObjects;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
Expand All @@ -40,7 +41,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
Expand Down Expand Up @@ -76,12 +76,36 @@ public class PersistentSubscription implements Subscription {
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor) {
private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";

// Map of properties that is used to mark this subscription as "replicated".
// Since this is the only field at this point, we can just keep a static
// instance of the map.
private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<>();
private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();

private volatile boolean isReplicated;

static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}

static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
this.isReplicated = replicated;
IS_FENCED_UPDATER.set(this, FALSE);
}

Expand All @@ -95,6 +119,15 @@ public Topic getTopic() {
return topic;
}

@Override
public boolean isReplicated() {
return isReplicated;
}

void setReplicated(boolean replicated) {
this.isReplicated = replicated;
}

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
cursor.updateLastActive();
Expand Down Expand Up @@ -194,7 +227,7 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
}
cursor.asyncMarkDelete(position, properties, markDeleteCallback, position);
cursor.asyncMarkDelete(position, mergeCursorProperties(properties), markDeleteCallback, position);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
Expand Down Expand Up @@ -647,7 +680,7 @@ public SubscriptionStats getStats() {
}
subStats.msgBacklog = getNumberOfEntriesInBacklog();
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();

subStats.isReplicated = isReplicated;
return subStats;
}

Expand Down Expand Up @@ -681,5 +714,25 @@ void topicTerminated() {
}
}

/**
* Return a merged map that contains the cursor properties specified by used
* (eg. when using compaction subscription) and the subscription properties.
*/
protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
Map<String, Long> baseProperties = isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
: NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;

if (userProperties.isEmpty()) {
// Use only the static instance in the common case
return baseProperties;
} else {
Map<String, Long> merged = new TreeMap<>();
merged.putAll(userProperties);
merged.putAll(baseProperties);
return merged;
}

}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Loading