Skip to content

Commit

Permalink
PHOENIX-2477 ClassCastException in IndexedWALEditCodec after HBASE-14…
Browse files Browse the repository at this point in the history
…501 (possible dataloss)
  • Loading branch information
enis committed Dec 4, 2015
1 parent 1c2b9b0 commit 9478d1f
Showing 1 changed file with 46 additions and 10 deletions.
Expand Up @@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -79,10 +81,36 @@ public Encoder getEncoder(OutputStream os) {
return new CompressedIndexKeyValueEncoder(os, encoder);
}

/**
* Returns a DataInput given an InputStream
*/
private static DataInput getDataInput(InputStream is) {
return is instanceof DataInput
? (DataInput) is
: new DataInputStream(is);
}

/**
* Returns a DataOutput given an OutputStream
*/
private static DataOutput getDataOutput(OutputStream os) {
return os instanceof DataOutput
? (DataOutput) os
: new DataOutputStream(os);
}

private static abstract class PhoenixBaseDecoder extends BaseDecoder {
protected DataInput dataInput;
public PhoenixBaseDecoder(InputStream in) {
super(in);
dataInput = getDataInput(this.in);
}
}

/**
* Custom Decoder that can handle a stream of regular and indexed {@link KeyValue}s.
*/
public class IndexKeyValueDecoder extends BaseDecoder {
public static class IndexKeyValueDecoder extends PhoenixBaseDecoder {

/**
* Create a Decoder on the given input stream with the given Decoder to parse
Expand All @@ -95,11 +123,11 @@ public IndexKeyValueDecoder(InputStream is){

@Override
protected KeyValue parseCell() throws IOException{
return KeyValueCodec.readKeyValue((DataInput) this.in);
return KeyValueCodec.readKeyValue(this.dataInput);
}
}

public class CompressedIndexKeyValueDecoder extends BaseDecoder {
public static class CompressedIndexKeyValueDecoder extends PhoenixBaseDecoder {

private Decoder decoder;

Expand Down Expand Up @@ -133,15 +161,23 @@ protected Cell parseCell() throws IOException {
}

// its an indexedKeyValue, so parse it out specially
return KeyValueCodec.readKeyValue((DataInput) this.in);
return KeyValueCodec.readKeyValue(this.dataInput);
}
}

private static abstract class PhoenixBaseEncoder extends BaseEncoder {
protected DataOutput dataOutput;
public PhoenixBaseEncoder(OutputStream out) {
super(out);
dataOutput = getDataOutput(this.out);
}
}

/**
* Encode {@link IndexedKeyValue}s via the {@link KeyValueCodec}. Does <b>not</b> support
* compression.
*/
private static class IndexKeyValueEncoder extends BaseEncoder {
private static class IndexKeyValueEncoder extends PhoenixBaseEncoder {
public IndexKeyValueEncoder(OutputStream os) {
super(os);
}
Expand All @@ -157,7 +193,7 @@ public void write(Cell cell) throws IOException {
checkFlushed();

// use the standard encoding mechanism
KeyValueCodec.write((DataOutput) this.out, KeyValueUtil.ensureKeyValue(cell));
KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell));
}
}

Expand All @@ -166,7 +202,7 @@ public void write(Cell cell) throws IOException {
* <b>not</b> compatible with the {@link IndexKeyValueDecoder} - one cannot intermingle compressed
* and uncompressed WALs that contain index entries.
*/
private static class CompressedIndexKeyValueEncoder extends BaseEncoder {
private static class CompressedIndexKeyValueEncoder extends PhoenixBaseEncoder {
private Encoder compressedKvEncoder;

public CompressedIndexKeyValueEncoder(OutputStream os, Encoder compressedKvEncoder) {
Expand All @@ -184,20 +220,20 @@ public void flush() throws IOException {
public void write(Cell cell) throws IOException {
//make sure we are open
checkFlushed();

//write the special marker so we can figure out which kind of kv is it
int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER;
if (cell instanceof IndexedKeyValue) {
marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER;
}
out.write(marker);

//then serialize based on the marker
if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) {
this.compressedKvEncoder.write(cell);
}
else{
KeyValueCodec.write((DataOutput) out, KeyValueUtil.ensureKeyValue(cell));
KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell));
}
}
}
Expand Down

0 comments on commit 9478d1f

Please sign in to comment.