Skip to content

Commit

Permalink
NIFI-3704: Added retry paths for transient SQL errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Apr 24, 2017
1 parent 2ecd130 commit f3f5e0e
Showing 1 changed file with 23 additions and 6 deletions.
Expand Up @@ -51,6 +51,7 @@
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -415,10 +416,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, jdbcURL);
} catch (final SQLException e) {
log.error("Unable to update database due to {}, flowfile {} will be penalized and routed to failure", new Object[]{e.getMessage(), flowFile}, e);
} catch (final SQLNonTransientException e) {
log.error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} catch (final SQLException e) {
log.error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
new Object[]{flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
}
} else {
log.warn("Record schema does not contain Field Containing SQL: {}, flowfile {} will be penalized and routed to failure", new Object[]{sqlField, flowFile});
Expand Down Expand Up @@ -450,11 +456,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
try (final Connection conn = dbcpService.getConnection()) {
schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys);
schemaCache.put(schemaKey, schema);
} catch (final SQLException e) {
log.error("Failed to convert {} into a SQL statement due to {}; penalizing and routing to failure", new Object[]{flowFile, e.toString()}, e);
} catch (final SQLNonTransientException e) {
log.error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
} catch (final SQLException e) {
log.error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
new Object[]{flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
return;
}
}
}
Expand Down Expand Up @@ -531,10 +543,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, jdbcURL);

} catch (final SQLException e) {
log.error("Unable to update database due to {}, flowfile {} will be penalized", new Object[]{e.getMessage(), flowFile}, e);
} catch (final SQLNonTransientException e) {
log.error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} catch (final SQLException e) {
log.error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
new Object[]{flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
}
}
} catch (final MalformedRecordException | IOException e) {
Expand Down

0 comments on commit f3f5e0e

Please sign in to comment.