Skip to content

Commit

Permalink
HBASE-25242 Add Increment/Append support to RowMutations
Browse files Browse the repository at this point in the history
  • Loading branch information
brfrn169 committed Nov 7, 2020
1 parent 671129d commit d086fea
Show file tree
Hide file tree
Showing 27 changed files with 1,005 additions and 315 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ default CompletableFuture<List<CheckAndMutateResult>> checkAndMutateAll(
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.
* @param mutation object that specifies the set of mutations to perform atomically
* @return A {@link CompletableFuture} that always returns null when complete normally.
* @return A {@link CompletableFuture} that returns results of Increment/Append operations
*/
CompletableFuture<Void> mutateRow(RowMutations mutation);
CompletableFuture<Result> mutateRow(RowMutations mutation);

/**
* The scan API uses the observer pattern.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate(
}

@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
public CompletableFuture<Result> mutateRow(RowMutations mutation) {
return wrap(rawTable.mutateRow(mutation));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -60,7 +53,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class CheckAndMutate extends Mutation {
public final class CheckAndMutate implements Row {

/**
* A builder class for building a CheckAndMutate object.
Expand Down Expand Up @@ -202,15 +195,15 @@ public CheckAndMutate build(Append append) {
}

/**
* @param mutation mutations to perform if check succeeds
* @param mutations mutations to perform if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(RowMutations mutation) {
preCheck(mutation);
public CheckAndMutate build(RowMutations mutations) {
preCheck(mutations);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, mutation);
return new CheckAndMutate(row, filter, timeRange, mutations);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation);
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutations);
}
}
}
Expand All @@ -225,6 +218,7 @@ public static Builder newBuilder(byte[] row) {
return new Builder(row);
}

private final byte[] row;
private final byte[] family;
private final byte[] qualifier;
private final CompareOperator op;
Expand All @@ -235,7 +229,7 @@ public static Builder newBuilder(byte[] row) {

private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final CompareOperator op,
byte[] value, TimeRange timeRange, Row action) {
super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
this.row = row;
this.family = family;
this.qualifier = qualifier;
this.op = op;
Expand All @@ -246,7 +240,7 @@ private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final Compare
}

private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action) {
super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
this.row = row;
this.family = null;
this.qualifier = null;
this.op = null;
Expand All @@ -256,6 +250,14 @@ private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row actio
this.action = action;
}

/**
* @return the row
*/
@Override
public byte[] getRow() {
return row;
}

/**
* @return the family to check
*/
Expand Down Expand Up @@ -311,84 +313,4 @@ public TimeRange getTimeRange() {
public Row getAction() {
return action;
}

@Override
public NavigableMap<byte[], List<Cell>> getFamilyCellMap() {
if (action instanceof Mutation) {
return ((Mutation) action).getFamilyCellMap();
}
throw new UnsupportedOperationException();
}

@Override
public CellBuilder getCellBuilder(CellBuilderType cellBuilderType) {
if (action instanceof Mutation) {
return ((Mutation) action).getCellBuilder();
}
throw new UnsupportedOperationException();
}

@Override
public long getTimestamp() {
if (action instanceof Mutation) {
return ((Mutation) action).getTimestamp();
}
throw new UnsupportedOperationException();
}

@Override
public Mutation setTimestamp(long timestamp) {
if (action instanceof Mutation) {
return ((Mutation) action).setTimestamp(timestamp);
}
throw new UnsupportedOperationException();
}

@Override
public Durability getDurability() {
if (action instanceof Mutation) {
return ((Mutation) action).getDurability();
}
throw new UnsupportedOperationException();
}

@Override
public Mutation setDurability(Durability d) {
if (action instanceof Mutation) {
return ((Mutation) action).setDurability(d);
}
throw new UnsupportedOperationException();
}

@Override
public byte[] getAttribute(String name) {
if (action instanceof Mutation) {
return ((Mutation) action).getAttribute(name);
}
throw new UnsupportedOperationException();
}

@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
if (action instanceof Mutation) {
return ((Mutation) action).setAttribute(name, value);
}
throw new UnsupportedOperationException();
}

@Override
public int getPriority() {
if (action instanceof Mutation) {
return ((Mutation) action).getPriority();
}
return ((RowMutations) action).getMaxPriority();
}

@Override
public OperationWithAttributes setPriority(int priority) {
if (action instanceof Mutation) {
return ((Mutation) action).setPriority(priority);
}
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -513,17 +513,17 @@ public void run(MultiResponse resp) {
}

@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
.action((controller, loc, stub) ->
this.<Result, Void> mutateRow(controller, loc, stub, mutation,
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
writeRpcTimeoutNs).action((controller, loc, stub) ->
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
(rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter
.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
.build();
}, resp -> null))
}, resp -> resp))
.call();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

