Skip to content

Commit

Permalink
NIFI-978: Support parameterized statements in ExecuteSQL
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2433.
  • Loading branch information
mattyb149 authored and pvillard31 committed Feb 12, 2018
1 parent b4a9f52 commit b5ca7ad
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 289 deletions.
Expand Up @@ -16,13 +16,11 @@
*/
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -35,6 +33,8 @@
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
Expand All @@ -50,8 +50,6 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
Expand All @@ -64,11 +62,32 @@
@EventDriven
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"sql", "select", "jdbc", "query", "database"})
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format."
@CapabilityDescription("Executes provided SQL select query. Query result will be converted to Avro format."
+ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
+ "select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. "
+ "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
@ReadsAttributes({
@ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer "
+ "that represents the JDBC Type of the parameter."),
@ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as "
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
@ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
+ "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases "
+ "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - "
+ "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. "
+ "base64: the string is a Base64 encoded string that can be decoded to bytes. "
+ "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. "
+ "Dates/Times/Timestamps - "
+ "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+ "as specified according to java.time.format.DateTimeFormatter. "
+ "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in "
+ "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), "
+ "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
})
@WritesAttributes({
@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"),
@WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds")
Expand Down Expand Up @@ -187,22 +206,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
queryContents.append(IOUtils.toString(in));
}
});
session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
selectQuery = queryContents.toString();
}

int resultCount=0;
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
final PreparedStatement st = con.prepareStatement(selectQuery)) {
st.setQueryTimeout(queryTimeout); // timeout in seconds

if (fileToProcess != null) {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
}
logger.debug("Executing query {}", new Object[]{selectQuery});
boolean results = st.execute(selectQuery);
boolean results = st.execute();


while(results){
Expand All @@ -215,22 +232,19 @@ public void process(InputStream in) throws IOException {
}

final AtomicLong nrOfRows = new AtomicLong(0L);
resultSetFF = session.write(resultSetFF, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
resultSetFF = session.write(resultSetFF, out -> {
try {

final ResultSet resultSet = st.getResultSet();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro)
.useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);
}
final ResultSet resultSet = st.getResultSet();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro)
.useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);
}
});

Expand Down Expand Up @@ -261,12 +275,7 @@ public void process(final OutputStream out) throws IOException {
if(resultCount > 0){
session.remove(fileToProcess);
} else {
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
JdbcCommon.createEmptyAvroStream(out);
}
});
fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream);

session.transfer(fileToProcess, REL_SUCCESS);
}
Expand Down

0 comments on commit b5ca7ad

Please sign in to comment.