Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
} else {
action = new Action(rawAction, i);
}
if (rawAction instanceof Append || rawAction instanceof Increment) {
if (hasIncrementOrAppend(rawAction)) {
action.setNonce(conn.getNonceGenerator().newNonce());
}
this.actions.add(action);
Expand All @@ -184,6 +184,26 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
this.startNs = System.nanoTime();
}

private static boolean hasIncrementOrAppend(Row action) {
if (action instanceof Append || action instanceof Increment) {
return true;
} else if (action instanceof RowMutations) {
return hasIncrementOrAppend((RowMutations) action);
} else if (action instanceof CheckAndMutate) {
return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
}
return false;
}

private static boolean hasIncrementOrAppend(RowMutations mutations) {
for (Mutation mutation : mutations.getMutations()) {
if (mutation instanceof Append || mutation instanceof Increment) {
return true;
}
}
return false;
}

private long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;

/**
* The implementation of RawAsyncTable.
Expand Down Expand Up @@ -324,7 +323,7 @@ public CompletableFuture<Boolean> thenPut(Put put) {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, p),
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
Expand All @@ -336,7 +335,7 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, d),
null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
Expand All @@ -349,8 +348,9 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, rm), CheckAndMutateResult::isSuccess))
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call();
}
}
Expand Down Expand Up @@ -387,7 +387,7 @@ public CompletableFuture<Boolean> thenPut(Put put) {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, p),
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
Expand All @@ -398,7 +398,7 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, d),
filter, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
Expand All @@ -410,8 +410,9 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, rm), CheckAndMutateResult::isSuccess))
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null,
filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call();
}
}
Expand All @@ -430,28 +431,32 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
if (mutation instanceof Put) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
}
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
mutation.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, mutation,
(rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), m),
checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
rowMutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
RawAsyncTableImpl.this.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(
controller, loc, stub, rowMutations,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
(rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), rm),
checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
resp -> resp))
.call();
} else {
Expand Down Expand Up @@ -516,16 +521,13 @@ public void run(MultiResponse resp) {
@Override
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
writeRpcTimeoutNs).action((controller, loc, stub) ->
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
(rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter
.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
.build();
}, resp -> resp))
(rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
resp -> resp))
.call();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,39 +203,78 @@ public static GetRequest buildGetRequest(final byte[] regionName,
*/
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
return MutateRequest.newBuilder()
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation))
final Filter filter, final TimeRange timeRange, final Mutation mutation, long nonceGroup,
long nonce) throws IOException {
MutateRequest.Builder builder = MutateRequest.newBuilder();
if (mutation instanceof Increment || mutation instanceof Append) {
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation, nonce))
.setNonceGroup(nonceGroup);
} else {
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation));
}
return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}

/**
* Create a protocol buffer MutateRequest for conditioned row mutations
* Create a protocol buffer MultiRequest for conditioned row mutations
*
* @return a mutate request
* @return a multi request
* @throws IOException
*/
public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
final RowMutations rowMutations) throws IOException {
final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op,
value, filter, timeRange), nonceGroup, nonce);
}

/**
* Create a protocol buffer MultiRequest for row mutations
*
* @return a multi request
*/
public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
return buildMultiRequest(regionName, rowMutations, null, nonceGroup, nonce);
}

private static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
final RowMutations rowMutations, final Condition condition, long nonceGroup, long nonce)
throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
builder.setAtomic(true);

boolean hasNonce = false;
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
MutationProto mp;
if (mutation instanceof Increment || mutation instanceof Append) {
mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder, nonce);
hasNonce = true;
} else {
mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder);
}
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition(
buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build();

if (condition != null) {
builder.setCondition(condition);
}

MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
if (hasNonce) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}

return multiRequestBuilder.addRegionAction(builder.build()).build();
}

/**
Expand Down Expand Up @@ -318,33 +357,6 @@ public static MutateRequest buildMutateRequest(
return builder.build();
}

/**
* Create a protocol buffer MultiRequest for row mutations.
* Does not propagate Action absolute position. Does not set atomic action on the created
* RegionAtomic. Caller should do that if wanted.
* @param regionName
* @param rowMutations
* @return a data-laden RegionMutation.Builder
* @throws IOException
*/
public static RegionAction.Builder buildRegionAction(final byte [] regionName,
final RowMutations rowMutations)
throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
return builder;
}

public static RegionAction.Builder getRegionActionBuilderWithRegion(
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
Expand Down Expand Up @@ -562,9 +574,6 @@ public static void buildNoDataRegionActions(final byte[] regionName,
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
}
if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
if (builder.getActionCount() > 0) {
multiRequestBuilder.addRegionAction(builder.build());
}
Expand All @@ -578,8 +587,11 @@ public static void buildNoDataRegionActions(final byte[] regionName,
builder.clear();
getRegionActionBuilderWithRegion(builder, regionName);

buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder,
mutationBuilder);
boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) action.getAction(),
cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
if (hasIncrementOrAppend) {
hasNonce = true;
}
builder.setAtomic(true);

multiRequestBuilder.addRegionAction(builder.build());
Expand Down Expand Up @@ -613,16 +625,21 @@ public static void buildNoDataRegionActions(final byte[] regionName,
} else if (cam.getAction() instanceof Increment) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder,
buildNoDataRegionAction((Increment) cam.getAction(), cells, action.getNonce(), builder,
actionBuilder, mutationBuilder);
hasNonce = true;
} else if (cam.getAction() instanceof Append) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder,
buildNoDataRegionAction((Append) cam.getAction(), cells, action.getNonce(), builder,
actionBuilder, mutationBuilder);
hasNonce = true;
} else if (cam.getAction() instanceof RowMutations) {
buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) cam.getAction(),
cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
if (hasIncrementOrAppend) {
hasNonce = true;
}
builder.setAtomic(true);
} else {
throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
Expand All @@ -635,6 +652,10 @@ public static void buildNoDataRegionActions(final byte[] regionName,
// in the overall multiRequest.
indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
}

if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
}

private static void buildNoDataRegionAction(final Put put, final List<CellScannable> cells,
Expand Down Expand Up @@ -684,18 +705,29 @@ private static void buildNoDataRegionAction(final Append append,
MutationType.APPEND, append, mutationBuilder, nonce)));
}

private static void buildNoDataRegionAction(final RowMutations rowMutations,
final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
/**
* @return whether or not the rowMutations has a Increment or Append
*/
private static boolean buildNoDataRegionAction(final RowMutations rowMutations,
final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
throws IOException {
boolean ret = false;
for (Mutation mutation: rowMutations.getMutations()) {
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation,
mutationBuilder);
MutationProto mp;
if (mutation instanceof Increment || mutation instanceof Append) {
mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder,
nonce);
ret = true;
} else {
mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder);
}
cells.add(mutation);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
return ret;
}

private static MutationType getMutationType(Mutation mutation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1551,7 +1551,7 @@ default List<Pair<Cell, Cell>> postAppendBeforeWAL(
List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
for (Pair<Cell, Cell> pair : cellPairs) {
resultPairs.add(new Pair<>(pair.getFirst(),
postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
postMutationBeforeWAL(ctx, MutationType.APPEND, mutation, pair.getFirst(),
pair.getSecond())));
}
return resultPairs;
Expand Down
Loading