Skip to content

Commit

Permalink
Fix and enhance increment coalescing.
Browse files Browse the repository at this point in the history
This closes OpenTSDB#41, whereby a too large number of increments that pile up
for a given counter could cause a Deferred chain to blow up.  We now
guarantee that this will not happen by keeping track of how many times
each counter gets incremented, and pro-actively flushing those that
reach the limit on the length of the Deferred chain.  This has a nice
side effect that the *really* busy counters will get flushed more often.
This tracking is done without any extra locking or synchronization and
only costs an extra 32 bits of memory per coalesced counter.

This enhances increment coalescing to support negative increment values
as well as values greater than Short.MAX_VALUE (65535).  There is still
a limit on how big the values can be, but the limit is much higher now
(2^48 - 1) and values greater than that are still gracefully handled.

The code is also more robust now in face of overflows / underflows, as
the previous implementation had one potential race condition, which
although it would have been incredibly rare, was still possible anyway.
We now explicitly check for overflows / underflows before we CAS the
value, so that when an overflow / underflow condition is detected we
abort the update gracefully and let the caller know so they can flush
the counter and create a new one to start coalescing again.

This change also adds extra integration tests for various corner cases.
  • Loading branch information
tsuna committed Nov 4, 2012
1 parent 83ae5b7 commit baed796
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 58 deletions.
174 changes: 137 additions & 37 deletions src/BufferedIncrement.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
package org.hbase.async;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
Expand All @@ -36,8 +36,6 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;

import org.slf4j.LoggerFactory;

import com.stumbleupon.async.Deferred;

