From 1ad48c1ebc7afa3338e35cc3bca41f7537b0476e Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Tue, 2 Jul 2019 14:32:24 -0700 Subject: [PATCH] HBASE-22622 WALKey Extended Attributes (#352) Signed-off-by: Andrew Purtell --- .../src/main/protobuf/WAL.proto | 18 ++-- hbase-protocol/src/main/protobuf/WAL.proto | 6 +- .../hbase/regionserver/wal/WALUtil.java | 24 ++++-- .../org/apache/hadoop/hbase/wal/WALKey.java | 21 +++++ .../apache/hadoop/hbase/wal/WALKeyImpl.java | 85 ++++++++++++++++--- .../regionserver/TestWALEntryStream.java | 35 ++++++++ 6 files changed, 159 insertions(+), 30 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index 08d4741aa4bc..35a179ce9024 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -63,14 +63,20 @@ message WALKey { optional uint64 nonceGroup = 9; optional uint64 nonce = 10; optional uint64 orig_sequence_number = 11; + repeated Attribute extended_attributes = 12; -/* - optional CustomEntryType custom_entry_type = 9; + /* + optional CustomEntryType custom_entry_type = 9; - enum CustomEntryType { - COMPACTION = 0; - } -*/ + enum CustomEntryType { + COMPACTION = 0; + } + */ +} + +message Attribute { + required string key = 1; + required bytes value = 2; } enum ScopeType { diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index c1d465ad952e..7272fa9cfca0 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -62,7 +62,7 @@ message WALKey { optional uint64 nonceGroup = 9; optional uint64 nonce = 10; optional uint64 orig_sequence_number = 11; - + repeated Attribute extended_attributes = 12; /* optional CustomEntryType custom_entry_type = 9; @@ -71,6 +71,10 @@ message WALKey { } */ } +message Attribute { + required string key = 1; + required bytes value = 2; +} enum ScopeType { REPLICATION_SCOPE_LOCAL = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index b76670e21bf5..1808cd67b1a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Map; import java.util.NavigableMap; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; @@ -71,7 +72,7 @@ public static WALKeyImpl writeCompactionMarker(WAL wal, MultiVersionConcurrencyControl mvcc) throws IOException { WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc); + writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } @@ -87,7 +88,7 @@ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) throws IOException { WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri, - WALEdit.createFlushWALEdit(hri, f), mvcc, sync); + WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); } @@ -103,7 +104,7 @@ public static WALKeyImpl writeRegionEventMarker(WAL wal, final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc) throws IOException { WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, - WALEdit.createRegionEventWALEdit(hri, r), mvcc); + WALEdit.createRegionEventWALEdit(hri, r), mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); } @@ -125,7 +126,7 @@ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) throws IOException { WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc); + writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); } @@ -133,11 +134,15 @@ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, } private static WALKeyImpl writeMarker(final WAL wal, - final NavigableMap replicationScope, final RegionInfo hri, - final WALEdit edit, final MultiVersionConcurrencyControl mvcc) + final NavigableMap replicationScope, + final RegionInfo hri, + final WALEdit edit, + final MultiVersionConcurrencyControl mvcc, + final Map extendedAttributes) throws IOException { // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT - return doFullAppendTransaction(wal, replicationScope, hri, edit, mvcc, true); + return doFullAppendTransaction(wal, replicationScope, hri, + edit, mvcc, extendedAttributes, true); } /** @@ -150,11 +155,12 @@ private static WALKeyImpl writeMarker(final WAL wal, */ public static WALKeyImpl doFullAppendTransaction(final WAL wal, final NavigableMap replicationScope, final RegionInfo hri, - final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync) + final WALEdit edit, final MultiVersionConcurrencyControl mvcc, + final Map extendedAttributes, final boolean sync) throws IOException { // TODO: Pass in current time to use? WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), - System.currentTimeMillis(), mvcc, replicationScope); + System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes); long trx = MultiVersionConcurrencyControl.NONE; try { trx = wal.append(hri, walKey, edit, false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index af3cf7115eca..c541cc0a80d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -86,6 +86,21 @@ default long getNonce() { */ long getOrigLogSeqNum(); + /** + * Return a named String value injected into the WALKey during processing, such as by a + * coprocessor + * @param attributeKey The key of a key / value pair + */ + default byte[] getExtendedAttribute(String attributeKey){ + return null; + } + + /** + * Returns a map of all extended attributes injected into this WAL key. + */ + default Map getExtendedAttributes() { + return new HashMap<>(); + } /** * Produces a string map for this key. Useful for programmatic use and * manipulation of the data stored in an WALKeyImpl, for example, printing @@ -98,6 +113,12 @@ default Map toStringMap() { stringMap.put("table", getTableName()); stringMap.put("region", Bytes.toStringBinary(getEncodedRegionName())); stringMap.put("sequence", getSequenceId()); + Map extendedAttributes = getExtendedAttributes(); + if (extendedAttributes != null){ + for (Map.Entry entry : extendedAttributes.entrySet()){ + stringMap.put(entry.getKey(), Bytes.toStringBinary(entry.getValue())); + } + } return stringMap; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java index c6aa92ead200..fc84d8e24526 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java @@ -19,11 +19,13 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; + import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -116,14 +118,16 @@ public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) */ private MultiVersionConcurrencyControl.WriteEntry writeEntry; + private Map extendedAttributes; + public WALKeyImpl() { init(null, null, 0L, HConstants.LATEST_TIMESTAMP, - new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); + new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null, null); } public WALKeyImpl(final NavigableMap replicationScope) { init(null, null, 0L, HConstants.LATEST_TIMESTAMP, - new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope); + new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope, null); } @VisibleForTesting @@ -132,7 +136,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, lon List clusterIds = new ArrayList<>(1); clusterIds.add(clusterId); init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE, - HConstants.NO_NONCE, null, null); + HConstants.NO_NONCE, null, null, null); } @VisibleForTesting @@ -141,7 +145,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, lon List clusterIds = new ArrayList<>(1); clusterIds.add(clusterId); init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE, - HConstants.NO_NONCE, mvcc, null); + HConstants.NO_NONCE, mvcc, null, null); } // TODO: Fix being able to pass in sequenceid. @@ -153,20 +157,28 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, fin EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, - null, null); + null, null, null); } // TODO: Fix being able to pass in sequenceid. public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, final NavigableMap replicationScope) { init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, - HConstants.NO_NONCE, null, replicationScope); + HConstants.NO_NONCE, null, replicationScope, null); } public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope) { init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, - HConstants.NO_NONCE, mvcc, replicationScope); + HConstants.NO_NONCE, mvcc, replicationScope, null); + } + + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, + MultiVersionConcurrencyControl mvcc, + final NavigableMap replicationScope, + Map extendedAttributes) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, + HConstants.NO_NONCE, mvcc, replicationScope, extendedAttributes); } public WALKeyImpl(final byte[] encodedRegionName, @@ -180,7 +192,7 @@ public WALKeyImpl(final byte[] encodedRegionName, EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, - mvcc, null); + mvcc, null, null); } /** @@ -206,7 +218,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, lon final long now, List clusterIds, long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope) { init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, - replicationScope); + replicationScope, null); } /** @@ -231,7 +243,8 @@ public WALKeyImpl(final byte[] encodedRegionName, long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) { - init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null); + init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, + nonce, mvcc, null, null); } /** @@ -252,7 +265,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, List clusterIds, long nonceGroup, final long nonce, final MultiVersionConcurrencyControl mvcc) { init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, - null); + null, null); } /** @@ -275,7 +288,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long nonce, final MultiVersionConcurrencyControl mvcc, NavigableMap replicationScope) { init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, - replicationScope); + replicationScope, null); } /** @@ -304,7 +317,22 @@ public WALKeyImpl(final byte[] encodedRegionName, EMPTY_UUIDS, nonceGroup, nonce, - mvcc, null); + mvcc, null, null); + } + + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, + final long now, List clusterIds, long nonceGroup, + final long nonce, final MultiVersionConcurrencyControl mvcc, + NavigableMap replicationScope, + Map extendedAttributes){ + init(encodedRegionName, + tablename, + NO_SEQUENCE_ID, + now, + clusterIds, + nonceGroup, + nonce, + mvcc, replicationScope, extendedAttributes); } @InterfaceAudience.Private @@ -316,7 +344,8 @@ protected void init(final byte[] encodedRegionName, long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc, - NavigableMap replicationScope) { + NavigableMap replicationScope, + Map extendedAttributes) { this.sequenceId = logSeqNum; this.writeTime = now; this.clusterIds = clusterIds; @@ -329,6 +358,7 @@ protected void init(final byte[] encodedRegionName, setSequenceId(logSeqNum); } this.replicationScope = replicationScope; + this.extendedAttributes = extendedAttributes; } // For deserialization. DO NOT USE. See setWriteEntry below. @@ -434,6 +464,17 @@ public UUID getOriginatingClusterId(){ return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0); } + @Override + public byte[] getExtendedAttribute(String attributeKey){ + return extendedAttributes != null ? extendedAttributes.get(attributeKey) : null; + } + + @Override + public Map getExtendedAttributes(){ + return extendedAttributes != null ? new HashMap(extendedAttributes) : + new HashMap(); + } + @Override public String toString() { return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + sequenceId; @@ -539,6 +580,14 @@ public WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor com .setScopeType(ScopeType.forNumber(e.getValue()))); } } + if (extendedAttributes != null){ + for (Map.Entry e : extendedAttributes.entrySet()){ + WALProtos.Attribute attr = WALProtos.Attribute.newBuilder(). + setKey(e.getKey()).setValue(compressor.compress(e.getValue(), + CompressionContext.DictionaryIndex.TABLE)).build(); + builder.addExtendedAttributes(attr); + } + } return builder; } @@ -573,6 +622,14 @@ public void readFieldsFromPb(WALProtos.WALKey walKey, if (walKey.hasOrigSequenceNumber()) { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } + if (walKey.getExtendedAttributesCount() > 0){ + this.extendedAttributes = new HashMap<>(walKey.getExtendedAttributesCount()); + for (WALProtos.Attribute attr : walKey.getExtendedAttributesList()){ + byte[] value = + uncompressor.uncompress(attr.getValue(), CompressionContext.DictionaryIndex.TABLE); + extendedAttributes.put(attr.getKey(), value); + } + } } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index f9f70cd323ca..a873320ef919 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -27,9 +27,13 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.NavigableMap; import java.util.OptionalLong; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; @@ -52,6 +56,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -64,6 +69,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -72,6 +78,8 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.mockito.Mockito; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + @Category({ ReplicationTests.class, LargeTests.class }) public class TestWALEntryStream { @@ -333,6 +341,33 @@ public void testEmptyStream() throws Exception { } } + @Test + public void testWALKeySerialization() throws Exception { + Map attributes = new HashMap(); + attributes.put("foo", Bytes.toBytes("foo-value")); + attributes.put("bar", Bytes.toBytes("bar-value")); + WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), new ArrayList(), 0L, 0L, + mvcc, scopes, attributes); + Assert.assertEquals(attributes, key.getExtendedAttributes()); + + WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor()); + WALProtos.WALKey serializedKey = builder.build(); + + WALKeyImpl deserializedKey = new WALKeyImpl(); + deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor()); + + //equals() only checks region name, sequence id and write time + Assert.assertEquals(key, deserializedKey); + //can't use Map.equals() because byte arrays use reference equality + Assert.assertEquals(key.getExtendedAttributes().keySet(), + deserializedKey.getExtendedAttributes().keySet()); + for (Map.Entry entry : deserializedKey.getExtendedAttributes().entrySet()){ + Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue()); + } + Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes()); + } + private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));