diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 9442e3356246f..adf398010e7c7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -51,6 +51,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.SQLNonTransientException; import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; @@ -415,10 +416,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().send(flowFile, jdbcURL); - } catch (final SQLException e) { - log.error("Unable to update database due to {}, flowfile {} will be penalized and routed to failure", new Object[]{e.getMessage(), flowFile}, e); + } catch (final SQLNonTransientException e) { + log.error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, e}); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); + } catch (final SQLException e) { + log.error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", + new Object[]{flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_RETRY); } } else { log.warn("Record schema does not contain Field Containing SQL: {}, flowfile {} will be penalized and routed to failure", new Object[]{sqlField, flowFile}); @@ -450,11 +456,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session try (final Connection conn = dbcpService.getConnection()) { schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys); schemaCache.put(schemaKey, schema); - } catch (final SQLException e) { - log.error("Failed to convert {} into a SQL statement due to {}; penalizing and routing to failure", new Object[]{flowFile, e.toString()}, e); + } catch (final SQLNonTransientException e) { + log.error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, e}); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); return; + } catch (final SQLException e) { + log.error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", + new Object[]{flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_RETRY); + return; } } } @@ -531,10 +543,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().send(flowFile, jdbcURL); - } catch (final SQLException e) { - log.error("Unable to update database due to {}, flowfile {} will be penalized", new Object[]{e.getMessage(), flowFile}, e); + } catch (final SQLNonTransientException e) { + log.error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, e}); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); + } catch (final SQLException e) { + log.error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", + new Object[]{flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_RETRY); } } } catch (final MalformedRecordException | IOException e) {