From f277264b5dae771ac4eda0b774c39f9fbcb27293 Mon Sep 17 00:00:00 2001 From: krisztina-zsihovszki Date: Tue, 28 Feb 2023 13:34:08 +0100 Subject: [PATCH 1/2] NIFI-4651 Add error message and code to PutSQL FlowFile attributes --- .../nifi/processors/standard/PutSQL.java | 94 +++++-- .../nifi/processors/standard/TestPutSQL.java | 232 +++++++++++------- 2 files changed, 207 insertions(+), 119 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 609c1aa440f8..d0ba92935ea9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.standard; +import java.util.Optional; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; @@ -41,7 +42,6 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.pattern.ErrorTypes; import org.apache.nifi.processor.util.pattern.ExceptionHandler; @@ -54,8 +54,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.db.JdbcCommon; -import java.io.IOException; -import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.sql.BatchUpdateException; import java.sql.Connection; @@ -67,7 +65,6 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -76,9 +73,11 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; -import java.util.stream.Collectors; import static java.lang.String.format; +import static java.lang.String.valueOf; +import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.toList; import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError; @SupportsBatching @@ -132,7 +131,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { .displayName("SQL Statement") .description("The SQL statement to execute. The statement can be empty, a constant value, or built from attributes " + "using Expression Language. If this property is specified, it will be used regardless of the content of " - + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected " + + "incoming FlowFiles. If this property is empty, the content of the incoming FlowFile is expected " + "to contain a valid SQL statement, to be issued by the processor to the database.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -200,6 +199,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor { private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key(); private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key(); + private static final String ERROR_MESSAGE_ATTR = "error.message"; + private static final String ERROR_CODE_ATTR = "error.code"; + private static final String ERROR_SQL_STATE_ATTR = "error.sql.state"; + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); @@ -282,7 +285,7 @@ private boolean isSupportBatching() { private final PartialFunctions.InitConnection initConnection = (c, s, fc, ffs) -> { final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class) - .getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ffs.get(0).getAttributes()); + .getConnection(ffs == null || ffs.isEmpty() ? emptyMap() : ffs.get(0).getAttributes()); try { fc.originalAutoCommit = connection.getAutoCommit(); final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean(); @@ -458,23 +461,58 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun private ExceptionHandler.OnError onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { ExceptionHandler.OnError onFlowFileError = createOnError(context, session, result, REL_FAILURE, REL_RETRY); - onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> { - switch (r.destination()) { + onFlowFileError = onFlowFileError.andThen((ctx, flowFile, errorTypesResult, exception) -> { + flowFile = addErrorAttributesToFlowFile(session, flowFile, exception); + + switch (errorTypesResult.destination()) { case Failure: - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {i, e}, e); + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, exception}, exception); break; case Retry: getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", - new Object[] {i, e}, e); + new Object[] {flowFile, exception}, exception); break; case Self: - getLogger().error("Failed to update database for {} due to {};", new Object[] {i, e}, e); + getLogger().error("Failed to update database for {} due to {};", new Object[] {flowFile, exception}, exception); break; } }); return RollbackOnFailure.createOnError(onFlowFileError); } + private ExceptionHandler.OnError onGroupError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { + ExceptionHandler.OnError onGroupError + = ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); + onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> { + + switch (errorTypesResult.destination()) { + case Failure: + List flowFilesToFailure = getFlowFilesOnRelationShip(result, REL_FAILURE); + Optional.ofNullable(flowFilesToFailure).map(flowFiles -> + result.getRoutedFlowFiles().put(REL_FAILURE, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); + break; + case Retry: + List flowFilesToRetry = getFlowFilesOnRelationShip(result, REL_RETRY); + Optional.ofNullable(flowFilesToRetry).map(flowFiles -> + result.getRoutedFlowFiles().put(REL_RETRY, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); + break; + } + }); + return onGroupError; + } + + private List addErrorAttributesToFlowFilesInGroup(ProcessSession session, List flowFilesOnRelationship, List flowFilesInGroup, Exception exception) { + return flowFilesOnRelationship.stream() + .map(ff -> flowFilesInGroup.contains(ff) ? addErrorAttributesToFlowFile(session, ff, exception) : ff) + .collect(toList()); + } + + private List getFlowFilesOnRelationShip(RoutingResult result, final Relationship relationship) { + return Optional.of(result.getRoutedFlowFiles()) + .orElse(emptyMap()) + .get(relationship); + } + private ExceptionHandler.OnError onBatchUpdateError( final ProcessContext context, final ProcessSession session, final RoutingResult result) { return RollbackOnFailure.createOnError((c, enclosure, r, e) -> { @@ -502,7 +540,7 @@ private ExceptionHandler.OnError on final int updateCount = updateCounts[i]; final FlowFile flowFile = batchFlowFiles.get(i); if (updateCount == Statement.EXECUTE_FAILED) { - result.routeTo(flowFile, REL_FAILURE); + result.routeTo(addErrorAttributesToFlowFile(session, flowFile, e), REL_FAILURE); failureCount++; } else { result.routeTo(flowFile, REL_SUCCESS); @@ -514,7 +552,7 @@ private ExceptionHandler.OnError on // if no failures found, the driver decided not to execute the statements after the // failure, so route the last one to failure. final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length); - result.routeTo(failedFlowFile, REL_FAILURE); + result.routeTo(addErrorAttributesToFlowFile(session, failedFlowFile, e), REL_FAILURE); failureCount++; } @@ -534,8 +572,7 @@ private ExceptionHandler.OnError on } // Apply default error handling and logging for other Exceptions. - ExceptionHandler.OnError onGroupError - = ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); + ExceptionHandler.OnError onGroupError = onGroupError(context, session, result); onGroupError = onGroupError.andThen((cl, il, rl, el) -> { switch (r.destination()) { case Failure: @@ -603,7 +640,8 @@ public void constructProcess() { if (c.getProperty(SUPPORT_TRANSACTIONS).asBoolean()){ if (r.contains(REL_RETRY) || r.contains(REL_FAILURE)) { final List transferredFlowFiles = r.getRoutedFlowFiles().values().stream() - .flatMap(List::stream).collect(Collectors.toList()); + .flatMap(List::stream).collect(toList()); + Relationship rerouteShip = r.contains(REL_RETRY) ? REL_RETRY : REL_FAILURE; r.getRoutedFlowFiles().clear(); r.routeTo(transferredFlowFiles, rerouteShip); @@ -686,8 +724,9 @@ private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSe } catch (IllegalArgumentException e) { // Map relationship based on context, and then let default handler to handle. final ErrorTypes.Result adjustedRoute = adjustError.apply(functionContext, ErrorTypes.InvalidInput); - ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY) + onGroupError(context, session, result) .apply(functionContext, () -> flowFiles, adjustedRoute, e); + return null; } @@ -705,7 +744,7 @@ private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSe * * @param stmt the statement that generated a key * @return the key that was generated from the given statement, or null if no key - * was generated or it could not be determined. + * was generated, or it could not be determined. */ private String determineGeneratedKey(final PreparedStatement stmt) { try { @@ -731,12 +770,7 @@ private String determineGeneratedKey(final PreparedStatement stmt) { private String getSQL(final ProcessSession session, final FlowFile flowFile) { // Read the SQL from the FlowFile's content final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer); - } - }); + session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer)); // Create the PreparedStatement to use for this FlowFile. final String sql = new String(buffer, StandardCharsets.UTF_8); @@ -836,7 +870,17 @@ boolean isFragmentedTransactionReady(final List flowFiles, final Long return false; // not enough FlowFiles for this transaction. Return them all to queue. } + private FlowFile addErrorAttributesToFlowFile(final ProcessSession session, FlowFile flowFile, final Exception exception) { + final Map attributes = new HashMap<>(); + attributes.put(ERROR_MESSAGE_ATTR, exception.getMessage()); + if (exception instanceof SQLException) { + attributes.put(ERROR_CODE_ATTR, valueOf(((SQLException) exception).getErrorCode())); + attributes.put(ERROR_SQL_STATE_ATTR, valueOf(((SQLException) exception).getSQLState())); + } + + return session.putAllAttributes(flowFile, attributes); + } /** * A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 640c5c638cf8..758e18a04d4b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -20,6 +20,7 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -35,7 +36,6 @@ import javax.xml.bind.DatatypeConverter; import java.io.File; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Paths; import java.sql.Connection; @@ -60,6 +60,7 @@ import java.util.UUID; import java.util.function.Function; +import static java.nio.charset.StandardCharsets.US_ASCII; import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE; import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE; import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE; @@ -69,6 +70,8 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestPutSQL { private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; @@ -163,7 +166,7 @@ public void testCommitOnCleanup() throws InitializationException, ProcessExcepti } @Test - public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException { + public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException { final TestRunner runner = initTestRunner(); runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true"); @@ -303,17 +306,18 @@ private void testKeepFlowFileOrdering(final TestRunner runner) throws ProcessExc @Test - public void testFailInMiddleWithBadStatementAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException { + public void testFailInMiddleWithBadStatementAndSupportTransaction() throws InitializationException, ProcessException { final TestRunner runner = initTestRunner(); testFailInMiddleWithBadStatement(runner); runner.run(); runner.assertTransferCount(PutSQL.REL_FAILURE, 4); runner.assertTransferCount(PutSQL.REL_SUCCESS, 0); + assertErrorAttributesInTransaction(runner, PutSQL.REL_FAILURE); } @Test - public void testFailInMiddleWithBadStatementAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException { + public void testFailInMiddleWithBadStatementAndNotSupportTransaction() throws InitializationException, ProcessException { final TestRunner runner = initTestRunner(); runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false"); testFailInMiddleWithBadStatement(runner); @@ -321,9 +325,11 @@ public void testFailInMiddleWithBadStatementAndNotSupportTransaction() throws In runner.assertTransferCount(PutSQL.REL_FAILURE, 1); runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + + assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE); } - private void testFailInMiddleWithBadStatement(final TestRunner runner) throws InitializationException { + private void testFailInMiddleWithBadStatement(final TestRunner runner) { runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes()); runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax @@ -331,10 +337,8 @@ private void testFailInMiddleWithBadStatement(final TestRunner runner) throws In runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes()); } - - @Test - public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException { final TestRunner runner = initTestRunner(); runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); @@ -343,16 +347,14 @@ public void testFailInMiddleWithBadStatementRollbackOnFailure() throws Initializ runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes()); runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes()); - final AssertionError e = assertThrows(AssertionError.class, () -> { - runner.run(); - }); + final AssertionError e = assertThrows(AssertionError.class, runner::run); assertInstanceOf(ProcessException.class, e.getCause()); runner.assertTransferCount(PutSQL.REL_FAILURE, 0); runner.assertTransferCount(PutSQL.REL_SUCCESS, 0); } @Test - public void testFailInMiddleWithBadParameterTypeAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException { + public void testFailInMiddleWithBadParameterTypeAndNotSupportTransaction() throws InitializationException, ProcessException { final TestRunner runner = initTestRunner(); runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false"); testFailInMiddleWithBadParameterType(runner); @@ -360,34 +362,21 @@ public void testFailInMiddleWithBadParameterTypeAndNotSupportTransaction() throw runner.assertTransferCount(PutSQL.REL_FAILURE, 1); runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + + assertErrorAttributesNotSet(runner, PutSQL.REL_SUCCESS); + assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE); } @Test - public void testFailInMiddleWithBadParameterTypeAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException { + public void testFailInMiddleWithBadParameterTypeAndSupportTransaction() throws InitializationException, ProcessException { final TestRunner runner = initTestRunner(); testFailInMiddleWithBadParameterType(runner); runner.run(); runner.assertTransferCount(PutSQL.REL_FAILURE, 4); runner.assertTransferCount(PutSQL.REL_SUCCESS, 0); - } - - private void testFailInMiddleWithBadParameterType(final TestRunner runner) throws InitializationException, ProcessException { - runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); - final Map goodAttributes = new HashMap<>(); - goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - goodAttributes.put("sql.args.1.value", "84"); - - final Map badAttributes = new HashMap<>(); - badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); - badAttributes.put("sql.args.1.value", "hello"); - - final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes(); - runner.enqueue(data, goodAttributes); - runner.enqueue(data, badAttributes); - runner.enqueue(data, goodAttributes); - runner.enqueue(data, goodAttributes); + assertErrorAttributesInTransaction(runner, PutSQL.REL_FAILURE); } @Test @@ -410,16 +399,14 @@ public void testFailInMiddleWithBadParameterTypeRollbackOnFailure() throws Initi runner.enqueue(data, goodAttributes); runner.enqueue(data, goodAttributes); - final AssertionError e = assertThrows(AssertionError.class, () -> { - runner.run(); - }); + final AssertionError e = assertThrows(AssertionError.class, runner::run); assertInstanceOf(ProcessException.class, e.getCause()); runner.assertTransferCount(PutSQL.REL_FAILURE, 0); runner.assertTransferCount(PutSQL.REL_SUCCESS, 0); } @Test - public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException { + public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws InitializationException, ProcessException, SQLException { final TestRunner runner = initTestRunner(); testFailInMiddleWithBadParameterValue(runner); runner.run(); @@ -434,6 +421,8 @@ public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws assertFalse(rs.next()); } } + + assertErrorAttributesInTransaction(runner, PutSQL.REL_RETRY); } @Test @@ -447,6 +436,10 @@ public void testFailInMiddleWithBadParameterValueAndNotSupportTransaction() thro runner.assertTransferCount(PutSQL.REL_FAILURE, 1); runner.assertTransferCount(PutSQL.REL_RETRY, 2); + assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE); + assertErrorAttributesNotSet(runner, PutSQL.REL_SUCCESS); + assertErrorAttributesNotSet(runner, PutSQL.REL_RETRY); + try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI"); @@ -459,24 +452,6 @@ public void testFailInMiddleWithBadParameterValueAndNotSupportTransaction() thro } } - private void testFailInMiddleWithBadParameterValue(final TestRunner runner) throws ProcessException, SQLException { - runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); - recreateTable("PERSONS_AI",createPersonsAutoId); - final Map goodAttributes = new HashMap<>(); - goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - goodAttributes.put("sql.args.1.value", "84"); - - final Map badAttributes = new HashMap<>(); - badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - badAttributes.put("sql.args.1.value", "9999"); - - final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes(); - runner.enqueue(data, goodAttributes); - runner.enqueue(data, badAttributes); - runner.enqueue(data, goodAttributes); - runner.enqueue(data, goodAttributes); - } - @Test public void testFailInMiddleWithBadParameterValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException { final TestRunner runner = initTestRunner(); @@ -499,9 +474,7 @@ public void testFailInMiddleWithBadParameterValueRollbackOnFailure() throws Init runner.enqueue(data, goodAttributes); runner.enqueue(data, goodAttributes); - final AssertionError e = assertThrows(AssertionError.class, () -> { - runner.run(); - }); + final AssertionError e = assertThrows(AssertionError.class, runner::run); assertInstanceOf(ProcessException.class, e.getCause()); runner.assertTransferCount(PutSQL.REL_FAILURE, 0); runner.assertTransferCount(PutSQL.REL_SUCCESS, 0); @@ -910,7 +883,7 @@ public void testUsingDateValuesEpochAndString() throws InitializationException, } @Test - public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException { + public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException { final TestRunner runner = initTestRunner(); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { @@ -1005,16 +978,16 @@ public void testBinaryColumnTypes() throws InitializationException, ProcessExcep //First Batch assertTrue(rs.next()); assertEquals(1, rs.getInt(1)); - assertArrayEquals(arg2BIN.getBytes("ASCII"), rs.getBytes(2)); - assertArrayEquals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3)); - assertArrayEquals(art4LongBin.getBytes("ASCII"), rs.getBytes(4)); + assertArrayEquals(arg2BIN.getBytes(US_ASCII), rs.getBytes(2)); + assertArrayEquals(art3VARBIN.getBytes(US_ASCII), rs.getBytes(3)); + assertArrayEquals(art4LongBin.getBytes(US_ASCII), rs.getBytes(4)); //Second batch assertTrue(rs.next()); assertEquals(2, rs.getInt(1)); - assertArrayEquals(arg2BIN.getBytes("ASCII"), rs.getBytes(2)); - assertArrayEquals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3)); - assertArrayEquals(art4LongBin.getBytes("ASCII"), rs.getBytes(4)); + assertArrayEquals(arg2BIN.getBytes(US_ASCII), rs.getBytes(2)); + assertArrayEquals(art3VARBIN.getBytes(US_ASCII), rs.getBytes(3)); + assertArrayEquals(art4LongBin.getBytes(US_ASCII), rs.getBytes(4)); //Third Batch (Hex) assertTrue(rs.next()); @@ -1094,7 +1067,7 @@ public void testStatementsWithPreparedParameters() throws InitializationExceptio @Test - public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException { + public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException { final TestRunner runner = initTestRunner(); recreateTable("PERSONS", createPersons); @@ -1119,6 +1092,7 @@ public void testMultipleStatementsWithinFlowFile() throws InitializationExceptio // should fail because of the semicolon runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { @@ -1129,7 +1103,7 @@ public void testMultipleStatementsWithinFlowFile() throws InitializationExceptio } @Test - public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws InitializationException, ProcessException, SQLException { final TestRunner runner = initTestRunner(); runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); @@ -1151,9 +1125,7 @@ public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws Initi attributes.put("sql.args.4.value", "1"); runner.enqueue(sql.getBytes(), attributes); - final AssertionError e = assertThrows(AssertionError.class, () -> { - runner.run(); - }); + final AssertionError e = assertThrows(AssertionError.class, runner::run); assertInstanceOf(ProcessException.class, e.getCause()); try (final Connection conn = service.getConnection()) { @@ -1220,6 +1192,7 @@ public void testInvalidStatement() throws InitializationException, ProcessExcept // should fail because of the semicolon runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { @@ -1252,9 +1225,7 @@ public void testInvalidStatementRollbackOnFailure() throws InitializationExcepti attributes.put("sql.args.4.value", "1"); runner.enqueue(sql.getBytes(), attributes); - final AssertionError e = assertThrows(AssertionError.class, () -> { - runner.run(); - }); + final AssertionError e = assertThrows(AssertionError.class, runner::run); assertInstanceOf(ProcessException.class, e.getCause()); try (final Connection conn = service.getConnection()) { @@ -1295,6 +1266,7 @@ public void testRetryableFailure() throws InitializationException, ProcessExcept // should fail because of the semicolon runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1); + assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_RETRY); } @Test @@ -1323,9 +1295,7 @@ public void testRetryableFailureRollbackOnFailure() throws InitializationExcepti attributes.put("sql.args.4.value", "1"); runner.enqueue(sql.getBytes(), attributes); - final AssertionError e = assertThrows(AssertionError.class, () -> { - runner.run(); - }); + final AssertionError e = assertThrows(AssertionError.class, runner::run); assertInstanceOf(ProcessException.class, e.getCause()); runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 0); } @@ -1353,7 +1323,7 @@ public void testMultipleFlowFilesSuccessfulInTransaction() throws Initialization runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); runner.run(); - // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + // No FlowFiles should be transferred because there were not enough FlowFiles with the same fragment identifier runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0); attributes.clear(); @@ -1410,7 +1380,7 @@ public void testMultipleFlowFilesSuccessfulInTransactionRollBackOnFailure() thro // ProcessException should not be thrown in this case, because the input FlowFiles are simply differed. runner.run(); - // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + // No FlowFiles should be transferred because there were not enough FlowFiles with the same fragment identifier runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0); } @@ -1445,8 +1415,9 @@ public String getAttribute(final String attrName) { runner.enqueue(mff); runner.run(); - // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + // No FlowFiles should be transferred because there were not enough FlowFiles with the same fragment identifier runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + assertNonSQLExceptionRelatedAttribute(runner); } @Test @@ -1475,12 +1446,11 @@ public Map getAttributes() { public String getAttribute(final String attrName) { return attributes.get(attrName); } + }; runner.enqueue(mff); - final AssertionError e = assertThrows(AssertionError.class, () -> { - runner.run(); - }); + final AssertionError e = assertThrows(AssertionError.class, runner::run); assertInstanceOf(ProcessException.class, e.getCause()); runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0); @@ -1505,9 +1475,7 @@ public void testNullFragmentCountRollbackOnFailure() throws InitializationExcept runner.enqueue(new byte[]{}, attribute1); runner.enqueue(new byte[]{}, attribute2); - final AssertionError e = assertThrows(AssertionError.class, () -> { - runner.run(); - }); + final AssertionError e = assertThrows(AssertionError.class, runner::run); assertInstanceOf(ProcessException.class, e.getCause()); runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0); @@ -1520,7 +1488,7 @@ public void testStatementsFromProperty() throws InitializationException, Process recreateTable("PERSONS", createPersons); - runner.enqueue("This statement should be ignored".getBytes(), new HashMap() {{ + runner.enqueue("This statement should be ignored".getBytes(), new HashMap<>() {{ put("row.id", "1"); }}); runner.run(); @@ -1539,7 +1507,7 @@ public void testStatementsFromProperty() throws InitializationException, Process } runner.setProperty(PutSQL.SQL_STATEMENT, "UPDATE PERSONS SET NAME='George' WHERE ID=${row.id}"); - runner.enqueue("This statement should be ignored".getBytes(), new HashMap() {{ + runner.enqueue("This statement should be ignored".getBytes(), new HashMap<>() {{ put("row.id", "1"); }}); runner.run(); @@ -1556,13 +1524,6 @@ public void testStatementsFromProperty() throws InitializationException, Process } } - private Map createFragmentedTransactionAttributes(String id, int count, int index) { - final Map attributes = new HashMap<>(); - attributes.put("fragment.identifier", id); - attributes.put("fragment.count", String.valueOf(count)); - attributes.put("fragment.index", String.valueOf(index)); - return attributes; - } @Test public void testTransactionalFlowFileFilter() { @@ -1622,6 +1583,51 @@ public void testTransactionalFlowFileFilter() { assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4)); } + private void testFailInMiddleWithBadParameterType(final TestRunner runner) throws ProcessException { + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + + final Map goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); + badAttributes.put("sql.args.1.value", "hello"); + + final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + } + + + private void testFailInMiddleWithBadParameterValue(final TestRunner runner) throws ProcessException, SQLException { + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + recreateTable("PERSONS_AI",createPersonsAutoId); + final Map goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + badAttributes.put("sql.args.1.value", "9999"); + + final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + } + + private Map createFragmentedTransactionAttributes(String id, int count, int index) { + final Map attributes = new HashMap<>(); + attributes.put("fragment.identifier", id); + attributes.put("fragment.count", String.valueOf(count)); + attributes.put("fragment.index", String.valueOf(index)); + return attributes; + } + /** * Simple implementation only for testing purposes */ @@ -1671,8 +1677,8 @@ public String getIdentifier() { public Connection getConnection() throws ProcessException { try { if (++successful > allowedBeforeFailure) { - final Connection conn = Mockito.mock(Connection.class); - Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException")); + final Connection conn = mock(Connection.class); + when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException")); return conn; } else { return service.getConnection(); @@ -1684,6 +1690,7 @@ public Connection getConnection() throws ProcessException { } } + private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException { try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { @@ -1696,7 +1703,7 @@ private void recreateTable(String tableName, String createSQL) throws ProcessExc private String fixedSizeByteArrayAsASCIIString(int length){ byte[] bBinary = RandomUtils.nextBytes(length); ByteBuffer bytes = ByteBuffer.wrap(bBinary); - StringBuffer sbBytes = new StringBuffer(); + StringBuilder sbBytes = new StringBuilder(); for (int i = bytes.position(); i < bytes.limit(); i++) sbBytes.append((char)bytes.get(i)); @@ -1728,6 +1735,43 @@ private static File getEmptyDirectory() { return Paths.get(getSystemTemporaryDirectory(), randomDirectory).toFile(); } + private static void assertSQLExceptionRelatedAttributes(final TestRunner runner, Relationship relationship) { + List flowFiles = runner.getFlowFilesForRelationship(relationship); + flowFiles.forEach(ff -> { + ff.assertAttributeExists("error.message"); + ff.assertAttributeExists("error.code"); + ff.assertAttributeExists("error.sql.state"); + }); + } + + private static void assertNonSQLExceptionRelatedAttribute(final TestRunner runner) { + List flowFiles = runner.getFlowFilesForRelationship(PutSQL.REL_FAILURE); + flowFiles.forEach(ff -> { + ff.assertAttributeExists("error.message"); + }); + } + + private static void assertErrorAttributesInTransaction(final TestRunner runner, Relationship relationship) { + List flowFiles = runner.getFlowFilesForRelationship(relationship); + assertEquals(1, flowFiles.stream() + .filter(TestPutSQL::errorAttributesAreSet) + .count(), + "Only one FlowFile should have the error attributes when transaction is used."); + } + + private static void assertErrorAttributesNotSet(final TestRunner runner, Relationship relationship) { + List flowFiles = runner.getFlowFilesForRelationship(relationship); + flowFiles.forEach(ff -> { + ff.assertAttributeNotExists("error.message"); + }); + } + + private static boolean errorAttributesAreSet(MockFlowFile ff) { + return ff.getAttribute("error.message") != null + && ff.getAttribute("error.code") != null + && ff.getAttribute("error.sql.state") != null; + } + private static String getSystemTemporaryDirectory() { return System.getProperty("java.io.tmpdir"); } From f7437084436db03670d7f37f46f2102068a440fe Mon Sep 17 00:00:00 2001 From: krisztina-zsihovszki Date: Fri, 10 Mar 2023 15:11:51 +0100 Subject: [PATCH 2/2] NIFI-4651 Review comments --- .../nifi/processors/standard/PutSQL.java | 48 +++++++------------ .../nifi/processors/standard/TestPutSQL.java | 26 ++++++---- 2 files changed, 33 insertions(+), 41 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index d0ba92935ea9..a8d9a95e3ce7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.util.Optional; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; @@ -78,6 +77,7 @@ import static java.lang.String.valueOf; import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toList; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure; import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError; @SupportsBatching @@ -466,14 +466,14 @@ private ExceptionHandler.OnError onFlowFileError(fina switch (errorTypesResult.destination()) { case Failure: - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, exception}, exception); + getLogger().error("Failed to update database for {} due to {}; routing to failure", flowFile, exception, exception); break; case Retry: getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", - new Object[] {flowFile, exception}, exception); + flowFile, exception, exception); break; case Self: - getLogger().error("Failed to update database for {} due to {};", new Object[] {flowFile, exception}, exception); + getLogger().error("Failed to update database for {} due to {};", flowFile, exception, exception); break; } }); @@ -481,23 +481,15 @@ private ExceptionHandler.OnError onFlowFileError(fina } private ExceptionHandler.OnError onGroupError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { - ExceptionHandler.OnError onGroupError - = ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); - onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> { + ExceptionHandler.OnError onGroupError = + ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); - switch (errorTypesResult.destination()) { - case Failure: - List flowFilesToFailure = getFlowFilesOnRelationShip(result, REL_FAILURE); - Optional.ofNullable(flowFilesToFailure).map(flowFiles -> - result.getRoutedFlowFiles().put(REL_FAILURE, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); - break; - case Retry: - List flowFilesToRetry = getFlowFilesOnRelationShip(result, REL_RETRY); - Optional.ofNullable(flowFilesToRetry).map(flowFiles -> - result.getRoutedFlowFiles().put(REL_RETRY, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); - break; - } + onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> { + Relationship relationship = errorTypesResult.destination() == Failure ? REL_FAILURE : REL_RETRY; + List flowFilesToRelationship = result.getRoutedFlowFiles().get(relationship); + result.getRoutedFlowFiles().put(relationship, addErrorAttributesToFlowFilesInGroup(session, flowFilesToRelationship, flowFileGroup.getFlowFiles(), exception)); }); + return onGroupError; } @@ -507,12 +499,6 @@ private List addErrorAttributesToFlowFilesInGroup(ProcessSession sessi .collect(toList()); } - private List getFlowFilesOnRelationShip(RoutingResult result, final Relationship relationship) { - return Optional.of(result.getRoutedFlowFiles()) - .orElse(emptyMap()) - .get(relationship); - } - private ExceptionHandler.OnError onBatchUpdateError( final ProcessContext context, final ProcessSession session, final RoutingResult result) { return RollbackOnFailure.createOnError((c, enclosure, r, e) -> { @@ -565,7 +551,7 @@ private ExceptionHandler.OnError on } getLogger().error("Failed to update database due to a failed batch update, {}. There were a total of {} FlowFiles that failed, {} that succeeded, " - + "and {} that were not execute and will be routed to retry; ", new Object[]{e, failureCount, successCount, retryCount}, e); + + "and {} that were not execute and will be routed to retry; ", e, failureCount, successCount, retryCount, e); return; @@ -576,11 +562,11 @@ private ExceptionHandler.OnError on onGroupError = onGroupError.andThen((cl, il, rl, el) -> { switch (r.destination()) { case Failure: - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {il.getFlowFiles(), e}, e); + getLogger().error("Failed to update database for {} due to {}; routing to failure", il.getFlowFiles(), e, e); break; case Retry: getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", - new Object[] {il.getFlowFiles(), e}, e); + il.getFlowFiles(), e, e); break; } }); @@ -620,7 +606,7 @@ public void constructProcess() { } catch (SQLException re) { // Just log the fact that rollback failed. // ProcessSession will be rollback by the thrown Exception so don't have to do anything here. - getLogger().warn("Failed to rollback database connection due to %s", new Object[]{re}, re); + getLogger().warn("Failed to rollback database connection due to {}",re, re); } }); @@ -631,7 +617,7 @@ public void constructProcess() { try { conn.setAutoCommit(fc.originalAutoCommit); } catch (final SQLException se) { - getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se}); + getLogger().warn("Failed to reset autocommit due to {}", se); } } }); @@ -684,7 +670,7 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto * transaction will be sorted in the order that they should be evaluated. * * @param context the process context for determining properties - * @param session the process session for pulling flowfiles + * @param session the process session for pulling FlowFiles * @return a FlowFilePoll containing a List of FlowFiles to process, or null if there are no FlowFiles to process */ private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session, diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 758e18a04d4b..3030d7f6ed7f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -329,14 +329,6 @@ public void testFailInMiddleWithBadStatementAndNotSupportTransaction() throws In assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE); } - private void testFailInMiddleWithBadStatement(final TestRunner runner) { - runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); - runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes()); - runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax - runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes()); - runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes()); - } - @Test public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException { final TestRunner runner = initTestRunner(); @@ -423,6 +415,7 @@ public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws } assertErrorAttributesInTransaction(runner, PutSQL.REL_RETRY); + assertOriginalAttributesAreKept(runner); } @Test @@ -437,6 +430,7 @@ public void testFailInMiddleWithBadParameterValueAndNotSupportTransaction() thro runner.assertTransferCount(PutSQL.REL_RETRY, 2); assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE); + assertOriginalAttributesAreKept(runner); assertErrorAttributesNotSet(runner, PutSQL.REL_SUCCESS); assertErrorAttributesNotSet(runner, PutSQL.REL_RETRY); @@ -1524,7 +1518,6 @@ public void testStatementsFromProperty() throws InitializationException, Process } } - @Test public void testTransactionalFlowFileFilter() { final MockFlowFile ff0 = new MockFlowFile(0); @@ -1601,7 +1594,6 @@ private void testFailInMiddleWithBadParameterType(final TestRunner runner) throw runner.enqueue(data, goodAttributes); } - private void testFailInMiddleWithBadParameterValue(final TestRunner runner) throws ProcessException, SQLException { runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); recreateTable("PERSONS_AI",createPersonsAutoId); @@ -1620,6 +1612,14 @@ private void testFailInMiddleWithBadParameterValue(final TestRunner runner) thro runner.enqueue(data, goodAttributes); } + private void testFailInMiddleWithBadStatement(final TestRunner runner) { + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes()); + runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes()); + } + private Map createFragmentedTransactionAttributes(String id, int count, int index) { final Map attributes = new HashMap<>(); attributes.put("fragment.identifier", id); @@ -1751,6 +1751,12 @@ private static void assertNonSQLExceptionRelatedAttribute(final TestRunner runne }); } + private static void assertOriginalAttributesAreKept(final TestRunner runner) { + runner.assertAllFlowFilesContainAttribute("sql.args.1.type"); + runner.assertAllFlowFilesContainAttribute("sql.args.1.value"); + } + + private static void assertErrorAttributesInTransaction(final TestRunner runner, Relationship relationship) { List flowFiles = runner.getFlowFilesForRelationship(relationship); assertEquals(1, flowFiles.stream()