From e4aaba5f711cca4fe72b566531486fb3d3b1fdd0 Mon Sep 17 00:00:00 2001 From: Arun Manivannan Date: Thu, 17 Aug 2017 01:30:05 +0800 Subject: [PATCH] NIFI-4198 *ElasticsearchHttp processors do not expose Proxy settings --- .../AbstractElasticsearchHttpProcessor.java | 55 +++++++++++++++++ .../elasticsearch/FetchElasticsearchHttp.java | 4 +- .../elasticsearch/PutElasticsearchHttp.java | 4 +- .../elasticsearch/QueryElasticsearchHttp.java | 4 +- .../ScrollElasticsearchHttp.java | 4 +- .../TestFetchElasticsearchHttp.java | 52 ++++++++++++++++ .../TestPutElasticsearchHttp.java | 54 +++++++++++++++++ .../TestQueryElasticsearchHttp.java | 60 +++++++++++++++++++ 8 files changed, 233 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java index 49ab682c91f8..80a33e9c09df 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -18,11 +18,13 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.Authenticator; import okhttp3.Credentials; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import okhttp3.Route; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -41,6 +43,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -81,6 +84,22 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .addValidator(StandardValidators.PORT_VALIDATOR) .build(); + public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder() + .name("proxy-username") + .displayName("Proxy Username") + .description("Proxy Username") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder() + .name("proxy-password") + .displayName("Proxy Password") + .description("Proxy Password") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .sensitive(true) + .build(); + public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder() .name("elasticsearch-http-connect-timeout") .displayName("Connection Timeout") @@ -114,6 +133,27 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String proper .build(); } + private static final List propertyDescriptors; + + static { + final List properties = new ArrayList<>(); + properties.add(ES_URL); + properties.add(PROXY_HOST); + properties.add(PROXY_PORT); + properties.add(PROXY_USERNAME); + properties.add(PROXY_PASSWORD); + properties.add(RESPONSE_TIMEOUT); + + propertyDescriptors = Collections.unmodifiableList(properties); + } + + @Override + public List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; + } + @Override protected void createElasticsearchClient(ProcessContext context) throws ProcessException { okHttpClientAtomicReference.set(null); @@ -128,6 +168,21 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE okHttpClient.proxy(proxy); } + final String proxyUsername = context.getProperty(PROXY_USERNAME).getValue(); + final String proxyPassword = context.getProperty(PROXY_PASSWORD).getValue(); + + if (proxyUsername != null && proxyPassword != null){ + okHttpClient.proxyAuthenticator(new Authenticator() { + @Override + public Request authenticate(Route route, Response response) throws IOException { + final String credential=Credentials.basic(proxyUsername, proxyPassword); + return response.request().newBuilder() + .header("Proxy-Authorization", credential) + .build(); + } + }); + } + // Set timeouts okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java index 63163e479a85..fb94395b036b 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -168,7 +168,9 @@ public Set getRelationships() { @Override public final List getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; } @OnScheduled diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 9102bfb1a8c4..05f8fe19fe6a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -173,7 +173,9 @@ public Set getRelationships() { @Override public final List getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; } @Override diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index 4b7312f517cc..7ff5fa19add0 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -211,7 +211,9 @@ public Set getRelationships() { @Override public final List getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; } @OnScheduled diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java index d49e71f535ce..5a187c092794 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -194,7 +194,9 @@ public Set getRelationships() { @Override public final List getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.addAll(propertyDescriptors); + return properties; } @OnScheduled diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java index de56b496866f..36570de8be08 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -456,4 +456,56 @@ public void testFetchElasticsearchBatch() throws IOException { runner.run(100); runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100); } + + @Test + @Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.") + public void testFetchElasticsearchBasicBehindProxy() { + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.setProperty(FetchElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(FetchElasticsearchHttp.PROXY_PORT, "3228"); + runner.setProperty(FetchElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Authenticated Proxy : Comment this out if you want to run against local proxied ES.") + public void testFetchElasticsearchBasicBehindAuthenticatedProxy() { + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.setProperty(FetchElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(FetchElasticsearchHttp.PROXY_PORT, "3328"); + runner.setProperty(FetchElasticsearchHttp.PROXY_USERNAME, "squid"); + runner.setProperty(FetchElasticsearchHttp.PROXY_PASSWORD, "changeme"); + runner.setProperty(FetchElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index 5ca38149c0fb..073ca16c540b 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -512,4 +512,58 @@ public void testPutElasticSearchBatch() throws IOException { runner.run(); runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 100); } + + @Test + @Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.") + public void testPutElasticSearchBasicBehindProxy() { + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.setProperty(PutElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(PutElasticsearchHttp.PROXY_PORT, "3228"); + runner.setProperty(PutElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Authenticated Proxy : Comment this out if you want to run against local proxied ES.") + public void testPutElasticSearchBasicBehindAuthenticatedProxy() { + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.setProperty(PutElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(PutElasticsearchHttp.PROXY_PORT, "3328"); + runner.setProperty(PutElasticsearchHttp.PROXY_USERNAME, "squid"); + runner.setProperty(PutElasticsearchHttp.PROXY_PASSWORD, "changeme"); + runner.setProperty(PutElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java index 478949623eaf..f109e2faed54 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java @@ -37,6 +37,7 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.After; +import org.junit.Ignore; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -362,6 +363,65 @@ public void testSetupSecureClient() throws Exception { runner.run(1, true, true); } + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Integration test section below + // + // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution. + // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each + // desired test. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @Test + @Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.") + public void testQueryElasticsearchBasicBehindProxy() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new QueryElasticsearchHttp()); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location"); + + runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228"); + runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("doc_id", "28039652140"); + }}); + + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Authenticated Proxy : Comment this out if you want to run against local proxied ES.") + public void testQueryElasticsearchBasicBehindAuthenticatedProxy() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new QueryElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location"); + + runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost"); + runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3328"); + runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid"); + runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme"); + runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("doc_id", "28039652140"); + }}); + + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 1); + } + @Test public void testQueryElasticsearchOnTrigger_withQueryParameters() throws IOException { QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();