Skip to content

Commit

Permalink
PHOENIX-5455 - IndexedKeyValue creation fails after HBASE-22034 (Geof…
Browse files Browse the repository at this point in the history
…frey Jacoby)
  • Loading branch information
s.kadam committed Aug 29, 2019
1 parent 250bf57 commit df3cc54
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,13 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
if (durability != Durability.SKIP_WAL) {
// we have all the WAL durability, so we just update the WAL entry and move on
for (Pair<Mutation, byte[]> entry : indexUpdates) {
edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
edit.add(IndexedKeyValue.newIndexedKeyValue(entry.getSecond(),
entry.getFirst()));
}
}
}
}
}

}

private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
Expand Down Expand Up @@ -629,21 +631,6 @@ private void doPostWithExceptions(ObserverContext<RegionCoprocessorEnvironment>
}
}

/**
* Search the {@link WALEdit} for the first {@link IndexedKeyValue} present
* @param edit {@link WALEdit}
* @return the first {@link IndexedKeyValue} in the {@link WALEdit} or <tt>null</tt> if not
* present
*/
private IndexedKeyValue getFirstIndexedKeyValue(WALEdit edit) {
for (Cell kv : edit.getCells()) {
if (kv instanceof IndexedKeyValue) {
return (IndexedKeyValue) kv;
}
}
return null;
}

/**
* Extract the index updates from the WAL Edit
* @param edit to search for index updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
Expand Down Expand Up @@ -50,13 +52,36 @@ private static int calcHashCode(ImmutableBytesPtr indexTableName, Mutation mutat
private boolean batchFinished = false;
private int hashCode;

public IndexedKeyValue() {}
public static IndexedKeyValue newIndexedKeyValue(byte[] bs, Mutation m){
Cell indexWALCell = adaptFirstCellFromMutation(m);
return new IndexedKeyValue(indexWALCell, bs, m);
}

private static Cell adaptFirstCellFromMutation(Mutation m) {
if (m != null && m.getFamilyCellMap() != null &&
m.getFamilyCellMap().firstEntry() != null &&
m.getFamilyCellMap().firstEntry().getValue() != null
&& m.getFamilyCellMap().firstEntry().getValue().get(0) != null) {
//have to replace the column family with WALEdit.METAFAMILY to make sure
//that IndexedKeyValues don't get replicated. The superclass KeyValue fields
//like row, qualifier and value are placeholders to prevent NPEs
// when using the KeyValue APIs. See PHOENIX-5188 / 5455
Cell mutationCell = m.getFamilyCellMap().firstEntry().getValue().get(0);
return CellUtil.createCell(m.getRow(), WALEdit.METAFAMILY,
mutationCell.getQualifierArray(), mutationCell.getTimestamp(),
mutationCell.getTypeByte(), mutationCell.getValueArray());
} else {
throw new IllegalArgumentException("Tried to create an IndexedKeyValue with a " +
"Mutation with no Cells!");
}

public IndexedKeyValue(byte[] bs, Mutation mutation) {
this.bytes = mutation.getRow();
this.offset = 0;
this.length = mutation.getRow().length;
}

//used for deserialization
public IndexedKeyValue() {}

private IndexedKeyValue(Cell c, byte[] bs, Mutation mutation){
super(c);
this.indexTableName = new ImmutableBytesPtr(bs);
this.mutation = mutation;
this.hashCode = calcHashCode(indexTableName, mutation);
Expand All @@ -70,74 +95,6 @@ public Mutation getMutation() {
return mutation;
}

/*
* Returns a faked column family for an IndexedKeyValue instance
*/
@Override
public byte [] getFamily() {
return WALEdit.METAFAMILY;
}

@Override
public byte[] getFamilyArray() {
return WALEdit.METAFAMILY;
}

/**
* @return Family offset
*/
@Override
public int getFamilyOffset() {
return 0;
}

/**
* @return Family length
*/
@Override
public byte getFamilyLength() {
return (byte) WALEdit.METAFAMILY.length;
}

@Override
public byte[] getQualifierArray() {
return COLUMN_QUALIFIER;
}

/**
* @return Qualifier offset
*/
@Override
public int getQualifierOffset() {
return 0;
}

/**
* @return Qualifier length
*/
@Override
public int getQualifierLength() {
return COLUMN_QUALIFIER.length;
}

@Override
public int getRowOffset() {
return this.offset;
}

@Override
public short getRowLength() {
return (short) this.length;
}

@Override
public int getKeyLength(){
//normally the key is row key + other key fields such as timestamp,
// but those aren't defined here because a Mutation can contain multiple,
// so we just return the length of the row key
return this.length;
}

@Override
public String toString() {
return "IndexWrite:\n\ttable: " + indexTableName + "\n\tmutation:" + mutation;
Expand Down Expand Up @@ -175,9 +132,9 @@ public int hashCode() {
}

/**
* Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done
* via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of
* {@link IndexedKeyValue}s.
* Internal write the underlying data for the entry - this does not do any special prefixing.
* Writing should be done via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure
* consistent reading/writing of {@link IndexedKeyValue}s.
*
* @param out
* to write data to. Does not close or flush the passed object.
Expand All @@ -190,25 +147,6 @@ void writeData(DataOutput out) throws IOException {
Bytes.writeByteArray(out, m.toByteArray());
}

/**
* This method shouldn't be used - you should use {@link KeyValueCodec#readKeyValue(DataInput)} instead. Its the
* complement to {@link #writeData(DataOutput)}.
*/
@SuppressWarnings("javadoc")
public void readFields(DataInput in) throws IOException {
this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
byte[] mutationData = Bytes.readByteArray(in);
MutationProto mProto = MutationProto.parseFrom(mutationData);
this.mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto);
this.hashCode = calcHashCode(indexTableName, mutation);
if (mutation != null){
bytes = mutation.getRow();
offset = 0;
length = bytes.length;
}

}

