diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index b4bd8bc0579..2e7780d3f3d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -528,11 +528,13 @@ public void preBatchMutateWithExceptions(ObserverContext entry : indexUpdates) { - edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst())); + edit.add(IndexedKeyValue.newIndexedKeyValue(entry.getSecond(), + entry.getFirst())); + } } } } - } + } private void setBatchMutateContext(ObserverContext c, BatchMutateContext context) { @@ -622,21 +624,6 @@ private void doPostWithExceptions(ObserverContext } } - /** - * Search the {@link WALEdit} for the first {@link IndexedKeyValue} present - * @param edit {@link WALEdit} - * @return the first {@link IndexedKeyValue} in the {@link WALEdit} or null if not - * present - */ - private IndexedKeyValue getFirstIndexedKeyValue(WALEdit edit) { - for (Cell kv : edit.getCells()) { - if (kv instanceof IndexedKeyValue) { - return (IndexedKeyValue) kv; - } - } - return null; - } - /** * Extract the index updates from the WAL Edit * @param edit to search for index updates diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java index 2245f26d1a3..a973f02e1ec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java @@ -18,11 +18,14 @@ package org.apache.phoenix.hbase.index.wal; -import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; @@ -34,8 +37,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; public class IndexedKeyValue extends KeyValue { - public static final byte [] COLUMN_QUALIFIER = Bytes.toBytes("INDEXEDKEYVALUE_FAKED_COLUMN"); - + private static int calcHashCode(ImmutableBytesPtr indexTableName, Mutation mutation) { final int prime = 31; int result = 1; @@ -50,13 +52,39 @@ private static int calcHashCode(ImmutableBytesPtr indexTableName, Mutation mutat private boolean batchFinished = false; private int hashCode; - public IndexedKeyValue() {} + public static IndexedKeyValue newIndexedKeyValue(byte[] bs, Mutation m){ + Cell indexWALCell = adaptFirstCellFromMutation(m); + return new IndexedKeyValue(indexWALCell, bs, m); + } + + private static Cell adaptFirstCellFromMutation(Mutation m) { + if (m != null && m.getFamilyCellMap() != null && + m.getFamilyCellMap().firstEntry() != null && + m.getFamilyCellMap().firstEntry().getValue() != null + && m.getFamilyCellMap().firstEntry().getValue().get(0) != null) { + //have to replace the column family with WALEdit.METAFAMILY to make sure + //that IndexedKeyValues don't get replicated. The superclass KeyValue fields + //like row, qualifier and value are placeholders to prevent NPEs + // when using the KeyValue APIs. See PHOENIX-5188 / 5455 + Cell mutationCell = m.getFamilyCellMap().firstEntry().getValue().get(0); + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + return builder.setFamily(WALEdit.METAFAMILY). + setQualifier(mutationCell.getQualifierArray()). + setRow(m.getRow()). + setTimestamp(mutationCell.getTimestamp()). + setValue(mutationCell.getValueArray()).setType(Cell.Type.Put).build(); + } else { + throw new IllegalArgumentException("Tried to create an IndexedKeyValue with a " + + "Mutation with no Cells!"); + } - public IndexedKeyValue(byte[] bs, Mutation mutation) { - this.bytes = mutation.getRow(); - this.offset = 0; - this.length = mutation.getRow().length; + } + + //used for deserialization + public IndexedKeyValue() {} + private IndexedKeyValue(Cell c, byte[] bs, Mutation mutation){ + super(c); this.indexTableName = new ImmutableBytesPtr(bs); this.mutation = mutation; this.hashCode = calcHashCode(indexTableName, mutation); @@ -70,66 +98,6 @@ public Mutation getMutation() { return mutation; } - @Override - public byte[] getFamilyArray() { - return WALEdit.METAFAMILY; - } - - /** - * @return Family offset - */ - @Override - public int getFamilyOffset() { - return 0; - } - - /** - * @return Family length - */ - @Override - public byte getFamilyLength() { - return (byte) WALEdit.METAFAMILY.length; - } - - @Override - public byte[] getQualifierArray() { - return COLUMN_QUALIFIER; - } - - /** - * @return Qualifier offset - */ - @Override - public int getQualifierOffset() { - return 0; - } - - /** - * @return Qualifier length - */ - @Override - public int getQualifierLength() { - return COLUMN_QUALIFIER.length; - } - - @Override - public int getRowOffset() { - return this.offset; - } - - @Override - public short getRowLength() { - return (short) this.length; - } - - @Override - public int getKeyLength(){ - //normally the key is row key + other key fields such as timestamp, - // but those aren't defined here because a Mutation can contain multiple, - // so we just return the length of the row key - return this.length; - } - @Override public String toString() { return "IndexWrite:\n\ttable: " + indexTableName + "\n\tmutation:" + mutation; @@ -167,9 +135,9 @@ public int hashCode() { } /** - * Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done - * via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of - * {@link IndexedKeyValue}s. + * Internal write the underlying data for the entry - this does not do any special prefixing. + * Writing should be done via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure + * consistent reading/writing of {@link IndexedKeyValue}s. * * @param out * to write data to. Does not close or flush the passed object. @@ -182,25 +150,6 @@ void writeData(DataOutput out) throws IOException { Bytes.writeByteArray(out, m.toByteArray()); } - /** - * This method shouldn't be used - you should use {@link KeyValueCodec#readKeyValue(DataInput)} instead. Its the - * complement to {@link #writeData(DataOutput)}. - */ - @SuppressWarnings("javadoc") - public void readFields(DataInput in) throws IOException { - this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in)); - byte[] mutationData = Bytes.readByteArray(in); - MutationProto mProto = MutationProto.parseFrom(mutationData); - this.mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto); - this.hashCode = calcHashCode(indexTableName, mutation); - if (mutation != null){ - bytes = mutation.getRow(); - offset = 0; - length = bytes.length; - } - - } - public boolean getBatchFinished() { return this.batchFinished; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java index d02d4315e24..0f8a7a05597 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java @@ -26,7 +26,10 @@ import java.util.List; import org.apache.hadoop.hbase.KeyValue; - +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** * Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit} */ @@ -68,8 +71,16 @@ public static KeyValue readKeyValue(DataInput in) throws IOException { int length = in.readInt(); // its a special IndexedKeyValue if (length == INDEX_TYPE_LENGTH_MARKER) { - IndexedKeyValue kv = new IndexedKeyValue(); - kv.readFields(in); + ImmutableBytesPtr indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in)); + byte[] mutationData = Bytes.readByteArray(in); + ClientProtos.MutationProto mProto = ClientProtos.MutationProto.parseFrom(mutationData); + Mutation mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto); + IndexedKeyValue kv = null; + if (mutation != null){ + kv = IndexedKeyValue.newIndexedKeyValue(indexTableName.copyBytesIfNecessary(), mutation); + } else { + kv = new IndexedKeyValue(); + } return kv; } else { return KeyValue.create(length, in); diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/IndexedKeyValueTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/IndexedKeyValueTest.java index 009c323b31f..060c57e3c7a 100644 --- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/IndexedKeyValueTest.java +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/IndexedKeyValueTest.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.regionserver.wal; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; @@ -30,16 +31,44 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.IOException; public class IndexedKeyValueTest { + private static final byte[] ROW_KEY = Bytes.toBytes("foo"); + private static final byte[] FAMILY = Bytes.toBytes("family"); + private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); + private static final byte[] VALUE = Bytes.toBytes("value"); + private static final byte[] TABLE_NAME = Bytes.toBytes("MyTableName"); + + @Test + public void testIndexedKeyValueExceptionWhenMutationEmpty() throws IOException { + boolean caughtNullMutation = false, caughtNullEntry = false; + try { + IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, null); + } catch (IllegalArgumentException iae){ + caughtNullMutation = true; + } + try { + Mutation m = new Put(ROW_KEY); + IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, m); + } catch (IllegalArgumentException iae){ + caughtNullEntry = true; + } + //no need to test adding a mutation with a Cell with just a row key; HBase will put in + //a default cell with family byte[0], qualifier and value of "", and LATEST_TIMESTAMP + + Assert.assertTrue(caughtNullMutation & caughtNullEntry); + + } + @Test public void testIndexedKeyValuePopulatesKVFields() throws Exception { - byte[] row = Bytes.toBytes("foo"); - byte[] tableNameBytes = Bytes.toBytes("MyTableName"); - Mutation mutation = new Put(row); - IndexedKeyValue indexedKeyValue = new IndexedKeyValue(tableNameBytes, mutation); - testIndexedKeyValueHelper(indexedKeyValue, row, tableNameBytes, mutation); + byte[] row = (ROW_KEY); + Put mutation = new Put(row); + mutation.addColumn(FAMILY, QUALIFIER, VALUE); + IndexedKeyValue indexedKeyValue = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, mutation); + testIndexedKeyValueHelper(indexedKeyValue, row, TABLE_NAME, mutation); //now serialize the IndexedKeyValue and make sure the deserialized copy also //has all the right fields @@ -50,17 +79,16 @@ public void testIndexedKeyValuePopulatesKVFields() throws Exception { IndexedKeyValue deSerializedKV = (IndexedKeyValue) KeyValueCodec.readKeyValue(new DataInputStream( new ByteArrayInputStream(baos.toByteArray()))); - testIndexedKeyValueHelper(deSerializedKV, row, tableNameBytes, mutation); + testIndexedKeyValueHelper(deSerializedKV, row, TABLE_NAME, mutation); } - private void testIndexedKeyValueHelper(IndexedKeyValue indexedKeyValue, byte[] row, byte[] tableNameBytes, Mutation mutation) { - Assert.assertArrayEquals(row, indexedKeyValue.getRowArray()); - Assert.assertEquals(0, indexedKeyValue.getRowOffset()); - Assert.assertEquals(row.length, indexedKeyValue.getRowLength()); + private void testIndexedKeyValueHelper(IndexedKeyValue indexedKeyValue, byte[] row, + byte[] tableNameBytes, Mutation mutation) { + Assert.assertArrayEquals(row, CellUtil.cloneRow(indexedKeyValue)); Assert.assertArrayEquals(tableNameBytes, indexedKeyValue.getIndexTable()); Assert.assertEquals(mutation.toString(), indexedKeyValue.getMutation().toString()); - Assert.assertArrayEquals(WALEdit.METAFAMILY, indexedKeyValue.getFamilyArray()); + Assert.assertArrayEquals(WALEdit.METAFAMILY, CellUtil.cloneFamily(indexedKeyValue)); } } diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java index ddbd4a34edc..bb88e1db575 100644 --- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java @@ -111,7 +111,8 @@ private List getEdits() { WALEdit justIndexUpdates = new WALEdit(); byte[] table = Bytes.toBytes("targetTable"); - IndexedKeyValue ikv = new IndexedKeyValue(table, p); + + IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(table, p); justIndexUpdates.add(ikv); edits.add(justIndexUpdates);