From e03600f0b167cb5554dbb9c685f498395d2bf29b Mon Sep 17 00:00:00 2001 From: yjhyjhyjh0 Date: Wed, 24 Oct 2018 21:22:06 +0800 Subject: [PATCH] NIFI-5744: Put exception message to attribute while ExecuteSQL fail --- .../standard/AbstractExecuteSQL.java | 2 + .../nifi/processors/standard/ExecuteSQL.java | 2 + .../processors/standard/ExecuteSQLRecord.java | 2 + .../processors/standard/TestExecuteSQL.java | 7 ++- .../standard/TestExecuteSQLRecord.java | 43 +++++++++++++++++++ 5 files changed, 55 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index d1fabef9ba68..76e36fd930c9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -61,6 +61,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime"; public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime"; public static final String RESULTSET_INDEX = "executesql.resultset.index"; + public static final String RESULT_ERROR_MESSAGE = "executesql.error.message"; public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); @@ -402,6 +403,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session new Object[]{selectQuery, e}); context.yield(); } + session.putAttribute(fileToProcess,RESULT_ERROR_MESSAGE,e.getMessage()); session.transfer(fileToProcess, REL_FAILURE); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 9c617932d1ab..1128ee75a506 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -83,6 +83,8 @@ @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"), @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, " + "the zero based index of this result set."), + @WritesAttribute(attribute = "executesql.error.message", description = "If processing an incoming flow file causes " + + "an Exception, the Flow File is routed to failure and this attribute is set to the exception message."), @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java index 5a844585ac4f..80d33c03d923 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java @@ -79,6 +79,8 @@ @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"), @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, " + "the zero based index of this result set."), + @WritesAttribute(attribute = "executesql.error.message", description = "If processing an incoming flow file causes " + + "an Exception, the Flow File is routed to failure and this attribute is set to the exception message."), @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 199bd94eb47a..35dfe766ce57 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -52,6 +52,7 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -463,7 +464,7 @@ public void testWithSqlExceptionErrorProcessingResultSet() throws Exception { ResultSet rs = mock(ResultSet.class); when(statement.getResultSet()).thenReturn(rs); // Throw an exception the first time you access the ResultSet, this is after the flow file to hold the results has been created. - when(rs.getMetaData()).thenThrow(SQLException.class); + when(rs.getMetaData()).thenThrow(new SQLException("test execute statement failed")); runner.addControllerService("mockdbcp", dbcp, new HashMap<>()); runner.enableControllerService(dbcp); @@ -475,6 +476,10 @@ public void testWithSqlExceptionErrorProcessingResultSet() throws Exception { runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 1); runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0); + + // Assert exception message has been put to flow file attribute + MockFlowFile failedFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0); + Assert.assertEquals("java.sql.SQLException: test execute statement failed",failedFlowFile.getAttribute(ExecuteSQL.RESULT_ERROR_MESSAGE)); } public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map attrs, final boolean setQueryProperty) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java index 03cdbfcf07ce..b0f5cda21793 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java @@ -28,6 +28,7 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -38,6 +39,8 @@ import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; @@ -45,6 +48,10 @@ import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestExecuteSQLRecord { @@ -350,6 +357,42 @@ public void invokeOnTriggerRecords(final Integer queryTimeout, final String quer assertEquals(durationTime, fetchTime + executionTime); } + @SuppressWarnings("unchecked") + @Test + public void testWithSqlExceptionErrorProcessingResultSet() throws Exception { + DBCPService dbcp = mock(DBCPService.class); + Connection conn = mock(Connection.class); + when(dbcp.getConnection(any(Map.class))).thenReturn(conn); + when(dbcp.getIdentifier()).thenReturn("mockdbcp"); + PreparedStatement statement = mock(PreparedStatement.class); + when(conn.prepareStatement(anyString())).thenReturn(statement); + when(statement.execute()).thenReturn(true); + ResultSet rs = mock(ResultSet.class); + when(statement.getResultSet()).thenReturn(rs); + // Throw an exception the first time you access the ResultSet, this is after the flow file to hold the results has been created. + when(rs.getMetaData()).thenThrow(new SQLException("test execute statement failed")); + + runner.addControllerService("mockdbcp", dbcp, new HashMap<>()); + runner.enableControllerService(dbcp); + runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "mockdbcp"); + + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); + runner.addControllerService("writer", recordWriter); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + + runner.setIncomingConnection(true); + runner.enqueue("SELECT 1"); + runner.run(); + + runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 1); + runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0); + + // Assert exception message has been put to flow file attribute + MockFlowFile failedFlowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_FAILURE).get(0); + Assert.assertEquals("java.sql.SQLException: test execute statement failed", failedFlowFile.getAttribute(AbstractExecuteSQL.RESULT_ERROR_MESSAGE)); + } + @Test public void testPreQuery() throws Exception { // remove previous test database, if any