Skip to content

Commit

Permalink
NIFI-3704: Updated to use renamed RecordReaderFactory and catch Schem…
Browse files Browse the repository at this point in the history
…aNotFoundException
  • Loading branch information
mattyb149 committed Apr 24, 2017
1 parent f3f5e0e commit 6ef1d31
Showing 1 changed file with 6 additions and 5 deletions.
Expand Up @@ -36,9 +36,10 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RowRecordReaderFactory;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
Expand Down Expand Up @@ -125,7 +126,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
.name("put-db-record-record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
.identifiesControllerService(RowRecordReaderFactory.class)
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();

Expand Down Expand Up @@ -326,8 +327,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

final ComponentLog log = getLogger();

final RowRecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
.asControllerService(RowRecordReaderFactory.class);
final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
.asControllerService(RecordReaderFactory.class);
final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
Expand Down Expand Up @@ -554,7 +555,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
session.transfer(flowFile, REL_RETRY);
}
}
} catch (final MalformedRecordException | IOException e) {
} catch (final MalformedRecordException | SchemaNotFoundException | IOException e) {
throw new ProcessException("Failed to determine schema of data records for " + flowFile, e);
}
}
Expand Down

0 comments on commit 6ef1d31

Please sign in to comment.