Skip to content

Commit

Permalink
ORC-344. Support the new Decimal64ColumnVector.
Browse files Browse the repository at this point in the history
Fixes #250

Signed-off-by: Owen O'Malley <omalley@apache.org>
  • Loading branch information
omalley committed Apr 23, 2018
1 parent 4aa8c61 commit c0f3bcf
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 47 deletions.
65 changes: 52 additions & 13 deletions java/core/src/java/org/apache/orc/TypeDescription.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
Expand All @@ -45,6 +46,7 @@ public class TypeDescription
private static final int MAX_SCALE = 38;
private static final int DEFAULT_PRECISION = 38;
private static final int DEFAULT_SCALE = 10;
public static final int MAX_DECIMAL64_PRECISION = 18;
private static final int DEFAULT_LENGTH = 256;
static final Pattern UNQUOTED_NAMES = Pattern.compile("^[a-zA-Z0-9_]+$");

Expand Down Expand Up @@ -622,7 +624,7 @@ public int getMaximumId() {
return maxId;
}

private ColumnVector createColumn(int maxSize) {
private ColumnVector createColumn(RowBatchVersion version, int maxSize) {
switch (category) {
case BOOLEAN:
case BYTE:
Expand All @@ -637,7 +639,12 @@ private ColumnVector createColumn(int maxSize) {
case DOUBLE:
return new DoubleColumnVector(maxSize);
case DECIMAL:
return new DecimalColumnVector(maxSize, precision, scale);
if (version == RowBatchVersion.ORIGINAL ||
precision > MAX_DECIMAL64_PRECISION) {
return new DecimalColumnVector(maxSize, precision, scale);
} else {
return new Decimal64ColumnVector(maxSize, precision, scale);
}
case STRING:
case BINARY:
case CHAR:
Expand All @@ -646,48 +653,80 @@ private ColumnVector createColumn(int maxSize) {
case STRUCT: {
ColumnVector[] fieldVector = new ColumnVector[children.size()];
for(int i=0; i < fieldVector.length; ++i) {
fieldVector[i] = children.get(i).createColumn(maxSize);
fieldVector[i] = children.get(i).createColumn(version, maxSize);
}
return new StructColumnVector(maxSize,
fieldVector);
}
case UNION: {
ColumnVector[] fieldVector = new ColumnVector[children.size()];
for(int i=0; i < fieldVector.length; ++i) {
fieldVector[i] = children.get(i).createColumn(maxSize);
fieldVector[i] = children.get(i).createColumn(version, maxSize);
}
return new UnionColumnVector(maxSize,
fieldVector);
}
case LIST:
return new ListColumnVector(maxSize,
children.get(0).createColumn(maxSize));
children.get(0).createColumn(version, maxSize));
case MAP:
return new MapColumnVector(maxSize,
children.get(0).createColumn(maxSize),
children.get(1).createColumn(maxSize));
children.get(0).createColumn(version, maxSize),
children.get(1).createColumn(version, maxSize));
default:
throw new IllegalArgumentException("Unknown type " + category);
}
}

public VectorizedRowBatch createRowBatch(int maxSize) {
/**
* Specify the version of the VectorizedRowBatch that the user desires.
*/
enum RowBatchVersion {
ORIGINAL,
USE_DECIMAL64;
}

VectorizedRowBatch createRowBatch(RowBatchVersion version, int size) {
VectorizedRowBatch result;
if (category == Category.STRUCT) {
result = new VectorizedRowBatch(children.size(), maxSize);
result = new VectorizedRowBatch(children.size(), size);
for(int i=0; i < result.cols.length; ++i) {
result.cols[i] = children.get(i).createColumn(maxSize);
result.cols[i] = children.get(i).createColumn(version, size);
}
} else {
result = new VectorizedRowBatch(1, maxSize);
result.cols[0] = createColumn(maxSize);
result = new VectorizedRowBatch(1, size);
result.cols[0] = createColumn(version, size);
}
result.reset();
return result;
}

/**
* Create a VectorizedRowBatch that uses Decimal64ColumnVector for
* short (p <= 18) decimals.
* @return a new VectorizedRowBatch
*/
public VectorizedRowBatch createRowBatchV2() {
return createRowBatch(RowBatchVersion.USE_DECIMAL64,
VectorizedRowBatch.DEFAULT_SIZE);
}

/**
* Create a VectorizedRowBatch with the original ColumnVector types
* @param maxSize the maximum size of the batch
* @return a new VectorizedRowBatch
*/
public VectorizedRowBatch createRowBatch(int maxSize) {
return createRowBatch(RowBatchVersion.ORIGINAL, maxSize);
}

/**
* Create a VectorizedRowBatch with the original ColumnVector types
* @return a new VectorizedRowBatch
*/
public VectorizedRowBatch createRowBatch() {
return createRowBatch(VectorizedRowBatch.DEFAULT_SIZE);
return createRowBatch(RowBatchVersion.ORIGINAL,
VectorizedRowBatch.DEFAULT_SIZE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ public static class AnyIntegerFromDecimalTreeReader extends ConvertTreeReader {
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
this.readerType = readerType;
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down Expand Up @@ -848,7 +848,7 @@ public static class FloatFromDecimalTreeReader extends ConvertTreeReader {
super(columnId);
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down Expand Up @@ -1054,7 +1054,7 @@ public static class DoubleFromDecimalTreeReader extends ConvertTreeReader {
super(columnId);
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down Expand Up @@ -1387,7 +1387,7 @@ public static class DecimalFromDecimalTreeReader extends ConvertTreeReader {
super(columnId);
filePrecision = fileType.getPrecision();
fileScale = fileType.getScale();
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, filePrecision, fileScale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down Expand Up @@ -1565,7 +1565,7 @@ public static class StringGroupFromDecimalTreeReader extends ConvertTreeReader {
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
this.readerType = readerType;
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
}
Expand Down Expand Up @@ -1904,7 +1904,7 @@ public static class TimestampFromDecimalTreeReader extends ConvertTreeReader {
super(columnId);
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down
12 changes: 7 additions & 5 deletions java/core/src/java/org/apache/orc/impl/SerializationUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ public SerializationUtils() {

public void writeVulong(OutputStream output,
long value) throws IOException {
int posn = 0;
while (true) {
if ((value & ~0x7f) == 0) {
output.write((byte) value);
return;
writeBuffer[posn++] = (byte) value;
break;
} else {
output.write((byte) (0x80 | (value & 0x7f)));
writeBuffer[posn++] = (byte)(0x80 | (value & 0x7f));
value >>>= 7;
}
}
output.write(writeBuffer, 0, posn);
}

public void writeVslong(OutputStream output,
Expand All @@ -55,7 +57,7 @@ public void writeVslong(OutputStream output,
}


public long readVulong(InputStream in) throws IOException {
public static long readVulong(InputStream in) throws IOException {
long result = 0;
long b;
int offset = 0;
Expand All @@ -70,7 +72,7 @@ public long readVulong(InputStream in) throws IOException {
return result;
}

public long readVslong(InputStream in) throws IOException {
public static long readVslong(InputStream in) throws IOException {
long result = readVulong(in);
return (result >>> 1) ^ -(result & 1);
}
Expand Down
88 changes: 72 additions & 16 deletions java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
Expand Down Expand Up @@ -1096,19 +1097,31 @@ void skipRows(long items) throws IOException {
}

public static class DecimalTreeReader extends TreeReader {
protected final int precision;
protected final int scale;
protected InStream valueStream;
protected IntegerReader scaleReader = null;
private int[] scratchScaleVector;
private byte[] scratchBytes;

DecimalTreeReader(int columnId, Context context) throws IOException {
this(columnId, null, null, null, null, context);
}

protected DecimalTreeReader(int columnId, InStream present,
InStream valueStream, InStream scaleStream, OrcProto.ColumnEncoding encoding, Context context)
throws IOException {
DecimalTreeReader(int columnId,
int precision,
int scale,
Context context) throws IOException {
this(columnId, null, null, null, null, precision, scale, context);
}

protected DecimalTreeReader(int columnId,
InStream present,
InStream valueStream,
InStream scaleStream,
OrcProto.ColumnEncoding encoding,
int precision,
int scale,
Context context) throws IOException {
super(columnId, present, context);
this.precision = precision;
this.scale = scale;
this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE];
this.valueStream = valueStream;
this.scratchBytes = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_SERIALIZATION_UTILS_READ];
Expand Down Expand Up @@ -1150,14 +1163,9 @@ public void seek(PositionProvider index) throws IOException {
scaleReader.seek(index);
}

@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final DecimalColumnVector result = (DecimalColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);

private void nextVector(DecimalColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
if (batchSize > scratchScaleVector.length) {
scratchScaleVector = new int[(int) batchSize];
}
Expand Down Expand Up @@ -1193,6 +1201,53 @@ public void nextVector(ColumnVector previousVector,
}
}

private void nextVector(Decimal64ColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
if (precision > TypeDescription.MAX_DECIMAL64_PRECISION) {
throw new IllegalArgumentException("Reading large precision type into" +
" Decimal64ColumnVector.");
}

if (batchSize > scratchScaleVector.length) {
scratchScaleVector = new int[(int) batchSize];
}
// read the scales
scaleReader.nextVector(result, scratchScaleVector, batchSize);
if (result.noNulls) {
for (int r=0; r < batchSize; ++r) {
result.vector[r] = SerializationUtils.readVslong(valueStream);
for(int s=scratchScaleVector[r]; s < scale; ++s) {
result.vector[r] *= 10;
}
}
} else if (!result.isRepeating || !result.isNull[0]) {
for (int r=0; r < batchSize; ++r) {
if (!result.isNull[r]) {
result.vector[r] = SerializationUtils.readVslong(valueStream);
for(int s=scratchScaleVector[r]; s < scale; ++s) {
result.vector[r] *= 10;
}
}
}
}
result.precision = (short) precision;
result.scale = (short) scale;
}

@Override
public void nextVector(ColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
if (result instanceof Decimal64ColumnVector) {
nextVector((Decimal64ColumnVector) result, isNull, batchSize);
} else {
nextVector((DecimalColumnVector) result, isNull, batchSize);
}
}

@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
Expand Down Expand Up @@ -2186,7 +2241,8 @@ public static TreeReader createTreeReader(TypeDescription readerType,
case DATE:
return new DateTreeReader(fileType.getId(), context);
case DECIMAL:
return new DecimalTreeReader(fileType.getId(), context);
return new DecimalTreeReader(fileType.getId(), fileType.getPrecision(),
fileType.getScale(), context);
case STRUCT:
return new StructTreeReader(fileType.getId(), readerType, context);
case LIST:
Expand Down

0 comments on commit c0f3bcf

Please sign in to comment.