public boolean getBatchFinished() {
return this.batchFinished;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;

/**
* Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit}
Expand Down Expand Up @@ -70,8 +73,16 @@ public static KeyValue readKeyValue(DataInput in) throws IOException {
int length = in.readInt();
// its a special IndexedKeyValue
if (length == INDEX_TYPE_LENGTH_MARKER) {
IndexedKeyValue kv = new IndexedKeyValue();
kv.readFields(in);
ImmutableBytesPtr indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
byte[] mutationData = Bytes.readByteArray(in);
ClientProtos.MutationProto mProto = ClientProtos.MutationProto.parseFrom(mutationData);
Mutation mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto);
IndexedKeyValue kv = null;
if (mutation != null){
kv = IndexedKeyValue.newIndexedKeyValue(indexTableName.copyBytesIfNecessary(), mutation);
} else {
kv = new IndexedKeyValue();
}
return kv;
} else {
return KeyValue.create(length, in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.hadoop.hbase.regionserver.wal;

import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
import org.apache.phoenix.hbase.index.wal.KeyValueCodec;
import org.junit.Assert;
Expand All @@ -30,17 +30,45 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;


public class IndexedKeyValueTest {

private static final byte[] ROW_KEY = Bytes.toBytes("foo");
private static final byte[] FAMILY = Bytes.toBytes("family");
private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
private static final byte[] VALUE = Bytes.toBytes("value");
private static final byte[] TABLE_NAME = Bytes.toBytes("MyTableName");

@Test
public void testIndexedKeyValueExceptionWhenMutationEmpty() throws IOException {
boolean caughtNullMutation = false, caughtNullEntry = false;
try {
IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, null);
} catch (IllegalArgumentException iae){
caughtNullMutation = true;
}
try {
Mutation m = new Put(ROW_KEY);
IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, m);
} catch (IllegalArgumentException iae){
caughtNullEntry = true;
}
//no need to test adding a mutation with a Cell with just a row key; HBase will put in
//a default cell with family byte[0], qualifier and value of "", and LATEST_TIMESTAMP

Assert.assertTrue(caughtNullMutation & caughtNullEntry);

}

@Test
public void testIndexedKeyValuePopulatesKVFields() throws Exception {
byte[] row = Bytes.toBytes("foo");
byte[] tableNameBytes = Bytes.toBytes("MyTableName");
Mutation mutation = new Put(row);
IndexedKeyValue indexedKeyValue = new IndexedKeyValue(tableNameBytes, mutation);
testIndexedKeyValueHelper(indexedKeyValue, row, tableNameBytes, mutation);
byte[] row = (ROW_KEY);
Put mutation = new Put(row);
mutation.addColumn(FAMILY, QUALIFIER, VALUE);
IndexedKeyValue indexedKeyValue = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, mutation);
testIndexedKeyValueHelper(indexedKeyValue, row, TABLE_NAME, mutation);

//now serialize the IndexedKeyValue and make sure the deserialized copy also
//has all the right fields
Expand All @@ -51,17 +79,16 @@ public void testIndexedKeyValuePopulatesKVFields() throws Exception {
IndexedKeyValue deSerializedKV = (IndexedKeyValue)
KeyValueCodec.readKeyValue(new DataInputStream(
new ByteArrayInputStream(baos.toByteArray())));
testIndexedKeyValueHelper(deSerializedKV, row, tableNameBytes, mutation);
testIndexedKeyValueHelper(deSerializedKV, row, TABLE_NAME, mutation);

}

private void testIndexedKeyValueHelper(IndexedKeyValue indexedKeyValue, byte[] row, byte[] tableNameBytes, Mutation mutation) {
Assert.assertArrayEquals(row, indexedKeyValue.getRowArray());
Assert.assertEquals(0, indexedKeyValue.getRowOffset());
Assert.assertEquals(row.length, indexedKeyValue.getRowLength());
private void testIndexedKeyValueHelper(IndexedKeyValue indexedKeyValue, byte[] row,
byte[] tableNameBytes, Mutation mutation) {
Assert.assertArrayEquals(row, CellUtil.cloneRow(indexedKeyValue));
Assert.assertArrayEquals(tableNameBytes, indexedKeyValue.getIndexTable());
Assert.assertEquals(mutation.toString(), indexedKeyValue.getMutation().toString());
Assert.assertArrayEquals(WALEdit.METAFAMILY, indexedKeyValue.getFamilyArray());
Assert.assertArrayEquals(WALEdit.METAFAMILY, CellUtil.cloneFamily(indexedKeyValue));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
Expand Down Expand Up @@ -110,7 +111,8 @@ private List<WALEdit> getEdits() {

WALEdit justIndexUpdates = new WALEdit();
byte[] table = Bytes.toBytes("targetTable");
IndexedKeyValue ikv = new IndexedKeyValue(table, p);
Cell c = CellUtil.createCell(p.getRow());
IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(table, p);
justIndexUpdates.add(ikv);
edits.add(justIndexUpdates);

Expand Down

0 comments on commit df3cc54

Please sign in to comment.