Skip to content

Commit

Permalink
Added multiple position delete in ManagedLedger (#1450)
Browse files Browse the repository at this point in the history
* Added multiple position delete in ManagedLedger

* Removed ref to old bug and readded checkNotNull
  • Loading branch information
merlimat committed Mar 30, 2018
1 parent dd29630 commit 73a214a
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 41 deletions.
Expand Up @@ -265,6 +265,43 @@ void markDelete(Position position, Map<String, Long> properties)
*/
void asyncDelete(Position position, DeleteCallback callback, Object ctx);


/**
* Delete a group of entries.
*
* <p/>
* Mark multiple single messages for deletion. When all the previous messages are all deleted, then markDelete()
* will be called internally to advance the persistent acknowledged position.
*
* <p/>
* The deletion of the message is not persisted into the durable storage and cannot be recovered upon the reopening
* of the ManagedLedger
*
* @param positions
* positions of the messages to be deleted
*/
void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException;

/**
* Delete a group of messages asynchronously
*
* <p/>
* Mark a group of messages for deletion. When all the previous messages are all deleted, then markDelete() will be
* called internally to advance the persistent acknowledged position.
*
* <p/>
* The deletion of the messages is not persisted into the durable storage and cannot be recovered upon the reopening
* of the ManagedLedger
*
* @param positions
* the positions of the messages to be deleted
* @param callback
* callback object
* @param ctx
* opaque context
*/
void asyncDelete(Iterable<Position> position, DeleteCallback callback, Object ctx);

/**
* Get the read position. This points to the next message to be read from the cursor.
*
Expand Down
Expand Up @@ -1447,8 +1447,17 @@ public void operationFailed(ManagedLedgerException exception) {

@Override
public void delete(final Position position) throws InterruptedException, ManagedLedgerException {
checkNotNull(position);
checkArgument(position instanceof PositionImpl);
delete(Collections.singletonList(position));
}

@Override
public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callback, Object ctx) {
asyncDelete(Collections.singletonList(pos), callback, ctx);
}

@Override
public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
checkNotNull(positions);

class Result {
ManagedLedgerException exception = null;
Expand All @@ -1458,12 +1467,12 @@ class Result {
final CountDownLatch counter = new CountDownLatch(1);
final AtomicBoolean timeout = new AtomicBoolean(false);

asyncDelete(position, new AsyncCallbacks.DeleteCallback() {
asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
if (timeout.get()) {
log.warn("[{}] [{}] Delete operation timeout. Callback deleteComplete at position {}",
ledger.getName(), name, position);
ledger.getName(), name, positions);
}

counter.countDown();
Expand All @@ -1475,7 +1484,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {

if (timeout.get()) {
log.warn("[{}] [{}] Delete operation timeout. Callback deleteFailed at position {}",
ledger.getName(), name, position);
ledger.getName(), name, positions);
}

counter.countDown();
Expand All @@ -1485,7 +1494,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
timeout.set(true);
log.warn("[{}] [{}] Delete operation timeout. No callback was triggered at position {}", ledger.getName(),
name, position);
name, positions);
throw new ManagedLedgerException("Timeout during delete operation");
}

Expand All @@ -1494,71 +1503,66 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
}
}

@Override
public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callback, Object ctx) {
checkArgument(pos instanceof PositionImpl);

if (STATE_UPDATER.get(this) == State.Closed) {
@Override
public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallback callback, Object ctx) {
if (state == State.Closed) {
callback.deleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
}

PositionImpl position = (PositionImpl) pos;

PositionImpl previousPosition = ledger.getPreviousPosition(position);
PositionImpl newMarkDeletePosition = null;

lock.writeLock().lock();

try {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deleting single message at {}. "
+ "Current status: {} - md-position: {} - previous-position: {}",
ledger.getName(), name, pos, individualDeletedMessages, markDeletePosition, previousPosition);
log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
ledger.getName(), name, positions, individualDeletedMessages, markDeletePosition);
}

if (individualDeletedMessages.contains(position) || position.compareTo(markDeletePosition) <= 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
}
callback.deleteComplete(ctx);
return;
}
for (Position pos : positions) {
PositionImpl position = (PositionImpl) checkNotNull(pos);

if (previousPosition.compareTo(markDeletePosition) == 0 && individualDeletedMessages.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Immediately mark-delete to position {}", ledger.getName(), name, position);
if (individualDeletedMessages.contains(position) || position.compareTo(markDeletePosition) <= 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
}
continue;
}

newMarkDeletePosition = position;
} else {
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
// the RangeSet recognize the "continuity" between adjacent Positions
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.add(Range.openClosed(previousPosition, position));
++messagesConsumedCounter;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
individualDeletedMessages);
}
}

// If the lower bound of the range set is the current mark delete position, then we can trigger a new
// mark
// delete to the upper bound of the first range segment
Range<PositionImpl> range = individualDeletedMessages.asRanges().iterator().next();
if (individualDeletedMessages.isEmpty()) {
// No changes to individually deleted messages, so nothing to do at this point
callback.deleteComplete(ctx);
return;
}

// Bug:7062188 - markDeletePosition can sometimes be stuck at the beginning of an empty ledger.
// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
// If the lower bound of the range set is the current mark delete position, then we can trigger a new
// mark-delete to the upper bound of the first range segment
Range<PositionImpl> range = individualDeletedMessages.asRanges().iterator().next();

if (log.isDebugEnabled()) {
log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
name, range);
}
// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {

newMarkDeletePosition = range.upperEndpoint();
if (log.isDebugEnabled()) {
log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
name, range);
}

newMarkDeletePosition = range.upperEndpoint();
}

if (newMarkDeletePosition != null) {
Expand Down
Expand Up @@ -155,6 +155,14 @@ public void delete(Position position) throws InterruptedException, ManagedLedger
public void asyncDelete(Position position, DeleteCallback callback, Object ctx) {
}

@Override
public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
}

@Override
public void asyncDelete(Iterable<Position> position, DeleteCallback callback, Object ctx) {
}

@Override
public void clearBacklog() throws InterruptedException, ManagedLedgerException {
}
Expand Down
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import static org.testng.Assert.assertEquals;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;

import java.nio.charset.Charset;

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.annotations.Test;

public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {

private static final Charset Encoding = Charsets.UTF_8;

@Test(timeOut = 20000)
void testMultiPositionDelete() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));

ManagedCursor c1 = ledger.openCursor("c1");
Position p0 = c1.getMarkDeletedPosition();
Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
Position p5 = ledger.addEntry("dummy-entry-5".getBytes(Encoding));
Position p6 = ledger.addEntry("dummy-entry-6".getBytes(Encoding));
Position p7 = ledger.addEntry("dummy-entry-7".getBytes(Encoding));

assertEquals(c1.getNumberOfEntries(), 7);
assertEquals(c1.getNumberOfEntriesInBacklog(), 7);

c1.delete(Lists.newArrayList(p2, p3, p5, p7));

assertEquals(c1.getNumberOfEntries(), 3);
assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
assertEquals(c1.getMarkDeletedPosition(), p0);

c1.delete(Lists.newArrayList(p1));

assertEquals(c1.getNumberOfEntries(), 2);
assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
assertEquals(c1.getMarkDeletedPosition(), p3);

c1.delete(Lists.newArrayList(p4, p6, p7));

assertEquals(c1.getNumberOfEntries(), 0);
assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
assertEquals(c1.getMarkDeletedPosition(), p7);
}

}

0 comments on commit 73a214a

Please sign in to comment.