Skip to content

Commit

Permalink
HBASE-22622 WALKey Extended Attributes (#352)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Purtell <apurtell@apache.org>
  • Loading branch information
gjacoby126 authored and apurtell committed Jul 19, 2019
1 parent 48c623c commit 1ad48c1
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 30 deletions.
18 changes: 12 additions & 6 deletions hbase-protocol-shaded/src/main/protobuf/WAL.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,20 @@ message WALKey {
optional uint64 nonceGroup = 9;
optional uint64 nonce = 10;
optional uint64 orig_sequence_number = 11;
repeated Attribute extended_attributes = 12;

/*
optional CustomEntryType custom_entry_type = 9;
/*
optional CustomEntryType custom_entry_type = 9;
enum CustomEntryType {
COMPACTION = 0;
}
*/
enum CustomEntryType {
COMPACTION = 0;
}
*/
}

message Attribute {
required string key = 1;
required bytes value = 2;
}

enum ScopeType {
Expand Down
6 changes: 5 additions & 1 deletion hbase-protocol/src/main/protobuf/WAL.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ message WALKey {
optional uint64 nonceGroup = 9;
optional uint64 nonce = 10;
optional uint64 orig_sequence_number = 11;

repeated Attribute extended_attributes = 12;
/*
optional CustomEntryType custom_entry_type = 9;
Expand All @@ -71,6 +71,10 @@ message WALKey {
}
*/
}
message Attribute {
required string key = 1;
required bytes value = 2;
}

enum ScopeType {
REPLICATION_SCOPE_LOCAL = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.NavigableMap;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -71,7 +72,7 @@ public static WALKeyImpl writeCompactionMarker(WAL wal,
MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc);
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
Expand All @@ -87,7 +88,7 @@ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer>
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri,
WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
Expand All @@ -103,7 +104,7 @@ public static WALKeyImpl writeRegionEventMarker(WAL wal,
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
WALEdit.createRegionEventWALEdit(hri, r), mvcc);
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
Expand All @@ -125,19 +126,23 @@ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc);
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
return walKey;
}

private static WALKeyImpl writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
final NavigableMap<byte[], Integer> replicationScope,
final RegionInfo hri,
final WALEdit edit,
final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes)
throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
return doFullAppendTransaction(wal, replicationScope, hri, edit, mvcc, true);
return doFullAppendTransaction(wal, replicationScope, hri,
edit, mvcc, extendedAttributes, true);
}

/**
Expand All @@ -150,11 +155,12 @@ private static WALKeyImpl writeMarker(final WAL wal,
*/
public static WALKeyImpl doFullAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes, final boolean sync)
throws IOException {
// TODO: Pass in current time to use?
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
System.currentTimeMillis(), mvcc, replicationScope);
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE;
try {
trx = wal.append(hri, walKey, edit, false);
Expand Down
21 changes: 21 additions & 0 deletions hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ default long getNonce() {
*/
long getOrigLogSeqNum();

/**
* Return a named String value injected into the WALKey during processing, such as by a
* coprocessor
* @param attributeKey The key of a key / value pair
*/
default byte[] getExtendedAttribute(String attributeKey){
return null;
}

/**
* Returns a map of all extended attributes injected into this WAL key.
*/
default Map<String, byte[]> getExtendedAttributes() {
return new HashMap<>();
}
/**
* Produces a string map for this key. Useful for programmatic use and
* manipulation of the data stored in an WALKeyImpl, for example, printing
Expand All @@ -98,6 +113,12 @@ default Map<String, Object> toStringMap() {
stringMap.put("table", getTableName());
stringMap.put("region", Bytes.toStringBinary(getEncodedRegionName()));
stringMap.put("sequence", getSequenceId());
Map<String, byte[]> extendedAttributes = getExtendedAttributes();
if (extendedAttributes != null){
for (Map.Entry<String, byte[]> entry : extendedAttributes.entrySet()){
stringMap.put(entry.getKey(), Bytes.toStringBinary(entry.getValue()));
}
}
return stringMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;

import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -116,14 +118,16 @@ public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry)
*/
private MultiVersionConcurrencyControl.WriteEntry writeEntry;

private Map<String, byte[]> extendedAttributes;

public WALKeyImpl() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null, null);
}

