Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25242 Add Increment/Append support to RowMutations #2711

Merged
merged 1 commit into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ default CompletableFuture<List<CheckAndMutateResult>> checkAndMutateAll(
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.
* @param mutation object that specifies the set of mutations to perform atomically
* @return A {@link CompletableFuture} that always returns null when complete normally.
* @return A {@link CompletableFuture} that returns results of Increment/Append operations
*/
CompletableFuture<Void> mutateRow(RowMutations mutation);
CompletableFuture<Result> mutateRow(RowMutations mutation);

/**
* The scan API uses the observer pattern.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate(
}

@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
public CompletableFuture<Result> mutateRow(RowMutations mutation) {
return wrap(rawTable.mutateRow(mutation));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Objects;

/**
* Used to perform CheckAndMutate operations.
Expand Down Expand Up @@ -58,7 +55,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class CheckAndMutate extends Mutation {
public final class CheckAndMutate implements Row {

/**
* A builder class for building a CheckAndMutate object.
Expand Down Expand Up @@ -200,15 +197,15 @@ public CheckAndMutate build(Append append) {
}

/**
* @param mutation mutations to perform if check succeeds
* @param mutations mutations to perform if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(RowMutations mutation) {
preCheck(mutation);
public CheckAndMutate build(RowMutations mutations) {
preCheck(mutations);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, mutation);
return new CheckAndMutate(row, filter, timeRange, mutations);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation);
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutations);
}
}
}
Expand All @@ -223,6 +220,7 @@ public static Builder newBuilder(byte[] row) {
return new Builder(row);
}

private final byte[] row;
private final byte[] family;
private final byte[] qualifier;
private final CompareOperator op;
Expand All @@ -233,7 +231,7 @@ public static Builder newBuilder(byte[] row) {

private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final CompareOperator op,
byte[] value, TimeRange timeRange, Row action) {
super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
this.row = row;
this.family = family;
this.qualifier = qualifier;
this.op = op;
Expand All @@ -244,7 +242,7 @@ private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final Compare
}

private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action) {
super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
this.row = row;
this.family = null;
this.qualifier = null;
this.op = null;
Expand All @@ -254,6 +252,37 @@ private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row actio
this.action = action;
}

/**
* @return the row
*/
@Override
public byte[] getRow() {
return row;
}

@Override
public int compareTo(Row row) {
return Bytes.compareTo(this.getRow(), row.getRow());
}

// Added to get rid of the stopbugs error
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Row other = (Row) obj;
return compareTo(other) == 0;
}

@Override
public int hashCode() {
return Bytes.hashCode(this.getRow());
}

/**
* @return the family to check
*/
Expand Down Expand Up @@ -309,76 +338,4 @@ public TimeRange getTimeRange() {
public Row getAction() {
return action;
}

@Override
public NavigableMap<byte[], List<Cell>> getFamilyCellMap() {
if (action instanceof Mutation) {
return ((Mutation) action).getFamilyCellMap();
}
throw new UnsupportedOperationException();
}

@Override
public long getTimestamp() {
if (action instanceof Mutation) {
return ((Mutation) action).getTimestamp();
}
throw new UnsupportedOperationException();
}

@Override
public Mutation setTimestamp(long timestamp) {
if (action instanceof Mutation) {
return ((Mutation) action).setTimestamp(timestamp);
}
throw new UnsupportedOperationException();
}

@Override
public Durability getDurability() {
if (action instanceof Mutation) {
return ((Mutation) action).getDurability();
}
throw new UnsupportedOperationException();
}

@Override
public Mutation setDurability(Durability d) {
if (action instanceof Mutation) {
return ((Mutation) action).setDurability(d);
}
throw new UnsupportedOperationException();
}

@Override
public byte[] getAttribute(String name) {
if (action instanceof Mutation) {
return ((Mutation) action).getAttribute(name);
}
throw new UnsupportedOperationException();
}

@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
if (action instanceof Mutation) {
return ((Mutation) action).setAttribute(name, value);
}
throw new UnsupportedOperationException();
}

@Override
public int getPriority() {
if (action instanceof Mutation) {
return ((Mutation) action).getPriority();
}
return ((RowMutations) action).getMaxPriority();
}