/**
* Performs multiple mutations atomically on a single row.
* Currently {@link Put} and {@link Delete} are supported.
*
* The mutations are performed in the order in which they
* were added.
Expand Down Expand Up @@ -75,8 +74,6 @@ public RowMutations(byte [] row, int initialCapacity) {
}

/**
* Currently only supports {@link Put} and {@link Delete} mutations.
*
* @param mutation The data to send.
* @throws IOException if the row of added mutation doesn't match the original row
*/
Expand All @@ -85,15 +82,13 @@ public RowMutations add(Mutation mutation) throws IOException {
}

/**
* Currently only supports {@link Put} and {@link Delete} mutations.
*
* @param mutations The data to send.
* @throws IOException if the row of added mutation doesn't match the original row
*/
public RowMutations add(List<? extends Mutation> mutations) throws IOException {
for (Mutation mutation : mutations) {
if (!Bytes.equals(row, mutation.getRow())) {
throw new WrongRowIOException("The row in the recently added Put/Delete <" +
throw new WrongRowIOException("The row in the recently added Mutation <" +
Bytes.toStringBinary(mutation.getRow()) + "> doesn't match the original one <" +
Bytes.toStringBinary(this.row) + ">");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,10 @@ default List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndM
* {@link Put} and {@link Delete} are supported.
*
* @param rm object that specifies the set of mutations to perform atomically
* @return results of Increment/Append operations
* @throws IOException
*/
default void mutateRow(final RowMutations rm) throws IOException {
default Result mutateRow(final RowMutations rm) throws IOException {
throw new NotImplementedException("Add an implementation!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMu
}

@Override
public void mutateRow(RowMutations rm) throws IOException {
FutureUtils.get(table.mutateRow(rm));
public Result mutateRow(RowMutations rm) throws IOException {
return FutureUtils.get(table.mutateRow(rm));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3657,9 +3657,10 @@ public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
return builder.build((Put) m);
} else if (m instanceof Delete) {
return builder.build((Delete) m);
} else if (m instanceof Increment) {
return builder.build((Increment) m);
} else {
throw new DoNotRetryIOException("Unsupported mutate type: " + mutations.get(0)
.getClass().getSimpleName().toUpperCase());
return builder.build((Append) m);
}
} else {
return builder.build(new RowMutations(mutations.get(0).getRow()).add(mutations));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,9 @@ public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionNa
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
Expand Down Expand Up @@ -343,17 +335,9 @@ public static RegionAction.Builder buildRegionAction(final byte [] regionName,
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
Expand Down Expand Up @@ -705,25 +689,16 @@ private static void buildNoDataRegionAction(final RowMutations rowMutations,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
throws IOException {
for (Mutation mutation: rowMutations.getMutations()) {
MutationType type;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else if (mutation instanceof Delete) {
type = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation,
mutationBuilder);
cells.add(mutation);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
}

private static MutationType getMutationType(Mutation mutation) {
assert !(mutation instanceof CheckAndMutate);
if (mutation instanceof Put) {
return MutationType.PUT;
} else if (mutation instanceof Delete) {
Expand Down

0 comments on commit d086fea

Please sign in to comment.