public WALKeyImpl(final NavigableMap<byte[], Integer> replicationScope) {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope, null);
}

@VisibleForTesting
Expand All @@ -132,7 +136,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, lon
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, null);
HConstants.NO_NONCE, null, null, null);
}

@VisibleForTesting
Expand All @@ -141,7 +145,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, lon
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, null);
HConstants.NO_NONCE, mvcc, null, null);
}

// TODO: Fix being able to pass in sequenceid.
Expand All @@ -153,20 +157,28 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, fin
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
null, null);
null, null, null);
}

// TODO: Fix being able to pass in sequenceid.
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, replicationScope);
HConstants.NO_NONCE, null, replicationScope, null);
}

public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope);
HConstants.NO_NONCE, mvcc, replicationScope, null);
}

public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc,
final NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope, extendedAttributes);
}

public WALKeyImpl(final byte[] encodedRegionName,
Expand All @@ -180,7 +192,7 @@ public WALKeyImpl(final byte[] encodedRegionName,
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
mvcc, null);
mvcc, null, null);
}

/**
Expand All @@ -206,7 +218,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, lon
final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
replicationScope, null);
}

/**
Expand All @@ -231,7 +243,8 @@ public WALKeyImpl(final byte[] encodedRegionName,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup,
nonce, mvcc, null, null);
}

/**
Expand All @@ -252,7 +265,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
null);
null, null);
}

/**
Expand All @@ -275,7 +288,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
final long nonce, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
replicationScope, null);
}

/**
Expand Down Expand Up @@ -304,7 +317,22 @@ public WALKeyImpl(final byte[] encodedRegionName,
EMPTY_UUIDS,
nonceGroup,
nonce,
mvcc, null);
mvcc, null, null);
}

public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes){
init(encodedRegionName,
tablename,
NO_SEQUENCE_ID,
now,
clusterIds,
nonceGroup,
nonce,
mvcc, replicationScope, extendedAttributes);
}

@InterfaceAudience.Private
Expand All @@ -316,7 +344,8 @@ protected void init(final byte[] encodedRegionName,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes) {
this.sequenceId = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
Expand All @@ -329,6 +358,7 @@ protected void init(final byte[] encodedRegionName,
setSequenceId(logSeqNum);
}
this.replicationScope = replicationScope;
this.extendedAttributes = extendedAttributes;
}

// For deserialization. DO NOT USE. See setWriteEntry below.
Expand Down Expand Up @@ -434,6 +464,17 @@ public UUID getOriginatingClusterId(){
return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0);
}

@Override
public byte[] getExtendedAttribute(String attributeKey){
return extendedAttributes != null ? extendedAttributes.get(attributeKey) : null;
}

@Override
public Map<String, byte[]> getExtendedAttributes(){
return extendedAttributes != null ? new HashMap<String, byte[]>(extendedAttributes) :
new HashMap<String, byte[]>();
}

@Override
public String toString() {
return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + sequenceId;
Expand Down Expand Up @@ -539,6 +580,14 @@ public WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor com
.setScopeType(ScopeType.forNumber(e.getValue())));
}
}
if (extendedAttributes != null){
for (Map.Entry<String, byte[]> e : extendedAttributes.entrySet()){
WALProtos.Attribute attr = WALProtos.Attribute.newBuilder().
setKey(e.getKey()).setValue(compressor.compress(e.getValue(),
CompressionContext.DictionaryIndex.TABLE)).build();
builder.addExtendedAttributes(attr);
}
}
return builder;
}

Expand Down Expand Up @@ -573,6 +622,14 @@ public void readFieldsFromPb(WALProtos.WALKey walKey,
if (walKey.hasOrigSequenceNumber()) {
this.origLogSeqNum = walKey.getOrigSequenceNumber();
}
if (walKey.getExtendedAttributesCount() > 0){
this.extendedAttributes = new HashMap<>(walKey.getExtendedAttributesCount());
for (WALProtos.Attribute attr : walKey.getExtendedAttributesList()){
byte[] value =
uncompressor.uncompress(attr.getValue(), CompressionContext.DictionaryIndex.TABLE);
extendedAttributes.put(attr.getKey(), value);
}
}
}

@Override
Expand Down
Loading

0 comments on commit 1ad48c1

Please sign in to comment.