Skip to content

Commit

Permalink
Fixed dependency issue by providing a local JSON reader
Browse files Browse the repository at this point in the history
  • Loading branch information
pvillard31 committed Mar 31, 2018
1 parent ad52f55 commit a6d28ec
Show file tree
Hide file tree
Showing 2 changed files with 324 additions and 8 deletions.
Expand Up @@ -60,8 +60,7 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.6.0-SNAPSHOT</version>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down
Expand Up @@ -19,10 +19,15 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
Expand All @@ -37,7 +42,6 @@
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
Expand All @@ -47,15 +51,32 @@
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SerializedForm;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;

/**
* Base class for ReportingTasks that send data over site-to-site.
Expand All @@ -66,10 +87,6 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
protected static final String DESTINATION_URL_PATH = "/nifi";
protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";

private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();

static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
.name("Destination URL")
.displayName("Destination URL")
Expand Down Expand Up @@ -247,7 +264,7 @@ protected SiteToSiteClient getClient() {
}

protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) {
try (final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), recordSchema, dateFormat, timeFormat, timestampFormat)) {
try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) {

final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema);
Expand Down Expand Up @@ -324,4 +341,304 @@ protected void addField(final JsonObjectBuilder builder, final String key, final
builder.add(key, value);
}
}

private class JsonRecordReader implements RecordReader {

private RecordSchema recordSchema;
private final JsonParser jsonParser;
private final boolean array;
private final JsonNode firstJsonNode;
private boolean firstObjectConsumed = false;

private final Supplier<DateFormat> dateFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat());
private final Supplier<DateFormat> timeFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat());
private final Supplier<DateFormat> timestampFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat());

public JsonRecordReader(final InputStream in, RecordSchema recordSchema) throws IOException, MalformedRecordException {
this.recordSchema = recordSchema;
try {
jsonParser = new JsonFactory().createJsonParser(in);
jsonParser.setCodec(new ObjectMapper());
JsonToken token = jsonParser.nextToken();
if (token == JsonToken.START_ARRAY) {
array = true;
token = jsonParser.nextToken();
} else {
array = false;
}
if (token == JsonToken.START_OBJECT) {
firstJsonNode = jsonParser.readValueAsTree();
} else {
firstJsonNode = null;
}
} catch (final JsonParseException e) {
throw new MalformedRecordException("Could not parse data as JSON", e);
}
}

@Override
public void close() throws IOException {
jsonParser.close();
}

@Override
public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
if (firstObjectConsumed && !array) {
return null;
}
try {
return convertJsonNodeToRecord(getNextJsonNode(), getSchema(), null, coerceTypes, dropUnknownFields);
} catch (final MalformedRecordException mre) {
throw mre;
} catch (final IOException ioe) {
throw ioe;
} catch (final Exception e) {
throw new MalformedRecordException("Failed to convert data into a Record object with the given schema", e);
}
}

@Override
public RecordSchema getSchema() throws MalformedRecordException {
return recordSchema;
}

private JsonNode getNextJsonNode() throws JsonParseException, IOException, MalformedRecordException {
if (!firstObjectConsumed) {
firstObjectConsumed = true;
return firstJsonNode;
}
while (true) {
final JsonToken token = jsonParser.nextToken();
if (token == null) {
return null;
}
switch (token) {
case END_OBJECT:
continue;
case START_OBJECT:
return jsonParser.readValueAsTree();
case END_ARRAY:
case START_ARRAY:
return null;
default:
throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
}
}
}

private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix,
final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {

final Map<String, Object> values = new HashMap<>(schema.getFieldCount() * 2);

if (dropUnknown) {
for (final RecordField recordField : schema.getFields()) {
final JsonNode childNode = getChildNode(jsonNode, recordField);
if (childNode == null) {
continue;
}

final String fieldName = recordField.getFieldName();
final Object value;

if (coerceTypes) {
final DataType desiredType = recordField.getDataType();
final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
} else {
value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
}

values.put(fieldName, value);
}
} else {
final Iterator<String> fieldNames = jsonNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final JsonNode childNode = jsonNode.get(fieldName);
final RecordField recordField = schema.getField(fieldName).orElse(null);
final Object value;

if (coerceTypes && recordField != null) {
final DataType desiredType = recordField.getDataType();
final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
} else {
value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
}

values.put(fieldName, value);
}
}

final Supplier<String> supplier = () -> jsonNode.toString();
return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown);
}

private JsonNode getChildNode(final JsonNode jsonNode, final RecordField field) {
if (jsonNode.has(field.getFieldName())) {
return jsonNode.get(field.getFieldName());
}
for (final String alias : field.getAliases()) {
if (jsonNode.has(alias)) {
return jsonNode.get(alias);
}
}
return null;
}

protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType, final boolean dropUnknown) throws IOException, MalformedRecordException {
if (fieldNode == null || fieldNode.isNull()) {
return null;
}

switch (desiredType.getFieldType()) {
case BOOLEAN:
case BYTE:
case CHAR:
case DOUBLE:
case FLOAT:
case INT:
case BIGINT:
case LONG:
case SHORT:
case STRING:
case DATE:
case TIME:
case TIMESTAMP: {
final Object rawValue = getRawNodeValue(fieldNode, null);
final Object converted = DataTypeUtils.convertType(rawValue, desiredType, dateFormat, timeFormat, timestampFormat, fieldName);
return converted;
}
case MAP: {
final DataType valueType = ((MapDataType) desiredType).getValueType();

final Map<String, Object> map = new HashMap<>();
final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
while (fieldNameItr.hasNext()) {
final String childName = fieldNameItr.next();
final JsonNode childNode = fieldNode.get(childName);
final Object childValue = convertField(childNode, fieldName, valueType, dropUnknown);
map.put(childName, childValue);
}

return map;
}
case ARRAY: {
final ArrayNode arrayNode = (ArrayNode) fieldNode;
final int numElements = arrayNode.size();
final Object[] arrayElements = new Object[numElements];
int count = 0;
for (final JsonNode node : arrayNode) {
final DataType elementType = ((ArrayDataType) desiredType).getElementType();
final Object converted = convertField(node, fieldName, elementType, dropUnknown);
arrayElements[count++] = converted;
}

return arrayElements;
}
case RECORD: {
if (fieldNode.isObject()) {
RecordSchema childSchema;
if (desiredType instanceof RecordDataType) {
childSchema = ((RecordDataType) desiredType).getChildSchema();
} else {
return null;
}

if (childSchema == null) {
final List<RecordField> fields = new ArrayList<>();
final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
while (fieldNameItr.hasNext()) {
fields.add(new RecordField(fieldNameItr.next(), RecordFieldType.STRING.getDataType()));
}

childSchema = new SimpleRecordSchema(fields);
}

return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".", true, dropUnknown);
} else {
return null;
}
}
case CHOICE: {
return DataTypeUtils.convertType(getRawNodeValue(fieldNode, null), desiredType, fieldName);
}
}

return null;
}

protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException {
if (fieldNode == null || fieldNode.isNull()) {
return null;
}

if (fieldNode.isNumber()) {
return fieldNode.getNumberValue();
}

if (fieldNode.isBinary()) {
return fieldNode.getBinaryValue();
}

if (fieldNode.isBoolean()) {
return fieldNode.getBooleanValue();
}

if (fieldNode.isTextual()) {
return fieldNode.getTextValue();
}

if (fieldNode.isArray()) {
final ArrayNode arrayNode = (ArrayNode) fieldNode;
final int numElements = arrayNode.size();
final Object[] arrayElements = new Object[numElements];
int count = 0;

final DataType elementDataType;
if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
elementDataType = arrayDataType.getElementType();
} else {
elementDataType = null;
}

for (final JsonNode node : arrayNode) {
final Object value = getRawNodeValue(node, elementDataType);
arrayElements[count++] = value;
}

return arrayElements;
}

if (fieldNode.isObject()) {
RecordSchema childSchema;
if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) {
final RecordDataType recordDataType = (RecordDataType) dataType;
childSchema = recordDataType.getChildSchema();
} else {
childSchema = null;
}

if (childSchema == null) {
childSchema = new SimpleRecordSchema(Collections.emptyList());
}

final Iterator<String> fieldNames = fieldNode.getFieldNames();
final Map<String, Object> childValues = new HashMap<>();
while (fieldNames.hasNext()) {
final String childFieldName = fieldNames.next();
final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType);
childValues.put(childFieldName, childValue);
}

final MapRecord record = new MapRecord(childSchema, childValues);
return record;
}

return null;
}

}
}

0 comments on commit a6d28ec

Please sign in to comment.