diff --git a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ChunkConverter.java b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ChunkConverter.java index 62f3d8afccc3..10810dfdef76 100644 --- a/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ChunkConverter.java +++ b/h2o-parsers/h2o-parquet-parser/src/main/java/water/parser/parquet/ChunkConverter.java @@ -92,7 +92,10 @@ private PrimitiveConverter newConverter(int colIdx, byte vecType, PrimitiveType return new StringConverter(_writer, colIdx, dictSupport, _maxStringSize); } case Vec.T_NUM: - return new NumberConverter(colIdx, _writer); + if (OriginalType.DECIMAL.equals(parquetType.getOriginalType())) + return new DecimalConverter(colIdx, parquetType.getDecimalMetadata().getScale(), _writer); + else + return new NumberConverter(colIdx, _writer); default: throw new UnsupportedOperationException("Unsupported type " + vecType); } @@ -201,6 +204,51 @@ public void addBinary(Binary value) { } } + private static class DecimalConverter extends PrimitiveConverter { + + private final int _colIdx; + private final WriterDelegate _writer; + private final BufferedString _bs = new BufferedString(); + private final int _exp; + + DecimalConverter(int colIdx, int scale, WriterDelegate writer) { + _colIdx = colIdx; + _writer = writer; + _exp = -scale; + } + + @Override + public void addBoolean(boolean value) { + throw new UnsupportedOperationException("Boolean type is not supported by DecimalConverter"); + } + + @Override + public void addDouble(double value) { + throw new UnsupportedOperationException("Double type is not supported by DecimalConverter"); + } + + @Override + public void addFloat(float value) { + throw new UnsupportedOperationException("Float type is not supported by DecimalConverter"); + } + + @Override + public void addInt(int value) { + _writer.addNumCol(_colIdx, value, _exp); + } + + @Override + public void addLong(long value) { + _writer.addNumCol(_colIdx, value, _exp); + } + + @Override + public void addBinary(Binary value) { + throw new UnsupportedOperationException("Arbitrary precision Decimal type is currently not supported by H2O." + + "Please use 64-bit decimal type instead (precision <= 18)."); + } + } + private static class TimestampConverter extends PrimitiveConverter { private final int _colIdx; diff --git a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java index 240a7e0b3291..389f0d4563d1 100644 --- a/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java +++ b/h2o-parsers/h2o-parquet-parser/src/test/java/water/parser/parquet/ParseTestParquet.java @@ -29,6 +29,7 @@ import water.parser.ParseDataset; import water.parser.ParseSetup; import water.util.IcedInt; +import water.util.PrettyPrint; import java.io.File; import java.io.IOException; @@ -329,6 +330,36 @@ public void testParseCategoricalsWithZeroCharacters() { assertFrameAssertion(assertion); } + @Test + public void testParseDecimals() { + FrameAssertion assertion = new GenFrameAssertion("decimals.parquet", TestUtil.ari(2, 18)) { + @Override protected File prepareFile() throws IOException { return ParquetFileGenerator.generateParquetFileDecimals(Files.createTempDir(), file, nrows()); } + @Override public void check(Frame f) { + assertArrayEquals("Column names need to match!", ar("decimal32", "decimal64"), f.names()); + assertArrayEquals("Column types need to match!", ar(Vec.T_NUM, Vec.T_NUM), f.types()); + for (int row = 0; row < nrows(); row++) { + double expected32 = (1 + PrettyPrint.pow10(1, row % 9)) / 1e5; + assertEquals("Value in column decimal32", expected32, f.vec(0).at(row), 0); + double expected64 = (1 + PrettyPrint.pow10(1, row % 18)) / 1e10; + assertEquals("Value in column decimal64", expected64, f.vec(1).at(row), 0); + } + } + }; + assertFrameAssertion(assertion); + } + + @Test + public void testPubdev5673() { + Frame actual = null; + try { + actual = TestUtil.parse_test_file("smalldata/jira/pubdev-5673.parquet"); + double actualVal = actual.vec(0).at(0); + assertEquals(98776543211.99876, actualVal, 0); + } finally { + if (actual != null) actual.delete(); + } + } + } class ParquetFileGenerator { @@ -492,4 +523,29 @@ static File generateParquetFileWithNullCharacters(File parentDir, String filenam return f; } + static File generateParquetFileDecimals(File parentDir, String filename, int nrows) throws IOException { + File f = new File(parentDir, filename); + + Configuration conf = new Configuration(); + MessageType schema = parseMessageType( + "message test { required int32 decimal32 (DECIMAL(9, 5)); required int64 decimal64 (DECIMAL(18, 10)); } "); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory fact = new SimpleGroupFactory(schema); + ParquetWriter writer = new ParquetWriter(new Path(f.getPath()), new GroupWriteSupport(), + UNCOMPRESSED, 1024, 1024, 512, true, false, ParquetProperties.WriterVersion.PARQUET_2_0, conf); + try { + for (int i = 0; i < nrows; i++) { + Group g = fact.newGroup(); + int dec32 = 1 + (int) PrettyPrint.pow10(1, i % 9); + g.append("decimal32", dec32); + long dec64 = 1 + (long) PrettyPrint.pow10(1, i % 18); + g.append("decimal64", dec64); + writer.write(g); + } + } finally { + writer.close(); + } + return f; + } + } \ No newline at end of file