Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -143,6 +144,8 @@ protected AbstractJsonRowRecordReader(final InputStream in,
try {
final StreamReadConstraints configuredStreamReadConstraints = streamReadConstraints == null ? DEFAULT_STREAM_READ_CONSTRAINTS : streamReadConstraints;
jsonParser = tokenParserFactory.getJsonParser(in, configuredStreamReadConstraints, allowComments);
jsonParser.enable(Feature.USE_FAST_DOUBLE_PARSER);
jsonParser.enable(Feature.USE_FAST_BIG_NUMBER_PARSER);

if (strategy == StartingFieldStrategy.NESTED_FIELD) {
while (jsonParser.nextToken() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.NullSuppression;
Expand Down Expand Up @@ -50,7 +51,12 @@
import java.util.Set;
import java.util.function.Supplier;


import java.util.regex.Pattern;

public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
private static final Pattern SCIENTIFIC_NOTATION_PATTERN = Pattern.compile("[0-9]([eE][-+]?)[0-9]");

private final ComponentLog logger;
private final SchemaAccessWriter schemaAccess;
private final RecordSchema recordSchema;
Expand All @@ -62,17 +68,18 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
private String mimeType = "application/json";
private final boolean prettyPrint;
private final boolean allowScientificNotation;

private static final ObjectMapper objectMapper = new ObjectMapper();

public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json");
this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json", false);
}

public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat,
final String mimeType) throws IOException {
final String mimeType, final boolean allowScientificNotation) throws IOException {

super(out);
this.logger = logger;
Expand All @@ -81,6 +88,7 @@ public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchem
this.nullSuppression = nullSuppression;
this.outputGrouping = outputGrouping;
this.mimeType = mimeType;
this.allowScientificNotation = allowScientificNotation;

// Use DateFormat with default TimeZone to avoid unexpected conversion of year-month-day
final DateFormat df = dateFormat == null ? null : new SimpleDateFormat(dateFormat);
Expand All @@ -95,6 +103,10 @@ public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchem
factory.setCodec(objectMapper);

this.generator = factory.createGenerator(out);
if (!allowScientificNotation) {
generator.enable(Feature.WRITE_BIGDECIMAL_AS_PLAIN);
}

this.prettyPrint = prettyPrint;
if (prettyPrint) {
generator.useDefaultPrettyPrinter();
Expand Down Expand Up @@ -167,23 +179,49 @@ public WriteResult writeRawRecord(final Record record) throws IOException {
return WriteResult.of(incrementRecordCount(), attributes);
}

private boolean isUseSerializeForm(final Record record, final RecordSchema writeSchema) {
final Optional<SerializedForm> serializedForm = record.getSerializedForm();
if (!serializedForm.isPresent()) {
return false;
}

final SerializedForm form = serializedForm.get();
if (!form.getMimeType().equals(getMimeType()) || !record.getSchema().equals(writeSchema)) {
return false;
}

final Object serialized = form.getSerialized();
String serializedString;

if (!(serialized instanceof String)) {
return false;
} else {
serializedString = (String) serialized;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it's risky to remove the check for pretty formatting form the backport. A later backport of https://issues.apache.org/jira/browse/NIFI-12480 is not straightforward, it's very possible this wouldn't be added back. I see no drawback in leaving it here.

Suggested change
final boolean serializedPretty = serializedString.contains("\n");
if (serializedPretty != this.prettyPrint) {
return false;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tpalfy Thank you for the comment, I agree, we need to avoid possible merge issues.
I think it would be the best if I backported NIFI-12480 first and when it is merged to support/nifi-1.x, we can continue with the actual ticket.
I created another ticket for backporting NIFI-12480, will create the PR for it soon: https://issues.apache.org/jira/browse/NIFI-13090

final boolean serializedPretty = serializedString.contains("\n");
if (serializedPretty != this.prettyPrint) {
return false;
}

if (!allowScientificNotation && hasScientificNotation(serializedString)) {
return false;
}

return true;
}

private boolean hasScientificNotation(final String value) {
return SCIENTIFIC_NOTATION_PATTERN.matcher(value).find();
}

private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator,
final GeneratorTask startTask, final GeneratorTask endTask, final boolean schemaAware) throws IOException {

final Optional<SerializedForm> serializedForm = record.getSerializedForm();
if (serializedForm.isPresent()) {
final SerializedForm form = serializedForm.get();
if (form.getMimeType().equals(getMimeType()) && record.getSchema().equals(writeSchema)) {
final Object serialized = form.getSerialized();
if (serialized instanceof String) {
String serializedString = (String) serialized;
final boolean serializedPretty = serializedString.contains("\n");
if (serializedPretty == this.prettyPrint) {
generator.writeRawValue((String) serialized);
return;
}
}
}
if (isUseSerializeForm(record, writeSchema)) {
final String serialized = (String) record.getSerializedForm().get().getSerialized();
generator.writeRawValue(serialized);
return;
}

try {
Expand Down Expand Up @@ -298,6 +336,12 @@ private void writeRawValue(final JsonGenerator generator, final Object value, fi
generator.writeObject(formatted);
return;
}
if (!allowScientificNotation) {
if (value instanceof Double || value instanceof Float) {
generator.writeNumber(DataTypeUtils.toBigDecimal(value, fieldName));
return;
}
}

generator.writeObject(value);
}
Expand Down Expand Up @@ -367,10 +411,18 @@ private void writeValue(final JsonGenerator generator, final Object value, final
break;
}
case DOUBLE:
generator.writeNumber(DataTypeUtils.toDouble(coercedValue, fieldName));
if (allowScientificNotation) {
generator.writeNumber(DataTypeUtils.toDouble(coercedValue, fieldName));
} else {
generator.writeNumber(DataTypeUtils.toBigDecimal(coercedValue, fieldName));
}
break;
case FLOAT:
generator.writeNumber(DataTypeUtils.toFloat(coercedValue, fieldName));
if (allowScientificNotation) {
generator.writeNumber(DataTypeUtils.toFloat(coercedValue, fieldName));
} else {
generator.writeNumber(DataTypeUtils.toBigDecimal(coercedValue, fieldName));
}
break;
case LONG:
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor ALLOW_SCIENTIFIC_NOTATION = new PropertyDescriptor.Builder()
.name("Allow Scientific Notation")
.description("Specifies whether or not scientific notation should be used when writing numbers")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor OUTPUT_GROUPING = new PropertyDescriptor.Builder()
.name("output-grouping")
.displayName("Output Grouping")
Expand Down Expand Up @@ -120,6 +128,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
.build();

private volatile boolean prettyPrint;
private volatile boolean allowScientificNotation;
private volatile NullSuppression nullSuppression;
private volatile OutputGrouping outputGrouping;
private volatile String compressionFormat;
Expand All @@ -130,6 +139,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PRETTY_PRINT_JSON);
properties.add(SUPPRESS_NULLS);
properties.add(ALLOW_SCIENTIFIC_NOTATION);
properties.add(OUTPUT_GROUPING);
properties.add(COMPRESSION_FORMAT);
properties.add(COMPRESSION_LEVEL);
Expand All @@ -150,6 +160,7 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
prettyPrint = context.getProperty(PRETTY_PRINT_JSON).asBoolean();
allowScientificNotation = context.getProperty(ALLOW_SCIENTIFIC_NOTATION).asBoolean();

final NullSuppression suppression;
final String suppressNullValue = context.getProperty(SUPPRESS_NULLS).getValue();
Expand Down Expand Up @@ -180,44 +191,44 @@ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchem

final OutputStream bufferedOut = new BufferedOutputStream(out, 65536);
final OutputStream compressionOut;
String mimeTypeRef;
String mimeType;

try {
switch (compressionFormat.toLowerCase()) {
case COMPRESSION_FORMAT_GZIP:
compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel);
mimeTypeRef = "application/gzip";
mimeType = "application/gzip";
break;
case COMPRESSION_FORMAT_XZ_LZMA2:
compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options());
mimeTypeRef = "application/x-xz";
mimeType = "application/x-xz";
break;
case COMPRESSION_FORMAT_SNAPPY:
compressionOut = new SnappyOutputStream(bufferedOut);
mimeTypeRef = "application/x-snappy";
mimeType = "application/x-snappy";
break;
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
compressionOut = new SnappyFramedOutputStream(bufferedOut);
mimeTypeRef = "application/x-snappy-framed";
mimeType = "application/x-snappy-framed";
break;
case COMPRESSION_FORMAT_BZIP2:
mimeTypeRef = "application/x-bzip2";
mimeType = "application/x-bzip2";
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
break;
case COMPRESSION_FORMAT_ZSTD:
mimeTypeRef = "application/zstd";
mimeType = "application/zstd";
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
break;
default:
mimeTypeRef = "application/json";
mimeType = "application/json";
compressionOut = out;
}
} catch (CompressorException e) {
throw new IOException(e);
}

return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema, variables), compressionOut, prettyPrint, nullSuppression, outputGrouping,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef);
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeType, allowScientificNotation);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,82 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

class TestWriteJsonResult {

@Test
public void testScientificNotationUsage() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(5, 10)));
final RecordSchema schema = new SimpleRecordSchema(fields);

