Skip to content

Commit

Permalink
Persist individually deleted messages (#276)
Browse files Browse the repository at this point in the history
PR from @sschepens  + tests and few fixes from @merlimat
  • Loading branch information
merlimat authored Mar 9, 2017
1 parent fbdbddd commit 83605f6
Show file tree
Hide file tree
Showing 11 changed files with 2,156 additions and 327 deletions.
9 changes: 9 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ managedLedgerCursorMaxEntriesPerLedger=50000
# Max time before triggering a rollover on a cursor ledger
managedLedgerCursorRolloverTimeInSeconds=14400

# Max number of "acknowledgment holes" that are going to be persistently stored.
# When acknowledging out of order, a consumer will leave holes that are supposed
# to be quickly filled by acking all the messages. The information of which
# messages are acknowledged is persisted by compressing in "ranges" of messages
# that were acknowledged. After the max number of ranges is reached, the information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
managedLedgerMaxUnackedRangesToPersist=1000



### --- Load balancer --- ###
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
@Beta
public class ManagedLedgerConfig {

private int maxUnackedRangesToPersist = 1000;
private int maxEntriesPerLedger = 50000;
private int maxSizePerLedgerMb = 100;
private int minimumRolloverTimeMs = 0;
Expand Down Expand Up @@ -347,4 +348,21 @@ public ManagedLedgerConfig setRetentionSizeInMB(long retentionSizeInMB) {
public long getRetentionSizeInMB() {
return retentionSizeInMB;
}

/**
* @return max unacked message ranges that will be persisted and recovered.
*
*/
public int getMaxUnackedRangesToPersist() {
return maxUnackedRangesToPersist;
}

/**
* @param maxUnackedRangesToPersist
* max unacked message ranges that will be persisted and receverd.
*/
public ManagedLedgerConfig setMaxUnackedRangesToPersist(int maxUnackedRangesToPersist) {
this.maxUnackedRangesToPersist = maxUnackedRangesToPersist;
return this;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public void operationComplete() {

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Recovery for cursor {} failed", name, cursorName);
log.warn("[{}] Recovery for cursor {} failed", name, cursorName, exception);
cursorCount.set(-1);
callback.initializeFailed(exception);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.RangeSet;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;

import com.google.common.base.Objects;
import com.google.common.collect.ComparisonChain;
Expand All @@ -39,6 +42,12 @@ public PositionImpl(PositionInfo pi) {
this.recyclerHandle = null;
}

public PositionImpl(NestedPositionInfo npi) {
this.ledgerId = npi.getLedgerId();
this.entryId = npi.getEntryId();
this.recyclerHandle = null;
}

public PositionImpl(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
this.entryId = entryId;
Expand Down
Loading

0 comments on commit 83605f6

Please sign in to comment.