Skip to content

Commit

Permalink
HBASE-16837 Implement checkAndPut and checkAndDelete
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Oct 19, 2016
1 parent ef8c65e commit acc6065
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 2 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;

Expand Down Expand Up @@ -204,4 +205,70 @@ default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family,
new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
.thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
}

/**
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
* adds the put. If the passed value is null, the check is for the lack of column (ie:
* non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param value the expected value
* @param put data to put if check succeeds
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
* a {@link CompletableFuture}.
*/
default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) {
return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put);
}

/**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* adds the put. If the passed value is null, the check is for the lack of column (ie:
* non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param compareOp comparison operator to use
* @param value the expected value
* @param put data to put if check succeeds
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
* a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Put put);

/**
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
* non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param value the expected value
* @param delete data to delete if check succeeds
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
* by a {@link CompletableFuture}.
*/
default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Delete delete) {
return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete);
}

/**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
* non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param compareOp comparison operator to use
* @param value the expected value
* @param delete data to delete if check succeeds
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
* by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Delete delete);
}
Expand Up @@ -28,6 +28,8 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
Expand All @@ -37,6 +39,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;

/**
* The implementation of AsyncTable.
Expand Down Expand Up @@ -151,12 +154,16 @@ private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController cont
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
}

private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row.getRow())
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
}

private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
return newCaller(row.getRow(), rpcTimeoutNs);
}

@Override
public CompletableFuture<Result> get(Get get) {
return this.<Result> newCaller(get, readRpcTimeoutNs)
Expand Down Expand Up @@ -201,6 +208,30 @@ public CompletableFuture<Result> increment(Increment increment) {
.call();
}

@Override
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Put put) {
return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
.action((controller, loc, stub) -> AsyncTableImpl.<Put, Boolean> mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p),
(c, r) -> r.getProcessed()))
.call();
}

@Override
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Delete delete) {
return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
.action((controller, loc, stub) -> AsyncTableImpl.<Delete, Boolean> mutate(controller, loc,
stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d),
(c, r) -> r.getProcessed()))
.call();
}

@Override
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
this.readRpcTimeoutNs = unit.toNanos(timeout);
Expand Down Expand Up @@ -230,4 +261,5 @@ public void setOperationTimeout(long timeout, TimeUnit unit) {
public long getOperationTimeout(TimeUnit unit) {
return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
}

}
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -184,4 +185,61 @@ public void testAppend() throws InterruptedException, ExecutionException {
.sorted().toArray();
assertArrayEquals(IntStream.range(0, count).toArray(), actual);
}

@Test
public void testCheckAndPut() throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
int count = 10;
CountDownLatch latch = new CountDownLatch(count);
IntStream.range(0, count).forEach(i -> table.checkAndPut(row, FAMILY, QUALIFIER, null,
new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
latch.countDown();
}));
latch.await();
assertEquals(1, successCount.get());
String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
}

@Test
public void testCheckAndDelete() throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();

AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch deleteLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(i -> table
.checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
deleteLatch.countDown();
}));
deleteLatch.await();
assertEquals(1, successCount.get());
Result result = table.get(new Get(row)).get();
IntStream.range(0, count).forEach(i -> {
if (i == successIndex.get()) {
assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
} else {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
}
});
}
}

0 comments on commit acc6065

Please sign in to comment.