final String expectedWithScientificNotation = "{\"float\":-4.2910323,\"double\":4.51E-7,\"decimal\":8.0E-8}"
.trim();
final String expectedWithScientificNotationArray = "[" + expectedWithScientificNotation + "]";
final String expectedWithoutScientificNotation = "{\"float\":-4.2910323,\"double\":0.000000451,\"decimal\":0.000000080}".trim();
final String expectedWithoutScientificNotationArray = "[" + expectedWithoutScientificNotation + "]";

final Map<String, Object> values = new HashMap<>();
values.put("float", -4.291032244F);
values.put("double", 0.000000451D);
values.put("decimal", new BigDecimal("0.000000080"));
final Record record = new MapRecord(schema, values);

final String withScientificNotation = writeRecord(record, true, false);
assertEquals(expectedWithScientificNotationArray, withScientificNotation);

// We cannot be sure of the ordering when writing the raw record
final String rawWithScientificNotation = writeRecord(record, true, true);
assertTrue(rawWithScientificNotation.contains("\"float\":-4.2910323"));
assertTrue(rawWithScientificNotation.contains("\"double\":4.51E-7"));
assertTrue(rawWithScientificNotation.contains("\"decimal\":8.0E-8"));

final String withoutScientificNotation = writeRecord(record, false, false);
assertEquals(expectedWithoutScientificNotationArray, withoutScientificNotation);

