Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.ErrorTypes;
import org.apache.nifi.processor.util.pattern.ExceptionHandler;
Expand All @@ -54,8 +53,6 @@
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.db.JdbcCommon;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.BatchUpdateException;
import java.sql.Connection;
Expand All @@ -67,7 +64,6 @@
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -76,9 +72,12 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure;
import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;

@SupportsBatching
Expand Down Expand Up @@ -132,7 +131,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
.displayName("SQL Statement")
.description("The SQL statement to execute. The statement can be empty, a constant value, or built from attributes "
+ "using Expression Language. If this property is specified, it will be used regardless of the content of "
+ "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+ "incoming FlowFiles. If this property is empty, the content of the incoming FlowFile is expected "
+ "to contain a valid SQL statement, to be issued by the processor to the database.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
Expand Down Expand Up @@ -200,6 +199,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key();
private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key();

private static final String ERROR_MESSAGE_ATTR = "error.message";
private static final String ERROR_CODE_ATTR = "error.code";
private static final String ERROR_SQL_STATE_ATTR = "error.sql.state";

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
Expand Down Expand Up @@ -282,7 +285,7 @@ private boolean isSupportBatching() {

private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ffs) -> {
final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class)
.getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes());
.getConnection(ffs == null || ffs.isEmpty() ? emptyMap() : ffs.get(0).getAttributes());
try {
fc.originalAutoCommit = connection.getAutoCommit();
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
Expand Down Expand Up @@ -458,23 +461,44 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun

private ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) {
ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError = createOnError(context, session, result, REL_FAILURE, REL_RETRY);
onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
switch (r.destination()) {
onFlowFileError = onFlowFileError.andThen((ctx, flowFile, errorTypesResult, exception) -> {
flowFile = addErrorAttributesToFlowFile(session, flowFile, exception);

switch (errorTypesResult.destination()) {
case Failure:
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {i, e}, e);
getLogger().error("Failed to update database for {} due to {}; routing to failure", flowFile, exception, exception);
break;
case Retry:
getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
new Object[] {i, e}, e);
flowFile, exception, exception);
break;
case Self:
getLogger().error("Failed to update database for {} due to {};", new Object[] {i, e}, e);
getLogger().error("Failed to update database for {} due to {};", flowFile, exception, exception);
break;
}
});
return RollbackOnFailure.createOnError(onFlowFileError);
}

private ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError(final ProcessContext context, final ProcessSession session, final RoutingResult result) {
ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError =
ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY);

onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> {
Relationship relationship = errorTypesResult.destination() == Failure ? REL_FAILURE : REL_RETRY;
List<FlowFile> flowFilesToRelationship = result.getRoutedFlowFiles().get(relationship);
result.getRoutedFlowFiles().put(relationship, addErrorAttributesToFlowFilesInGroup(session, flowFilesToRelationship, flowFileGroup.getFlowFiles(), exception));
});

return onGroupError;
}

private List<FlowFile> addErrorAttributesToFlowFilesInGroup(ProcessSession session, List<FlowFile> flowFilesOnRelationship, List<FlowFile> flowFilesInGroup, Exception exception) {
return flowFilesOnRelationship.stream()
.map(ff -> flowFilesInGroup.contains(ff) ? addErrorAttributesToFlowFile(session, ff, exception) : ff)
.collect(toList());
}

private ExceptionHandler.OnError<FunctionContext, StatementFlowFileEnclosure> onBatchUpdateError(
final ProcessContext context, final ProcessSession session, final RoutingResult result) {
return RollbackOnFailure.createOnError((c, enclosure, r, e) -> {
Expand Down Expand Up @@ -502,7 +526,7 @@ private ExceptionHandler.OnError<FunctionContext, StatementFlowFileEnclosure> on
final int updateCount = updateCounts[i];
final FlowFile flowFile = batchFlowFiles.get(i);
if (updateCount == Statement.EXECUTE_FAILED) {
result.routeTo(flowFile, REL_FAILURE);
result.routeTo(addErrorAttributesToFlowFile(session, flowFile, e), REL_FAILURE);
failureCount++;
} else {
result.routeTo(flowFile, REL_SUCCESS);
Expand All @@ -514,7 +538,7 @@ private ExceptionHandler.OnError<FunctionContext, StatementFlowFileEnclosure> on
// if no failures found, the driver decided not to execute the statements after the
// failure, so route the last one to failure.
final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length);
result.routeTo(failedFlowFile, REL_FAILURE);
result.routeTo(addErrorAttributesToFlowFile(session, failedFlowFile, e), REL_FAILURE);
failureCount++;
}

Expand All @@ -527,23 +551,22 @@ private ExceptionHandler.OnError<FunctionContext, StatementFlowFileEnclosure> on
}

getLogger().error("Failed to update database due to a failed batch update, {}. There were a total of {} FlowFiles that failed, {} that succeeded, "
+ "and {} that were not execute and will be routed to retry; ", new Object[]{e, failureCount, successCount, retryCount}, e);
+ "and {} that were not execute and will be routed to retry; ", e, failureCount, successCount, retryCount, e);

