From 76b82e23a16a3ca0b2556d5d3f54140f446c0d9d Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Fri, 6 Jul 2018 22:07:46 -0700 Subject: [PATCH 01/13] NIFI-5370 removed custom hostname verifier implementation from OkHttpReplicationClient (default handles wildcard certs). This closes #2869. Signed-off-by: Mark Payne --- .../protocol/AbstractNodeProtocolSender.java | 4 +- .../okhttp/OkHttpReplicationClient.java | 39 +++++++------------ ...hreadPoolRequestReplicatorFactoryBean.java | 3 +- 3 files changed, 18 insertions(+), 28 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java index db3fc1d9f36c..2e507c73482f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java @@ -62,7 +62,7 @@ public ConnectionResponseMessage requestConnection(final ConnectionRequestMessag response = unmarshaller.unmarshal(socket.getInputStream()); } catch (final IOException ioe) { throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message from " - + socket.getRemoteSocketAddress() + " due to: " + ioe, ioe); + + socket.getRemoteSocketAddress() + " due to: " + ioe, ioe); } if (MessageType.CONNECTION_RESPONSE == response.getType()) { @@ -155,7 +155,7 @@ private ProtocolMessage sendProtocolMessage(final ProtocolMessage msg, final Str response = unmarshaller.unmarshal(socket.getInputStream()); } catch (final IOException ioe) { throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message from " - + socket.getRemoteSocketAddress() + " due to: " + ioe, ioe); + + socket.getRemoteSocketAddress() + " due to: " + ioe, ioe); } return response; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java index c4016de77cf0..a300fc22809d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java @@ -17,6 +17,10 @@ package org.apache.nifi.cluster.coordination.http.replication.okhttp; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonInclude.Value; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.FileInputStream; @@ -35,8 +39,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.GZIPInputStream; - -import javax.net.ssl.HostnameVerifier; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; @@ -48,7 +50,14 @@ import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; - +import okhttp3.Call; +import okhttp3.ConnectionPool; +import okhttp3.Headers; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient; import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest; @@ -62,20 +71,6 @@ import org.slf4j.LoggerFactory; import org.springframework.util.StreamUtils; -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.annotation.JsonInclude.Value; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; - -import okhttp3.Call; -import okhttp3.ConnectionPool; -import okhttp3.Headers; -import okhttp3.HttpUrl; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; - public class OkHttpReplicationClient implements HttpReplicationClient { private static final Logger logger = LoggerFactory.getLogger(OkHttpReplicationClient.class); private static final Set gzipEncodings = Stream.of("gzip", "x-gzip").collect(Collectors.toSet()); @@ -86,14 +81,14 @@ public class OkHttpReplicationClient implements HttpReplicationClient { private final ObjectMapper jsonCodec = new ObjectMapper(); private final OkHttpClient okHttpClient; - public OkHttpReplicationClient(final NiFiProperties properties, final HostnameVerifier hostnameVerifier) { + public OkHttpReplicationClient(final NiFiProperties properties) { jsonCodec.setDefaultPropertyInclusion(Value.construct(Include.NON_NULL, Include.ALWAYS)); jsonCodec.setAnnotationIntrospector(new JaxbAnnotationIntrospector(jsonCodec.getTypeFactory())); jsonSerializer = new JsonEntitySerializer(jsonCodec); xmlSerializer = new XmlEntitySerializer(); - okHttpClient = createOkHttpClient(properties, hostnameVerifier); + okHttpClient = createOkHttpClient(properties); } @Override @@ -280,7 +275,7 @@ private boolean isUseGzip(final Map headers) { } } - private OkHttpClient createOkHttpClient(final NiFiProperties properties, final HostnameVerifier hostnameVerifier) { + private OkHttpClient createOkHttpClient(final NiFiProperties properties) { final String connectionTimeout = properties.getClusterNodeConnectionTimeout(); final long connectionTimeoutMs = FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); final String readTimeout = properties.getClusterNodeReadTimeout(); @@ -298,10 +293,6 @@ private OkHttpClient createOkHttpClient(final NiFiProperties properties, final H okHttpClientBuilder.sslSocketFactory(tuple.getKey(), tuple.getValue()); } - if (hostnameVerifier != null) { - okHttpClientBuilder.hostnameVerifier(hostnameVerifier); - } - return okHttpClientBuilder.build(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java index e0477a7afccb..2bb3f608e007 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java @@ -23,7 +23,6 @@ import org.apache.nifi.cluster.coordination.http.replication.okhttp.OkHttpReplicationClient; import org.apache.nifi.events.EventReporter; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.util.NiFiHostnameVerifier; import org.springframework.beans.BeansException; import org.springframework.beans.factory.FactoryBean; import org.springframework.context.ApplicationContext; @@ -46,7 +45,7 @@ public ThreadPoolRequestReplicator getObject() throws Exception { final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize(); final int maxConcurrentRequests = nifiProperties.getClusterNodeMaxConcurrentRequests(); - final OkHttpReplicationClient replicationClient = new OkHttpReplicationClient(nifiProperties, new NiFiHostnameVerifier()); + final OkHttpReplicationClient replicationClient = new OkHttpReplicationClient(nifiProperties); replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, maxConcurrentRequests, replicationClient, clusterCoordinator, requestCompletionCallback, eventReporter, nifiProperties); From 333146b3fecef50cc16751cdf2fe7bcddcc7e03d Mon Sep 17 00:00:00 2001 From: Mark Bean Date: Thu, 5 Jul 2018 19:35:15 +0000 Subject: [PATCH 02/13] NIFI-5368 controller services validated prior to enabling; referenced controller services must be enabled for referencing component to be valid (mock framework) This closes #2873. Signed-off-by: Mark Payne --- .../util/MockControllerServiceLookup.java | 4 ++ .../apache/nifi/util/MockProcessContext.java | 10 +++++ .../util/StandardProcessorTestRunner.java | 14 +++++- .../util/TestStandardProcessorTestRunner.java | 44 +++++++++++++++++++ ...dentialsProviderControllerServiceTest.java | 15 ++----- .../service/ServiceStateTransition.java | 4 +- .../dbcp/TestDBCPConnectionPoolLookup.java | 4 -- .../hbase/TestHBase_1_1_2_ClientService.java | 9 ---- .../nifi/xml/TestXMLRecordSetWriter.java | 4 ++ .../ssl/StandardSSLContextServiceTest.groovy | 2 +- .../nifi/ssl/SSLContextServiceTest.java | 2 +- 11 files changed, 82 insertions(+), 30 deletions(-) diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java index 9893a96011e3..ec7b179e422f 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java @@ -29,6 +29,10 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo private final Map controllerServiceMap = new ConcurrentHashMap<>(); + public Map getControllerServices() { + return controllerServiceMap; + } + public ControllerServiceConfiguration addControllerService(final ControllerService service, final String identifier) { final ControllerServiceConfiguration config = new ControllerServiceConfiguration(service); controllerServiceMap.put(identifier, config); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index a16216dc7d09..f40de46c5365 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -249,6 +249,16 @@ public Collection validate() { final Collection serviceResults = validateReferencedControllerServices(validationContext); results.addAll(serviceResults); + + // verify all controller services are enabled + for (Map.Entry service : getControllerServices().entrySet()) { + if (!service.getValue().isEnabled()) { + results.add(new ValidationResult.Builder() + .explanation("Controller service " + service.getKey() + " for " + this.getName() + " is not enabled") + .valid(false) + .build()); + } + } return results; } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 8f4de965a998..3a9babb2127f 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -686,6 +686,16 @@ public void enableControllerService(final ControllerService service) { throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is not disabled"); } + // ensure controller service is valid before enabling + final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); + final Collection results = context.getControllerService(service.getIdentifier()).validate(validationContext); + + for (final ValidationResult result : results) { + if (!result.isValid()) { + throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is in an invalid state: " + result.toString()); + } + } + try { final ConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), context,variableRegistry); ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext); @@ -712,7 +722,9 @@ public boolean isControllerServiceEnabled(final ControllerService service) { @Override public void removeControllerService(final ControllerService service) { - disableControllerService(service); + if (context.getControllerServiceLookup().isControllerServiceEnabled(service)) { + disableControllerService(service); + } try { ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, service); diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java index db8a74ef44f1..d2572fabc8ae 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java @@ -207,6 +207,35 @@ public void testControllerServiceUpdateShouldCallOnSetProperty() { assertTrue("onPropertyModified has not been called", ((SimpleTestService) testService).isOpmCalled()); } + @Test + public void testProcessorInvalidWhenControllerServiceDisabled() { + final ControllerService testService = new RequiredPropertyTestService(); + final AddAttributeProcessor proc = new AddAttributeProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + final String serviceIdentifier = "test"; + final String pdName = "name"; + final String pdValue = "exampleName"; + try { + runner.addControllerService(serviceIdentifier, testService); + } catch (InitializationException e) { + fail(e.getMessage()); + } + + // controller service invalid due to no value on required property; processor must also be invalid + runner.assertNotValid(testService); + runner.assertNotValid(); + + // add required property; controller service valid but not enabled; processor must be invalid + runner.setProperty(testService, RequiredPropertyTestService.namePropertyDescriptor, pdValue); + runner.assertValid(testService); + runner.assertNotValid(); + + // enable controller service; processor now valid + runner.enableControllerService(testService); + runner.assertValid(testService); + runner.assertValid(); + } + private static class ProcessorWithOnStop extends AbstractProcessor { private int callsWithContext = 0; @@ -330,4 +359,19 @@ public boolean isOpmCalled() { return opmCalled; } } + + private static class RequiredPropertyTestService extends AbstractControllerService { + private static final String PD_NAME = "name"; + protected static final PropertyDescriptor namePropertyDescriptor = new PropertyDescriptor.Builder() + .name(PD_NAME) + .displayName("Controller Service Name") + .required(true) + .sensitive(false) + .allowableValues("exampleName", "anotherExampleName") + .build(); + + protected List getSupportedPropertyDescriptors() { + return Arrays.asList(namePropertyDescriptor); + } + } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java index 86c415492c5c..c61240d41acd 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java @@ -144,7 +144,7 @@ public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeout3600() th runner.assertValid(serviceImpl); } - @Test(expected = AssertionError.class) + @Test public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutLessThan900() throws Throwable { final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); @@ -154,11 +154,10 @@ public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutLessThan9 runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "899"); - runner.enableControllerService(serviceImpl); runner.assertNotValid(serviceImpl); } - @Test(expected = AssertionError.class) + @Test public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutGreaterThan3600() throws Throwable { final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); @@ -168,7 +167,7 @@ public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutGreaterTh runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "899"); - runner.enableControllerService(serviceImpl); + runner.assertNotValid(serviceImpl); } @Test @@ -179,7 +178,6 @@ public void testKeysCredentialsProviderWithRoleOnlyInvalid() throws Throwable { runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); - runner.enableControllerService(serviceImpl); runner.assertNotValid(serviceImpl); } @@ -192,7 +190,6 @@ public void testKeysCredentialsProviderWithRoleNameOnlyInvalid() throws Throwabl runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); - runner.enableControllerService(serviceImpl); runner.assertNotValid(serviceImpl); } @@ -244,7 +241,6 @@ public void testFileCredentialsProviderBadFile() throws Throwable { runner.addControllerService("awsCredentialsProvider", serviceImpl); runner.setProperty(serviceImpl, CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/bad-mock-aws-credentials.properties"); - runner.enableControllerService(serviceImpl); runner.assertNotValid(serviceImpl); } @@ -258,7 +254,6 @@ public void testFileAndAccessSecretKeyInvalid() throws Throwable { "src/test/resources/mock-aws-credentials.properties"); runner.setProperty(serviceImpl, CredentialPropertyDescriptors.ACCESS_KEY, "awsAccessKey"); runner.setProperty(serviceImpl, CredentialPropertyDescriptors.SECRET_KEY, "awsSecretKey"); - runner.enableControllerService(serviceImpl); runner.assertNotValid(serviceImpl); } @@ -271,7 +266,6 @@ public void testFileAndAccessKeyInvalid() throws Throwable { runner.setProperty(serviceImpl, CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties"); runner.setProperty(serviceImpl, CredentialPropertyDescriptors.ACCESS_KEY, "awsAccessKey"); - runner.enableControllerService(serviceImpl); runner.assertNotValid(serviceImpl); } @@ -284,7 +278,6 @@ public void testFileAndSecretKeyInvalid() throws Throwable { runner.setProperty(serviceImpl, CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties"); runner.setProperty(serviceImpl, CredentialPropertyDescriptors.SECRET_KEY, "awsSecretKey"); - runner.enableControllerService(serviceImpl); runner.assertNotValid(serviceImpl); } @@ -295,7 +288,6 @@ public void testAccessKeyOnlyInvalid() throws Throwable { final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); runner.addControllerService("awsCredentialsProvider", serviceImpl); runner.setProperty(serviceImpl, CredentialPropertyDescriptors.ACCESS_KEY, "awsAccessKey"); - runner.enableControllerService(serviceImpl); runner.assertNotValid(serviceImpl); } @@ -306,7 +298,6 @@ public void testSecretKeyOnlyInvalid() throws Throwable { final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); runner.addControllerService("awsCredentialsProvider", serviceImpl); runner.setProperty(serviceImpl, CredentialPropertyDescriptors.SECRET_KEY, "awsSecretKey"); - runner.enableControllerService(serviceImpl); runner.assertNotValid(serviceImpl); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java index 319c18c9317b..d88c017874a3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java @@ -17,12 +17,12 @@ package org.apache.nifi.controller.service; +import org.apache.nifi.controller.ComponentNode; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.nifi.controller.ComponentNode; - public class ServiceStateTransition { private ControllerServiceState state = ControllerServiceState.DISABLED; private final List> enabledFutures = new ArrayList<>(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java index 635af2e606da..d02437fbe443 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java @@ -125,15 +125,12 @@ public void testCustomValidateAtLeaseOneServiceDefined() throws InitializationEx // enable lookup service with no services registered, verify not valid runner = TestRunners.newTestRunner(TestProcessor.class); runner.addControllerService("dbcp-lookup", dbcpLookupService); - runner.enableControllerService(dbcpLookupService); runner.assertNotValid(dbcpLookupService); final String dbcpServiceAIdentifier = "dbcp-a"; runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA); - runner.enableControllerService(dbcpServiceA); // register a service and now verify valid - runner.disableControllerService(dbcpLookupService); runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier); runner.enableControllerService(dbcpLookupService); runner.assertValid(dbcpLookupService); @@ -144,7 +141,6 @@ public void testCustomValidateSelfReferenceNotAllowed() throws InitializationExc runner = TestRunners.newTestRunner(TestProcessor.class); runner.addControllerService("dbcp-lookup", dbcpLookupService); runner.setProperty(dbcpLookupService, "dbcp-lookup", "dbcp-lookup"); - runner.enableControllerService(dbcpLookupService); runner.assertNotValid(dbcpLookupService); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java index 544b502e1807..75e9c1710c41 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java @@ -83,7 +83,6 @@ public void testCustomValidate() throws InitializationException, IOException { // no conf file or zk properties so should be invalid MockHBaseClientService service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); - runner.enableControllerService(service); runner.assertNotValid(service); runner.removeControllerService(service); @@ -106,7 +105,6 @@ public void testCustomValidate() throws InitializationException, IOException { service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "${zk-quorum}"); - runner.enableControllerService(service); runner.assertNotValid(service); runner.removeControllerService(service); @@ -116,7 +114,6 @@ public void testCustomValidate() throws InitializationException, IOException { runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "${zk-quorum}"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "${zk-client-port}"); - runner.enableControllerService(service); runner.assertNotValid(service); runner.removeControllerService(service); @@ -155,11 +152,9 @@ public void testCustomValidate() throws InitializationException, IOException { runner.disableControllerService(service); runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml"); - runner.enableControllerService(service); runner.assertNotValid(service); // Kerberos - add valid options - runner.disableControllerService(service); runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM"); runner.enableControllerService(service); @@ -168,14 +163,11 @@ public void testCustomValidate() throws InitializationException, IOException { // Kerberos - add invalid non-existent keytab file runner.disableControllerService(service); runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/missing.keytab"); - runner.enableControllerService(service); runner.assertNotValid(service); // Kerberos - add invalid principal - runner.disableControllerService(service); runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), ""); - runner.enableControllerService(service); runner.assertNotValid(service); // Kerberos - valid props but the KerberosProperties has a null Kerberos config file so be invalid @@ -185,7 +177,6 @@ public void testCustomValidate() throws InitializationException, IOException { "src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml"); runner.setProperty(service, kerberosPropsWithoutFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); runner.setProperty(service, kerberosPropsWithoutFile.getKerberosPrincipal(), "test@REALM"); - runner.enableControllerService(service); runner.assertNotValid(service); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java index 372e60db3ceb..8008f65be0da 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java @@ -174,6 +174,10 @@ public void testValidation() throws IOException, InitializationException { runner.setProperty(writer, XMLRecordSetWriter.RECORD_TAG_NAME, "record"); runner.setProperty(writer, XMLRecordSetWriter.ARRAY_WRAPPING, XMLRecordSetWriter.USE_PROPERTY_AS_WRAPPER); + runner.assertNotValid(writer); + + runner.setProperty(writer, XMLRecordSetWriter.ARRAY_TAG_NAME, "array-tag-name"); + runner.assertValid(writer); runner.enableControllerService(writer); runner.enqueue(""); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/groovy/org/apache/nifi/ssl/StandardSSLContextServiceTest.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/groovy/org/apache/nifi/ssl/StandardSSLContextServiceTest.groovy index 881eb02df176..6d2f7b20dee7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/groovy/org/apache/nifi/ssl/StandardSSLContextServiceTest.groovy +++ b/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/groovy/org/apache/nifi/ssl/StandardSSLContextServiceTest.groovy @@ -117,7 +117,7 @@ class StandardSSLContextServiceTest { } // Assert - assert msg =~ "invalid because Cannot access file" + assert msg =~ "Cannot enable Controller Service SSLContextService.* because it is in an invalid state: 'Truststore Filename'.* is invalid because File.* does not exist or cannot be read"; runner.assertNotValid(sslContextService) } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java index fd0df1f39df4..6cddc7df57f2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java @@ -224,7 +224,7 @@ public void testValidationResultsCacheShouldExpire() throws InitializationExcept // Assert // Have to exhaust the cached result by checking n-1 more times - for (int i = 2; i <= sslContextService.getValidationCacheExpiration(); i++) { + for (int i = 2; i < sslContextService.getValidationCacheExpiration(); i++) { validationResults = sslContextService.customValidate(validationContext); assertTrue("validation results is not empty", validationResults.isEmpty()); logger.info("(" + i + ") StandardSSLContextService#customValidate() returned true even though the keystore file is no longer available"); From 54bb511e579a00535c976159fa2903385af74f77 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 9 Jul 2018 11:50:06 -0400 Subject: [PATCH 03/13] NIFI-5377: Addressed issue of infinite recursion when enabling/disabling controller services if there is a recursive loop (i.e., Service A references Service B references Service A). This closes #2847 Signed-off-by: Matt Gilman --- .../service/ServiceStateTransition.java | 8 +-- .../StandardControllerServiceReference.java | 62 +++++++------------ .../nifi/web/StandardNiFiServiceFacade.java | 13 +--- 3 files changed, 27 insertions(+), 56 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java index d88c017874a3..e78f6c6bb8f5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; public class ServiceStateTransition { @@ -58,12 +59,9 @@ public synchronized boolean enable() { } private void validateReferences(final ControllerServiceNode service) { - for (final ComponentNode component : service.getReferences().getReferencingComponents()) { + final List referencingComponents = service.getReferences().findRecursiveReferences(ComponentNode.class); + for (final ComponentNode component : referencingComponents) { component.performValidation(); - - if (component instanceof ControllerServiceNode) { - validateReferences((ControllerServiceNode) component); - } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java index 4cb5239790e0..e18e38351490 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java @@ -31,8 +31,7 @@ public class StandardControllerServiceReference implements ControllerServiceRefe private final ControllerServiceNode referenced; private final Set components; - public StandardControllerServiceReference(final ControllerServiceNode referencedService, - final Set referencingComponents) { + public StandardControllerServiceReference(final ControllerServiceNode referencedService, final Set referencingComponents) { this.referenced = referencedService; this.components = new HashSet<>(referencingComponents); } @@ -56,52 +55,30 @@ private boolean isRunning(final ComponentNode component) { return ((ProcessorNode) component).isRunning(); } + if (component instanceof ControllerServiceNode) { + return ((ControllerServiceNode) component).isActive(); + } + return false; } @Override public Set getActiveReferences() { final Set activeReferences = new HashSet<>(); - final Set serviceNodes = new HashSet<>(); for (final ComponentNode component : components) { - if (component instanceof ControllerServiceNode) { - serviceNodes.add((ControllerServiceNode) component); - - if (((ControllerServiceNode) component).isActive()) { - activeReferences.add(component); - } - } else if (isRunning(component)) { + if (isRunning(component)) { activeReferences.add(component); } } - activeReferences.addAll(getActiveIndirectReferences(serviceNodes)); - return activeReferences; - } - - private Set getActiveIndirectReferences(final Set referencingServices) { - if (referencingServices.isEmpty()) { - return Collections.emptySet(); - } - - final Set references = new HashSet<>(); - for (final ControllerServiceNode referencingService : referencingServices) { - final Set serviceNodes = new HashSet<>(); - final ControllerServiceReference ref = referencingService.getReferences(); - - for (final ComponentNode component : ref.getReferencingComponents()) { - if (component instanceof ControllerServiceNode) { - serviceNodes.add((ControllerServiceNode) component); - } else if (isRunning(component)) { - references.add(component); - } + for (final ComponentNode component : findRecursiveReferences(ComponentNode.class)) { + if (isRunning(component)) { + activeReferences.add(component); } - - references.addAll(getActiveIndirectReferences(serviceNodes)); } - return references; + return activeReferences; } @@ -111,6 +88,10 @@ public List findRecursiveReferences(final Class componentType) { } private List findRecursiveReferences(final ControllerServiceNode referencedNode, final Class componentType) { + return findRecursiveReferences(referencedNode, componentType, new HashSet<>()); + } + + private List findRecursiveReferences(final ControllerServiceNode referencedNode, final Class componentType, final Set servicesVisited) { final List references = new ArrayList<>(); for (final ComponentNode referencingComponent : referencedNode.getReferences().getReferencingComponents()) { @@ -122,12 +103,15 @@ private List findRecursiveReferences(final ControllerServiceNode referenc final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent; // find components recursively that depend on referencingNode. - final List recursive = findRecursiveReferences(referencingNode, componentType); - - // For anything that depends on referencing node, we want to add it to the list, but we know - // that it must come after the referencing node, so we first remove any existing occurrence. - references.removeAll(recursive); - references.addAll(recursive); + final boolean added = servicesVisited.add(referencingNode); + if (added) { + final List recursive = findRecursiveReferences(referencingNode, componentType, servicesVisited); + + // For anything that depends on referencing node, we want to add it to the list, but we know + // that it must come after the referencing node, so we first remove any existing occurrence. + references.removeAll(recursive); + references.addAll(recursive); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index a7814429a565..d5b6f806ec04 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -2222,17 +2222,6 @@ public ControllerServiceEntity updateControllerService(final Revision revision, return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, bulletinEntities); } - private Set findAllReferencingComponents(final ControllerServiceReference reference) { - final Set referencingComponents = new HashSet<>(reference.getReferencingComponents()); - - for (final ComponentNode referencingComponent : reference.getReferencingComponents()) { - if (referencingComponent instanceof ControllerServiceNode) { - referencingComponents.addAll(findAllReferencingComponents(((ControllerServiceNode) referencingComponent).getReferences())); - } - } - - return referencingComponents; - } @Override public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents( @@ -2257,7 +2246,7 @@ public RevisionUpdate update() { } // ensure the revision for all referencing components is included regardless of whether they were updated in this request - for (final ComponentNode component : findAllReferencingComponents(updatedReference)) { + for (final ComponentNode component : updatedReference.findRecursiveReferences(ComponentNode.class)) { updatedRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier())); } From 260bc29e1014a2fd21c3775e27f434756e13e0ee Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 25 Jun 2018 10:36:55 -0400 Subject: [PATCH 04/13] NIFI-5316 Fixed array handling for Avro that comes from Parquet's Avro reader Signed-off-by: zenfenan --- .../org/apache/nifi/avro/AvroTypeUtil.java | 22 +++- .../nifi-parquet-processors/pom.xml | 2 + .../processors/parquet/FetchParquetTest.java | 110 ++++++++++++++++++ .../test/resources/avro/user-with-array.avsc | 9 ++ .../avro/user-with-nullable-array.avsc | 9 ++ 5 files changed, 146 insertions(+), 6 deletions(-) create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-nullable-array.avsc diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index c21481999094..23f74b86d01e 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -894,13 +894,23 @@ private static Object normalizeValue(final Object value, final Schema avroSchema case STRING: return value.toString(); case ARRAY: - final GenericData.Array array = (GenericData.Array) value; - final Object[] valueArray = new Object[array.size()]; - for (int i = 0; i < array.size(); i++) { - final Schema elementSchema = avroSchema.getElementType(); - valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]"); + if (value instanceof List) { + final List list = (List) value; + final Object[] valueArray = new Object[list.size()]; + for (int i = 0; i < list.size(); i++) { + final Schema elementSchema = avroSchema.getElementType(); + valueArray[i] = normalizeValue(list.get(i), elementSchema, fieldName + "[" + i + "]"); + } + return valueArray; + } else { + final GenericData.Array array = (GenericData.Array) value; + final Object[] valueArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + final Schema elementSchema = avroSchema.getElementType(); + valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]"); + } + return valueArray; } - return valueArray; case MAP: final Map avroMap = (Map) value; final Map map = new HashMap<>(avroMap.size()); diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml index daefadf1100e..4b552d37a4c8 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml @@ -101,6 +101,8 @@ src/test/resources/avro/user.avsc + src/test/resources/avro/user-with-array.avsc + src/test/resources/avro/user-with-nullable-array.avsc diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java index 76d44aab5c8f..83b11f2ff69a 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java @@ -62,6 +62,8 @@ public class FetchParquetTest { static final String RECORD_HEADER = "name,favorite_number,favorite_color"; private Schema schema; + private Schema schemaWithArray; + private Schema schemaWithNullableArray; private Configuration testConf; private FetchParquet proc; private TestRunner testRunner; @@ -71,6 +73,12 @@ public void setup() throws IOException, InitializationException { final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8); schema = new Schema.Parser().parse(avroSchema); + final String avroSchemaWithArray = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-array.avsc"), StandardCharsets.UTF_8); + schemaWithArray = new Schema.Parser().parse(avroSchemaWithArray); + + final String avroSchemaWithNullableArray = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-nullable-array.avsc"), StandardCharsets.UTF_8); + schemaWithNullableArray = new Schema.Parser().parse(avroSchemaWithNullableArray); + testConf = new Configuration(); testConf.addResource(new Path(TEST_CONF_PATH)); @@ -243,6 +251,42 @@ public void testIOExceptionWhileWritingShouldRouteToRetry() throws Initializatio flowFile.assertContentEquals("TRIGGER"); } + @Test + public void testFetchWithArray() throws InitializationException, IOException { + configure(proc); + + final File parquetDir = new File(DIRECTORY); + final File parquetFile = new File(parquetDir,"testFetchParquetWithArrayToCSV.parquet"); + final int numUsers = 10; + writeParquetUsersWithArray(parquetFile, numUsers); + + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath()); + attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName()); + + testRunner.enqueue("TRIGGER", attributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1); + } + + @Test + public void testFetchWithNullableArray() throws InitializationException, IOException { + configure(proc); + + final File parquetDir = new File(DIRECTORY); + final File parquetFile = new File(parquetDir,"testFetchParquetWithNullableArrayToCSV.parquet"); + final int numUsers = 10; + writeParquetUsersWithNullableArray(parquetFile, numUsers); + + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath()); + attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName()); + + testRunner.enqueue("TRIGGER", attributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1); + } + protected void verifyCSVRecords(int numUsers, String csvContent) { final String[] splits = csvContent.split("[\\n]"); Assert.assertEquals(numUsers, splits.length); @@ -278,4 +322,70 @@ private void writeParquetUsers(final File parquetFile, int numUsers) throws IOEx } + private void writeParquetUsersWithArray(final File parquetFile, int numUsers) throws IOException { + if (parquetFile.exists()) { + Assert.assertTrue(parquetFile.delete()); + } + + final Path parquetPath = new Path(parquetFile.getPath()); + + final AvroParquetWriter.Builder writerBuilder = AvroParquetWriter + .builder(parquetPath) + .withSchema(schemaWithArray) + .withConf(testConf); + + final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema(); + + try (final ParquetWriter writer = writerBuilder.build()) { + for (int i=0; i < numUsers; i++) { + final GenericRecord user = new GenericData.Record(schema); + user.put("name", "Bob" + i); + user.put("favorite_number", i); + + + final GenericData.Array colors = new GenericData.Array<>(1, favoriteColorsSchema); + colors.add("blue" + i); + + user.put("favorite_color", colors); + + writer.write(user); + } + } + + } + + private void writeParquetUsersWithNullableArray(final File parquetFile, int numUsers) throws IOException { + if (parquetFile.exists()) { + Assert.assertTrue(parquetFile.delete()); + } + + final Path parquetPath = new Path(parquetFile.getPath()); + + final AvroParquetWriter.Builder writerBuilder = AvroParquetWriter + .builder(parquetPath) + .withSchema(schemaWithNullableArray) + .withConf(testConf); + + // use the schemaWithArray here just to get the schema for the array part of the favorite_colors fields, the overall + // schemaWithNullableArray has a union of the array schema and null + final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema(); + + try (final ParquetWriter writer = writerBuilder.build()) { + for (int i=0; i < numUsers; i++) { + final GenericRecord user = new GenericData.Record(schema); + user.put("name", "Bob" + i); + user.put("favorite_number", i); + + + final GenericData.Array colors = new GenericData.Array<>(1, favoriteColorsSchema); + colors.add("blue" + i); + + user.put("favorite_color", colors); + + writer.write(user); + } + } + + } + } diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc new file mode 100644 index 000000000000..67a0cca1c569 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc @@ -0,0 +1,9 @@ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_colors", "type": { "type": "array", "items": ["string","null"] }, "default": null } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-nullable-array.avsc b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-nullable-array.avsc new file mode 100644 index 000000000000..8986ebabf6b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-nullable-array.avsc @@ -0,0 +1,9 @@ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_colors", "type": [ "null", { "type": "array", "items": ["string","null"] } ], "default": null } + ] +} \ No newline at end of file From e09ab9b69a3bd21c09118dd2a2c9b8bd4e091bae Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 29 Jun 2018 09:25:57 -0400 Subject: [PATCH 05/13] NIFI-5361: When submitting many processors to start, calculate the 'timeout timestamp' immediately before calling @OnScheduled method, after the task has been scheduled to run, instead of before the task has a chance to run. This closes #2831 --- .../org/apache/nifi/controller/StandardProcessorNode.java | 8 ++++++-- .../controller/scheduling/StandardProcessScheduler.java | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 6f51f87ba05c..413f0974a2f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1481,12 +1481,16 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l final Processor processor = getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); - final long completionTimestamp = System.currentTimeMillis() + onScheduleTimeoutMillis; + // Completion Timestamp is set to MAX_VALUE because we don't want to timeout until the task has a chance to run. + final AtomicLong completionTimestampRef = new AtomicLong(Long.MAX_VALUE); // Create a task to invoke the @OnScheduled annotation of the processor final Callable startupTask = () -> { LOG.debug("Invoking @OnScheduled methods of {}", processor); + // Now that the task has been scheduled, set the timeout + completionTimestampRef.set(System.currentTimeMillis() + onScheduleTimeoutMillis); + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { try { activateThread(); @@ -1571,7 +1575,7 @@ public void run() { return; } - monitorAsyncTask(taskFuture, monitoringFuture, completionTimestamp); + monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get()); } }; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 8f7eb2fdb057..b23e76356e4a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -319,7 +319,7 @@ public void trigger() { @Override public Future scheduleTask(final Callable task) { lifecycleState.incrementActiveThreadCount(null); - return componentMonitoringThreadPool.submit(task); + return componentLifeCycleThreadPool.submit(task); } @Override From cfdb0de8f83c3e7d732cac6a5d18b2f1631d20e5 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 2 Jul 2018 10:00:38 -0400 Subject: [PATCH 06/13] NIFI-5362: When a processor is terminated and has no more active threads, ensure that we set this.hasActiveThreads = false Signed-off-by: Pierre Villard This closes #2832. --- .../java/org/apache/nifi/controller/StandardProcessorNode.java | 1 + 1 file changed, 1 insertion(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 413f0974a2f0..c2b98e64584b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1453,6 +1453,7 @@ public int terminate() { getLogger().terminate(); scheduledState.set(ScheduledState.STOPPED); + hasActiveThreads = false; return count; } From 7b28b914cd6ca4c5f0d566a8726ca02ebd38f3c2 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 7 Jun 2018 12:13:21 +0200 Subject: [PATCH 07/13] NIFI-5278: fixes JSON escaping of code Change-Id: I2cb0e6c658d4a0f2aad9c4aab9201a3334ee54df NIFI-5278: adds Apache Commons Text to NOTICE Change-Id: I8185239b0a888c16159b18f13d6682ba350cc766 NIFI-5278: adds tests Change-Id: I9286ac71bc7399e5bdc1e6602609b5e8829db27e NIFI-5278: fixes review findings Change-Id: I292c93dae877cf1cd146f3897b7e132b6afac801 Signed-off-by: Matthew Burgess This closes #2768 --- nifi-assembly/NOTICE | 5 ++ .../src/main/resources/META-INF/NOTICE | 5 ++ .../nifi-livy-processors/pom.xml | 5 ++ .../livy/ExecuteSparkInteractive.java | 4 +- .../livy/ExecuteSparkInteractiveTestBase.java | 86 ++++++++++++------- .../livy/TestExecuteSparkInteractive.java | 36 ++------ .../livy/TestExecuteSparkInteractiveSSL.java | 33 ++----- 7 files changed, 87 insertions(+), 87 deletions(-) diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 9f413d6fb505..b3d24e936da4 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -291,6 +291,11 @@ The following binary components are provided under the Apache Software License v This product includes software from the Spring Framework, under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + (ASLv2) Apache Commons Text + The following NOTICE information applies: + Apache Commons Text + Copyright 2001-2018 The Apache Software Foundation + (ASLv2) Apache Commons Configuration The following NOTICE information applies: Apache Commons Configuration diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE index ffbe292b79df..787d2e492dc0 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE @@ -53,6 +53,11 @@ The following binary components are provided under the Apache Software License v Apache Commons IO Copyright 2002-2016 The Apache Software Foundation + (ASLv2) Apache Commons Text + The following NOTICE information applies: + Apache Commons Text + Copyright 2001-2018 The Apache Software Foundation + (ASLv2) Jackson JSON processor The following NOTICE information applies: # Jackson JSON processor diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml index 8be74d3bdca4..7d33e427ad56 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml @@ -98,5 +98,10 @@ 1.7.0 test + + org.apache.commons + commons-text + 1.3 + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java index 4a878429fec2..d8ca9e10ce44 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.text.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; @@ -207,7 +207,7 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro } } - code = StringEscapeUtils.escapeJavaScript(code); + code = StringEscapeUtils.escapeJson(code); String payload = "{\"code\":\"" + code + "\"}"; try { final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java index 3a2c67a0f9a2..f0076e785372 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java @@ -16,37 +16,43 @@ */ package org.apache.nifi.processors.livy; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.web.util.TestServer; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; -import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; +import java.util.List; -public class ExecuteSparkInteractiveTestBase { +class ExecuteSparkInteractiveTestBase { public static class LivyAPIHandler extends AbstractHandler { int session1Requests = 0; @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { baseRequest.setHandled(true); - response.setStatus(200); + int responseStatus = 404; + String responseContentType = "text/plain"; + String responseBody = "Not found"; if ("GET".equalsIgnoreCase(request.getMethod())) { - - String responseBody = "{}"; - response.setContentType("application/json"); - + responseStatus = 200; + responseBody = "{}"; + responseContentType = "application/json"; if ("/sessions".equalsIgnoreCase(target)) { responseBody = "{\"sessions\": [{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}]}"; } else if (target.startsWith("/sessions/") && !target.contains("statement")) { responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}"; - } else if ("/sessions/1/statements/7".equalsIgnoreCase(target)) { switch (session1Requests) { case 0: @@ -64,33 +70,55 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques } session1Requests++; } - - response.setContentLength(responseBody.length()); - - try (PrintWriter writer = response.getWriter()) { - writer.print(responseBody); - writer.flush(); + } else if ("POST".equalsIgnoreCase(request.getMethod())) { + String requestBody = IOUtils.toString(request.getReader()); + try { + // validate JSON payload + new ObjectMapper().readTree(requestBody); + + responseStatus = 200; + responseBody = "{}"; + responseContentType = "application/json"; + if ("/sessions".equalsIgnoreCase(target)) { + responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}"; + } else if ("/sessions/1/statements".equalsIgnoreCase(target)) { + responseBody = "{\"id\": 7}"; + } + } catch (JsonProcessingException e) { + responseStatus = 400; + responseContentType = "text/plain"; + responseBody = "Bad request"; } + } - } else if ("POST".equalsIgnoreCase(request.getMethod())) { + response.setStatus(responseStatus); + response.setContentType(responseContentType); + response.setContentLength(responseBody.length()); - String responseBody = "{}"; - response.setContentType("application/json"); + try (PrintWriter writer = response.getWriter()) { + writer.print(responseBody); + writer.flush(); + } - if ("/sessions".equalsIgnoreCase(target)) { - responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}"; - } else if ("/sessions/1/statements".equalsIgnoreCase(target)) { - responseBody = "{\"id\": 7}"; - } + } + } - response.setContentLength(responseBody.length()); + TestRunner runner; - try (PrintWriter writer = response.getWriter()) { - writer.print(responseBody); - writer.flush(); - } + void testCode(TestServer server, String code) throws Exception { + server.addHandler(new LivyAPIHandler()); - } + runner.enqueue(code); + runner.run(); + List waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); + while (!waitingFlowfiles.isEmpty()) { + Thread.sleep(1000); + runner.clearTransferState(); + runner.enqueue(code); + runner.run(); + waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); } + runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1); } + } diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java index 1be718ad8177..992e2e5c6ce7 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java @@ -17,26 +17,18 @@ package org.apache.nifi.processors.livy; import org.apache.nifi.controller.livy.LivySessionController; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.util.TestServer; -import org.eclipse.jetty.server.Handler; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; -import java.util.List; - public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase { - public static TestServer server; - public static String url; - - public TestRunner runner; + private static TestServer server; + private static String url; @BeforeClass public static void beforeClass() throws Exception { @@ -52,10 +44,6 @@ public static void beforeClass() throws Exception { url = server.getUrl(); } - public void addHandler(Handler handler) { - server.addHandler(handler); - } - @AfterClass public static void afterClass() throws Exception { server.shutdownServer(); @@ -79,25 +67,17 @@ public void after() { runner.shutdown(); } - private static TestServer createServer() throws IOException { + private static TestServer createServer() { return new TestServer(); } @Test public void testSparkSession() throws Exception { + testCode(server, "print \"hello world\""); + } - addHandler(new LivyAPIHandler()); - - runner.enqueue("print \"hello world\""); - runner.run(); - List waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); - while (!waitingFlowfiles.isEmpty()) { - Thread.sleep(1000); - runner.clearTransferState(); - runner.enqueue("print \"hello world\""); - runner.run(); - waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); - } - runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1); + @Test + public void testSparkSessionWithSpecialChars() throws Exception { + testCode(server, "print \"/'?!<>[]{}()$&*=%;.|_-\\\""); } } diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java index 3e84cba64d0c..c5cec742ad2e 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java @@ -18,30 +18,23 @@ import org.apache.nifi.controller.livy.LivySessionController; import org.apache.nifi.ssl.StandardSSLContextService; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.util.TestServer; -import org.eclipse.jetty.server.Handler; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestBase { private static Map sslProperties; - public static TestServer server; - public static String url; - - public TestRunner runner; + private static TestServer server; + private static String url; @BeforeClass public static void beforeClass() throws Exception { @@ -64,10 +57,6 @@ public static void beforeClass() throws Exception { url = server.getSecureUrl(); } - public void addHandler(Handler handler) { - server.addHandler(handler); - } - @AfterClass public static void afterClass() throws Exception { server.shutdownServer(); @@ -101,25 +90,13 @@ public void after() { runner.shutdown(); } - private static TestServer createServer() throws IOException { + private static TestServer createServer() { return new TestServer(sslProperties); } @Test - public void testSslSparkSession() throws Exception { - addHandler(new LivyAPIHandler()); - - runner.enqueue("print \"hello world\""); - runner.run(); - List waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); - while (!waitingFlowfiles.isEmpty()) { - Thread.sleep(1000); - runner.clearTransferState(); - runner.enqueue("print \"hello world\""); - runner.run(); - waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); - } - runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1); + public void testSparkSession() throws Exception { + testCode(server,"print \"hello world\""); } private static Map createSslProperties() { From f4b2aae48ae32d2d8c7dbe89d342a749bdc32575 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 28 Jun 2018 15:02:58 -0400 Subject: [PATCH 08/13] NIFI-5349: Fixed EL handling in Initial Max Value props for DB Fetch processors Signed-off-by: Pierre Villard This closes #2822. --- .../AbstractDatabaseFetchProcessor.java | 30 +--- .../standard/GenerateTableFetch.java | 24 ++- .../standard/QueryDatabaseTable.java | 20 ++- .../standard/QueryDatabaseTableTest.java | 56 ++++++ .../standard/TestGenerateTableFetch.java | 161 ++++++++++++++++++ 5 files changed, 261 insertions(+), 30 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 924c7da18776..e13f9de8369a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -21,7 +21,6 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; @@ -220,18 +219,6 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .build(); } - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .required(false) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) - .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .dynamic(true) - .build(); - } - // A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language protected Collection customValidate(ValidationContext validationContext) { // For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language) @@ -540,19 +527,16 @@ protected static String getStateKey(String prefix, String columnName, DatabaseAd return sb.toString(); } - protected Map getDefaultMaxValueProperties(final Map properties){ - final Map defaultMaxValues = new HashMap<>(); + protected Map getDefaultMaxValueProperties(final ProcessContext context, final FlowFile flowFile) { + final Map defaultMaxValues = new HashMap<>(); - for (final Map.Entry entry : properties.entrySet()) { - final String key = entry.getKey().getName(); + context.getProperties().forEach((k, v) -> { + final String key = k.getName(); - if(!key.startsWith(INITIAL_MAX_VALUE_PROP_START)) { - continue; + if (key.startsWith(INITIAL_MAX_VALUE_PROP_START)) { + defaultMaxValues.put(key.substring(INITIAL_MAX_VALUE_PROP_START.length()), context.getProperty(k).evaluateAttributeExpressions(flowFile).getValue()); } - - defaultMaxValues.put(key.substring(INITIAL_MAX_VALUE_PROP_START.length()), entry.getValue()); - } - + }); return defaultMaxValues; } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 9842d6ce6fc4..dd001a67c463 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -37,6 +37,7 @@ import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -98,9 +99,11 @@ @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."), @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.") }) -@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", - expressionLanguageScope = ExpressionLanguageScope.NONE, description = "Specifies an initial " - + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") +@DynamicProperty(name = "initial.maxvalue.", value = "Initial maximum value for the specified column", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Specifies an initial " + + "max value for max value columns. Properties should be added in the format `initial.maxvalue.`. This value is only used the first time " + + "the table is accessed (when a Maximum Value Column is specified). In the case of incoming connections, the value is only used the first time for each table " + + "specified in the flow files.") public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder() @@ -164,6 +167,18 @@ protected List getSupportedPropertyDescriptors() { return propDescriptors; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } + @Override protected Collection customValidate(ValidationContext validationContext) { List results = new ArrayList<>(super.customValidate(validationContext)); @@ -180,7 +195,6 @@ protected Collection customValidate(ValidationContext validati @Override @OnScheduled public void setup(final ProcessContext context) { - maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) { getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor."); } @@ -209,6 +223,8 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory return; } } + maxValueProperties = getDefaultMaxValueProperties(context, fileToProcess); + final ComponentLog logger = getLogger(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 1dfe64c131ec..b245f5750d89 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -34,6 +34,7 @@ import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -107,8 +108,9 @@ + "FlowFiles were produced"), @WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The " + "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated.")}) -@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.NONE, - description = "Specifies an initial max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") +@DynamicProperty(name = "initial.maxvalue.", value = "Initial maximum value for the specified column", + expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should " + + "be added in the format `initial.maxvalue.`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).") public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { public static final String RESULT_TABLENAME = "tablename"; @@ -200,9 +202,21 @@ protected List getSupportedPropertyDescriptors() { return propDescriptors; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .dynamic(true) + .build(); + } + @OnScheduled public void setup(final ProcessContext context) { - maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); + maxValueProperties = getDefaultMaxValueProperties(context, null); } @OnStopped diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 87bf2a2e3f1d..a27c529a5679 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -897,6 +897,62 @@ public void testInitialMaxValue() throws ClassNotFoundException, SQLException, I runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); runner.clearTransferState(); + } + + @Test + public void testInitialMaxValueWithEL() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + cal.setTimeInMillis(0); + + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + int rowCount=0; + //create larger row set + for(int batch=0;batch<10;batch++){ + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')"); + + rowCount++; + cal.add(Calendar.MINUTE, 1); + } + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}"); + runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on"); + + cal.setTimeInMillis(0); + cal.add(Calendar.MINUTE, 5); + runner.setProperty("initial.maxvalue.CREATED_ON", "${created.on}"); + runner.setVariable("created.on", dateFormat.format(cal.getTime().getTime())); + // Initial run with no previous state. Should get only last 4 records + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(4, getNumberOfRecordsFromStream(in)); + runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + // Validate Max Value doesn't change also + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); + runner.clearTransferState(); // Append a new row, expect 1 flowfile one row cal.setTimeInMillis(0); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index f6c27fa62d6b..44dcadf2a747 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -963,6 +963,167 @@ public void testInitialMaxValue() throws ClassNotFoundException, SQLException, I runner.clearTransferState(); } + @Test + public void testInitialMaxValueWithEL() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty("initial.maxvalue.ID", "${maxval.id}"); + runner.setVariable("maxval.id", "1"); + + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + String query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query); + ResultSet resultSet = stmt.executeQuery(query); + // Should be one record (the initial max value skips the first two) + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0); + runner.clearTransferState(); + } + + @Test + public void testInitialMaxValueWithELAndIncoming() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty("initial.maxvalue.ID", "${maxval.id}"); + Map attrs = new HashMap() {{ + put("maxval.id", "1"); + }}; + runner.setIncomingConnection(true); + runner.enqueue(new byte[0], attrs); + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + String query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query); + ResultSet resultSet = stmt.executeQuery(query); + // Should be one record (the initial max value skips the first two) + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.enqueue(new byte[0], attrs); + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0); + runner.clearTransferState(); + } + + @Test + public void testInitialMaxValueWithELAndMultipleTables() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${table.name}"); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty("initial.maxvalue.ID", "${maxval.id}"); + Map attrs = new HashMap() {{ + put("maxval.id", "1"); + put("table.name", "TEST_QUERY_DB_TABLE"); + }}; + runner.setIncomingConnection(true); + runner.enqueue(new byte[0], attrs); + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + String query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query); + ResultSet resultSet = stmt.executeQuery(query); + // Should be one record (the initial max value skips the first two) + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.enqueue(new byte[0], attrs); + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0); + runner.clearTransferState(); + + // Initial Max Value for second table + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE2"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + attrs.put("table.name", "TEST_QUERY_DB_TABLE2"); + runner.enqueue(new byte[0], attrs); + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE2 WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query); + resultSet = stmt.executeQuery(query); + // Should be one record (the initial max value skips the first two) + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.enqueue(new byte[0], attrs); + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0); + runner.clearTransferState(); + } + @Test public void testNoDuplicateWithRightBounded() throws ClassNotFoundException, SQLException, InitializationException, IOException { From d326edb25765c02e66fb16f4b52c47c3bc444f00 Mon Sep 17 00:00:00 2001 From: thenatog Date: Mon, 25 Jun 2018 13:23:28 -0400 Subject: [PATCH 09/13] NIFI-5258 - Changed the way the servlets are created for the documentation webapp. Removed some unnecessary code. Fixed imports. This closes #2812. Signed-off-by: Andy LoPresto --- .../apache/nifi/web/server/JettyServer.java | 57 ++++++++++--------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java index bcada3514870..2f93ec87a964 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java @@ -49,15 +49,12 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; -import org.eclipse.jetty.server.handler.ContextHandler; -import org.eclipse.jetty.server.handler.ContextHandlerCollection; import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.server.handler.ResourceHandler; import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.FilterHolder; -import org.eclipse.jetty.util.resource.Resource; -import org.eclipse.jetty.util.resource.ResourceCollection; +import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.webapp.Configuration; @@ -337,11 +334,10 @@ private Handler loadWars(final Set bundles) { // load the documentation war webDocsContext = loadWar(webDocsWar, docsContextPath, frameworkClassLoader); - // overlay the actual documentation - final ContextHandlerCollection documentationHandlers = new ContextHandlerCollection(); - documentationHandlers.addHandler(createDocsWebApp(docsContextPath)); - documentationHandlers.addHandler(webDocsContext); - handlers.addHandler(documentationHandlers); + // add the servlets which serve the HTML documentation within the documentation web app + addDocsServlets(webDocsContext); + + handlers.addHandler(webDocsContext); // load the web error app final WebAppContext webErrorContext = loadWar(webErrorWar, "/", frameworkClassLoader); @@ -517,39 +513,46 @@ private WebAppContext loadWar(final File warFile, final String contextPath, fina return webappContext; } - private ContextHandler createDocsWebApp(final String contextPath) { + private void addDocsServlets(WebAppContext docsContext) { try { - final ResourceHandler resourceHandler = new ResourceHandler(); - resourceHandler.setDirectoriesListed(false); - + // Load the nifi/docs directory final File docsDir = getDocsDir("docs"); - final Resource docsResource = Resource.newResource(docsDir); // load the component documentation working directory final File componentDocsDirPath = props.getComponentDocumentationWorkingDirectory(); final File workingDocsDirectory = getWorkingDocsDirectory(componentDocsDirPath); - final Resource workingDocsResource = Resource.newResource(workingDocsDirectory); + // Load the API docs final File webApiDocsDir = getWebApiDocsDir(); - final Resource webApiDocsResource = Resource.newResource(webApiDocsDir); - // create resources for both docs locations - final ResourceCollection resources = new ResourceCollection(docsResource, workingDocsResource, webApiDocsResource); - resourceHandler.setBaseResource(resources); + // Create the servlet which will serve the static resources + ServletHolder defaultHolder = new ServletHolder("default", DefaultServlet.class); + defaultHolder.setInitParameter("dirAllowed", "false"); + + ServletHolder docs = new ServletHolder("docs", DefaultServlet.class); + docs.setInitParameter("resourceBase", docsDir.getPath()); + + ServletHolder components = new ServletHolder("components", DefaultServlet.class); + components.setInitParameter("resourceBase", workingDocsDirectory.getPath()); + + ServletHolder restApi = new ServletHolder("rest-api", DefaultServlet.class); + restApi.setInitParameter("resourceBase", webApiDocsDir.getPath()); - // create the context handler - final ContextHandler handler = new ContextHandler(contextPath); - handler.setHandler(resourceHandler); + docsContext.addServlet(docs, "/html/*"); + docsContext.addServlet(components, "/components/*"); + docsContext.addServlet(restApi, "/rest-api/*"); + + docsContext.addServlet(defaultHolder, "/"); + + logger.info("Loading documents web app with context path set to " + docsContext.getContextPath()); - logger.info("Loading documents web app with context path set to " + contextPath); - return handler; } catch (Exception ex) { logger.error("Unhandled Exception in createDocsWebApp: " + ex.getMessage()); startUpFailure(ex); - return null; // required by compiler, though never be executed. } } + /** * Returns a File object for the directory containing NIFI documentation. *

