From 80992464248fa3a42d358c8e3570805ddf8ab601 Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Wed, 28 Oct 2020 17:29:35 +0900 Subject: [PATCH] HBASE-24996 Support CheckAndMutate in Region.batchMutate() --- .../hadoop/hbase/client/CheckAndMutate.java | 204 +++++-- .../hadoop/hbase/client/RowMutations.java | 7 +- .../hbase/util/CompositeFamilyCellMap.java | 314 +++++++++++ .../hbase/coprocessor/RegionObserver.java | 19 +- .../hadoop/hbase/regionserver/HRegion.java | 523 ++++++++++-------- .../MiniBatchOperationInProgress.java | 1 + .../hbase/regionserver/OperationStatus.java | 28 +- .../hbase/regionserver/RSRpcServices.java | 24 +- .../hadoop/hbase/regionserver/Region.java | 10 +- .../security/access/AccessController.java | 272 ++++----- .../visibility/VisibilityController.java | 145 ++--- .../hbase/util/CollectionBackedScanner.java | 1 + .../coprocessor/SimpleRegionObserver.java | 12 + .../TestRegionObserverInterface.java | 164 ++++-- .../hbase/regionserver/TestHRegion.java | 248 +++++++++ 15 files changed, 1384 insertions(+), 588 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/CompositeFamilyCellMap.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java index f7d846b44c7a..3051afee9467 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java @@ -17,9 +17,14 @@ */ package org.apache.hadoop.hbase.client; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.NavigableMap; +import java.util.UUID; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilder; import org.apache.hadoop.hbase.CellBuilderType; @@ -27,7 +32,10 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.security.access.Permission; +import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CompositeFamilyCellMap; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; @@ -202,15 +210,23 @@ public CheckAndMutate build(Append append) { } /** - * @param mutation mutations to perform if check succeeds + * @param mutations mutations to perform if check succeeds. It must not have any + * CheckAndMutate objects * @return a CheckAndMutate object */ - public CheckAndMutate build(RowMutations mutation) { - preCheck(mutation); + public CheckAndMutate build(RowMutations mutations) { + preCheck(mutations); + + boolean hasCheckAndMutate = mutations.getMutations().stream() + .anyMatch(m -> m instanceof CheckAndMutate); + if (hasCheckAndMutate) { + throw new IllegalArgumentException("mutations must not have any CheckAndMutate objects"); + } + if (filter != null) { - return new CheckAndMutate(row, filter, timeRange, mutation); + return new CheckAndMutate(row, filter, timeRange, mutations); } else { - return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation); + return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutations); } } } @@ -233,6 +249,9 @@ public static Builder newBuilder(byte[] row) { private final TimeRange timeRange; private final Row action; + private boolean initFamilyMap; + private boolean initDurability; + private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final CompareOperator op, byte[] value, TimeRange timeRange, Row action) { super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap()); @@ -312,83 +331,178 @@ public Row getAction() { return action; } + /** + * @return mutations executed if the condition matches + */ + public List getMutations() { + if (action instanceof Mutation) { + return Collections.singletonList((Mutation) action); + } + return ((RowMutations) action).getMutations(); + } + + /** + * @return a composite read-only familyCellMap for all the mutations + */ @Override public NavigableMap> getFamilyCellMap() { - if (action instanceof Mutation) { - return ((Mutation) action).getFamilyCellMap(); + if (initFamilyMap) { + return super.getFamilyCellMap(); } - throw new UnsupportedOperationException(); + initFamilyMap = true; + familyMap = new CompositeFamilyCellMap(getMutations().stream() + .map(Mutation::getFamilyCellMap) + .collect(Collectors.toList())); + return super.getFamilyCellMap(); } @Override public CellBuilder getCellBuilder(CellBuilderType cellBuilderType) { - if (action instanceof Mutation) { - return ((Mutation) action).getCellBuilder(); - } - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Please call this method of the individual mutations"); } @Override public long getTimestamp() { - if (action instanceof Mutation) { - return ((Mutation) action).getTimestamp(); - } - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Please call this method of the individual mutations"); } @Override - public Mutation setTimestamp(long timestamp) { - if (action instanceof Mutation) { - return ((Mutation) action).setTimestamp(timestamp); - } - throw new UnsupportedOperationException(); + public CheckAndMutate setTimestamp(long timestamp) { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); } + /** + * @return the highest durability of all the mutations + */ @Override public Durability getDurability() { - if (action instanceof Mutation) { - return ((Mutation) action).getDurability(); + if (initDurability) { + return super.getDurability(); } - throw new UnsupportedOperationException(); + initDurability = true; + for (Mutation mutation : getMutations()) { + Durability tmpDur = mutation.getDurability(); + if (tmpDur.ordinal() > super.getDurability().ordinal()) { + super.setDurability(tmpDur); + } + } + return super.getDurability(); } @Override - public Mutation setDurability(Durability d) { - if (action instanceof Mutation) { - return ((Mutation) action).setDurability(d); - } - throw new UnsupportedOperationException(); + public CheckAndMutate setDurability(Durability d) { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); } + /** + * @return the highest priority of all the mutations + */ @Override - public byte[] getAttribute(String name) { + public int getPriority() { if (action instanceof Mutation) { - return ((Mutation) action).getAttribute(name); + return ((Mutation) action).getPriority(); } - throw new UnsupportedOperationException(); + return ((RowMutations) action).getMaxPriority(); } @Override - public OperationWithAttributes setAttribute(String name, byte[] value) { - if (action instanceof Mutation) { - return ((Mutation) action).setAttribute(name, value); - } - throw new UnsupportedOperationException(); + public CheckAndMutate setPriority(int priority) { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); } @Override - public int getPriority() { - if (action instanceof Mutation) { - return ((Mutation) action).getPriority(); + public CheckAndMutate setAttribute(String name, byte[] value) { + return (CheckAndMutate) super.setAttribute(name, value); + } + + @Override + public String getId() { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public CheckAndMutate setId(String id) { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public List getClusterIds() { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public CheckAndMutate setClusterIds(List clusterIds) { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public CellVisibility getCellVisibility() { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public CheckAndMutate setCellVisibility(CellVisibility expression) { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public byte[] getACL() { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public CheckAndMutate setACL(String user, Permission perms) { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public CheckAndMutate setACL(Map perms) { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public long getTTL() { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public CheckAndMutate setTTL(long ttl) { + throw new UnsupportedOperationException("Please call this method of the individual mutations"); + } + + @Override + public long heapSize() { + return getMutations().stream().mapToLong(Mutation::heapSize).sum(); + } + + @Override + public Map getFingerprint() { + Map ret = new HashMap<>(); + List mutations = new ArrayList<>(); + ret.put("mutations", mutations); + for (Mutation mutation : getMutations()) { + mutations.add(mutation.getFingerprint()); } - return ((RowMutations) action).getMaxPriority(); + return ret; } @Override - public OperationWithAttributes setPriority(int priority) { - if (action instanceof Mutation) { - return ((Mutation) action).setPriority(priority); + public Map toMap(int maxCols) { + Map ret = new HashMap<>(); + List mutations = new ArrayList<>(); + ret.put("row", Bytes.toStringBinary(row)); + if (filter != null) { + ret.put("filter", filter.toString()); + } else { + ret.put("family", Bytes.toStringBinary(family)); + ret.put("qualifier", Bytes.toStringBinary(qualifier)); + ret.put("op", op); + ret.put("value", Bytes.toStringBinary(value)); + } + ret.put("mutations", mutations); + for (Mutation mutation : getMutations()) { + mutations.add(mutation.toMap(maxCols)); } - throw new UnsupportedOperationException(); + return ret; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java index 3b0f94b9dbce..0f8b429959de 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java @@ -29,7 +29,6 @@ /** * Performs multiple mutations atomically on a single row. - * Currently {@link Put} and {@link Delete} are supported. * * The mutations are performed in the order in which they * were added. @@ -75,8 +74,6 @@ public RowMutations(byte [] row, int initialCapacity) { } /** - * Currently only supports {@link Put} and {@link Delete} mutations. - * * @param mutation The data to send. * @throws IOException if the row of added mutation doesn't match the original row */ @@ -85,15 +82,13 @@ public RowMutations add(Mutation mutation) throws IOException { } /** - * Currently only supports {@link Put} and {@link Delete} mutations. - * * @param mutations The data to send. * @throws IOException if the row of added mutation doesn't match the original row */ public RowMutations add(List mutations) throws IOException { for (Mutation mutation : mutations) { if (!Bytes.equals(row, mutation.getRow())) { - throw new WrongRowIOException("The row in the recently added Put/Delete <" + + throw new WrongRowIOException("The row in the recently added Mutation <" + Bytes.toStringBinary(mutation.getRow()) + "> doesn't match the original one <" + Bytes.toStringBinary(this.row) + ">"); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CompositeFamilyCellMap.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CompositeFamilyCellMap.java new file mode 100644 index 000000000000..1e074f187445 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CompositeFamilyCellMap.java @@ -0,0 +1,314 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import org.apache.hadoop.hbase.Cell; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Composite read-only familyCellMap + */ +@InterfaceAudience.Private +public class CompositeFamilyCellMap implements NavigableMap> { + + private static class CompositeFamilyCellMapEntry implements Entry> { + private final byte[] key; + private final List value; + + public CompositeFamilyCellMapEntry(byte[] key, List value) { + this.key = key; + this.value = value; + } + + @Override + public byte[] getKey() { + return key; + } + + @Override + public List getValue() { + return value; + } + + @Override + public List setValue(List value) { + throw new UnsupportedOperationException("read-only"); + } + } + + private final List>> familyCellMapList = new ArrayList<>(); + + public CompositeFamilyCellMap() { + } + + public CompositeFamilyCellMap(List>> familyCellMapList) { + for (NavigableMap> familyCellMap : familyCellMapList) { + addFamilyCellMap(familyCellMap); + } + } + + public void addFamilyCellMap(NavigableMap> familyCellMap) { + familyCellMapList.add(familyCellMap); + } + + @Override + public Entry> lowerEntry(byte[] key) { + byte[] lowerKey = lowerKey(key); + return new CompositeFamilyCellMapEntry(lowerKey, get(lowerKey)); + } + + @Override + public byte[] lowerKey(byte[] key) { + return navigableKeySet().lower(key); + } + + @Override + public Entry> floorEntry(byte[] key) { + byte[] floorKey = floorKey(key); + return new CompositeFamilyCellMapEntry(floorKey, get(floorKey)); + } + + @Override + public byte[] floorKey(byte[] key) { + return navigableKeySet().floor(key); + } + + @Override + public Entry> ceilingEntry(byte[] key) { + byte[] ceilingKey = ceilingKey(key); + return new CompositeFamilyCellMapEntry(ceilingKey, get(ceilingKey)); + } + + @Override + public byte[] ceilingKey(byte[] key) { + return navigableKeySet().ceiling(key); + } + + @Override + public Entry> higherEntry(byte[] key) { + byte[] higherKey = higherKey(key); + return new CompositeFamilyCellMapEntry(higherKey, get(higherKey)); + } + + @Override + public byte[] higherKey(byte[] key) { + return navigableKeySet().higher(key); + } + + @Override + public Entry> firstEntry() { + byte[] firstKey = firstKey(); + return new CompositeFamilyCellMapEntry(firstKey, get(firstKey)); + } + + @Override + public Entry> lastEntry() { + byte[] lastKey = lastKey(); + return new CompositeFamilyCellMapEntry(lastKey, get(lastKey)); + } + + @Override + public Entry> pollFirstEntry() { + throw new UnsupportedOperationException("read-only"); + } + + @Override + public Entry> pollLastEntry() { + throw new UnsupportedOperationException("read-only"); + } + + @Override + public NavigableMap> descendingMap() { + NavigableMap> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] key : descendingKeySet()) { + ret.put(key, get(key)); + } + return ret; + } + + @Override + public NavigableSet navigableKeySet() { + NavigableSet keySet = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (NavigableMap> familyMap : familyCellMapList) { + keySet.addAll(familyMap.keySet()); + } + return keySet; + } + + @Override + public NavigableSet descendingKeySet() { + return navigableKeySet().descendingSet(); + } + + @Override + public NavigableMap> subMap(byte[] fromKey, boolean fromInclusive, + byte[] toKey, boolean toInclusive) { + NavigableMap> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] key : navigableKeySet().subSet(fromKey, fromInclusive, toKey, toInclusive)) { + ret.put(key, get(key)); + } + return ret; + } + + @Override + public NavigableMap> headMap(byte[] toKey, boolean inclusive) { + NavigableMap> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] key : navigableKeySet().headSet(toKey, inclusive)) { + ret.put(key, get(key)); + } + return ret; + } + + @Override + public NavigableMap> tailMap(byte[] fromKey, boolean inclusive) { + NavigableMap> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] key : navigableKeySet().tailSet(fromKey, inclusive)) { + ret.put(key, get(key)); + } + return ret; + } + + @Override + public Comparator comparator() { + return Bytes.BYTES_COMPARATOR; + } + + @Override + public NavigableMap> subMap(byte[] fromKey, byte[] toKey) { + NavigableMap> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] key : navigableKeySet().subSet(fromKey, toKey)) { + ret.put(key, get(key)); + } + return ret; + } + + @Override + public NavigableMap> headMap(byte[] toKey) { + NavigableMap> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] key : navigableKeySet().headSet(toKey)) { + ret.put(key, get(key)); + } + return ret; + } + + @Override + public NavigableMap> tailMap(byte[] fromKey) { + NavigableMap> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] key : navigableKeySet().tailSet(fromKey)) { + ret.put(key, get(key)); + } + return ret; + } + + @Override + public byte[] firstKey() { + return navigableKeySet().first(); + } + + @Override + public byte[] lastKey() { + return navigableKeySet().last(); + } + + @Override + public int size() { + return familyCellMapList.stream().mapToInt(Map::size).sum(); + } + + @Override + public boolean isEmpty() { + return familyCellMapList.stream().allMatch(Map::isEmpty); + } + + @Override + public boolean containsKey(Object key) { + return familyCellMapList.stream().anyMatch(familyMap -> familyMap.containsKey(key)); + } + + @Override + public boolean containsValue(Object value) { + return familyCellMapList.stream().anyMatch(familyMap -> familyMap.containsValue(value)); + } + + @Override + public List get(Object key) { + return familyCellMapList.stream() + .map(familyMap -> familyMap.getOrDefault(key, Collections.emptyList())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + @Override + public List put(byte[] key, List value) { + throw new UnsupportedOperationException("read-only"); + } + + @Override + public List remove(Object key) { + throw new UnsupportedOperationException("read-only"); + } + + @Override + public void putAll(Map> m) { + throw new UnsupportedOperationException("read-only"); + } + + @Override + public void clear() { + throw new UnsupportedOperationException("read-only"); + } + + @Override + public Set keySet() { + return navigableKeySet(); + } + + @Override + public Collection> values() { + Set keySet = keySet(); + List> ret = new ArrayList<>(keySet.size()); + for (byte[] key : keySet) { + ret.add(get(key)); + } + return ret; + } + + @Override + public Set>> entrySet() { + Set>> ret = new TreeSet<>( + (o1, o2) -> Bytes.BYTES_COMPARATOR.compare(o1.getKey(), o2.getKey())); + for (byte[] key : keySet()) { + ret.add(new CompositeFamilyCellMapEntry(key, get(key))); + } + return ret; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 3f1c6dc7fcee..1f352bc08bff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -441,8 +441,8 @@ default void postDelete(ObserverContext c, Delete /** * This will be called for every batch mutation operation happening at the server. This will be * called after acquiring the locks on the mutating rows and after applying the proper timestamp - * for each Mutation at the server. The batch may contain Put/Delete/Increment/Append. By - * setting OperationStatus of Mutations + * for each Mutation at the server. The batch may contain + * Put/Delete/Increment/Append/CheckAndMutate. By setting OperationStatus of Mutations * ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}), * {@link RegionObserver} can make Region to skip these Mutations. *

@@ -460,7 +460,8 @@ default void preBatchMutate(ObserverContext c, * {@link #postPut(ObserverContext, Put, WALEdit, Durability)} * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} * and {@link #postIncrement(ObserverContext, Increment, Result)} - * and {@link #postAppend(ObserverContext, Append, Result)} is + * and {@link #postAppend(ObserverContext, Append, Result)} + * and {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} is * this hook will be executed before the mvcc transaction completion. *

* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation. @@ -491,8 +492,8 @@ default void postCloseRegionOperation(ObserverContext * Note: Do not retain references to any Cells in Mutations beyond the life of this invocation. * If need a Cell reference for later use, copy the cell and use that. @@ -919,7 +920,7 @@ default CheckAndMutateResult preCheckAndMutateAfterRowLock( /** * Called after checkAndMutate *

- * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation. + * Note: Do not retain references to any Cells in actions beyond the life of this invocation. * If need a Cell reference for later use, copy the cell and use that. * @param c the environment provided by the region server * @param checkAndMutate the CheckAndMutate object @@ -1358,7 +1359,8 @@ default Cell postMutationBeforeWAL(ObserverContext /** * Called after a list of new cells has been created during an increment operation, but before - * they are committed to the WAL or memstore. + * they are committed to the WAL or memstore. This is also called in case of an increment + * operation in checkAndMutate. * * @param ctx the environment provided by the region server * @param mutation the current mutation @@ -1380,7 +1382,8 @@ default List> postIncrementBeforeWAL( /** * Called after a list of new cells has been created during an append operation, but before - * they are committed to the WAL or memstore. + * they are committed to the WAL or memstore. This is also called in case of an append + * operation in checkAndMutate. * * @param ctx the environment provided by the region server * @param mutation the current mutation diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a208d9330042..a8fb3e3a4092 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -168,6 +168,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.CompositeFamilyCellMap; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; @@ -3243,6 +3244,8 @@ private abstract static class BatchOperation { protected final Map>[] familyCellMaps; // For Increment/Append operations protected final Result[] results; + // For CheckAndMutate operations + protected final CheckAndMutateResult[] checkAndMutateResults; // For nonce operations protected final boolean[] canProceed; @@ -3260,6 +3263,7 @@ public BatchOperation(final HRegion region, T[] operations) { this.walEditsFromCoprocessors = new WALEdit[operations.length]; familyCellMaps = new Map[operations.length]; this.results = new Result[operations.length]; + this.checkAndMutateResults = new CheckAndMutateResult[operations.length]; this.canProceed = new boolean[operations.length]; this.region = region; @@ -3376,7 +3380,12 @@ public boolean isOperationPending(int index) { public List getClusterIds() { assert size() != 0; - return getMutation(0).getClusterIds(); + Mutation mutation = getMutation(0); + if (mutation instanceof CheckAndMutate) { + return ((CheckAndMutate) mutation).getMutations().get(0).getClusterIds(); + } else { + return mutation.getClusterIds(); + } } boolean isAtomic() { @@ -3386,9 +3395,10 @@ boolean isAtomic() { /** * Helper method that checks and prepares only one mutation. This can be used to implement * {@link #checkAndPrepare()} for entire Batch. - * NOTE: As CP prePut()/preDelete()/preIncrement()/preAppend() hooks may modify mutations, - * this method should be called after prePut()/preDelete()/preIncrement()/preAppend() CP hooks - * are run for the mutation + * NOTE: As CP prePut()/preDelete()/preIncrement()/preAppend()/preCheckAndMutate() hooks may + * modify mutations, this method should be called after + * prePut()/preDelete()/preIncrement()/preAppend()/preCheckAndMutate() CP hooks are run for + * the mutation */ protected void checkAndPrepareMutation(Mutation mutation, final long timestamp) throws IOException { @@ -3401,6 +3411,22 @@ protected void checkAndPrepareMutation(Mutation mutation, final long timestamp) region.prepareDelete((Delete) mutation); } else if (mutation instanceof Increment || mutation instanceof Append) { region.checkFamilies(mutation.getFamilyCellMap().keySet(), mutation.getDurability()); + } else if (mutation instanceof CheckAndMutate) { + CheckAndMutate checkAndMutate = (CheckAndMutate) mutation; + checkRow(checkAndMutate.getAction(), mutation.getRow()); + if (!checkAndMutate.hasFilter()) { + region.checkFamily(checkAndMutate.getFamily()); + } + for (Mutation m : checkAndMutate.getMutations()) { + checkAndPrepareMutation(m, timestamp); + } + } + } + + private void checkRow(final Row action, final byte[] row) + throws DoNotRetryIOException { + if (!Bytes.equals(row, action.getRow())) { + throw new DoNotRetryIOException("Action's getRow must match"); } } @@ -3409,8 +3435,11 @@ protected void checkAndPrepareMutation(int index, long timestamp) throws IOExcep try { this.checkAndPrepareMutation(mutation, timestamp); - // store the family map reference to allow for mutations - familyCellMaps[index] = mutation.getFamilyCellMap(); + if (mutation instanceof Put || mutation instanceof Delete) { + // store the family map reference to allow for mutations + familyCellMaps[index] = mutation.getFamilyCellMap(); + } + // store durability for the batch (highest durability of all operations in the batch) Durability tmpDur = region.getEffectiveDurability(mutation.getDurability()); if (tmpDur.ordinal() > durability.ordinal()) { @@ -3796,41 +3825,97 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress mi if (mutation instanceof Put) { HRegion.updateCellTimestamps(familyCellMaps[index].values(), Bytes.toBytes(timestamp)); miniBatchOp.incrementNumOfPuts(); + region.rewriteCellTags(familyCellMaps[index], mutation); } else if (mutation instanceof Delete) { region.prepareDeleteTimestamps(mutation, familyCellMaps[index], Bytes.toBytes(timestamp)); miniBatchOp.incrementNumOfDeletes(); + region.rewriteCellTags(familyCellMaps[index], mutation); } else if (mutation instanceof Increment || mutation instanceof Append) { + boolean returnResults; + if (mutation instanceof Increment) { + returnResults = ((Increment) mutation).isReturnResults(); + } else { + returnResults = ((Append) mutation).isReturnResults(); + } + // For nonce operations canProceed[index] = startNonceOperation(nonceGroup, nonce); if (!canProceed[index]) { - // convert duplicate increment/append to get - List results = region.get(toGet(mutation), false, nonceGroup, nonce); + Result result; + if (returnResults) { + // convert duplicate increment/append to get + List results = region.get(toGet(mutation), false, nonceGroup, nonce); + result = Result.create(results); + } else { + result = Result.EMPTY_RESULT; + } + retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result); + return true; + } + + Result result = null; + if (region.coprocessorHost != null) { + if (mutation instanceof Increment) { + result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation); + } else { + result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation); + } + } + if (result != null) { retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, - Result.create(results)); + returnResults ? result : Result.EMPTY_RESULT); return true; } - boolean returnResults; + List results = returnResults ? new ArrayList<>(mutation.size()) : null; + familyCellMaps[index] = reckonDeltas(mutation, results, timestamp); + this.results[index] = results != null ? Result.create(results) : Result.EMPTY_RESULT; + region.rewriteCellTags(familyCellMaps[index], mutation); + if (mutation instanceof Increment) { - returnResults = ((Increment) mutation).isReturnResults(); miniBatchOp.incrementNumOfIncrements(); } else { - returnResults = ((Append) mutation).isReturnResults(); miniBatchOp.incrementNumOfAppends(); } - Result result = doCoprocessorPreCallAfterRowLock(mutation); + } else if (mutation instanceof CheckAndMutate) { + CheckAndMutateResult result = null; + if (region.coprocessorHost != null) { + result = region.coprocessorHost.preCheckAndMutateAfterRowLock( + (CheckAndMutate) mutation); + } if (result != null) { - retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, - returnResults ? result : Result.EMPTY_RESULT); + retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result); return true; } - List results = returnResults ? new ArrayList<>(mutation.size()) : null; - familyCellMaps[index] = reckonDeltas(mutation, results, timestamp); - this.results[index] = results != null ? Result.create(results): Result.EMPTY_RESULT; - } - region.rewriteCellTags(familyCellMaps[index], mutation); + List> resultsList = new ArrayList<>(); + CompositeFamilyCellMap compositeFamilyCellMap = new CompositeFamilyCellMap(); + boolean success = checkAndMutate((CheckAndMutate) mutation, timestamp, + compositeFamilyCellMap, resultsList, miniBatchOp); + familyCellMaps[index] = compositeFamilyCellMap; + if (success) { + if (resultsList.isEmpty()) { + checkAndMutateResults[index] = new CheckAndMutateResult(true, null); + } else if (resultsList.size() == 1) { + checkAndMutateResults[index] = new CheckAndMutateResult(true, + !resultsList.get(0).isEmpty() ? Result.create(resultsList.get(0)) : + Result.EMPTY_RESULT); + } else { + List results = new ArrayList<>(); + for (List cells : resultsList) { + if (cells != null) { + results.addAll(cells); + } + } + results.sort(CellComparator.getInstance()); + checkAndMutateResults[index] = new CheckAndMutateResult(true, + !results.isEmpty() ? Result.create(results) : Result.EMPTY_RESULT); + } + } else { + checkAndMutateResults[index] = new CheckAndMutateResult(false, null); + } + } // update cell count if (region.getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { for (List cells : mutation.getFamilyCellMap().values()) { @@ -3909,28 +3994,10 @@ private static Get toGet(final Mutation mutation) throws IOException { return get; } - /** - * Do coprocessor pre-increment or pre-append after row lock call. - * @return Result returned out of the coprocessor, which means bypass all further processing - * and return the preferred Result instead, or null which means proceed. - */ - private Result doCoprocessorPreCallAfterRowLock(Mutation mutation) throws IOException { - assert mutation instanceof Increment || mutation instanceof Append; - Result result = null; - if (region.coprocessorHost != null) { - if (mutation instanceof Increment) { - result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation); - } else { - result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation); - } - } - return result; - } - - private Map> reckonDeltas(Mutation mutation, List results, + private NavigableMap> reckonDeltas(Mutation mutation, List results, long now) throws IOException { assert mutation instanceof Increment || mutation instanceof Append; - Map> ret = new HashMap<>(); + NavigableMap> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); // Process a Store/family at a time. for (Map.Entry> entry: mutation.getFamilyCellMap().entrySet()) { final byte[] columnFamilyName = entry.getKey(); @@ -3973,14 +4040,25 @@ private List reckonDeltasByStore(HStore store, Mutation mutation, long now byte[] columnFamily = store.getColumnFamilyDescriptor().getName(); List> cellPairs = new ArrayList<>(deltas.size()); + // Sort the cells so that they match the order that they appear in the Get results. + // Otherwise, we won't be able to find the existing values if the cells are not specified + // in order by the client since cells are in an array list. + sort(deltas, store.getComparator()); + // Get previous values for all columns in this family. + Get get = new Get(mutation.getRow()); + for (Cell cell: deltas) { + get.addColumn(columnFamily, CellUtil.cloneQualifier(cell)); + } TimeRange tr; if (mutation instanceof Increment) { tr = ((Increment) mutation).getTimeRange(); } else { tr = ((Append) mutation).getTimeRange(); + } if (tr != null) { + get.setTimeRange(tr.getMin(), tr.getMax()); } - List currentValues = get(mutation, store, deltas, tr); + List currentValues = region.get(get, false); // Iterate the input columns and update existing values if they were found, otherwise // add new column initialized to the delta amount @@ -4074,29 +4152,113 @@ private static long getLongValue(final Cell cell) throws DoNotRetryIOException { return PrivateCellUtil.getValueAsLong(cell); } - /** - * Do a specific Get on passed columnFamily and column qualifiers. - * @param mutation Mutation we are doing this Get for. - * @param store Which column family on row (TODO: Go all Gets in one go) - * @param coordinates Cells from mutation used as coordinates applied to Get. - * @return Return list of Cells found. - */ - private List get(Mutation mutation, HStore store, List coordinates, - TimeRange tr) throws IOException { - // Sort the cells so that they match the order that they appear in the Get results. - // Otherwise, we won't be able to find the existing values if the cells are not specified - // in order by the client since cells are in an array list. - // TODO: I don't get why we are sorting. St.Ack 20150107 - sort(coordinates, store.getComparator()); - Get get = new Get(mutation.getRow()); - for (Cell cell: coordinates) { - get.addColumn(store.getColumnFamilyDescriptor().getName(), CellUtil.cloneQualifier(cell)); + public boolean checkAndMutate(CheckAndMutate checkAndMutate, long now, + CompositeFamilyCellMap compositeFamilyCellMap, List> resultsList, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + Get get = new Get(checkAndMutate.getRow()); + if (checkAndMutate.hasFilter()) { + get.setFilter(checkAndMutate.getFilter()); + } else { + get.addColumn(checkAndMutate.getFamily(), checkAndMutate.getQualifier()); + } + get.setTimeRange(checkAndMutate.getTimeRange().getMin(), + checkAndMutate.getTimeRange().getMax()); + + // NOTE: We used to wait here until mvcc caught up: mvcc.await(); + // Supposition is that now all changes are done under row locks, then when we go to read, + // we'll get the latest on this row. + List result = region.get(get, false); + boolean matches = false; + long cellTs = 0; + if (checkAndMutate.hasFilter()) { + if (!result.isEmpty()) { + matches = true; + cellTs = result.get(0).getTimestamp(); + } + } else { + boolean valueIsNull = checkAndMutate.getValue() == null || + checkAndMutate.getValue().length == 0; + if (result.isEmpty() && valueIsNull) { + matches = true; + } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { + matches = true; + cellTs = result.get(0).getTimestamp(); + } else if (result.size() == 1 && !valueIsNull) { + Cell kv = result.get(0); + cellTs = kv.getTimestamp(); + int compareResult = PrivateCellUtil.compareValue(kv, + new BinaryComparator(checkAndMutate.getValue())); + matches = matches(checkAndMutate.getCompareOp(), compareResult); + } + } + + // If matches, perform the action + if (matches) { + // We have acquired the row lock already. If the system clock is NOT monotonically + // non-decreasing (see HBASE-14070) we should make sure that the mutation has a + // larger timestamp than what was observed via Get. doBatchMutate already does this, but + // there is no way to pass the cellTs. See HBASE-14054. + long ts = Math.max(now, cellTs); // ensure write is not eclipsed + + for (Mutation mutation : checkAndMutate.getMutations()) { + if (mutation instanceof Put) { + NavigableMap> familyMap = mutation.getFamilyCellMap(); + HRegion.updateCellTimestamps(familyMap.values(), Bytes.toBytes(ts)); + region.rewriteCellTags(familyMap, mutation); + compositeFamilyCellMap.addFamilyCellMap(familyMap); + miniBatchOp.incrementNumOfPuts(); + } else if (mutation instanceof Delete) { + NavigableMap> familyMap = mutation.getFamilyCellMap(); + region.prepareDeleteTimestamps(mutation, familyMap, Bytes.toBytes(now)); + region.rewriteCellTags(familyMap, mutation); + compositeFamilyCellMap.addFamilyCellMap(familyMap); + miniBatchOp.incrementNumOfDeletes(); + } else if (mutation instanceof Increment || mutation instanceof Append) { + boolean returnResults; + if (mutation instanceof Increment) { + returnResults = ((Increment) mutation).isReturnResults(); + miniBatchOp.incrementNumOfIncrements(); + } else { + returnResults = ((Append) mutation).isReturnResults(); + miniBatchOp.incrementNumOfAppends(); + } + List results = returnResults ? new ArrayList<>(mutation.size()) : null; + NavigableMap> familyMap = reckonDeltas(mutation, results, ts); + resultsList.add(results); + region.rewriteCellTags(familyMap, mutation); + compositeFamilyCellMap.addFamilyCellMap(familyMap); + } + } + return true; } - // Increments carry time range. If an Increment instance, put it on the Get. - if (tr != null) { - get.setTimeRange(tr.getMin(), tr.getMax()); + return false; + } + + private boolean matches(final CompareOperator op, final int compareResult) { + boolean matches; + switch (op) { + case LESS: + matches = compareResult < 0; + break; + case LESS_OR_EQUAL: + matches = compareResult <= 0; + break; + case EQUAL: + matches = compareResult == 0; + break; + case NOT_EQUAL: + matches = compareResult != 0; + break; + case GREATER_OR_EQUAL: + matches = compareResult >= 0; + break; + case GREATER: + matches = compareResult > 0; + break; + default: + throw new RuntimeException("Unknown Compare op " + op.name()); } - return region.get(get, false); + return matches; } @Override @@ -4169,6 +4331,13 @@ public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress m retCodeDetails[i] = new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result); } + } else if (m instanceof CheckAndMutate) { + CheckAndMutateResult result = region.getCoprocessorHost() + .postCheckAndMutate((CheckAndMutate) m, checkAndMutateResults[i]); + if (result != checkAndMutateResults[i]) { + retCodeDetails[i] = + new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result); + } } } return true; @@ -4218,7 +4387,8 @@ public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress m } /** - * Runs prePut/preDelete/preIncrement/preAppend coprocessor hook for input mutation in a batch + * Runs prePut/preDelete/preIncrement/preAppend/preCheckAndMutate coprocessor hook for input + * mutation in a batch * @param metrics Array of 2 ints. index 0: count of puts, index 1: count of deletes, index 2: * count of increments and 3: count of appends */ @@ -4264,15 +4434,29 @@ private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] m metrics[3]++; retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result); } - } else { - String msg = "Put/Delete/Increment/Append mutations only supported in a batch"; - retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, msg); - if (isAtomic()) { // fail, atomic means all or none - throw new IOException(msg); + } else if (m instanceof CheckAndMutate) { + CheckAndMutate checkAndMutate = (CheckAndMutate) m; + CheckAndMutateResult result = region.coprocessorHost.preCheckAndMutate(checkAndMutate); + if (result != null) { + // pre hook says skip this CheckAndMutate + // mark as success and skip in doMiniBatchMutation + for (Mutation mutation : checkAndMutate.getMutations()) { + if (mutation instanceof Put) { + metrics[0]++; + } else if (mutation instanceof Delete) { + metrics[1]++; + } else if (mutation instanceof Increment) { + metrics[2]++; + } else if (mutation instanceof Append) { + metrics[3]++; + } + } + retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result); } } } + // TODO Support Increment/Append/CheckAndMutate operations private void checkAndMergeCPMutations(final MiniBatchOperationInProgress miniBatchOp, final List acquiredRowLocks, final long timestamp) throws IOException { visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), (int i) -> { @@ -4437,21 +4621,23 @@ public void completeMiniBatchOperations( } } + public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic) throws IOException { + return batchMutate(new MutationBatchOperation(this, mutations, atomic, HConstants.NO_NONCE, + HConstants.NO_NONCE)); + } + public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup, long nonce) throws IOException { - // As it stands, this is used for 3 things - // * batchMutate with single mutation - put/delete/increment/append, separate or from - // checkAndMutate. - // * coprocessor calls (see ex. BulkDeleteEndpoint). - // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce)); } @Override public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { - // If the mutations has any Increment/Append operations, we need to do batchMutate atomically + // If the mutations has any Increment/Append/CheckAndMutate operations, we need to do + // batchMutate atomically boolean atomic = Arrays.stream(mutations) - .anyMatch(m -> m instanceof Increment || m instanceof Append); + .anyMatch(m -> + m instanceof Increment || m instanceof Append || m instanceof CheckAndMutate); return batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -4482,7 +4668,6 @@ public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqI /** * Perform a batch of mutations. * - * It supports Put, Delete, Increment, Append mutations and will ignore other types passed. * Operations in a batch are stored with highest durability specified of for all operations in a * batch, except for {@link Durability#SKIP_WAL}. * @@ -4512,7 +4697,7 @@ OperationStatus[] batchMutate(BatchOperation batchOp) throws IOException { if (!initialized) { this.writeRequestsCount.add(batchOp.size()); // validate and prepare batch for write, for MutationBatchOperation it also calls CP - // prePut()/preDelete()/preIncrement()/preAppend() hooks + // prePut()/preDelete()/preIncrement()/preAppend()/preCheckAndMutate() hooks batchOp.checkAndPrepare(); initialized = true; } @@ -4551,8 +4736,8 @@ private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { // We've now grabbed as many mutations off the list as we can // Ensure we acquire at least one. if (miniBatchOp.getReadyToWriteCount() <= 0) { - // Nothing to put/delete/increment/append -- an exception in the above such as - // NoSuchColumnFamily? + // Nothing to put/delete/increment/append/checkAndMutate -- an exception in the above + // such as NoSuchColumnFamily? return; } @@ -4561,8 +4746,8 @@ private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp // We should record the timestamp only after we have acquired the rowLock, - // otherwise, newer puts/deletes/increment/append are not guaranteed to have a newer - // timestamp + // otherwise, newer puts/deletes/increment/append/checkAndPut are not guaranteed to have a + // newer timestamp long now = EnvironmentEdgeManager.currentTime(); batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks); @@ -4611,16 +4796,18 @@ private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> { Mutation mutation = batchOp.getMutation(i); - if (mutation instanceof Increment || mutation instanceof Append) { - if (finalSuccess) { + if (finalSuccess) { + if (mutation instanceof Increment || mutation instanceof Append) { batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.SUCCESS, batchOp.results[i]); + } else if (mutation instanceof CheckAndMutate) { + batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.SUCCESS, + batchOp.checkAndMutateResults[i]); } else { - batchOp.retCodeDetails[i] = OperationStatus.FAILURE; + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; } } else { - batchOp.retCodeDetails[i] = - finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE; + batchOp.retCodeDetails[i] = OperationStatus.FAILURE; } return true; }); @@ -4713,177 +4900,23 @@ public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, @Override public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { - byte[] row = checkAndMutate.getRow(); - Filter filter = null; - byte[] family = null; - byte[] qualifier = null; - CompareOperator op = null; - ByteArrayComparable comparator = null; - if (checkAndMutate.hasFilter()) { - filter = checkAndMutate.getFilter(); - } else { - family = checkAndMutate.getFamily(); - qualifier = checkAndMutate.getQualifier(); - op = checkAndMutate.getCompareOp(); - comparator = new BinaryComparator(checkAndMutate.getValue()); - } - TimeRange timeRange = checkAndMutate.getTimeRange(); - - Mutation mutation = null; - RowMutations rowMutations = null; - if (checkAndMutate.getAction() instanceof Mutation) { - mutation = (Mutation) checkAndMutate.getAction(); - } else { - rowMutations = (RowMutations) checkAndMutate.getAction(); - } - - if (mutation != null) { - checkMutationType(mutation); - checkRow(mutation, row); - } else { - checkRow(rowMutations, row); - } checkReadOnly(); // TODO, add check for value length also move this check to the client checkResources(); startRegionOperation(); try { - Get get = new Get(row); - if (family != null) { - checkFamily(family); - get.addColumn(family, qualifier); - } - if (filter != null) { - get.setFilter(filter); - } - if (timeRange != null) { - get.setTimeRange(timeRange.getMin(), timeRange.getMax()); - } - // Lock row - note that doBatchMutate will relock this row if called - checkRow(row, "doCheckAndRowMutate"); - RowLock rowLock = getRowLockInternal(get.getRow(), false, null); - try { - if (this.getCoprocessorHost() != null) { - CheckAndMutateResult result = - getCoprocessorHost().preCheckAndMutateAfterRowLock(checkAndMutate); - if (result != null) { - return result; - } - } - - // NOTE: We used to wait here until mvcc caught up: mvcc.await(); - // Supposition is that now all changes are done under row locks, then when we go to read, - // we'll get the latest on this row. - List result = get(get, false); - boolean matches = false; - long cellTs = 0; - if (filter != null) { - if (!result.isEmpty()) { - matches = true; - cellTs = result.get(0).getTimestamp(); - } - } else { - boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; - if (result.isEmpty() && valueIsNull) { - matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { - matches = true; - cellTs = result.get(0).getTimestamp(); - } else if (result.size() == 1 && !valueIsNull) { - Cell kv = result.get(0); - cellTs = kv.getTimestamp(); - int compareResult = PrivateCellUtil.compareValue(kv, comparator); - matches = matches(op, compareResult); - } - } - - // If matches, perform the mutation or the rowMutations - if (matches) { - // We have acquired the row lock already. If the system clock is NOT monotonically - // non-decreasing (see HBASE-14070) we should make sure that the mutation has a - // larger timestamp than what was observed via Get. doBatchMutate already does this, but - // there is no way to pass the cellTs. See HBASE-14054. - long now = EnvironmentEdgeManager.currentTime(); - long ts = Math.max(now, cellTs); // ensure write is not eclipsed - byte[] byteTs = Bytes.toBytes(ts); - if (mutation != null) { - if (mutation instanceof Put) { - updateCellTimestamps(mutation.getFamilyCellMap().values(), byteTs); - } - // And else 'delete' is not needed since it already does a second get, and sets the - // timestamp from get (see prepareDeleteTimestamps). - } else { - for (Mutation m: rowMutations.getMutations()) { - if (m instanceof Put) { - updateCellTimestamps(m.getFamilyCellMap().values(), byteTs); - } - } - // And else 'delete' is not needed since it already does a second get, and sets the - // timestamp from get (see prepareDeleteTimestamps). - } - // All edits for the given row (across all column families) must happen atomically. - Result r = null; - if (mutation != null) { - r = doBatchMutate(mutation, true).getResult(); - } else { - mutateRow(rowMutations); - } - this.checkAndMutateChecksPassed.increment(); - return new CheckAndMutateResult(true, r); - } + CheckAndMutateResult result = doBatchMutate(checkAndMutate, true).getCheckAndMutateResult(); + if (result.isSuccess()) { + this.checkAndMutateChecksPassed.increment(); + } else { this.checkAndMutateChecksFailed.increment(); - return new CheckAndMutateResult(false, null); - } finally { - rowLock.release(); } + return result; } finally { closeRegionOperation(); } } - private void checkMutationType(final Mutation mutation) - throws DoNotRetryIOException { - if (!(mutation instanceof Put) && !(mutation instanceof Delete) && - !(mutation instanceof Increment) && !(mutation instanceof Append)) { - throw new org.apache.hadoop.hbase.DoNotRetryIOException( - "Action must be Put or Delete or Increment or Delete"); - } - } - - private void checkRow(final Row action, final byte[] row) - throws DoNotRetryIOException { - if (!Bytes.equals(row, action.getRow())) { - throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's getRow must match"); - } - } - - private boolean matches(final CompareOperator op, final int compareResult) { - boolean matches = false; - switch (op) { - case LESS: - matches = compareResult < 0; - break; - case LESS_OR_EQUAL: - matches = compareResult <= 0; - break; - case EQUAL: - matches = compareResult == 0; - break; - case NOT_EQUAL: - matches = compareResult != 0; - break; - case GREATER_OR_EQUAL: - matches = compareResult >= 0; - break; - case GREATER: - matches = compareResult > 0; - break; - default: - throw new RuntimeException("Unknown Compare op " + op.name()); - } - return matches; - } - private OperationStatus doBatchMutate(Mutation mutation) throws IOException { return doBatchMutate(mutation, false); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java index ae5b6ec3c8ea..ac6b5dc382e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -124,6 +124,7 @@ private int getAbsoluteIndex(int index) { * in the same batch. These mutations are applied to the WAL and applied to the memstore as well. * The timestamp of the cells in the given Mutations MUST be obtained from the original mutation. * Note: The durability from CP will be replaced by the durability of corresponding mutation. + * Note: Currently only supports Put and Delete operations. * @param index the index that corresponds to the original mutation index in the batch * @param newOperations the Mutations to add */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java index 6beb7c78e2ca..33b2a6793a02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.Result; import org.apache.yetus.audience.InterfaceAudience; @@ -44,28 +45,38 @@ public class OperationStatus { public static final OperationStatus NOT_RUN = new OperationStatus(OperationStatusCode.NOT_RUN); private final OperationStatusCode code; + // For Increment/Append operations private final Result result; + // For CheckAndMutate operations + private final CheckAndMutateResult checkAndMutateResult; + private final String exceptionMsg; public OperationStatus(OperationStatusCode code) { - this(code, null, ""); + this(code, null, null, ""); } public OperationStatus(OperationStatusCode code, Result result) { - this(code, result, ""); + this(code, result, null, ""); + } + + public OperationStatus(OperationStatusCode code, CheckAndMutateResult checkAndMutateResult) { + this(code, null, checkAndMutateResult, ""); } public OperationStatus(OperationStatusCode code, String exceptionMsg) { - this(code, null, exceptionMsg); + this(code, null, null, exceptionMsg); } public OperationStatus(OperationStatusCode code, Exception e) { - this(code, null, (e == null) ? "" : e.getClass().getName() + ": " + e.getMessage()); + this(code, null, null, (e == null) ? "" : e.getClass().getName() + ": " + e.getMessage()); } - private OperationStatus(OperationStatusCode code, Result result, String exceptionMsg) { + private OperationStatus(OperationStatusCode code, Result result, + CheckAndMutateResult checkAndMutateResult, String exceptionMsg) { this.code = code; this.result = result; + this.checkAndMutateResult = checkAndMutateResult; this.exceptionMsg = exceptionMsg; } @@ -83,6 +94,13 @@ public Result getResult() { return result; } + /** + * @return checkAndMutateResult + */ + public CheckAndMutateResult getCheckAndMutateResult() { + return checkAndMutateResult; + } + /** * @return ExceptionMessge */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a59f5e609b17..91b71bfdd54e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -629,17 +629,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, List - * Note this supports only Put, Delete, Increment and Append mutations and will ignore other - * types passed. + * + * Please do not operate on a same column of a single row in a batch, we will not consider the + * previous operation in the same batch when performing the operations in the batch. + * * @param mutations the list of mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - OperationStatus[] batchMutate(Mutation[] mutations) - throws IOException; + OperationStatus[] batchMutate(Mutation[] mutations) throws IOException; /** * Atomically checks if a row/family/qualifier value matches the expected value and if it does, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 3779903f869a..1a3cf9b67c3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -53,6 +52,8 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; @@ -86,7 +87,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; -import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -127,7 +127,6 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -427,8 +426,7 @@ private enum OpType { SCAN("scan"), PUT("put"), DELETE("delete"), - CHECK_AND_PUT("checkAndPut"), - CHECK_AND_DELETE("checkAndDelete"), + CHECK_AND_MUTATE("checkAndMutate"), APPEND("append"), INCREMENT("increment"); @@ -1497,48 +1495,82 @@ public void preBatchMutate(ObserverContext c, TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); User user = getActiveUser(c); for (int i = 0; i < miniBatchOp.size(); i++) { - Mutation m = miniBatchOp.getOperation(i); - if (m.getAttribute(CHECK_COVERING_PERM) != null) { - // We have a failure with table, cf and q perm checks and now giving a chance for cell - // perm check - OpType opType; - long timestamp; - if (m instanceof Put) { - checkForReservedTagPresence(user, m); - opType = OpType.PUT; - timestamp = m.getTimestamp(); - } else if (m instanceof Delete) { - opType = OpType.DELETE; - timestamp = m.getTimestamp(); - } else if (m instanceof Increment) { - opType = OpType.INCREMENT; - timestamp = ((Increment) m).getTimeRange().getMax(); - } else if (m instanceof Append) { - opType = OpType.APPEND; - timestamp = ((Append) m).getTimeRange().getMax(); - } else { - // If the operation type is not Put/Delete/Increment/Append, do nothing - continue; - } - AuthResult authResult = null; - if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(), - m.getFamilyCellMap(), timestamp, Action.WRITE)) { - authResult = AuthResult.allow(opType.toString(), "Covering cell set", - user, Action.WRITE, table, m.getFamilyCellMap()); + Mutation mutation = miniBatchOp.getOperation(i); + if (mutation.getAttribute(CHECK_COVERING_PERM) != null) { + if (mutation instanceof CheckAndMutate) { + CheckAndMutate checkAndMutate = (CheckAndMutate) mutation; + // We had failure with table, cf and q perm checks and now giving a chance for cell + // perm check on the condition + Map> families = makeFamilyMap( + checkAndMutate.getFamily(), checkAndMutate.getQualifier()); + AuthResult authResult; + if (checkCoveringPermission(user, OpType.CHECK_AND_MUTATE, c.getEnvironment(), + checkAndMutate.getRow(), families, HConstants.LATEST_TIMESTAMP, Action.READ)) { + authResult = AuthResult.allow(OpType.CHECK_AND_MUTATE.toString(), + "Covering cell set", user, Action.READ, table, families); + } else { + authResult = AuthResult.deny(OpType.CHECK_AND_MUTATE.toString(), + "Covering cell set", user, Action.READ, table, families); + } + AccessChecker.logResult(authResult); + if (authorizationEnabled && !authResult.isAllowed()) { + throw new AccessDeniedException("Insufficient permissions " + + authResult.toContextString()); + } + + // For the mutations in CheckAndMutate + for (Mutation m : checkAndMutate.getMutations()) { + if (m.getAttribute(CHECK_COVERING_PERM) != null) { + // We have a failure with table, cf and q perm checks and now giving a chance for + // cell perm check + checkCoveringPermission(c.getEnvironment(), user, table, m); + } + } } else { - authResult = AuthResult.deny(opType.toString(), "Covering cell set", - user, Action.WRITE, table, m.getFamilyCellMap()); - } - AccessChecker.logResult(authResult); - if (authorizationEnabled && !authResult.isAllowed()) { - throw new AccessDeniedException("Insufficient permissions " - + authResult.toContextString()); + // We have a failure with table, cf and q perm checks and now giving a chance for cell + // perm check + checkCoveringPermission(c.getEnvironment(), user, table, mutation); } } } } } + private void checkCoveringPermission(RegionCoprocessorEnvironment env, User user, + TableName table, Mutation mutation) throws IOException { + OpType opType; + long timestamp; + if (mutation instanceof Put) { + opType = OpType.PUT; + timestamp = mutation.getTimestamp(); + } else if (mutation instanceof Delete) { + opType = OpType.DELETE; + timestamp = mutation.getTimestamp(); + } else if (mutation instanceof Increment) { + opType = OpType.INCREMENT; + timestamp = ((Increment) mutation).getTimeRange().getMax(); + } else if (mutation instanceof Append) { + opType = OpType.APPEND; + timestamp = ((Append) mutation).getTimeRange().getMax(); + } else { + throw new AssertionError("Unexpected mutation type"); + } + AuthResult authResult; + if (checkCoveringPermission(user, opType, env, mutation.getRow(), mutation.getFamilyCellMap(), + timestamp, Action.WRITE)) { + authResult = AuthResult.allow(opType.toString(), "Covering cell set", user, + Action.WRITE, table, mutation.getFamilyCellMap()); + } else { + authResult = AuthResult.deny(opType.toString(), "Covering cell set", user, + Action.WRITE, table, mutation.getFamilyCellMap()); + } + AccessChecker.logResult(authResult); + if (authorizationEnabled && !authResult.isAllowed()) { + throw new AccessDeniedException("Insufficient permissions " + + authResult.toContextString()); + } + } + @Override public void postDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final Durability durability) @@ -1549,125 +1581,74 @@ public void postDelete(final ObserverContext c, } @Override - public boolean preCheckAndPut(final ObserverContext c, - final byte [] row, final byte [] family, final byte [] qualifier, - final CompareOperator op, - final ByteArrayComparable comparator, final Put put, - final boolean result) throws IOException { + public CheckAndMutateResult preCheckAndMutate(ObserverContext c, + CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException { User user = getActiveUser(c); - checkForReservedTagPresence(user, put); - - // Require READ and WRITE permissions on the table, CF, and KV to update RegionCoprocessorEnvironment env = c.getEnvironment(); - Map> families = makeFamilyMap(family, qualifier); - AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, - user, env, families, Action.READ, Action.WRITE); + + // Require READ permissions on the table, CF, and qualifier of the condition + Map> families = makeFamilyMap(checkAndMutate.getFamily(), + checkAndMutate.getQualifier()); + AuthResult authResult = permissionGranted(OpType.CHECK_AND_MUTATE, user, env, families, + Action.READ); AccessChecker.logResult(authResult); if (!authResult.isAllowed()) { if (cellFeaturesEnabled && !compatibleEarlyTermination) { - put.setAttribute(CHECK_COVERING_PERM, TRUE); + checkAndMutate.setAttribute(CHECK_COVERING_PERM, TRUE); } else if (authorizationEnabled) { throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString()); } } - byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL); - if (bytes != null) { - if (cellFeaturesEnabled) { - addCellPermissions(bytes, put.getFamilyCellMap()); + // For the mutations in CheckAndMutate + for (Mutation mutation : checkAndMutate.getMutations()) { + OpType opType; + if (mutation instanceof Delete) { + opType = OpType.DELETE; + Delete delete = (Delete) mutation; + // An ACL on a delete is useless, we shouldn't allow it + if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) { + throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " + + delete.toString()); + } } else { - throw new DoNotRetryIOException("Cell ACLs cannot be persisted"); - } - } - return result; - } + if (mutation instanceof Put) { + opType = OpType.PUT; + } else if (mutation instanceof Increment) { + opType = OpType.INCREMENT; + } else if (mutation instanceof Append) { + opType = OpType.APPEND; + } else { + throw new AssertionError("Unexpected mutation type"); + } - @Override - public boolean preCheckAndPutAfterRowLock(final ObserverContext c, - final byte[] row, final byte[] family, final byte[] qualifier, - final CompareOperator opp, final ByteArrayComparable comparator, final Put put, - final boolean result) throws IOException { - if (put.getAttribute(CHECK_COVERING_PERM) != null) { - // We had failure with table, cf and q perm checks and now giving a chance for cell - // perm check - TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); - Map> families = makeFamilyMap(family, qualifier); - AuthResult authResult = null; - User user = getActiveUser(c); - if (checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families, - HConstants.LATEST_TIMESTAMP, Action.READ)) { - authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), - "Covering cell set", user, Action.READ, table, families); - } else { - authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), - "Covering cell set", user, Action.READ, table, families); - } - AccessChecker.logResult(authResult); - if (authorizationEnabled && !authResult.isAllowed()) { - throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString()); - } - } - return result; - } + checkForReservedTagPresence(user, mutation); - @Override - public boolean preCheckAndDelete(final ObserverContext c, - final byte [] row, final byte [] family, final byte [] qualifier, - final CompareOperator op, - final ByteArrayComparable comparator, final Delete delete, - final boolean result) throws IOException { - // An ACL on a delete is useless, we shouldn't allow it - if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) { - throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " + - delete.toString()); - } - // Require READ and WRITE permissions on the table, CF, and the KV covered - // by the delete - RegionCoprocessorEnvironment env = c.getEnvironment(); - Map> families = makeFamilyMap(family, qualifier); - User user = getActiveUser(c); - AuthResult authResult = permissionGranted( - OpType.CHECK_AND_DELETE, user, env, families, Action.READ, Action.WRITE); - AccessChecker.logResult(authResult); - if (!authResult.isAllowed()) { - if (cellFeaturesEnabled && !compatibleEarlyTermination) { - delete.setAttribute(CHECK_COVERING_PERM, TRUE); - } else if (authorizationEnabled) { - throw new AccessDeniedException("Insufficient permissions " + - authResult.toContextString()); + byte[] bytes = mutation.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL); + if (bytes != null) { + if (cellFeaturesEnabled) { + addCellPermissions(bytes, mutation.getFamilyCellMap()); + } else { + throw new DoNotRetryIOException("Cell ACLs cannot be persisted"); + } + } } - } - return result; - } - @Override - public boolean preCheckAndDeleteAfterRowLock( - final ObserverContext c, final byte[] row, - final byte[] family, final byte[] qualifier, final CompareOperator op, - final ByteArrayComparable comparator, final Delete delete, final boolean result) - throws IOException { - if (delete.getAttribute(CHECK_COVERING_PERM) != null) { - // We had failure with table, cf and q perm checks and now giving a chance for cell - // perm check - TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); - Map> families = makeFamilyMap(family, qualifier); - AuthResult authResult = null; - User user = getActiveUser(c); - if (checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(), - row, families, HConstants.LATEST_TIMESTAMP, Action.READ)) { - authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), - "Covering cell set", user, Action.READ, table, families); - } else { - authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), - "Covering cell set", user, Action.READ, table, families); - } + // Require WRITE permissions on the table, CF, and qualifier of the mutation + authResult = permissionGranted(opType, user, env, mutation.getFamilyCellMap(), Action.WRITE); AccessChecker.logResult(authResult); - if (authorizationEnabled && !authResult.isAllowed()) { - throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString()); + if (!authResult.isAllowed()) { + if (cellFeaturesEnabled && !compatibleEarlyTermination) { + mutation.setAttribute(CHECK_COVERING_PERM, TRUE); + } else if (authorizationEnabled) { + throw new AccessDeniedException("Insufficient permissions " + + authResult.toContextString()); + } } } - return result; + + return null; } @Override @@ -1770,7 +1751,6 @@ private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell // Collect any ACLs from the old cell List tags = Lists.newArrayList(); List aclTags = Lists.newArrayList(); - ListMultimap perms = ArrayListMultimap.create(); if (oldCell != null) { Iterator tagIterator = PrivateCellUtil.tagsIterator(oldCell); while (tagIterator.hasNext()) { @@ -1795,15 +1775,7 @@ private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell tags.add(new ArrayBackedTag(PermissionStorage.ACL_TAG_TYPE, aclBytes)); } else { // No, use what we carried forward - if (perms != null) { - // TODO: If we collected ACLs from more than one tag we may have a - // List of size > 1, this can be collapsed into a single - // Permission - if (LOG.isTraceEnabled()) { - LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms); - } - tags.addAll(aclTags); - } + tags.addAll(aclTags); } // If we have no tags to add, just return diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index 37f25a83ea72..cbbd1e5c9979 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -288,79 +289,91 @@ public void preBatchMutate(ObserverContext c, // TODO this can be made as a global LRU cache at HRS level? Map> labelCache = new HashMap<>(); for (int i = 0; i < miniBatchOp.size(); i++) { - Mutation m = miniBatchOp.getOperation(i); - CellVisibility cellVisibility = null; - try { - cellVisibility = m.getCellVisibility(); - } catch (DeserializationException de) { - miniBatchOp.setOperationStatus(i, - new OperationStatus(SANITY_CHECK_FAILURE, de.getMessage())); - continue; + Mutation mutation = miniBatchOp.getOperation(i); + if (mutation instanceof CheckAndMutate) { + for (Mutation m : ((CheckAndMutate) mutation).getMutations()) { + sanityCheck(miniBatchOp, i, m, labelCache); + } + } else { + sanityCheck(miniBatchOp, i, mutation, labelCache); + } + } + } + + private void sanityCheck(MiniBatchOperationInProgress miniBatchOp, + int i, Mutation mutation, Map> labelCache) throws IOException { + CellVisibility cellVisibility; + try { + cellVisibility = mutation.getCellVisibility(); + } catch (DeserializationException e) { + miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE, e.getMessage())); + return; + } + boolean sanityFailure = false; + boolean modifiedTagFound = false; + Pair pair = new Pair<>(false, null); + for (CellScanner cellScanner = mutation.cellScanner(); cellScanner.advance();) { + pair = checkForReservedVisibilityTagPresence(cellScanner.current(), pair); + if (!pair.getFirst()) { + // Don't disallow reserved tags if authorization is disabled + if (authorizationEnabled) { + miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE, + "Mutation contains cell with reserved type tag")); + sanityFailure = true; + } + break; + } else { + // Indicates that the cell has a the tag which was modified in the src replication cluster + Tag tag = pair.getSecond(); + if (cellVisibility == null && tag != null) { + // May need to store only the first one + cellVisibility = new CellVisibility(Tag.getValueAsString(tag)); + modifiedTagFound = true; + } } - boolean sanityFailure = false; - boolean modifiedTagFound = false; - Pair pair = new Pair<>(false, null); - for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { - pair = checkForReservedVisibilityTagPresence(cellScanner.current(), pair); - if (!pair.getFirst()) { - // Don't disallow reserved tags if authorization is disabled - if (authorizationEnabled) { + } + // Add the cell visibility tags to the cells for Put and Delete. For Increment/Append, we do + // this in postIncrementBeforeWAL/postAppendBeforeWAL + if (!sanityFailure && (mutation instanceof Put || mutation instanceof Delete)) { + if (cellVisibility != null) { + String labelsExp = cellVisibility.getExpression(); + List visibilityTags = labelCache.get(labelsExp); + if (visibilityTags == null) { + // Don't check user auths for labels with Mutations when the user is super user + boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser()); + try { + visibilityTags = this.visibilityLabelService.createVisibilityExpTags(labelsExp, true, + authCheck); + } catch (InvalidLabelException e) { miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE, - "Mutation contains cell with reserved type tag")); - sanityFailure = true; + e.getMessage())); } - break; - } else { - // Indicates that the cell has a the tag which was modified in the src replication cluster - Tag tag = pair.getSecond(); - if (cellVisibility == null && tag != null) { - // May need to store only the first one - cellVisibility = new CellVisibility(Tag.getValueAsString(tag)); - modifiedTagFound = true; + if (visibilityTags != null) { + labelCache.put(labelsExp, visibilityTags); } } - } - if (!sanityFailure && (m instanceof Put || m instanceof Delete)) { - if (cellVisibility != null) { - String labelsExp = cellVisibility.getExpression(); - List visibilityTags = labelCache.get(labelsExp); - if (visibilityTags == null) { - // Don't check user auths for labels with Mutations when the user is super user - boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser()); - try { - visibilityTags = this.visibilityLabelService.createVisibilityExpTags(labelsExp, true, - authCheck); - } catch (InvalidLabelException e) { - miniBatchOp.setOperationStatus(i, - new OperationStatus(SANITY_CHECK_FAILURE, e.getMessage())); - } - if (visibilityTags != null) { - labelCache.put(labelsExp, visibilityTags); + if (visibilityTags != null) { + List updatedCells = new ArrayList<>(); + for (CellScanner cellScanner = mutation.cellScanner(); cellScanner.advance();) { + Cell cell = cellScanner.current(); + List tags = PrivateCellUtil.getTags(cell); + if (modifiedTagFound) { + // Rewrite the tags by removing the modified tags. + removeReplicationVisibilityTag(tags); } + tags.addAll(visibilityTags); + Cell updatedCell = PrivateCellUtil.createCell(cell, tags); + updatedCells.add(updatedCell); } - if (visibilityTags != null) { - List updatedCells = new ArrayList<>(); - for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { - Cell cell = cellScanner.current(); - List tags = PrivateCellUtil.getTags(cell); - if (modifiedTagFound) { - // Rewrite the tags by removing the modified tags. - removeReplicationVisibilityTag(tags); - } - tags.addAll(visibilityTags); - Cell updatedCell = PrivateCellUtil.createCell(cell, tags); - updatedCells.add(updatedCell); - } - m.getFamilyCellMap().clear(); - // Clear and add new Cells to the Mutation. - for (Cell cell : updatedCells) { - if (m instanceof Put) { - Put p = (Put) m; - p.add(cell); - } else { - Delete d = (Delete) m; - d.add(cell); - } + mutation.getFamilyCellMap().clear(); + // Clear and add new Cells to the Mutation. + for (Cell cell : updatedCells) { + if (mutation instanceof Put) { + Put p = (Put) mutation; + p.add(cell); + } else { + Delete d = (Delete) mutation; + d.add(cell); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java index d69a8c7483c7..cc6305d12c17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java @@ -113,6 +113,7 @@ public boolean reseek(Cell seekCell) { return true; } } + current = null; return false; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 07273853f82e..e432acdda1d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -777,14 +777,26 @@ public boolean hadPreBatchMutate() { return ctPreBatchMutate.get() > 0; } + public int getPreBatchMutate() { + return ctPreBatchMutate.get(); + } + public boolean hadPostBatchMutate() { return ctPostBatchMutate.get() > 0; } + public int getPostBatchMutate() { + return ctPostBatchMutate.get(); + } + public boolean hadPostBatchMutateIndispensably() { return ctPostBatchMutateIndispensably.get() > 0; } + public int getPostBatchMutateIndispensably() { + return ctPostBatchMutateIndispensably.get(); + } + public boolean hadPostStartRegionOperation() { return ctPostStartRegionOperation.get() > 0; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index b679c323fbf6..7a772cc84fc1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; @@ -241,14 +242,16 @@ public void testIncrementHook() throws IOException { inc.addColumn(A, A, 1); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" }, - tableName, new Boolean[] { false, false, false }); + new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock", + "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" }, + tableName, new Boolean[] { false, false, false, false, false, false }); table.increment(inc); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" }, - tableName, new Boolean[] { true, true, true }); + new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock", + "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" }, + tableName, new Boolean[] { true, true, true, true, true, true }); } finally { util.deleteTable(tableName); table.close(); @@ -262,32 +265,37 @@ public void testCheckAndPutHooks() throws IOException { Put p = new Put(Bytes.toBytes(0)); p.addColumn(A, A, A); table.put(p); + p = new Put(Bytes.toBytes(0)); p.addColumn(A, A, A); + verifyMethodResult(SimpleRegionObserver.class, new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", "getPostCheckAndPutWithFilter", "getPreCheckAndMutate", - "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, - tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 }); + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1 }); - table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p); + table.checkAndMutate(CheckAndMutate + .newBuilder(Bytes.toBytes(0)).ifEquals(A, A, A).build(p)); verifyMethodResult(SimpleRegionObserver.class, new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", "getPostCheckAndPutWithFilter", "getPreCheckAndMutate", - "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, - tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 }); + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1, 2, 2, 2 }); - table.checkAndMutate(Bytes.toBytes(0), - new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)) - .thenPut(p); + table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) + .ifMatches(new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)).build(p)); verifyMethodResult(SimpleRegionObserver.class, new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", "getPostCheckAndPutWithFilter", "getPreCheckAndMutate", - "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, - tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 }); + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3 }); } finally { util.deleteTable(tableName); } @@ -301,35 +309,76 @@ public void testCheckAndDeleteHooks() throws IOException { Put p = new Put(Bytes.toBytes(0)); p.addColumn(A, A, A); table.put(p); + Delete d = new Delete(Bytes.toBytes(0)); - table.delete(d); + verifyMethodResult( SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate", - "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, - tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 }); + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1 }); - table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d); + table.checkAndMutate(CheckAndMutate + .newBuilder(Bytes.toBytes(0)).ifEquals(A, A, A).build(d)); verifyMethodResult( SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate", - "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, - tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 }); + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1, 2, 2, 2 }); - table.checkAndMutate(Bytes.toBytes(0), - new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)) - .thenDelete(d); + table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)). + ifMatches(new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)).build(d)); verifyMethodResult( SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate", - "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, - tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 }); + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3 }); + } finally { + util.deleteTable(tableName); + table.close(); + } + } + + @Test + public void testCheckAndIncrementHooks() throws Exception { + final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + + name.getMethodName()); + Table table = util.createTable(tableName, new byte[][] { A, B, C }); + try { + byte[] row = Bytes.toBytes(0); + + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 0, 0, 0, 0, 0, 0 }); + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(A, A, Bytes.toBytes(0L)) + .build(new Increment(row).addColumn(A, A, 1))); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 1, 1, 1, 1, 1, 1 }); + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(A, A, Bytes.toBytes(0L)) + .build(new Increment(row).addColumn(A, A, 1))); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 2, 2, 2, 2, 2, 2 }); } finally { util.deleteTable(tableName); table.close(); @@ -337,19 +386,56 @@ public void testCheckAndDeleteHooks() throws IOException { } @Test - public void testCheckAndMutateWithRowMutationsHooks() throws Exception { + public void testCheckAndAppendHooks() throws Exception { final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); Table table = util.createTable(tableName, new byte[][] { A, B, C }); try { byte[] row = Bytes.toBytes(0); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 0, 0, 0, 0, 0, 0 }); + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(A, A, HConstants.EMPTY_BYTE_ARRAY) + .build(new Append(row).addColumn(A, A, A))); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 1, 1, 1, 1, 1, 1 }); + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(A, A, HConstants.EMPTY_BYTE_ARRAY) + .build(new Append(row).addColumn(A, A, A))); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 2, 2, 2, 2, 2, 2 }); + } finally { + util.deleteTable(tableName); + table.close(); + } } + + @Test + public void testCheckAndRowMutationsHooks() throws Exception { + final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + + name.getMethodName()); + Table table = util.createTable(tableName, new byte[][] { A, B, C }); + try { + byte[] row = Bytes.toBytes(0); Put p = new Put(row).addColumn(A, A, A); table.put(p); + verifyMethodResult( SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", - "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, - tableName, new Integer[] { 0, 0, 0 }); + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 0, 0, 0, 1, 1, 1 }); table.checkAndMutate(CheckAndMutate.newBuilder(row) .ifEquals(A, A, A) @@ -358,8 +444,9 @@ public void testCheckAndMutateWithRowMutationsHooks() throws Exception { .add((Mutation) new Delete(row)))); verifyMethodResult( SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", - "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, - tableName, new Integer[] { 1, 1, 1 }); + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 1, 1, 1, 2, 2, 2 }); Object[] result = new Object[2]; table.batch(Arrays.asList(p, CheckAndMutate.newBuilder(row) @@ -369,8 +456,9 @@ public void testCheckAndMutateWithRowMutationsHooks() throws Exception { .add((Mutation) new Delete(row)))), result); verifyMethodResult( SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", - "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, - tableName, new Integer[] { 2, 2, 2 }); + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate", "getPreBatchMutate", + "getPostBatchMutate", "getPostBatchMutateIndispensably" }, + tableName, new Integer[] { 2, 2, 2, 4, 4, 4 }); } finally { util.deleteTable(tableName); table.close(); @@ -386,14 +474,18 @@ public void testAppendHook() throws IOException { app.addColumn(A, A, A); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName, - new Boolean[] { false, false, false }); + new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock", + "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" }, + tableName, + new Boolean[] { false, false, false, false, false, false }); table.append(app); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName, - new Boolean[] { true, true, true }); + new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock", + "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" }, + tableName, + new Boolean[] { true, true, true, true, true, true }); } finally { util.deleteTable(tableName); table.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index bbc73e3bda59..79055275678f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; @@ -2382,6 +2383,7 @@ public void testCheckAndMutateWithEmptyRowValue() throws IOException { CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Putting data in key put = new Put(row1); @@ -2391,17 +2393,20 @@ public void testCheckAndMutateWithEmptyRowValue() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // not empty anymore res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); Delete delete = new Delete(row1); delete.addColumn(fam1, qf1); res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); put = new Put(row1); put.addColumn(fam1, qf1, val2); @@ -2409,6 +2414,7 @@ public void testCheckAndMutateWithEmptyRowValue() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // checkAndDelete with correct value delete = new Delete(row1); @@ -2417,11 +2423,13 @@ public void testCheckAndMutateWithEmptyRowValue() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); delete = new Delete(row1); res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // checkAndPut looking for a null value put = new Put(row1); @@ -2430,6 +2438,7 @@ public void testCheckAndMutateWithEmptyRowValue() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1).ifNotExists(fam1, qf1) .build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); } @Test @@ -2453,6 +2462,7 @@ public void testCheckAndMutateWithWrongValue() throws IOException { CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // checkAndDelete with wrong value Delete delete = new Delete(row1); @@ -2460,6 +2470,7 @@ public void testCheckAndMutateWithWrongValue() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Putting data in key put = new Put(row1); @@ -2470,6 +2481,7 @@ public void testCheckAndMutateWithWrongValue() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // checkAndDelete with wrong value delete = new Delete(row1); @@ -2477,6 +2489,7 @@ public void testCheckAndMutateWithWrongValue() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(delete)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); } @Test @@ -2546,11 +2559,13 @@ public void testCheckAndMutateWithNonEqualCompareOp() throws IOException { CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS, val3).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS: original = val3, compare with val4, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS, val4).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS: original = val3, compare with val2, // succeed (now value = val2) @@ -2559,17 +2574,20 @@ public void testCheckAndMutateWithNonEqualCompareOp() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS, val2).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val3).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2, // succeed (value still = val2) res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val2).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1, // succeed (now value = val3) @@ -2578,16 +2596,19 @@ public void testCheckAndMutateWithNonEqualCompareOp() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val1).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER: original = val3, compare with val3, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER, val3).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER: original = val3, compare with val2, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER, val2).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER: original = val3, compare with val4, // succeed (now value = val2) @@ -2596,22 +2617,26 @@ public void testCheckAndMutateWithNonEqualCompareOp() throws IOException { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER, val4).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val1).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2, // succeed (value still = val2) res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val2).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val3).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); } @Test @@ -2751,6 +2776,7 @@ public void testCheckAndMutateWithFilters() throws Throwable { Bytes.toBytes("b")))) .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); @@ -2764,6 +2790,7 @@ public void testCheckAndMutateWithFilters() throws Throwable { Bytes.toBytes("c")))) .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))); assertFalse(res.isSuccess()); + assertNull(res.getResult()); assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty()); @@ -2776,6 +2803,7 @@ public void testCheckAndMutateWithFilters() throws Throwable { Bytes.toBytes("b")))) .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty()); @@ -2791,6 +2819,7 @@ public void testCheckAndMutateWithFilters() throws Throwable { .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))); assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); @@ -2815,6 +2844,7 @@ public void testCheckAndMutateWithFiltersAndTimeRange() throws Throwable { .timeRange(TimeRange.between(0, 101)) .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))); assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); @@ -2826,6 +2856,7 @@ public void testCheckAndMutateWithFiltersAndTimeRange() throws Throwable { .timeRange(TimeRange.between(0, 100)) .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))); assertFalse(res.isSuccess()); + assertNull(res.getResult()); assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty()); @@ -2839,6 +2870,7 @@ public void testCheckAndMutateWithFiltersAndTimeRange() throws Throwable { .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); @@ -2967,6 +2999,64 @@ public void testCheckAndAppend() throws Throwable { assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); } + @Test + public void testCheckAndIncrementAndAppend() throws Throwable { + // Setting up region + this.region = initHRegion(tableName, method, CONF, fam1); + + // CheckAndMutate with Increment and Append + CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(fam1, qual) + .build(new RowMutations(row) + .add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L)) + .add((Mutation) new Append(row).addColumn(fam1, qual2, Bytes.toBytes("a"))) + ); + + CheckAndMutateResult result = region.checkAndMutate(checkAndMutate); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(fam1, qual1))); + assertEquals("a", Bytes.toString(result.getResult().getValue(fam1, qual2))); + + Result r = region.get(new Get(row)); + assertEquals(1L, Bytes.toLong(r.getValue(fam1, qual1))); + assertEquals("a", Bytes.toString(r.getValue(fam1, qual2))); + + // Set return results to false + checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(fam1, qual) + .build(new RowMutations(row) + .add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L).setReturnResults(false)) + .add((Mutation) new Append(row).addColumn(fam1, qual2, Bytes.toBytes("a")) + .setReturnResults(false)) + ); + + result = region.checkAndMutate(checkAndMutate); + assertTrue(result.isSuccess()); + assertNull(result.getResult().getValue(fam1, qual1)); + assertNull(result.getResult().getValue(fam1, qual2)); + + r = region.get(new Get(row)); + assertEquals(2L, Bytes.toLong(r.getValue(fam1, qual1))); + assertEquals("aa", Bytes.toString(r.getValue(fam1, qual2))); + + checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(fam1, qual) + .build(new RowMutations(row) + .add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L)) + .add((Mutation) new Append(row).addColumn(fam1, qual2, Bytes.toBytes("a")) + .setReturnResults(false)) + ); + + result = region.checkAndMutate(checkAndMutate); + assertTrue(result.isSuccess()); + assertEquals(3L, Bytes.toLong(result.getResult().getValue(fam1, qual1))); + assertNull(result.getResult().getValue(fam1, qual2)); + + r = region.get(new Get(row)); + assertEquals(3L, Bytes.toLong(r.getValue(fam1, qual1))); + assertEquals("aaa", Bytes.toString(r.getValue(fam1, qual2))); + } + // //////////////////////////////////////////////////////////////////////////// // Delete tests // //////////////////////////////////////////////////////////////////////////// @@ -6966,6 +7056,164 @@ public void testCheckAndMutateTimestampsAreMonotonic() throws IOException { qual2, 0, qual2.length)); } + @Test + public void testBatchMutate() throws Exception { + final byte[] row = Bytes.toBytes("row"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final byte[] q5 = Bytes.toBytes("q5"); + final String v1 = "v1"; + final String v2 = "v2"; + + region = initHRegion(tableName, method, CONF, fam1); + + // Initial values + region.batchMutate(new Mutation[] { + new Put(row).addColumn(fam1, q2, Bytes.toBytes("toBeDeleted")), + new Put(row).addColumn(fam1, q3, Bytes.toBytes(5L)), + new Put(row).addColumn(fam1, q4, Bytes.toBytes("a")), + }); + + // Do batch mutate + OperationStatus[] statuses = region.batchMutate(new Mutation[] { + new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)), + new Delete(row).addColumns(fam1, q2), + new Increment(row).addColumn(fam1, q3, 1), + new Append(row).addColumn(fam1, q4, Bytes.toBytes("b")), + CheckAndMutate.newBuilder(row).ifNotExists(fam1, q5) + .build(new Put(row).addColumn(fam1, q5, Bytes.toBytes(v2))) + }); + assertEquals(6L, Bytes.toLong(statuses[2].getResult().getValue(fam1, q3))); + assertEquals("ab", Bytes.toString(statuses[3].getResult().getValue(fam1, q4))); + assertTrue(statuses[4].getCheckAndMutateResult().isSuccess()); + + // Verify the value + Result result = region.get(new Get(row)); + assertEquals(v1, Bytes.toString(result.getValue(fam1, q1))); + assertNull(result.getValue(fam1, q2)); + assertEquals(6L, Bytes.toLong(result.getValue(fam1, q3))); + assertEquals("ab", Bytes.toString(result.getValue(fam1, q4))); + assertEquals(v2, Bytes.toString(result.getValue(fam1, q5))); + } + + @Test + public void testBatchMutateInParallel() throws Exception { + final int numReaderThreads = 100; + final CountDownLatch latch = new CountDownLatch(numReaderThreads); + + final byte[] row = Bytes.toBytes("row"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final byte[] q5 = Bytes.toBytes("q5"); + final String v1 = "v1"; + final String v2 = "v2"; + final String v3 = "v3"; + + // We need to ensure the timestamp of the delete operation is more than the previous one + final AtomicLong deleteTimestamp = new AtomicLong(); + + region = initHRegion(tableName, method, CONF, fam1); + + // Initial values + region.batchMutate(new Mutation[] { + new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)) + .addColumn(fam1, q2, deleteTimestamp.getAndIncrement(), Bytes.toBytes(v2)) + .addColumn(fam1, q3, Bytes.toBytes(1L)) + .addColumn(fam1, q4, Bytes.toBytes("a")) + .addColumn(fam1, q5, Bytes.toBytes(v3)) + }); + + final AtomicReference assertionError = new AtomicReference<>(); + + // Writer thread + Thread writerThread = new Thread(() -> { + try { + while (true) { + // If all the reader threads finish, then stop the writer thread + if (latch.await(0, TimeUnit.MILLISECONDS)) { + return; + } + + // Execute the mutations. This should be done atomically + region.batchMutate(new Mutation[] { + new Put(row).addColumn(fam1, q1, Bytes.toBytes(v2)), + new Delete(row).addColumns(fam1, q2, deleteTimestamp.getAndIncrement()), + new Increment(row).addColumn(fam1, q3, 1L), + new Append(row).addColumn(fam1, q4, Bytes.toBytes("b")), + CheckAndMutate.newBuilder(row).ifEquals(fam1, q5, Bytes.toBytes(v3)).build( + new Put(row).addColumn(fam1, q5, Bytes.toBytes(v1))) }); + + // We need to ensure the timestamps of the Increment/Append/CheckAndPut operations are + // more than the previous ones + Result result = region.get(new Get(row) + .addColumn(fam1, q3).addColumn(fam1, q4).addColumn(fam1, q5)); + long tsIncrement = result.getColumnLatestCell(fam1, q3).getTimestamp(); + long tsAppend = result.getColumnLatestCell(fam1, q4).getTimestamp(); + long tsCheckAndPut = result.getColumnLatestCell(fam1, q5).getTimestamp(); + + // Put the initial values + region.batchMutate(new Mutation[] { + new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)) + .addColumn(fam1, q2, deleteTimestamp.getAndIncrement(), Bytes.toBytes(v2)) + .addColumn(fam1, q3, tsIncrement + 1, Bytes.toBytes(1L)) + .addColumn(fam1, q4, tsAppend + 1, Bytes.toBytes("a")) + .addColumn(fam1, q5, tsCheckAndPut + 1, Bytes.toBytes(v3)) + }); + } + } catch (Exception e) { + e.printStackTrace(); + assertionError.set(new AssertionError(e.getMessage())); + } + }); + writerThread.start(); + + // Reader threads + for (int i = 0; i < numReaderThreads; i++) { + new Thread(() -> { + try { + for (int j = 0; j < 10000; j++) { + // Verify the values + Result result = region.get(new Get(row)); + + // The values should be equals to either the initial values or the values after + // executing the mutations + String q1Value = Bytes.toString(result.getValue(fam1, q1)); + if (v1.equals(q1Value)) { + assertEquals(v2, Bytes.toString(result.getValue(fam1, q2))); + assertEquals(1L, Bytes.toLong(result.getValue(fam1, q3))); + assertEquals("a", Bytes.toString(result.getValue(fam1, q4))); + assertEquals(v3, Bytes.toString(result.getValue(fam1, q5))); + } else if (v2.equals(q1Value)) { + assertNull(Bytes.toString(result.getValue(fam1, q2))); + assertEquals(2L, Bytes.toLong(result.getValue(fam1, q3))); + assertEquals("ab", Bytes.toString(result.getValue(fam1, q4))); + assertEquals(v1, Bytes.toString(result.getValue(fam1, q5))); + } else { + fail("the qualifier " + q1 + " should be " + v1 + " or " + v2 + ", but " + q1Value); + } + } + } catch (Exception e) { + e.printStackTrace(); + assertionError.set(new AssertionError(e.getMessage())); + } catch (AssertionError e) { + assertionError.set(e); + } + + latch.countDown(); + }).start(); + } + + writerThread.join(); + + if (assertionError.get() != null) { + throw assertionError.get(); + } + } + @Test public void testBatchMutateWithWrongRegionException() throws Exception { final byte[] a = Bytes.toBytes("a");