Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the reader skips compacted data which original ledger been removed #12522

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand All @@ -34,7 +33,7 @@
import org.slf4j.LoggerFactory;

public class CompactorSubscription extends PersistentSubscription {
private CompactedTopic compactedTopic;
private final CompactedTopic compactedTopic;

public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
String subscriptionName, ManagedCursor cursor) {
Expand Down Expand Up @@ -65,35 +64,42 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cumulative ack on compactor subscription {}", topicName, subName, position);
}
CompletableFuture<Void> future = new CompletableFuture<>();
cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() {

// The newCompactedLedger must be called at the first step because we need to ensure the reader can read
// complete data from compacted Ledger, otherwise, if the original ledger been deleted the reader cursor
// might move to a subsequent original ledger if `compactionHorizon` have not updated, this will lead to
// the reader skips compacted data at that time, after the `compactionHorizon` updated, the reader able
// to read the complete compacted data again.
// And we can only delete the previous ledger after the mark delete succeed, otherwise we will loss the
// compacted data if mark delete failed.
compactedTopic.newCompactedLedger(position, compactedLedgerId).thenAccept(previousContext -> {
cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Mark deleted messages until position on compactor subscription {}",
topicName, subName, position);
topicName, subName, position);
}
if (previousContext != null) {
compactedTopic.deleteCompactedLedger(previousContext.getLedger().getId());
}
future.complete(null);
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
// TODO: cut consumer connection on markDeleteFailed
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to mark delete for position on compactor subscription {}",
topicName, subName, ctx, exception);
topicName, subName, ctx, exception);
}
}
}, null);
});

if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
}

// Once properties have been persisted, we can notify the compacted topic to use
// the new ledger
future.thenAccept((v) -> compactedTopic.newCompactedLedger(position, compactedLedgerId));
}

private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import org.apache.pulsar.broker.service.Consumer;

public interface CompactedTopic {
CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId);
CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId);
CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
boolean isFirstRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
Expand Down Expand Up @@ -64,23 +65,24 @@ public CompactedTopicImpl(BookKeeper bk) {
}

@Override
public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId) {
public CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId) {
synchronized (this) {
compactionHorizon = (PositionImpl) p;

CompletableFuture<CompactedTopicContext> previousContext = compactedTopicContext;
compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);

// delete the ledger from the old context once the new one is open
if (previousContext != null) {
return compactedTopicContext.thenCompose((res) -> previousContext)
.thenCompose((res) -> tryDeleteCompactedLedger(bk, res.ledger.getId()));
} else {
return compactedTopicContext;
}
return compactedTopicContext.thenCompose(__ ->
previousContext != null ? previousContext : CompletableFuture.completedFuture(null));
}
}

@Override
public CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId) {
return tryDeleteCompactedLedger(bk, compactedLedgerId);
}

@Override
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
Expand Down Expand Up @@ -303,6 +305,11 @@ private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m)
.compare(p.getLedgerId(), m.getLedgerId())
.compare(p.getEntryId(), m.getEntryId()).result();
}

@VisibleForTesting
PositionImpl getCompactionHorizon() {
return this.compactionHorizon;
}
private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -123,12 +124,10 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
Expand Down Expand Up @@ -1834,6 +1833,8 @@ public void testClosingReplicationProducerTwice() throws Exception {
public void testCompactorSubscription() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
CompactedTopic compactedTopic = mock(CompactedTopic.class);
when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
.thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicContext.class)));
PersistentSubscription sub = new CompactorSubscription(topic, compactedTopic,
Compactor.COMPACTION_SUBSCRIPTION,
cursorMock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,19 @@ public void testCleanupOldCompactedTopicLedger() throws Exception {
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();

// update the compacted topic ledger
compactedTopic.newCompactedLedger(new PositionImpl(1,2), newCompactedLedger.getId()).get();
PositionImpl newHorizon = new PositionImpl(1,3);
compactedTopic.newCompactedLedger(newHorizon, newCompactedLedger.getId()).get();

// Make sure the old compacted ledger still exist after the new compacted ledger created.
bk.openLedger(oldCompactedLedger.getId(),
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();

Assert.assertTrue(compactedTopic.getCompactedTopicContext().isPresent());
Assert.assertEquals(compactedTopic.getCompactedTopicContext().get().getLedger().getId(),
newCompactedLedger.getId());
Assert.assertEquals(compactedTopic.getCompactionHorizon(), newHorizon);
compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join();

// old ledger should be deleted, new still there
try {
Expand Down