Skip to content

Commit

Permalink
HBASE-24996 Support CheckAndMutate in Region.batchMutate()
Browse files Browse the repository at this point in the history
  • Loading branch information
brfrn169 committed Oct 28, 2020
1 parent aff8bbf commit 8099246
Show file tree
Hide file tree
Showing 15 changed files with 1,384 additions and 588 deletions.
Expand Up @@ -17,17 +17,25 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompositeFamilyCellMap;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -202,15 +210,23 @@ public CheckAndMutate build(Append append) {
}

/**
* @param mutation mutations to perform if check succeeds
* @param mutations mutations to perform if check succeeds. It must not have any
* CheckAndMutate objects
* @return a CheckAndMutate object
*/
public CheckAndMutate build(RowMutations mutation) {
preCheck(mutation);
public CheckAndMutate build(RowMutations mutations) {
preCheck(mutations);

boolean hasCheckAndMutate = mutations.getMutations().stream()
.anyMatch(m -> m instanceof CheckAndMutate);
if (hasCheckAndMutate) {
throw new IllegalArgumentException("mutations must not have any CheckAndMutate objects");
}

if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, mutation);
return new CheckAndMutate(row, filter, timeRange, mutations);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation);
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutations);
}
}
}
Expand All @@ -233,6 +249,9 @@ public static Builder newBuilder(byte[] row) {
private final TimeRange timeRange;
private final Row action;

private boolean initFamilyMap;
private boolean initDurability;

private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final CompareOperator op,
byte[] value, TimeRange timeRange, Row action) {
super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
Expand Down Expand Up @@ -312,83 +331,178 @@ public Row getAction() {
return action;
}

/**
* @return mutations executed if the condition matches
*/
public List<Mutation> getMutations() {
if (action instanceof Mutation) {
return Collections.singletonList((Mutation) action);
}
return ((RowMutations) action).getMutations();
}

/**
* @return a composite read-only familyCellMap for all the mutations
*/
@Override
public NavigableMap<byte[], List<Cell>> getFamilyCellMap() {
if (action instanceof Mutation) {
return ((Mutation) action).getFamilyCellMap();
if (initFamilyMap) {
return super.getFamilyCellMap();
}
throw new UnsupportedOperationException();
initFamilyMap = true;
familyMap = new CompositeFamilyCellMap(getMutations().stream()
.map(Mutation::getFamilyCellMap)
.collect(Collectors.toList()));
return super.getFamilyCellMap();
}

@Override
public CellBuilder getCellBuilder(CellBuilderType cellBuilderType) {
if (action instanceof Mutation) {
return ((Mutation) action).getCellBuilder();
}
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public long getTimestamp() {
if (action instanceof Mutation) {
return ((Mutation) action).getTimestamp();
}
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public Mutation setTimestamp(long timestamp) {
if (action instanceof Mutation) {
return ((Mutation) action).setTimestamp(timestamp);
}
throw new UnsupportedOperationException();
public CheckAndMutate setTimestamp(long timestamp) {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

/**
* @return the highest durability of all the mutations
*/
@Override
public Durability getDurability() {
if (action instanceof Mutation) {
return ((Mutation) action).getDurability();
if (initDurability) {
return super.getDurability();
}
throw new UnsupportedOperationException();
initDurability = true;
for (Mutation mutation : getMutations()) {
Durability tmpDur = mutation.getDurability();
if (tmpDur.ordinal() > super.getDurability().ordinal()) {
super.setDurability(tmpDur);
}
}
return super.getDurability();
}

@Override
public Mutation setDurability(Durability d) {
if (action instanceof Mutation) {
return ((Mutation) action).setDurability(d);
}
throw new UnsupportedOperationException();
public CheckAndMutate setDurability(Durability d) {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

/**
* @return the highest priority of all the mutations
*/
@Override
public byte[] getAttribute(String name) {
public int getPriority() {
if (action instanceof Mutation) {
return ((Mutation) action).getAttribute(name);
return ((Mutation) action).getPriority();
}
throw new UnsupportedOperationException();
return ((RowMutations) action).getMaxPriority();
}

@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
if (action instanceof Mutation) {
return ((Mutation) action).setAttribute(name, value);
}
throw new UnsupportedOperationException();
public CheckAndMutate setPriority(int priority) {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public int getPriority() {
if (action instanceof Mutation) {
return ((Mutation) action).getPriority();
public CheckAndMutate setAttribute(String name, byte[] value) {
return (CheckAndMutate) super.setAttribute(name, value);
}

@Override
public String getId() {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public CheckAndMutate setId(String id) {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public List<UUID> getClusterIds() {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public CheckAndMutate setClusterIds(List<UUID> clusterIds) {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public CellVisibility getCellVisibility() {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public CheckAndMutate setCellVisibility(CellVisibility expression) {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public byte[] getACL() {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public CheckAndMutate setACL(String user, Permission perms) {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public CheckAndMutate setACL(Map<String, Permission> perms) {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public long getTTL() {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public CheckAndMutate setTTL(long ttl) {
throw new UnsupportedOperationException("Please call this method of the individual mutations");
}

@Override
public long heapSize() {
return getMutations().stream().mapToLong(Mutation::heapSize).sum();
}

@Override
public Map<String, Object> getFingerprint() {
Map<String, Object> ret = new HashMap<>();
List<Object> mutations = new ArrayList<>();
ret.put("mutations", mutations);
for (Mutation mutation : getMutations()) {
mutations.add(mutation.getFingerprint());
}
return ((RowMutations) action).getMaxPriority();
return ret;
}

@Override
public OperationWithAttributes setPriority(int priority) {
if (action instanceof Mutation) {
return ((Mutation) action).setPriority(priority);
public Map<String, Object> toMap(int maxCols) {
Map<String, Object> ret = new HashMap<>();
List<Object> mutations = new ArrayList<>();
ret.put("row", Bytes.toStringBinary(row));
if (filter != null) {
ret.put("filter", filter.toString());
} else {
ret.put("family", Bytes.toStringBinary(family));
ret.put("qualifier", Bytes.toStringBinary(qualifier));
ret.put("op", op);
ret.put("value", Bytes.toStringBinary(value));
}
ret.put("mutations", mutations);
for (Mutation mutation : getMutations()) {
mutations.add(mutation.toMap(maxCols));
}
throw new UnsupportedOperationException();
return ret;
}
}
Expand Up @@ -29,7 +29,6 @@

/**
* Performs multiple mutations atomically on a single row.
* Currently {@link Put} and {@link Delete} are supported.
*
* The mutations are performed in the order in which they
* were added.
Expand Down Expand Up @@ -75,8 +74,6 @@ public RowMutations(byte [] row, int initialCapacity) {
}

/**
* Currently only supports {@link Put} and {@link Delete} mutations.
*
* @param mutation The data to send.
* @throws IOException if the row of added mutation doesn't match the original row
*/
Expand All @@ -85,15 +82,13 @@ public RowMutations add(Mutation mutation) throws IOException {
}

/**
* Currently only supports {@link Put} and {@link Delete} mutations.
*
* @param mutations The data to send.
* @throws IOException if the row of added mutation doesn't match the original row
*/
public RowMutations add(List<? extends Mutation> mutations) throws IOException {
for (Mutation mutation : mutations) {
if (!Bytes.equals(row, mutation.getRow())) {
throw new WrongRowIOException("The row in the recently added Put/Delete <" +
throw new WrongRowIOException("The row in the recently added Mutation <" +
Bytes.toStringBinary(mutation.getRow()) + "> doesn't match the original one <" +
Bytes.toStringBinary(this.row) + ">");
}
Expand Down

0 comments on commit 8099246

Please sign in to comment.