Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-5455 IndexedKeyValue creation fails after HBASE-22034 #579

Merged
merged 1 commit into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,13 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
if (durability != Durability.SKIP_WAL) {
// we have all the WAL durability, so we just update the WAL entry and move on
for (Pair<Mutation, byte[]> entry : indexUpdates) {
edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
edit.add(IndexedKeyValue.newIndexedKeyValue(entry.getSecond(),
entry.getFirst()));
}
}
}
}
}

}

private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
Expand Down Expand Up @@ -622,21 +624,6 @@ private void doPostWithExceptions(ObserverContext<RegionCoprocessorEnvironment>
}
}

/**
* Search the {@link WALEdit} for the first {@link IndexedKeyValue} present
* @param edit {@link WALEdit}
* @return the first {@link IndexedKeyValue} in the {@link WALEdit} or <tt>null</tt> if not
* present
*/
private IndexedKeyValue getFirstIndexedKeyValue(WALEdit edit) {
for (Cell kv : edit.getCells()) {
if (kv instanceof IndexedKeyValue) {
return (IndexedKeyValue) kv;
}
}
return null;
}

/**
* Extract the index updates from the WAL Edit
* @param edit to search for index updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.phoenix.hbase.index.wal;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
Expand All @@ -34,8 +37,7 @@
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;

public class IndexedKeyValue extends KeyValue {
public static final byte [] COLUMN_QUALIFIER = Bytes.toBytes("INDEXEDKEYVALUE_FAKED_COLUMN");


private static int calcHashCode(ImmutableBytesPtr indexTableName, Mutation mutation) {
final int prime = 31;
int result = 1;
Expand All @@ -50,13 +52,39 @@ private static int calcHashCode(ImmutableBytesPtr indexTableName, Mutation mutat
private boolean batchFinished = false;
private int hashCode;

public IndexedKeyValue() {}
public static IndexedKeyValue newIndexedKeyValue(byte[] bs, Mutation m){
Cell indexWALCell = adaptFirstCellFromMutation(m);
return new IndexedKeyValue(indexWALCell, bs, m);
}

private static Cell adaptFirstCellFromMutation(Mutation m) {
if (m != null && m.getFamilyCellMap() != null &&
m.getFamilyCellMap().firstEntry() != null &&
m.getFamilyCellMap().firstEntry().getValue() != null
&& m.getFamilyCellMap().firstEntry().getValue().get(0) != null) {
//have to replace the column family with WALEdit.METAFAMILY to make sure
//that IndexedKeyValues don't get replicated. The superclass KeyValue fields
//like row, qualifier and value are placeholders to prevent NPEs
// when using the KeyValue APIs. See PHOENIX-5188 / 5455
Cell mutationCell = m.getFamilyCellMap().firstEntry().getValue().get(0);
CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
return builder.setFamily(WALEdit.METAFAMILY).
setQualifier(mutationCell.getQualifierArray()).
setRow(m.getRow()).
setTimestamp(mutationCell.getTimestamp()).
setValue(mutationCell.getValueArray()).setType(Cell.Type.Put).build();
} else {
throw new IllegalArgumentException("Tried to create an IndexedKeyValue with a " +
"Mutation with no Cells!");
}

public IndexedKeyValue(byte[] bs, Mutation mutation) {
this.bytes = mutation.getRow();
this.offset = 0;
this.length = mutation.getRow().length;
}

//used for deserialization
public IndexedKeyValue() {}

private IndexedKeyValue(Cell c, byte[] bs, Mutation mutation){
super(c);
this.indexTableName = new ImmutableBytesPtr(bs);
this.mutation = mutation;
this.hashCode = calcHashCode(indexTableName, mutation);
Expand All @@ -70,66 +98,6 @@ public Mutation getMutation() {
return mutation;
}

@Override
public byte[] getFamilyArray() {
return WALEdit.METAFAMILY;
}

/**
* @return Family offset
*/
@Override
public int getFamilyOffset() {
return 0;
}

/**
* @return Family length
*/
@Override
public byte getFamilyLength() {
return (byte) WALEdit.METAFAMILY.length;
}

@Override
public byte[] getQualifierArray() {
return COLUMN_QUALIFIER;
}

/**
* @return Qualifier offset
*/
@Override
public int getQualifierOffset() {
return 0;
}

/**
* @return Qualifier length
*/
@Override
public int getQualifierLength() {
return COLUMN_QUALIFIER.length;
}

@Override
public int getRowOffset() {
return this.offset;
}

@Override
public short getRowLength() {
return (short) this.length;
}

@Override
public int getKeyLength(){
//normally the key is row key + other key fields such as timestamp,
// but those aren't defined here because a Mutation can contain multiple,
// so we just return the length of the row key
return this.length;
}

@Override
public String toString() {
return "IndexWrite:\n\ttable: " + indexTableName + "\n\tmutation:" + mutation;
Expand Down Expand Up @@ -167,9 +135,9 @@ public int hashCode() {
}

/**
* Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done
* via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of
* {@link IndexedKeyValue}s.
* Internal write the underlying data for the entry - this does not do any special prefixing.
* Writing should be done via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure
* consistent reading/writing of {@link IndexedKeyValue}s.
*
* @param out
* to write data to. Does not close or flush the passed object.
Expand All @@ -182,25 +150,6 @@ void writeData(DataOutput out) throws IOException {
Bytes.writeByteArray(out, m.toByteArray());
}

/**
* This method shouldn't be used - you should use {@link KeyValueCodec#readKeyValue(DataInput)} instead. Its the
* complement to {@link #writeData(DataOutput)}.
*/
@SuppressWarnings("javadoc")
public void readFields(DataInput in) throws IOException {
this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
byte[] mutationData = Bytes.readByteArray(in);
MutationProto mProto = MutationProto.parseFrom(mutationData);
this.mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto);
this.hashCode = calcHashCode(indexTableName, mutation);
if (mutation != null){
bytes = mutation.getRow();
offset = 0;
length = bytes.length;
}

}