@@ -1052,4 +1055,4 @@ public void destroy() { @FunctionalInterface interface ServerConnectorCreator { ServerConnector create(Server server, HttpConfiguration httpConfiguration); -} \ No newline at end of file +} From b46033be306b2ce66c3316b2f4fec92b941307ac Mon Sep 17 00:00:00 2001 From: thenatog Date: Tue, 3 Jul 2018 14:21:03 -0400 Subject: [PATCH 10/13] NIFI-5374 - Added ExceptionFilter which catches RequestRejectedException thrown in the nifi-api Jersey code. These exceptions were not caught by the Jetty error-page configuration because they're thrown before the endpoint/Jetty routing is hit. Added integration test for checking the ExceptionFilter catches malicious string exceptions. Made minor changes to PR 2840 for code style. This closes #2840. Co-authored-by: Andy LoPresto Signed-off-by: Andy LoPresto --- .../nifi/web/filter/ExceptionFilter.java | 72 +++++++++++++++++++ .../src/main/webapp/WEB-INF/web.xml | 9 ++- .../ITProcessGroupAccessControl.java | 36 ++++++++++ 3 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/ExceptionFilter.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/ExceptionFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/ExceptionFilter.java new file mode 100644 index 000000000000..17d7dc5ed334 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/ExceptionFilter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.filter; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.security.web.firewall.RequestRejectedException; + +/** + * A filter to catch exceptions that aren't handled by the Jetty error-page. + * + */ +public class ExceptionFilter implements Filter { + + private static final Logger logger = LoggerFactory.getLogger(ExceptionFilter.class); + + @Override + public void doFilter(final ServletRequest req, final ServletResponse resp, final FilterChain filterChain) + throws IOException, ServletException { + + try { + filterChain.doFilter(req, resp); + } catch (RequestRejectedException e) { + if (logger.isDebugEnabled()) { + logger.debug("An exception was caught performing the HTTP request security filter check and the stacktrace has been suppressed from the response"); + } + + HttpServletResponse filteredResponse = (HttpServletResponse) resp; + filteredResponse.setStatus(500); + filteredResponse.getWriter().write(e.getMessage()); + + StringWriter sw = new StringWriter(); + sw.write("Exception caught by ExceptionFilter:\n"); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + logger.error(sw.toString()); + } + } + + @Override + public void init(final FilterConfig config) { + } + + @Override + public void destroy() { + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/webapp/WEB-INF/web.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/webapp/WEB-INF/web.xml index 18877109f383..2c87331e1d77 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/webapp/WEB-INF/web.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/webapp/WEB-INF/web.xml @@ -42,7 +42,14 @@ jerseySpring /* - + + exceptionFilter + org.apache.nifi.web.filter.ExceptionFilter + + + exceptionFilter + /* + timer org.apache.nifi.web.filter.TimerFilter diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/integration/accesscontrol/ITProcessGroupAccessControl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/integration/accesscontrol/ITProcessGroupAccessControl.java index 2b3f42c3577a..48061a87dcac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/integration/accesscontrol/ITProcessGroupAccessControl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/integration/accesscontrol/ITProcessGroupAccessControl.java @@ -315,6 +315,35 @@ public void testNoneUserDeleteProcessGroup() throws Exception { verifyDelete(helper.getNoneUser(), NONE_CLIENT_ID, 403); } + /** + * Ensures malicious string inputs added to the end of a process group + * are handled safely and a stack trace is not returned. + * + * @throws Exception ex + */ + @Test + public void testProcessGroupRejectMaliciousString() throws Exception { + final ProcessGroupEntity entity = createProcessGroup(NiFiTestAuthorizer.NO_POLICY_COMPONENT_NAME); + + final String updatedName = "Updated name" + count++; + final String maliciousString = "z-->;"; + final String maliciousErrorMessage = "The request was rejected because the URL contained a potentially malicious String \";\""; + + // attempt to update the name + entity.getRevision().setClientId(READ_WRITE_CLIENT_ID); + entity.getComponent().setName(updatedName); + + // perform the request + final Response response = updateProcessGroup(helper.getReadWriteUser(), entity, maliciousString); + String maliciousStringResponse = response.readEntity(String.class); + + // ensure successful response + assertEquals(500, response.getStatus()); + + // verify + assertEquals(maliciousErrorMessage, maliciousStringResponse); + } + private ProcessGroupEntity getRandomProcessGroup(final NiFiTestUser user) throws Exception { final String url = helper.getBaseUrl() + "/flow/process-groups/root"; @@ -338,6 +367,13 @@ private ProcessGroupEntity getRandomProcessGroup(final NiFiTestUser user) throws return processGroupIter.next(); } + private Response updateProcessGroup(final NiFiTestUser user, final ProcessGroupEntity entity, final String urlParameter) throws Exception { + final String url = helper.getBaseUrl() + "/process-groups/" + entity.getId() + urlParameter; + + // perform the request + return user.testPut(url, entity); + } + private Response updateProcessGroup(final NiFiTestUser user, final ProcessGroupEntity entity) throws Exception { final String url = helper.getBaseUrl() + "/process-groups/" + entity.getId(); From 4f75a0b46cd98ad7642ad0cde98e0504881b7751 Mon Sep 17 00:00:00 2001 From: Mark Bean Date: Fri, 6 Jul 2018 12:29:00 +0000 Subject: [PATCH 11/13] NIFI-5377 prevent infinite loop if a controller service circular reference exists --- .../java/org/apache/nifi/web/StandardNiFiServiceFacade.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index d5b6f806ec04..700185c849e4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -2271,9 +2271,9 @@ private void findControllerServiceReferencingComponentIdentifiers(final Controll if (component instanceof ControllerServiceNode) { final ControllerServiceNode node = (ControllerServiceNode) component; if (!visited.contains(node)) { + visited.add(node); findControllerServiceReferencingComponentIdentifiers(node.getReferences(), visited); } - visited.add(node); } } } From a618ea55975094087023d02e27a9bd859f671702 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Wed, 11 Jul 2018 23:32:23 -0700 Subject: [PATCH 12/13] NIFI-5415 Renamed ListenSyslogGroovyTest to ITListenSyslogGroovy. --- .../nifi/processors/standard/ITListenSyslogGroovy.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy index 1f442253c3b5..3f5d16e37ee6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy @@ -33,8 +33,8 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory @RunWith(JUnit4.class) -class ListenSyslogGroovyTest extends GroovyTestCase { - private static final Logger logger = LoggerFactory.getLogger(ListenSyslogGroovyTest.class) +class ITListenSyslogGroovy extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(ITListenSyslogGroovy.class) static final String ZERO_LENGTH_MESSAGE = " \n" From e959630c22c9a52ec717141f6cf9f018830a38bf Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Thu, 12 Jul 2018 01:19:19 -0700 Subject: [PATCH 13/13] NIFI-5414 Fixed checkstyle error due to unused import. --- .../apache/nifi/controller/service/ServiceStateTransition.java | 1 - 1 file changed, 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java index e78f6c6bb8f5..26e3b82e2b89 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; public class ServiceStateTransition {