From 4d6de9663087ebb30041341c8d55de01c4af3875 Mon Sep 17 00:00:00 2001 From: Toivo Adams Date: Sun, 16 Oct 2016 20:15:33 +0300 Subject: [PATCH] NIFI-2624 JdbcCommon treats BigDecimals now as Avro Logical type using bytes to hold data (not String as is was before). --- .../processors/standard/util/JdbcCommon.java | 38 +++++++++++++++++-- .../standard/util/TestJdbcCommon.java | 14 ++++++- pom.xml | 2 +- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 0aa4c60e375e..c5c6b9a2b83d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -43,6 +43,7 @@ import static java.sql.Types.TINYINT; import static java.sql.Types.VARBINARY; import static java.sql.Types.VARCHAR; +import static java.util.Objects.requireNonNull; import java.io.IOException; import java.io.InputStream; @@ -55,8 +56,13 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.List; +import org.apache.avro.Conversions.DecimalConversion; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; import org.apache.avro.SchemaBuilder; import org.apache.avro.SchemaBuilder.FieldAssembler; import org.apache.avro.file.DataFileWriter; @@ -91,6 +97,7 @@ public static long convertToAvroStream(final ResultSet rs, final OutputStream ou throws SQLException, IOException { final Schema schema = createSchema(rs, recordName, convertNames); final GenericRecord rec = new GenericData.Record(schema); + final DecimalConversion decimalConversion = new DecimalConversion(); final DatumWriter datumWriter = new GenericDatumWriter<>(schema); try (final DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter)) { @@ -168,8 +175,13 @@ public static long convertToAvroStream(final ResultSet rs, final OutputStream ou rec.put(i - 1, ((Byte) value).intValue()); } else if (value instanceof BigDecimal) { - // Avro can't handle BigDecimal as a number - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38" - rec.put(i - 1, value.toString()); + // try to avoid mysterious error: Unknown datum type java.math.BigDecimal: 38 + String columnName = convertNames ? normalizeNameForAvro(meta.getColumnName(i)) : meta.getColumnName(i); + Schema decimalSchema = getDecimalSchema(schema, columnName); + LogicalType logicalType = LogicalTypes.fromSchema(decimalSchema); + + ByteBuffer byteBuffer = decimalConversion.toBytes((BigDecimal) value, decimalSchema, logicalType); + rec.put(i - 1, byteBuffer); } else if (value instanceof BigInteger) { // Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that. @@ -224,6 +236,21 @@ public static long convertToAvroStream(final ResultSet rs, final OutputStream ou } } + /** + * Because we want to support null values, Avro Union Schema is required. + * And handling is little bit complicated. getDecimalSchema() contains handling logic + */ + public static Schema getDecimalSchema(Schema recordSchema, String fieldName) { + + Field field = recordSchema.getField(fieldName); + requireNonNull(field, "schema does not contain field '" + fieldName + "'"); + + Schema unionSchema = field.schema(); + List supportedTypes = unionSchema.getTypes(); + Schema decimalSchema = supportedTypes.get(1); + return decimalSchema; + } + public static Schema createSchema(final ResultSet rs) throws SQLException { return createSchema(rs, null, false); } @@ -315,10 +342,13 @@ public static Schema createSchema(final ResultSet rs, String recordName, boolean builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault(); break; - // Did not find direct suitable type, need to be clarified!!!! + // Avro 1.8.1 support decimal type case DECIMAL: case NUMERIC: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + int dprecision = meta.getPrecision(i); + int scale = meta.getScale(i); + Schema decimal = LogicalTypes.decimal(dprecision, scale).addToSchema(Schema.create(Schema.Type.BYTES)); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(decimal).endUnion().noDefault(); break; // Did not find direct suitable type, need to be clarified!!!! diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java index dd375aa7f703..5e7be9cf4917 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.apache.nifi.processors.standard.util.JdbcCommon.getDecimalSchema; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -45,6 +46,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; +import org.apache.avro.Conversions.DecimalConversion; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -294,6 +298,7 @@ public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOExcept when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC); when(metadata.getColumnName(1)).thenReturn("The.Chairman"); when(metadata.getTableName(1)).thenReturn("1the::table"); + when(metadata.getPrecision(1)).thenReturn(10); final ResultSet rs = mock(ResultSet.class); when(rs.getMetaData()).thenReturn(metadata); @@ -323,7 +328,14 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { while (dataFileReader.hasNext()) { record = dataFileReader.next(record); assertEquals("_1the__table", record.getSchema().getName()); - assertEquals(bigDecimal.toString(), record.get("The_Chairman").toString()); + DecimalConversion decimalConversion = new DecimalConversion(); + Schema schema = record.getSchema(); + Schema decimalSchema = getDecimalSchema(schema, "The_Chairman"); + LogicalType logicalType = LogicalTypes.fromSchema(decimalSchema); + ByteBuffer buffer = (ByteBuffer) record.get("The_Chairman"); + + BigDecimal resultBD = decimalConversion.fromBytes(buffer, schema, logicalType); + assertEquals(bigDecimal.toString(), resultBD.toString()); } } } diff --git a/pom.xml b/pom.xml index 735a9ec09225..c4b2f91756b3 100644 --- a/pom.xml +++ b/pom.xml @@ -669,7 +669,7 @@ language governing permissions and limitations under the License. --> org.apache.avro avro - 1.7.7 + 1.8.1 com.sun.jersey