@Override
public OperationWithAttributes setPriority(int priority) {
if (action instanceof Mutation) {
return ((Mutation) action).setPriority(priority);
}
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ public void put(final List<Put> puts) throws IOException {
}

@Override
public void mutateRow(final RowMutations rm) throws IOException {
public Result mutateRow(final RowMutations rm) throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
rpcControllerFactory.newController(), writeRpcTimeoutMs,
Expand All @@ -578,20 +578,23 @@ protected MultiResponse rpcCall() throws Exception {
return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
}
};
Object[] results = new Object[rm.getMutations().size()];
AsyncProcessTask task = AsyncProcessTask.newBuilder()
.setPool(pool)
.setTableName(tableName)
.setRowAccess(rm.getMutations())
.setCallable(callable)
.setRpcTimeout(writeRpcTimeoutMs)
.setOperationTimeout(operationTimeoutMs)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.build();
.setPool(pool)
.setTableName(tableName)
.setRowAccess(rm.getMutations())
.setCallable(callable)
.setRpcTimeout(writeRpcTimeoutMs)
.setOperationTimeout(operationTimeoutMs)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.setResults(results)
.build();
AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
return (Result) results[0];
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,17 +551,17 @@ public void run(MultiResponse resp) {
}

@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
.action((controller, loc, stub) ->
this.<Result, Void> mutateRow(controller, loc, stub, mutation,
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
writeRpcTimeoutNs).action((controller, loc, stub) ->
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
(rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter
.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
.build();
}, resp -> null))
}, resp -> resp))
.call();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

/**
* Performs multiple mutations atomically on a single row.
* Currently {@link Put} and {@link Delete} are supported.
*
* The mutations are performed in the order in which they
* were added.
Expand Down Expand Up @@ -110,15 +109,13 @@ public RowMutations add(Mutation mutation) throws IOException {
}

/**
* Currently only supports {@link Put} and {@link Delete} mutations.
*
* @param mutations The data to send.
* @throws IOException if the row of added mutation doesn't match the original row
*/
public RowMutations add(List<? extends Mutation> mutations) throws IOException {
for (Mutation mutation : mutations) {
if (!Bytes.equals(row, mutation.getRow())) {
throw new WrongRowIOException("The row in the recently added Put/Delete <" +
throw new WrongRowIOException("The row in the recently added Mutation <" +
Bytes.toStringBinary(mutation.getRow()) + "> doesn't match the original one <" +
Bytes.toStringBinary(this.row) + ">");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,10 @@ default List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndM
* {@link Put} and {@link Delete} are supported.
*
* @param rm object that specifies the set of mutations to perform atomically
* @throws IOException
* @return results of Increment/Append operations
* @throws IOException if a remote or network exception occurs.
*/
default void mutateRow(final RowMutations rm) throws IOException {
default Result mutateRow(final RowMutations rm) throws IOException {
throw new NotImplementedException("Add an implementation!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3570,9 +3570,13 @@ public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
return builder.build((Put) m);
} else if (m instanceof Delete) {
return builder.build((Delete) m);
} else if (m instanceof Increment) {
return builder.build((Increment) m);
} else if (m instanceof Append) {
return builder.build((Append) m);
} else {
throw new DoNotRetryIOException("Unsupported mutate type: " + mutations.get(0)
.getClass().getSimpleName().toUpperCase());
throw new DoNotRetryIOException("Unsupported mutate type: " + m.getClass()
.getSimpleName().toUpperCase());
}
} else {
return builder.build(new RowMutations(mutations.get(0).getRow()).add(mutations));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,9 @@ public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionNa
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
Expand Down Expand Up @@ -387,17 +379,9 @@ public static RegionAction.Builder buildRegionAction(final byte [] regionName,
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
Expand Down Expand Up @@ -928,25 +912,16 @@ private static void buildNoDataRegionAction(final RowMutations rowMutations,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
throws IOException {
for (Mutation mutation: rowMutations.getMutations()) {
MutationType type;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else if (mutation instanceof Delete) {
type = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation,
mutationBuilder);
cells.add(mutation);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
}

private static MutationType getMutationType(Mutation mutation) {
assert !(mutation instanceof CheckAndMutate);
if (mutation instanceof Put) {
return MutationType.PUT;
} else if (mutation instanceof Delete) {
Expand Down