// We cannot be sure of the ordering when writing the raw record
final String rawWithoutScientificNotation = writeRecord(record, false, true);
assertTrue(rawWithoutScientificNotation.contains("\"float\":-4.2910323"));
assertTrue(rawWithoutScientificNotation.contains("\"double\":0.000000451"));
assertTrue(rawWithoutScientificNotation.contains("\"decimal\":0.000000080"));

final Record recordWithSerializedForm = new MapRecord(schema, values, SerializedForm.of(expectedWithScientificNotation, "application/json"));
final String writtenWith = writeRecord(recordWithSerializedForm, true, false);
assertEquals(expectedWithScientificNotationArray, writtenWith);

final String writtenWithout = writeRecord(recordWithSerializedForm, false, false);
assertEquals(expectedWithoutScientificNotationArray, writtenWithout);

// We cannot be sure of the ordering when writing the raw record
final String writtenWithoutRaw = writeRecord(recordWithSerializedForm, false, true);
assertTrue(writtenWithoutRaw.contains("\"float\":-4.2910323"));
assertTrue(writtenWithoutRaw.contains("\"double\":0.000000451"));
assertTrue(writtenWithoutRaw.contains("\"decimal\":0.000000080"));
}

private String writeRecord(final Record record, final boolean allowScientificNotation, final boolean writeRawRecord) throws IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), record.getSchema(), new SchemaNameAsAttribute(), baos, false,
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "application/json", allowScientificNotation)) {

writer.beginRecordSet();
if (writeRawRecord) {
writer.writeRawRecord(record);
} else {
writer.write(record);
}

writer.finishRecordSet();
writer.flush();

return baos.toString(StandardCharsets.UTF_8.name());
}
}

@Test
void testDataTypes() throws IOException, ParseException {
final List<RecordField> fields = new ArrayList<>();
Expand Down Expand Up @@ -116,7 +189,7 @@ void testDataTypes() throws IOException, ParseException {
writer.write(rs);
}

final String output = baos.toString("UTF-8");
final String output = baos.toString(StandardCharsets.UTF_8.name());

final String expected = new String(Files.readAllBytes(Paths.get("src/test/resources/json/output/dataTypes.json")));
assertEquals(StringUtils.deleteWhitespace(expected), StringUtils.deleteWhitespace(output));
Expand Down Expand Up @@ -555,7 +628,7 @@ void testChoiceArrayOfStringsOrArrayOfRecords() throws IOException {

final byte[] data = baos.toByteArray();

final String output = new String(data, StandardCharsets.UTF_8);
final String output = new String(data, StandardCharsets.UTF_8.name());
assertEquals(json, output);
}
}