public boolean getBatchFinished() {
return this.batchFinished;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import java.util.List;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
* Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit}
*/
Expand Down Expand Up @@ -68,8 +71,16 @@ public static KeyValue readKeyValue(DataInput in) throws IOException {
int length = in.readInt();
// its a special IndexedKeyValue
if (length == INDEX_TYPE_LENGTH_MARKER) {
IndexedKeyValue kv = new IndexedKeyValue();
kv.readFields(in);
ImmutableBytesPtr indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
byte[] mutationData = Bytes.readByteArray(in);
ClientProtos.MutationProto mProto = ClientProtos.MutationProto.parseFrom(mutationData);
Mutation mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto);
IndexedKeyValue kv = null;
if (mutation != null){
kv = IndexedKeyValue.newIndexedKeyValue(indexTableName.copyBytesIfNecessary(), mutation);
} else {
kv = new IndexedKeyValue();
}
return kv;
} else {
return KeyValue.create(length, in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hbase.regionserver.wal;

import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
Expand All @@ -30,16 +31,44 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

public class IndexedKeyValueTest {

private static final byte[] ROW_KEY = Bytes.toBytes("foo");
private static final byte[] FAMILY = Bytes.toBytes("family");
private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
private static final byte[] VALUE = Bytes.toBytes("value");
private static final byte[] TABLE_NAME = Bytes.toBytes("MyTableName");

@Test
public void testIndexedKeyValueExceptionWhenMutationEmpty() throws IOException {
boolean caughtNullMutation = false, caughtNullEntry = false;
try {
IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, null);
} catch (IllegalArgumentException iae){
caughtNullMutation = true;
}
try {
Mutation m = new Put(ROW_KEY);
IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, m);
} catch (IllegalArgumentException iae){
caughtNullEntry = true;
}
//no need to test adding a mutation with a Cell with just a row key; HBase will put in
//a default cell with family byte[0], qualifier and value of "", and LATEST_TIMESTAMP

Assert.assertTrue(caughtNullMutation & caughtNullEntry);

}

@Test
public void testIndexedKeyValuePopulatesKVFields() throws Exception {
byte[] row = Bytes.toBytes("foo");
byte[] tableNameBytes = Bytes.toBytes("MyTableName");
Mutation mutation = new Put(row);
IndexedKeyValue indexedKeyValue = new IndexedKeyValue(tableNameBytes, mutation);
testIndexedKeyValueHelper(indexedKeyValue, row, tableNameBytes, mutation);
byte[] row = (ROW_KEY);
Put mutation = new Put(row);
mutation.addColumn(FAMILY, QUALIFIER, VALUE);
IndexedKeyValue indexedKeyValue = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, mutation);
testIndexedKeyValueHelper(indexedKeyValue, row, TABLE_NAME, mutation);

//now serialize the IndexedKeyValue and make sure the deserialized copy also
//has all the right fields
Expand All @@ -50,17 +79,16 @@ public void testIndexedKeyValuePopulatesKVFields() throws Exception {
IndexedKeyValue deSerializedKV = (IndexedKeyValue)
KeyValueCodec.readKeyValue(new DataInputStream(
new ByteArrayInputStream(baos.toByteArray())));
testIndexedKeyValueHelper(deSerializedKV, row, tableNameBytes, mutation);
testIndexedKeyValueHelper(deSerializedKV, row, TABLE_NAME, mutation);

}

private void testIndexedKeyValueHelper(IndexedKeyValue indexedKeyValue, byte[] row, byte[] tableNameBytes, Mutation mutation) {
Assert.assertArrayEquals(row, indexedKeyValue.getRowArray());
Assert.assertEquals(0, indexedKeyValue.getRowOffset());
Assert.assertEquals(row.length, indexedKeyValue.getRowLength());
private void testIndexedKeyValueHelper(IndexedKeyValue indexedKeyValue, byte[] row,
byte[] tableNameBytes, Mutation mutation) {
Assert.assertArrayEquals(row, CellUtil.cloneRow(indexedKeyValue));
Assert.assertArrayEquals(tableNameBytes, indexedKeyValue.getIndexTable());
Assert.assertEquals(mutation.toString(), indexedKeyValue.getMutation().toString());
Assert.assertArrayEquals(WALEdit.METAFAMILY, indexedKeyValue.getFamilyArray());
Assert.assertArrayEquals(WALEdit.METAFAMILY, CellUtil.cloneFamily(indexedKeyValue));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ private List<WALEdit> getEdits() {

WALEdit justIndexUpdates = new WALEdit();
byte[] table = Bytes.toBytes("targetTable");
IndexedKeyValue ikv = new IndexedKeyValue(table, p);

IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(table, p);
justIndexUpdates.add(ikv);
edits.add(justIndexUpdates);

Expand Down