return;

}

// Apply default error handling and logging for other Exceptions.
ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError
= ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY);
ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError = onGroupError(context, session, result);
onGroupError = onGroupError.andThen((cl, il, rl, el) -> {
switch (r.destination()) {
case Failure:
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {il.getFlowFiles(), e}, e);
getLogger().error("Failed to update database for {} due to {}; routing to failure", il.getFlowFiles(), e, e);
break;
case Retry:
getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
new Object[] {il.getFlowFiles(), e}, e);
il.getFlowFiles(), e, e);
break;
}
});
Expand Down Expand Up @@ -583,7 +606,7 @@ public void constructProcess() {
} catch (SQLException re) {
// Just log the fact that rollback failed.
// ProcessSession will be rollback by the thrown Exception so don't have to do anything here.
getLogger().warn("Failed to rollback database connection due to %s", new Object[]{re}, re);
getLogger().warn("Failed to rollback database connection due to {}",re, re);
}
});

Expand All @@ -594,7 +617,7 @@ public void constructProcess() {
try {
conn.setAutoCommit(fc.originalAutoCommit);
} catch (final SQLException se) {
getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se});
getLogger().warn("Failed to reset autocommit due to {}", se);
}
}
});
Expand All @@ -603,7 +626,8 @@ public void constructProcess() {
if (c.getProperty(SUPPORT_TRANSACTIONS).asBoolean()){
if (r.contains(REL_RETRY) || r.contains(REL_FAILURE)) {
final List<FlowFile> transferredFlowFiles = r.getRoutedFlowFiles().values().stream()
.flatMap(List::stream).collect(Collectors.toList());
.flatMap(List::stream).collect(toList());

Relationship rerouteShip = r.contains(REL_RETRY) ? REL_RETRY : REL_FAILURE;
r.getRoutedFlowFiles().clear();
r.routeTo(transferredFlowFiles, rerouteShip);
Expand Down Expand Up @@ -646,7 +670,7 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
* transaction will be sorted in the order that they should be evaluated.
*
* @param context the process context for determining properties
* @param session the process session for pulling flowfiles
* @param session the process session for pulling FlowFiles
* @return a FlowFilePoll containing a List of FlowFiles to process, or <code>null</code> if there are no FlowFiles to process
*/
private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session,
Expand Down Expand Up @@ -686,8 +710,9 @@ private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSe
} catch (IllegalArgumentException e) {
// Map relationship based on context, and then let default handler to handle.
final ErrorTypes.Result adjustedRoute = adjustError.apply(functionContext, ErrorTypes.InvalidInput);
ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY)
onGroupError(context, session, result)
.apply(functionContext, () -> flowFiles, adjustedRoute, e);

return null;
}

Expand All @@ -705,7 +730,7 @@ private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSe
*
* @param stmt the statement that generated a key
* @return the key that was generated from the given statement, or <code>null</code> if no key
* was generated or it could not be determined.
* was generated, or it could not be determined.
*/
private String determineGeneratedKey(final PreparedStatement stmt) {
try {
Expand All @@ -731,12 +756,7 @@ private String determineGeneratedKey(final PreparedStatement stmt) {
private String getSQL(final ProcessSession session, final FlowFile flowFile) {
// Read the SQL from the FlowFile's content
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));

// Create the PreparedStatement to use for this FlowFile.
final String sql = new String(buffer, StandardCharsets.UTF_8);
Expand Down Expand Up @@ -836,7 +856,17 @@ boolean isFragmentedTransactionReady(final List<FlowFile> flowFiles, final Long
return false; // not enough FlowFiles for this transaction. Return them all to queue.
}

private FlowFile addErrorAttributesToFlowFile(final ProcessSession session, FlowFile flowFile, final Exception exception) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(ERROR_MESSAGE_ATTR, exception.getMessage());

if (exception instanceof SQLException) {
attributes.put(ERROR_CODE_ATTR, valueOf(((SQLException) exception).getErrorCode()));
attributes.put(ERROR_SQL_STATE_ATTR, valueOf(((SQLException) exception).getSQLState()));
}

return session.putAllAttributes(flowFile, attributes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that the flowfile doesn't have original attributes before being updated with the error ones or does the method only update and not replace the flowfile attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method adds extra attributes to the FlowFile, it does not remove any existing attribute. (Added a check in unit tests to verify this.)

}

/**
* A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong
Expand Down
Loading