Skip to content

Commit

Permalink
PUBDEV-5673: Adds support for DecimalType to Parquet Parser
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkurka committed Jul 9, 2018
1 parent 37a4f6c commit cb6e9e0
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 1 deletion.
Expand Up @@ -92,7 +92,10 @@ private PrimitiveConverter newConverter(int colIdx, byte vecType, PrimitiveType
return new StringConverter(_writer, colIdx, dictSupport, _maxStringSize);
}
case Vec.T_NUM:
return new NumberConverter(colIdx, _writer);
if (OriginalType.DECIMAL.equals(parquetType.getOriginalType()))
return new DecimalConverter(colIdx, parquetType.getDecimalMetadata().getScale(), _writer);
else
return new NumberConverter(colIdx, _writer);
default:
throw new UnsupportedOperationException("Unsupported type " + vecType);
}
Expand Down Expand Up @@ -201,6 +204,51 @@ public void addBinary(Binary value) {
}
}

private static class DecimalConverter extends PrimitiveConverter {

private final int _colIdx;
private final WriterDelegate _writer;
private final BufferedString _bs = new BufferedString();
private final int _exp;

DecimalConverter(int colIdx, int scale, WriterDelegate writer) {
_colIdx = colIdx;
_writer = writer;
_exp = -scale;
}

@Override
public void addBoolean(boolean value) {
throw new UnsupportedOperationException("Boolean type is not supported by DecimalConverter");
}

@Override
public void addDouble(double value) {
throw new UnsupportedOperationException("Double type is not supported by DecimalConverter");
}

@Override
public void addFloat(float value) {
throw new UnsupportedOperationException("Float type is not supported by DecimalConverter");
}

@Override
public void addInt(int value) {
_writer.addNumCol(_colIdx, value, _exp);
}

@Override
public void addLong(long value) {
_writer.addNumCol(_colIdx, value, _exp);
}

@Override
public void addBinary(Binary value) {
throw new UnsupportedOperationException("Arbitrary precision Decimal type is currently not supported by H2O." +
"Please use 64-bit decimal type instead (precision <= 18).");
}
}

private static class TimestampConverter extends PrimitiveConverter {

private final int _colIdx;
Expand Down
Expand Up @@ -29,6 +29,7 @@
import water.parser.ParseDataset;
import water.parser.ParseSetup;
import water.util.IcedInt;
import water.util.PrettyPrint;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -329,6 +330,36 @@ public void testParseCategoricalsWithZeroCharacters() {
assertFrameAssertion(assertion);
}

@Test
public void testParseDecimals() {
FrameAssertion assertion = new GenFrameAssertion("decimals.parquet", TestUtil.ari(2, 18)) {
@Override protected File prepareFile() throws IOException { return ParquetFileGenerator.generateParquetFileDecimals(Files.createTempDir(), file, nrows()); }
@Override public void check(Frame f) {
assertArrayEquals("Column names need to match!", ar("decimal32", "decimal64"), f.names());
assertArrayEquals("Column types need to match!", ar(Vec.T_NUM, Vec.T_NUM), f.types());
for (int row = 0; row < nrows(); row++) {
double expected32 = (1 + PrettyPrint.pow10(1, row % 9)) / 1e5;
assertEquals("Value in column decimal32", expected32, f.vec(0).at(row), 0);
double expected64 = (1 + PrettyPrint.pow10(1, row % 18)) / 1e10;
assertEquals("Value in column decimal64", expected64, f.vec(1).at(row), 0);
}
}
};
assertFrameAssertion(assertion);
}

@Test
public void testPubdev5673() {
Frame actual = null;
try {
actual = TestUtil.parse_test_file("smalldata/jira/pubdev-5673.parquet");
double actualVal = actual.vec(0).at(0);
assertEquals(98776543211.99876, actualVal, 0);
} finally {
if (actual != null) actual.delete();
}
}

}

class ParquetFileGenerator {
Expand Down Expand Up @@ -492,4 +523,29 @@ static File generateParquetFileWithNullCharacters(File parentDir, String filenam
return f;
}

static File generateParquetFileDecimals(File parentDir, String filename, int nrows) throws IOException {
File f = new File(parentDir, filename);

Configuration conf = new Configuration();
MessageType schema = parseMessageType(
"message test { required int32 decimal32 (DECIMAL(9, 5)); required int64 decimal64 (DECIMAL(18, 10)); } ");
GroupWriteSupport.setSchema(schema, conf);
SimpleGroupFactory fact = new SimpleGroupFactory(schema);
ParquetWriter<Group> writer = new ParquetWriter<Group>(new Path(f.getPath()), new GroupWriteSupport(),
UNCOMPRESSED, 1024, 1024, 512, true, false, ParquetProperties.WriterVersion.PARQUET_2_0, conf);
try {
for (int i = 0; i < nrows; i++) {
Group g = fact.newGroup();
int dec32 = 1 + (int) PrettyPrint.pow10(1, i % 9);
g.append("decimal32", dec32);
long dec64 = 1 + (long) PrettyPrint.pow10(1, i % 18);
g.append("decimal64", dec64);
writer.write(g);
}
} finally {
writer.close();
}
return f;
}

}

0 comments on commit cb6e9e0

Please sign in to comment.