Skip to content

Commit

Permalink
Replicated subscriptions - Advance remote cursor when local consumers…
Browse files Browse the repository at this point in the history
… moves ahead (#4396)

### Motivation

This is the 4th and last (implementation) change for pip-33. 

It includes reading and caching the last N snapshots and sending the updates to the other clusters.

Previous PRs: 
 1. #4299
 2. #4340
 3. #4354
  • Loading branch information
merlimat authored and sijie committed Jun 17, 2019
1 parent 1c51adc commit 7be1ee1
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 13 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ replicatedSubscriptionsSnapshotFrequencyMillis=1000
# Timeout for building a consistent snapshot for tracking replicated subscriptions state.
replicatedSubscriptionsSnapshotTimeoutSeconds=30

# Max number of snapshot to be cached per subscription.
replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Timeout for building a consistent snapshot for tracking replicated subscriptions state. ")
private int replicatedSubscriptionsSnapshotTimeoutSeconds = 30;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Max number of snapshot to be cached per subscription.")
private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;

/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
Expand Down Expand Up @@ -905,7 +910,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000;
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
)
private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import java.util.Collections;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;

@Slf4j
public abstract class AbstractBaseDispatcher implements Dispatcher {

protected final Subscription subscription;
Expand Down Expand Up @@ -61,20 +65,26 @@ protected AbstractBaseDispatcher(Subscription subscription) {
* @param subscription
* the subscription object
*/
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo) {
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo) {
int totalMessages = 0;
long totalBytes = 0;

for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
ByteBuf metadataAndPayload = entry.getDataBuffer();
PositionImpl pos = (PositionImpl) entry.getPosition();

MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);

try {
if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
}

entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Expand All @@ -100,4 +110,17 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
}

private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
// Remove the protobuf headers
Commands.skipMessageMetadata(headersAndPayload);

try {
ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
subscription.processReplicatedSubscriptionSnapshot(snapshot);
} catch (Throwable t) {
log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t);
return;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;

public interface Subscription {

Expand Down Expand Up @@ -90,6 +91,10 @@ default long getNumberOfEntriesDelayed() {

void addUnAckedMessages(int unAckMessages);

default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
// Default is no-op
}

// Subscription utils
static boolean isCumulativeAckMode(SubType subType) {
return SubType.Exclusive.equals(subType) || SubType.Failover.equals(subType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
Expand Down Expand Up @@ -84,7 +85,7 @@ public class PersistentSubscription implements Subscription {
private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<>();
private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();

private volatile boolean isReplicated;
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;

static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
Expand All @@ -105,7 +106,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.topicName = topic.getName();
this.subName = subscriptionName;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
this.isReplicated = replicated;
this.setReplicated(replicated);
IS_FENCED_UPDATER.set(this, FALSE);
}

Expand All @@ -121,11 +122,15 @@ public Topic getTopic() {

@Override
public boolean isReplicated() {
return isReplicated;
return replicatedSubscriptionSnapshotCache != null;
}

void setReplicated(boolean replicated) {
this.isReplicated = replicated;
this.replicatedSubscriptionSnapshotCache = replicated
? new ReplicatedSubscriptionSnapshotCache(subName,
topic.getBrokerService().pulsar().getConfiguration()
.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
: null;
}

@Override
Expand Down Expand Up @@ -217,6 +222,8 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {

@Override
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String,Long> properties) {
Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();

if (ackType == AckType.Cumulative) {
if (positions.size() != 1) {
log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids", topicName, subName);
Expand All @@ -236,6 +243,19 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
dispatcher.getRedeliveryTracker().removeBatch(positions);
}

if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
// Mark delete position advance
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
.advancedMarkDeletePosition((PositionImpl) cursor.getMarkDeletedPosition());
if (snapshot != null) {
topic.getReplicatedSubscriptionController()
.ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
}
}
}

if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) {
// Notify all consumer that the end of topic was reached
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
Expand Down Expand Up @@ -680,7 +700,7 @@ public SubscriptionStats getStats() {
}
subStats.msgBacklog = getNumberOfEntriesInBacklog();
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
subStats.isReplicated = isReplicated;
subStats.isReplicated = isReplicated();
return subStats;
}

Expand Down Expand Up @@ -728,7 +748,7 @@ void topicTerminated() {
* (eg. when using compaction subscription) and the subscription properties.
*/
protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
Map<String, Long> baseProperties = isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
Map<String, Long> baseProperties = isReplicated() ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
: NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;

if (userProperties.isEmpty()) {
Expand All @@ -743,5 +763,13 @@ protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperti

}

@Override
public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
snapshotCache.addNewSnapshot(snapshot);
}
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,10 @@ void receivedReplicatedSubscriptionMarker(Position position, int markerType, Byt
}

ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);;
}

Optional<ReplicatedSubscriptionsController> getReplicatedSubscriptionController() {
return replicatedSubscriptionsController;
}

public CompactedTopic getCompactedTopic() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import java.util.NavigableMap;
import java.util.TreeMap;

import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;

/**
* Store the last N snapshots that were scanned by a particular subscription
*/
@Slf4j
public class ReplicatedSubscriptionSnapshotCache {
private final String subscription;
private final NavigableMap<PositionImpl, ReplicatedSubscriptionsSnapshot> snapshots;
private final int maxSnapshotToCache;

public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache) {
this.subscription = subscription;
this.snapshots = new TreeMap<>();
this.maxSnapshotToCache = maxSnapshotToCache;
}

public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
MessageIdData msgId = snapshot.getLocalMessageId();
PositionImpl position = new PositionImpl(msgId.getLedgerId(), msgId.getEntryId());

if (log.isDebugEnabled()) {
log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position,
snapshot.getSnapshotId());
}

snapshots.put(position, snapshot);

// Prune the cache
while (snapshots.size() > maxSnapshotToCache) {
snapshots.pollFirstEntry();
}
}

/**
* Signal that the mark-delete position on the subscription has been advanced. If there is a snapshot that
* correspond to this position, it will returned, other it will return null.
*/
public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(PositionImpl pos) {
ReplicatedSubscriptionsSnapshot snapshot = null;
while (!snapshots.isEmpty()) {
PositionImpl first = snapshots.firstKey();
if (first.compareTo(pos) > 0) {
// Snapshot is associated which an higher position, so it cannot be used now
break;
} else {
// This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we
// can use
snapshot = snapshots.pollFirstEntry().getValue();
}
}

if (log.isDebugEnabled()) {
if (snapshot != null) {
log.debug("[{}] Advanced mark-delete position to {} -- found snapshot {} at {}:{}", subscription, pos,
snapshot.getSnapshotId(),
snapshot.getLocalMessageId().getLedgerId(),
snapshot.getLocalMessageId().getEntryId());
} else {
log.debug("[{}] Advanced mark-delete position to {} -- snapshot not found", subscription, pos);
}
}
return snapshot;
}
}
Loading

0 comments on commit 7be1ee1

Please sign in to comment.