/**
Expand Down Expand Up @@ -98,12 +96,124 @@ public String toString() {
return buf.toString();
}

/** Increment amount. */
static final class Amount extends AtomicInteger {
/**
* Atomic increment amount.
* <p>
* This behaves like a signed 49 bit atomic integer that
* can only be incremented/decremented a specific number
* of times. The {@link #update} method must be used
* for all increment/decrement operations, as the underlying
* raw value of the {@code long} from {@link AtomicLong} is
* not the value of the counter, as some of its bits are
* reserved to keep track of how many times the value
* got changed.
* <p>
* Implementation details:<br/>
* The first 49 most significant bits are the value of
* the amount (including 1 bit for the sign), the last
* 15 least significant bits are the number of times
* left that {@link #update} can be called.
* <p>
* Again, this class opportunistically inherits from
* {@link AtomicLong}, but <strong>don't call methods
* from the parent class directly</strong>.
*/
static final class Amount extends AtomicLong {

/** Number of least-significant bits (LSB) we reserve to track updates. */
private final static int UPDATE_BITS = 15;
/** Mask used to retrieve number of updates left (0x0000000000007FFFL). */
private final static long UPDATE_MASK = (1L << UPDATE_BITS) - 1;
/** Mask used to get value bits we can't store (0xFFFF000000000000L). */
private final static long OVERFLOW_MASK = (UPDATE_MASK << (64 - UPDATE_BITS)
>> 1); // Reserve the sign bit.

/** Everyone waiting for this increment is queued up here. */
final Deferred<Long> deferred = new Deferred<Long>();

/**
* Creates a new atomic amount.
* @param max_updates The maximum number of times {@link #update}
* can be called. Beyond this number of calls, the method will return
* {@code false}.
*/
Amount(final short max_updates) {
super(max_updates);
assert max_updates > 0 : "WTF: max_updates=" + max_updates;
}

/**
* Atomically updates this amount.
* @param delta The delta by which to increment the value.
* Of course, if the delta value is negative, the value will be
* decremented instead.
* @return {@code true} if the update could be done, {@code false} if
* it couldn't due to an overflow/underflow or due to reaching the
* maximum number of times this Amount could be incremented.
*/
final boolean update(final long delta) {
while (true) {
final long current = super.get();
final int updates = numUpdatesLeft(current);
if (updates == 0) {
return false; // Already too many increments.
}
final long new_amount = amount(current) + delta;
if (!checkOverflow(new_amount)) {
return false; // Overflow, new amount doesn't fit on 49 bits.
}
final long next = (new_amount << UPDATE_BITS) | (updates - 1);
if (super.compareAndSet(current, next)) {
return true;
}
// else: CAS failed, loop again.
}
}

/**
* Atomically sets the number of updates left to 0 and return the value.
* @return The raw value, not the amount by which to increment.
* To get the raw value, use {@link #amount} on the value returned.
*/
final long getRawAndInvalidate() {
while (true) {
final long current = super.get();
// Technically, we could leave the whole value set to 0 here, but
// to help when debugging with toString(), we restore the amount.
final long next = amount(current) << UPDATE_BITS;
if (super.compareAndSet(current, next)) { // => sets updates left to 0.
return current;
}
// else: CAS failed, loop again.
}
}

/** The amount by which we're going to increment the value in HBase. */
static final long amount(final long n) {
return n >> UPDATE_BITS;
}

/** The number of times left that this amount can be updated. */
static final int numUpdatesLeft(final long n) {
return (int) (n & UPDATE_MASK);
}

/**
* Returns {@code true} if the given value can fit without overflowing.
*/
static final boolean checkOverflow(final long value) {
// If the amount was positive, then the MSBs must remain 0. Any 1 in
// the MSBs would indicate that the value has become too big or has
// become negative due to an overflow. Similarly, if the amount was
// negative, then MSBs must remain 1.
final long masked = value & OVERFLOW_MASK;
return masked == 0 || masked == OVERFLOW_MASK;
}

public String toString() {
return "Amount(" + super.get() + ", " + deferred + ")";
final long n = super.get();
return "Amount(" + amount(n) + ", "
+ numUpdatesLeft(n) + " updates left, " + deferred + ')';
}

private static final long serialVersionUID = 1333868942;
Expand Down Expand Up @@ -141,9 +251,15 @@ public String toString() {
/** Creates new zero-Amount for new BufferedIncrements. */
static final class Loader extends CacheLoader<BufferedIncrement, Amount> {

/**
* Max number of increments/decrements per counter before we force-flush.
* This limit is comes from the max callback chain length on a Deferred.
*/
private static final short MAX_UPDATES = 16383;

@Override
public Amount load(final BufferedIncrement key) {
return new Amount();
return new Amount(MAX_UPDATES);
}

}
Expand All @@ -164,37 +280,21 @@ private static final class EvictionHandler
@Override
public void onRemoval(final RemovalNotification<BufferedIncrement, Amount> entry) {
final Amount amount = entry.getValue();
// This trick with MIN_VALUE is what makes this whole increment
// coalescing work completely without locks. The difficulty here is
// that another thread might be trying to increment the value at the
// same time we're trying to flush it. By making the value hugely
// negative, we allow the other thread to notice that after adding its
// own positive delta, the value has become negative, which is otherwise
// impossible. This makes the other thread realize that it lost the
// race with us and that it needs to retry. Upon retrying it will cause
// the insertion of a new entry in the cache.
// The consequences of this trick are twofold:
// - We can't support negative increments, since we give a special
// meaning to negative values.
// - We can't allow large positive increments that could cause the
// integer to wrap around. Since we the maximum increment amount
// we allow per call is Short.MAX_VALUE, it would take
// (2^31 - 1) / (2^15 - 1) = 65538 increments of Short.MAX_VALUE
// before a flush happens to cause an overflow. Even if this was to
// happen, when the increment code detects a negative value, it
// undoes its increment (which would cause an underflow back to a
// positive value).
final int delta = amount.getAndSet(Integer.MIN_VALUE);
final BufferedIncrement incr = entry.getKey();
if (delta >= 0) {
final AtomicIncrementRequest req =
new AtomicIncrementRequest(incr.table, incr.key, incr.family,
incr.qualifier, delta);
client.atomicIncrement(req).chain(amount.deferred);
} else {
LoggerFactory.getLogger(EvictionHandler.class).error("WTF? Should"
+ " never happen: negative delta " + delta + " in " + incr);
final long raw = amount.getRawAndInvalidate();
final long delta = Amount.amount(raw);
if (Amount.numUpdatesLeft(raw) == Loader.MAX_UPDATES) {
// This amount was never incremented, because the number of updates
// left is still the original number. Therefore this is an Amount
// that has been evicted before anyone could attach any update to
// it, so the delta must be 0, and we don't need to send this RPC.
assert delta == 0 : "WTF? Pristine Amount with non-0 delta: " + amount;
return;
}
final BufferedIncrement incr = entry.getKey();
final AtomicIncrementRequest req =
new AtomicIncrementRequest(incr.table, incr.key, incr.family,
incr.qualifier, delta);
client.atomicIncrement(req).chain(amount.deferred);
}

}
Expand Down
28 changes: 8 additions & 20 deletions src/HBaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1086,38 +1086,30 @@ public Deferred<Long> atomicIncrement(final AtomicIncrementRequest request) {
* by {@link #getFlushInterval} in order to allow the client to coalesce
* increments.
* <p>
* <strong>Node: This method only works with positive increments.</strong>
* Also note that if the amount is greater than or equal to
* {@link Short#MAX_VALUE}, the increment cannot be buffered and will be
* sent directly.
* <p>
* Increment coalescing can dramatically reduce the number of RPCs and write
* load on HBase if you tend to increment multiple times the same working
* set of counters. This is very common in user-facing serving systems that
* use HBase counters to keep track of user actions.
* <p>
* If client-side buffering is disabled ({@link #getFlushInterval} returns
* 0) then this function has the same effect as calling
* {@link #atomicIncrement(AtomicIncrementRequest)}.
* {@link #atomicIncrement(AtomicIncrementRequest)} directly.
* @param request The increment request.
* @return The deferred {@code long} value that results from the increment.
* @throws IllegalArgumentException if {@code request.getAmount() < 0}
* @since 1.3
* @since 1.4 This method works with negative increment values.
*/
public Deferred<Long> bufferAtomicIncrement(final AtomicIncrementRequest request) {
final long value = request.getAmount();
if (value < 0) {
throw new IllegalArgumentException("Cannot buffer atomic increment with"
+ " negative amount: " + request);
} else if (value >= Short.MAX_VALUE // Value to large to safely coalesce.
|| flush_interval == 0) { // Client-side buffer disabled.
if (!BufferedIncrement.Amount.checkOverflow(value) // Value too large.
|| flush_interval == 0) { // Client-side buffer disabled.
return atomicIncrement(request);
}

final BufferedIncrement incr =
new BufferedIncrement(request.table(), request.key(), request.family(),
request.qualifier());
final short delta = (short) value;

do {
BufferedIncrement.Amount amount;
// Semi-evil: the very first time we get here, `increment_buffer' will
Expand All @@ -1130,17 +1122,13 @@ public Deferred<Long> bufferAtomicIncrement(final AtomicIncrementRequest request
setupIncrementCoalescing();
amount = increment_buffer.getUnchecked(incr);
}
if (amount.addAndGet(delta) < 0) {
// Race condition. We got something out of the buffer, but in the mean
// time another thread picked it up and decided to send it to HBase. So
// we need to retry, which will create a new entry in the buffer.
amount.addAndGet(-delta); // Undo our previous addAndGet.
// Loop again to retry.
} else {
if (amount.update(value)) {
final Deferred<Long> deferred = new Deferred<Long>();
amount.deferred.chain(deferred);
return deferred;
}
// else: Loop again to retry.
increment_buffer.refresh(incr);
} while (true);
}

Expand Down
Loading

0 comments on commit baed796

Please sign in to comment.