diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 2ed3c26c3e4d..6019bdcfb48c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -204,4 +205,70 @@ default CompletableFuture incrementColumnValue(byte[] row, byte[] family, new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); } + + /** + * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it + * adds the put. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param put data to put if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + default CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) { + return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * adds the put. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param put data to put if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put); + + /** + * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it + * adds the delete. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param delete data to delete if check succeeds + * @return true if the new delete was executed, false otherwise. The return value will be wrapped + * by a {@link CompletableFuture}. + */ + default CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) { + return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * adds the delete. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param delete data to delete if check succeeds + * @return true if the new delete was executed, false otherwise. The return value will be wrapped + * by a {@link CompletableFuture}. + */ + CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 89f798c3edb5..b7dc38877c14 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; /** * The implementation of AsyncTable. @@ -151,12 +154,16 @@ private CompletableFuture noncedMutate(HBaseRpcController cont (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); } - private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { - return conn.callerFactory. single().table(tableName).row(row.getRow()) + private SingleRequestCallerBuilder newCaller(byte[] row, long rpcTimeoutNs) { + return conn.callerFactory. single().table(tableName).row(row) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS); } + private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { + return newCaller(row.getRow(), rpcTimeoutNs); + } + @Override public CompletableFuture get(Get get) { return this. newCaller(get, readRpcTimeoutNs) @@ -201,6 +208,30 @@ public CompletableFuture increment(Increment increment) { .call(); } + @Override + public CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) { + return this. newCaller(row, writeRpcTimeoutNs) + .action((controller, loc, stub) -> AsyncTableImpl. mutate(controller, loc, + stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p), + (c, r) -> r.getProcessed())) + .call(); + } + + @Override + public CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) { + return this. newCaller(row, writeRpcTimeoutNs) + .action((controller, loc, stub) -> AsyncTableImpl. mutate(controller, loc, + stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d), + (c, r) -> r.getProcessed())) + .call(); + } + @Override public void setReadRpcTimeout(long timeout, TimeUnit unit) { this.readRpcTimeoutNs = unit.toNanos(timeout); @@ -230,4 +261,5 @@ public void setOperationTimeout(long timeout, TimeUnit unit) { public long getOperationTimeout(TimeUnit unit) { return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); } + } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 41002cb595bd..8b3ab622d13a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; @@ -184,4 +185,61 @@ public void testAppend() throws InterruptedException, ExecutionException { .sorted().toArray(); assertArrayEquals(IntStream.range(0, count).toArray(), actual); } + + @Test + public void testCheckAndPut() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger successIndex = new AtomicInteger(-1); + int count = 10; + CountDownLatch latch = new CountDownLatch(count); + IntStream.range(0, count).forEach(i -> table.checkAndPut(row, FAMILY, QUALIFIER, null, + new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + latch.countDown(); + })); + latch.await(); + assertEquals(1, successCount.get()); + String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); + assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); + } + + @Test + public void testCheckAndDelete() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 10; + CountDownLatch putLatch = new CountDownLatch(count + 1); + table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); + IntStream.range(0, count) + .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) + .thenRun(() -> putLatch.countDown())); + putLatch.await(); + + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger successIndex = new AtomicInteger(-1); + CountDownLatch deleteLatch = new CountDownLatch(count); + IntStream.range(0, count).forEach(i -> table + .checkAndDelete(row, FAMILY, QUALIFIER, VALUE, + new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + deleteLatch.countDown(); + })); + deleteLatch.await(); + assertEquals(1, successCount.get()); + Result result = table.get(new Get(row)).get(); + IntStream.range(0, count).forEach(i -> { + if (i == successIndex.get()) { + assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); + } else { + assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); + } + }); + } }