diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/README.md b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/README.md index 023544d4e9ae..82d7b194c91f 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/README.md +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/README.md @@ -17,60 +17,32 @@ ## Integration Tests -The `nifi-elasticsearch-client-service` component build allows for optional Integration Tests to be executed to verify -additional functionality. +### Overview -The Integration Tests create an in-memory instance of Elasticsearch, populate it with known data, perform operations -upon the instance and verify the results. +The integration tests use [Testcontainers](https://www.testcontainers.org/) to provide a sane default for developers who have installed Docker. Testcontainers support can be disabled by setting the system property `elasticsearch.testcontainers.enabled` to something other than `true`. If Testcontainers are disabled, the endpoint will need to be configured. It can be set manually with the system property `elasticsearch.endpoint`. The default value is `http://localhost:9200`. -These can be activated by running the following build commands: +### Maven Profiles -### Elasticsearch 5 +* `integration-tests` +* `elasticsearch6` +* `elasticsearch7` -Test integration with Elasticsearch 5.x: +### Configurable System Properties -```bash -mvn -P integration-tests,elasticsearch-oss clean verify -``` +* `elasticsearch.endpoint` - Manually configure the endpoint root for a non-Docker version of Elasticsearch, +* `elasticsearch.testcontainers.enabled` - Set to anything other than `true` to disable Testcontainers and use a non-Docker version of Elasticsearch. +* `elasticsearch.elastic_user.password` - Set the Elasticsearch `elastic` user's password. When Testcontainers are enabled, this sets up the Docker container and the rest clients for accessing it within the tests. When Testcontainers are disabled, it needs to be set to whatever password is used on the external Elasticsearch node or cluster. -### Elasticsearch 6 +### Maven Run Examples -Test integration with Elasticsearch 6.x: +Elasticsearch 8.X is the current default version of Elasticsearch when Testcontainers are used. An example run of the integration tests with Elasticsearch 7 support would be like this: -```bash -mvn -P integration-tests,elasticsearch-oss,elasticsearch-6 clean verify -``` +`mvn clean install -Pintegration-tests,elasticsearch7` -### Elasticsearch 7 +An example using a non-Docker version of Elasticsearch: -[elasticsearch-oss](https://www.elastic.co/downloads/past-releases#elasticsearch-oss) was discontinued after `7.10.2`, -so the use of `elasticsearch-oss` is unnecessary for newer versions. +`mvn clean install -Pintegration-tests -Delasticsearch.testcontainers.enabled=false -Delasticsearch.elastic_user.password=s3cret1234` -For 7.x, we have two separate profiles: +### Misc -1. `elasticsearch-7` that can be used with `oss` (no X-Pack) and `default` (with X-Pack) flavours -2. `elasticsearch-7-no-oss` that can only be used with the `default` flavour (using a newer version of [elasticsearch](https://www.elastic.co/downloads/past-releases#elasticsearch)) - -#### With X-Pack - -Allows for testing of some X-Pack only features such as "Point in Time" querying: - -```bash -mvn -P integration-tests,elasticsearch-default,elasticsearch-7 clean verify -sleep 2 -mvn -P integration-tests,elasticsearch-default,elasticsearch-7-no-oss clean verify -``` - -#### Without X-Pack - -```bash -mvn -P integration-tests,elasticsearch-oss,elasticsearch-7 clean verify -``` - -### Elasticsearch 8 - -Test integration with Elasticsearch 8.x (with X-Pack): - -```bash -mvn -P integration-tests,elasticsearch-default,elasticsearch-8 clean verify -``` +The Testcontainers support currently only supports the x64 release of Dockerized Elasticsearch. ARM64 support may be added later. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml index a5afdcdf0af2..eff494cb66fd 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml @@ -25,20 +25,6 @@ nifi-elasticsearch-client-service jar - - - 5.6.16 - 6.19 - setup-5.script - faketype - src/test/resources/conf-5/ - testCluster - 9500 - 9400 - 90 - ERROR - - org.apache.nifi @@ -196,6 +182,17 @@ ${nifi.groovy.version} test + + + org.testcontainers + elasticsearch + ${testcontainers.version} + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + @@ -204,181 +201,40 @@ false + + 8.4.3 + s3cret + org.apache.maven.plugins maven-failsafe-plugin + + + http://localhost:9200 + true + docker.elastic.co/elasticsearch/elasticsearch:${elasticsearch_docker_image} + ${elasticsearch.elastic.password} + + - - - elasticsearch-6 - - false - + elasticsearch6 - 6.8.23 - _doc - setup-6.script - + 6.8.23 - - elasticsearch-7 - - false - + elasticsearch7 - 7.10.2 - setup-7.script - - + 7.17.6 - - - elasticsearch-7-no-oss - - false - - - 7.17.3 - setup-7.script - - - - - - - elasticsearch-8 - - false - - - 8.2.0 - - 6.22 - setup-8.script - - - - - - - - elasticsearch-oss - - false - - - - - org.apache.maven.plugins - maven-failsafe-plugin - - - ${es.int.type.name} - ${es.int.version} - oss - - - - - com.github.alexcojocaru - elasticsearch-maven-plugin - ${alexcojocaru.plugin.version} - - ${es.int.clusterName} - ${es.int.transportPort} - ${es.int.httpPort} - ${es.int.version} - ${es.int.timeout} - ${es.int.logLevel} - ${project.basedir}/src/test/resources/${es.int.script.name} - false - ${es.int.path.conf} - - - - start-elasticsearch - pre-integration-test - - runforked - - - - stop-elasticsearch - post-integration-test - - stop - - - - - - - - - - - elasticsearch-default - - false - - - - - org.apache.maven.plugins - maven-failsafe-plugin - - - ${es.int.type.name} - ${es.int.version} - default - - - - - com.github.alexcojocaru - elasticsearch-maven-plugin - ${alexcojocaru.plugin.version} - - default - ${es.int.clusterName} - ${es.int.transportPort} - ${es.int.httpPort} - ${es.int.version} - ${es.int.timeout} - ${es.int.logLevel} - ${project.basedir}/src/test/resources/${es.int.script.name} - false - - - - start-elasticsearch - pre-integration-test - - runforked - - - - stop-elasticsearch - post-integration-test - - stop - - - - - - - diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy deleted file mode 100644 index 082a67f2941c..000000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.elasticsearch - -import org.junit.jupiter.api.Test -import static org.junit.jupiter.api.Assertions.assertEquals -import static org.junit.jupiter.api.Assertions.assertTrue - -class SearchResponseTest { - @Test - void test() { - def results = [] - def aggs = [:] - def pitId = "pitId" - def scrollId = "scrollId" - def searchAfter = "searchAfter" - def num = 10 - def took = 100 - def timeout = false - def warnings = ["auth"] - def response = new SearchResponse(results, aggs as Map, pitId, scrollId, searchAfter, num, took, timeout, warnings) - def str = response.toString() - - assertEquals(results, response.hits) - assertEquals(aggs, response.aggregations) - assertEquals(pitId, response.pitId) - assertEquals(scrollId, response.scrollId) - assertEquals(num, response.numberOfHits) - assertEquals(took, response.took) - assertEquals(timeout, response.timedOut) - assertEquals(warnings, response.warnings) - assertTrue(str.contains("aggregations")) - assertTrue(str.contains("hits")) - assertTrue(str.contains("pitId")) - assertTrue(str.contains("scrollId")) - assertTrue(str.contains("searchAfter")) - assertTrue(str.contains("numberOfHits")) - assertTrue(str.contains("took")) - assertTrue(str.contains("timedOut")) - assertTrue(str.contains("warnings")) - } -} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy deleted file mode 100644 index 0997542af2d6..000000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy +++ /dev/null @@ -1,695 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License") you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.elasticsearch.integration - -import groovy.json.JsonSlurper -import org.apache.maven.artifact.versioning.ComparableVersion -import org.apache.nifi.elasticsearch.DeleteOperationResponse -import org.apache.nifi.elasticsearch.ElasticSearchClientService -import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl -import org.apache.nifi.elasticsearch.ElasticsearchException -import org.apache.nifi.elasticsearch.IndexOperationRequest -import org.apache.nifi.elasticsearch.IndexOperationResponse -import org.apache.nifi.elasticsearch.SearchResponse -import org.apache.nifi.elasticsearch.UpdateOperationResponse -import org.apache.nifi.security.util.StandardTlsConfiguration -import org.apache.nifi.security.util.TemporaryKeyStoreBuilder -import org.apache.nifi.security.util.TlsConfiguration -import org.apache.nifi.util.StringUtils -import org.apache.nifi.util.TestRunner -import org.apache.nifi.util.TestRunners -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test - -import static groovy.json.JsonOutput.prettyPrint -import static groovy.json.JsonOutput.toJson -import static org.junit.jupiter.api.Assertions.assertNotEquals -import static org.junit.jupiter.api.Assertions.assertThrows -import static org.junit.jupiter.api.Assumptions.assumeTrue - -import static org.junit.jupiter.api.Assertions.assertEquals -import static org.junit.jupiter.api.Assertions.assertFalse -import static org.junit.jupiter.api.Assertions.assertNotNull -import static org.junit.jupiter.api.Assertions.assertNull -import static org.junit.jupiter.api.Assertions.assertTrue - -class ElasticSearchClientService_IT { - private TestRunner runner - private ElasticSearchClientServiceImpl service - - static final String INDEX = "messages" - static final String TYPE = StringUtils.isBlank(System.getProperty("type_name")) ? null : System.getProperty("type_name") - - static final ComparableVersion VERSION = new ComparableVersion(System.getProperty("es_version", "0.0.0")) - static final ComparableVersion ES_7_10 = new ComparableVersion("7.10") - static final ComparableVersion ES_8_0 = new ComparableVersion("8.0") - - static final String FLAVOUR = System.getProperty("es_flavour") - static final String DEFAULT = "default" - - private static TlsConfiguration generatedTlsConfiguration - private static TlsConfiguration truststoreTlsConfiguration - - static boolean isElasticsearchSetup() { - boolean setup = true - if (StringUtils.isBlank(System.getProperty("es_version"))) { - System.err.println("Cannot run Elasticsearch integration-tests: Elasticsearch version (5, 6, 7) not specified") - setup = false - } - - if (StringUtils.isBlank(System.getProperty("es_flavour"))) { - System.err.println("Cannot run Elasticsearch integration-tests: Elasticsearch flavour (oss, default) not specified") - setup = false - } - - return setup - } - - @BeforeAll - static void beforeAll() throws Exception { - assumeTrue(isElasticsearchSetup(), "Elasticsearch integration-tests not setup") - - System.out.println( - String.format("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nVERSION: %s%nFLAVOUR %s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n", - TYPE, VERSION, FLAVOUR) - ) - - generatedTlsConfiguration = new TemporaryKeyStoreBuilder().build() - truststoreTlsConfiguration = new StandardTlsConfiguration( - null, - null, - null, - generatedTlsConfiguration.getTruststorePath(), - generatedTlsConfiguration.getTruststorePassword(), - generatedTlsConfiguration.getTruststoreType() - ) - } - - @BeforeEach - void before() throws Exception { - runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class) - service = new ElasticSearchClientServiceImpl() - runner.addControllerService("Client Service", service) - runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400") - runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000") - runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000") - runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000") - runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue()) - - try { - runner.enableControllerService(service) - } catch (Exception ex) { - ex.printStackTrace() - throw ex - } - - service.refresh(null, null) - } - - @AfterEach - void after() throws Exception { - service.onDisabled() - } - - @Test - void testBasicSearch() throws Exception { - String query = prettyPrint(toJson([ - size: 10, - query: [ - match_all: [:] - ], - aggs: [ - term_counts: [ - terms: [ - field: "msg", - size: 5 - ] - ] - ] - ])) - - - SearchResponse response = service.search(query, INDEX, TYPE, null) - assertNotNull(response, "Response was null") - - assertEquals(15, response.numberOfHits, "Wrong count") - assertFalse(response.isTimedOut(), "Timed out") - assertNotNull(response.getHits(), "Hits was null") - assertEquals(10, response.hits.size(), "Wrong number of hits") - assertNotNull(response.aggregations, "Aggregations are missing") - assertEquals(1, response.aggregations.size(), "Aggregation count is wrong") - assertNull(response.scrollId, "Unexpected ScrollId") - assertNull(response.searchAfter, "Unexpected Search_After") - assertNull(response.pitId, "Unexpected pitId") - - Map termCounts = response.aggregations.get("term_counts") as Map - assertNotNull(termCounts, "Term counts was missing") - def buckets = termCounts.get("buckets") - assertNotNull(buckets, "Buckets branch was empty") - def expected = [ - "one": 1, - "two": 2, - "three": 3, - "four": 4, - "five": 5 - ] - - buckets.each { aggRes -> - def key = aggRes["key"] - def docCount = aggRes["doc_count"] - assertEquals(expected[key as String], docCount, "${key} did not match.") - } - } - - @Test - void testBasicSearchRequestParameters() throws Exception { - String query = prettyPrint(toJson([ - size: 10, - query: [ - match_all: [:] - ], - aggs: [ - term_counts: [ - terms: [ - field: "msg", - size: 5 - ] - ] - ] - ])) - - - SearchResponse response = service.search(query, "messages", TYPE, [preference: "_local"]) - assertNotNull(response, "Response was null") - - assertEquals(15, response.numberOfHits, "Wrong count") - assertFalse(response.isTimedOut(), "Timed out") - assertNotNull(response.getHits(), "Hits was null") - assertEquals(10, response.hits.size(), "Wrong number of hits") - assertNotNull(response.aggregations, "Aggregations are missing") - assertEquals(1, response.aggregations.size(), "Aggregation count is wrong") - - Map termCounts = response.aggregations.get("term_counts") as Map - assertNotNull(termCounts, "Term counts was missing") - def buckets = termCounts.get("buckets") - assertNotNull(buckets, "Buckets branch was empty") - def expected = [ - "one": 1, - "two": 2, - "three": 3, - "four": 4, - "five": 5 - ] - - buckets.each { aggRes -> - String key = aggRes["key"] - def docCount = aggRes["doc_count"] - assertEquals(expected[key], docCount, "${key} did not match.") - } - } - - @Test - void testSearchWarnings() { - assumeTrue(VERSION < ES_8_0, "Requires version <8.0 (no search API deprecations yet for 8.x)") - - String query = null - String type = TYPE - if (VERSION.toString().startsWith("8.")) { - // TODO: something that's deprecated when the 8.x branch progresses to include search-API deprecations - } else if (VERSION.toString().startsWith("7.")) { - // querying with _type in ES 7.x is deprecated - query = prettyPrint(toJson([size: 1, query: [match_all: [:]]])) - type = "a-type" - } else if (VERSION.toString().startsWith("6.")) { - // "query_string" query option "all_fields" in ES 6.x is deprecated - query = prettyPrint(toJson([size: 1, query: [query_string: [query: 1, all_fields: true]]])) - } else { - // "mlt" query in ES 5.x is deprecated - query = prettyPrint(toJson([size: 1, query: [mlt: [fields: ["msg"], like: 1]]])) - } - final SearchResponse response = service.search(query, INDEX, type, null) - assertTrue(!response.warnings.isEmpty(), "Missing warnings") - } - - @Test - void testScroll() { - final String query = prettyPrint(toJson([ - size: 2, - query: [ match_all: [:] ], - aggs: [ term_counts: [ terms: [ field: "msg", size: 5 ] ] ] - ])) - - // initiate the scroll - final SearchResponse response = service.search(query, INDEX, TYPE, Collections.singletonMap("scroll", "10s")) - assertNotNull(response, "Response was null") - - assertEquals(15, response.numberOfHits, "Wrong count") - assertFalse(response.isTimedOut(), "Timed out") - assertNotNull(response.getHits(), "Hits was null") - assertEquals(2, response.hits.size(), "Wrong number of hits") - assertNotNull(response.aggregations, "Aggregations are missing") - assertEquals(1, response.aggregations.size(), "Aggregation count is wrong") - assertNotNull(response.scrollId, "ScrollId missing") - assertNull(response.searchAfter, "Unexpected Search_After") - assertNull(response.pitId, "Unexpected pitId") - - final Map termCounts = response.aggregations.get("term_counts") as Map - assertNotNull(termCounts, "Term counts was missing") - assertEquals(5, (termCounts.get("buckets") as List).size(), "Buckets count is wrong") - - // scroll the next page - final SearchResponse scrollResponse = service.scroll(prettyPrint((toJson([scroll_id: response.scrollId, scroll: "10s"])))) - assertNotNull(scrollResponse, "Scroll Response was null") - - assertEquals(15, scrollResponse.numberOfHits, "Wrong count") - assertFalse(scrollResponse.isTimedOut(), "Timed out") - assertNotNull(scrollResponse.getHits(), "Hits was null") - assertEquals(2, scrollResponse.hits.size(), "Wrong number of hits") - assertNotNull(scrollResponse.aggregations, "Aggregations missing") - assertEquals(0, scrollResponse.aggregations.size(), "Aggregation count is wrong") - assertNotNull(scrollResponse.scrollId, "ScrollId missing") - assertNull(scrollResponse.searchAfter, "Unexpected Search_After") - assertNull(scrollResponse.pitId, "Unexpected pitId") - - assertNotEquals(scrollResponse.hits, response.hits, () -> "Same results") - - // delete the scroll - DeleteOperationResponse deleteResponse = service.deleteScroll(scrollResponse.scrollId) - assertNotNull(deleteResponse, "Delete Response was null") - assertTrue(deleteResponse.took > 0) - - // delete scroll again (should now be unknown but the 404 caught and ignored) - deleteResponse = service.deleteScroll(scrollResponse.scrollId) - assertNotNull(deleteResponse, "Delete Response was null") - assertEquals(0L, deleteResponse.took) - } - - @Test - void testSearchAfter() { - final Map queryMap = [ - size: 2, - query: [ match_all: [:] ], - aggs: [ term_counts: [ terms: [ field: "msg", size: 5 ] ] ], - sort: [[ msg: "desc" ]] - ] - final String query = prettyPrint(toJson(queryMap)) - - // search first page - final SearchResponse response = service.search(query, INDEX, TYPE, null) - assertNotNull(response, "Response was null") - - assertEquals(15, response.numberOfHits, "Wrong count") - assertFalse(response.isTimedOut(), "Timed out") - assertNotNull(response.getHits(), "Hits was null") - assertEquals(2, response.hits.size(), "Wrong number of hits") - assertNotNull(response.aggregations, "Aggregations missing") - assertEquals(1, response.aggregations.size(), "Aggregation count is wrong") - assertNull(response.scrollId, "Unexpected ScrollId") - assertNotNull(response.searchAfter, "Search_After missing") - assertNull(response.pitId, "Unexpected pitId") - - final Map termCounts = response.aggregations.get("term_counts") as Map - assertNotNull(termCounts, "Term counts was missing") - assertEquals(5, (termCounts.get("buckets") as List).size(), "Buckets count is wrong") - - // search the next page - queryMap.search_after = new JsonSlurper().parseText(response.searchAfter) as Serializable - queryMap.remove("aggs") - final String secondPage = prettyPrint(toJson(queryMap)) - final SearchResponse secondResponse = service.search(secondPage, INDEX, TYPE, null) - assertNotNull(secondResponse, "Second Response was null") - - assertEquals(15, secondResponse.numberOfHits, "Wrong count") - assertFalse(secondResponse.isTimedOut(), "Timed out") - assertNotNull(secondResponse.getHits(), "Hits was null") - assertEquals(2, secondResponse.hits.size(), "Wrong number of hits") - assertNotNull(secondResponse.aggregations, "Aggregations missing") - assertEquals(0, secondResponse.aggregations.size(), "Aggregation count is wrong") - assertNull(secondResponse.scrollId, "Unexpected ScrollId") - assertNotNull(secondResponse.searchAfter, "Unexpected Search_After") - assertNull(secondResponse.pitId, "Unexpected pitId") - - assertNotEquals(secondResponse.hits, response.hits, "Same results") - } - - @Test - void testPointInTime() { - // Point in Time only available in 7.10+ with XPack enabled - assumeTrue(VERSION >= ES_7_10, "Requires version 7.10+") - assumeTrue(FLAVOUR == DEFAULT, "Requires XPack features") - - // initialise - final String pitId = service.initialisePointInTime(INDEX, "10s") - - final Map queryMap = [ - size: 2, - query: [ match_all: [:] ], - aggs: [ term_counts: [ terms: [ field: "msg", size: 5 ] ] ], - sort: [[ msg: "desc" ]], - pit: [ id: pitId, keep_alive: "10s" ] - ] - final String query = prettyPrint(toJson(queryMap)) - - // search first page - final SearchResponse response = service.search(query, null, TYPE, null) - assertNotNull(response, "Response was null") - - assertEquals(15, response.numberOfHits, "Wrong count") - assertFalse(response.isTimedOut(), "Timed out") - assertNotNull(response.getHits(), "Hits was null") - assertEquals(2, response.hits.size(), "Wrong number of hits") - assertNotNull(response.aggregations, "Aggregations missing") - assertEquals(1, response.aggregations.size(), "Aggregation count is wrong") - assertNull(response.scrollId, "Unexpected ScrollId") - assertNotNull(response.searchAfter, "Unexpected Search_After") - assertNotNull(response.pitId, "pitId missing") - - final Map termCounts = response.aggregations.get("term_counts") as Map - assertNotNull(termCounts, "Term counts was missing") - assertEquals(5, (termCounts.get("buckets") as List).size(), "Buckets count is wrong") - - // search the next page - queryMap.search_after = new JsonSlurper().parseText(response.searchAfter) as Serializable - queryMap.remove("aggs") - final String secondPage = prettyPrint(toJson(queryMap)) - final SearchResponse secondResponse = service.search(secondPage, null, TYPE, null) - assertNotNull(secondResponse, "Second Response was null") - - assertEquals(15, secondResponse.numberOfHits, "Wrong count") - assertFalse(secondResponse.isTimedOut(), "Timed out") - assertNotNull(secondResponse.getHits(), "Hits was null") - assertEquals(2, secondResponse.hits.size(), "Wrong number of hits") - assertNotNull(secondResponse.aggregations, "Aggregations missing") - assertEquals(0, secondResponse.aggregations.size(), "Aggregation count is wrong") - assertNull(secondResponse.scrollId, "Unexpected ScrollId") - assertNotNull(secondResponse.searchAfter, "Unexpected Search_After") - assertNotNull(secondResponse.pitId, "pitId missing") - - assertNotEquals(secondResponse.hits, response.hits, "Same results") - - // delete pitId - DeleteOperationResponse deleteResponse = service.deletePointInTime(pitId) - assertNotNull(deleteResponse, "Delete Response was null") - assertTrue(deleteResponse.took > 0) - - // delete pitId again (should now be unknown but the 404 caught and ignored) - deleteResponse = service.deletePointInTime(pitId) - assertNotNull(deleteResponse, "Delete Response was null") - assertEquals(0L, deleteResponse.took) - } - - @Test - void testDeleteByQuery() throws Exception { - String query = prettyPrint(toJson([ - query: [ - match: [ - msg: "five" - ] - ] - ])) - DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, null) - assertNotNull(response) - assertTrue(response.getTook() > 0) - } - - @Test - void testDeleteByQueryRequestParameters() throws Exception { - String query = prettyPrint(toJson([ - query: [ - match: [ - msg: "six" - ] - ] - ])) - DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, [refresh: "true"]) - assertNotNull(response) - assertTrue(response.getTook() > 0) - } - - @Test - void testUpdateByQuery() throws Exception { - String query = prettyPrint(toJson([ - query: [ - match: [ - msg: "four" - ] - ] - ])) - UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, null) - assertNotNull(response) - assertTrue(response.getTook() > 0) - } - - @Test - void testUpdateByQueryRequestParameters() throws Exception { - String query = prettyPrint(toJson([ - query: [ - match: [ - msg: "four" - ] - ] - ])) - UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, [refresh: "true", slices: "1"]) - assertNotNull(response) - assertTrue(response.getTook() > 0) - } - - @Test - void testDeleteById() throws Exception { - final String ID = "1" - final def originalDoc = service.get(INDEX, TYPE, ID, null) - try { - DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID, null) - assertNotNull(response) - assertTrue(response.getTook() > 0) - final ElasticsearchException ee = assertThrows(ElasticsearchException.class, { -> - service.get(INDEX, TYPE, ID, null) }) - assertTrue(ee.isNotFound()) - final def doc = service.get(INDEX, TYPE, "2", null) - assertNotNull(doc) - } finally { - // replace the deleted doc - service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index), null) - waitForIndexRefresh() // (affects later tests using _search or _bulk) - } - } - - @Test - void testGet() { - Map old - 1.upto(15) { index -> - String id = String.valueOf(index) - def doc = service.get(INDEX, TYPE, id, null) - assertNotNull(doc, "Doc was null") - assertNotNull(doc.get("msg"), "${doc.toString()}\t${doc.keySet().toString()}") - old = doc - } - } - - @Test - void testGetNotFound() { - final ElasticsearchException ee = assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE, "not_found", null) }) - assertTrue(ee.isNotFound()) - } - - @Test - void testNullSuppression() { - Map doc = new HashMap(){{ - put("msg", "test") - put("is_null", null) - put("is_empty", "") - put("is_blank", " ") - put("empty_nested", Collections.emptyMap()) - put("empty_array", Collections.emptyList()) - }} - - // index with nulls - suppressNulls(false) - IndexOperationResponse response = service.bulk([new IndexOperationRequest("nulls", TYPE, "1", doc, IndexOperationRequest.Operation.Index)], null) - assertNotNull(response) - assertTrue(response.getTook() > 0) - waitForIndexRefresh() - - Map result = service.get("nulls", TYPE, "1", null) - assertEquals(doc, result) - - // suppress nulls - suppressNulls(true) - response = service.bulk([new IndexOperationRequest("nulls", TYPE, "2", doc, IndexOperationRequest.Operation.Index)], null) - assertNotNull(response) - assertTrue(response.getTook() > 0) - waitForIndexRefresh() - - result = service.get("nulls", TYPE, "2", null) - assertTrue(result.keySet().containsAll(["msg", "is_blank"]), "Non-nulls (present): " + result.toString()) - assertFalse(result.keySet().contains("is_null"), "is_null (should be omitted): " + result.toString()) - assertFalse(result.keySet().contains("is_empty"), "is_empty (should be omitted): " + result.toString()) - assertFalse(result.keySet().contains("empty_nested"), "empty_nested (should be omitted): " + result.toString()) - assertFalse(result.keySet().contains("empty_array"), "empty_array (should be omitted): " + result.toString()) - } - - private void suppressNulls(final boolean suppressNulls) { - runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service") - runner.disableControllerService(service) - runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, suppressNulls ? ElasticSearchClientService.ALWAYS_SUPPRESS.getValue() : ElasticSearchClientService.NEVER_SUPPRESS.getValue()) - runner.enableControllerService(service) - runner.assertValid() - } - - @Test - void testBulkAddTwoIndexes() throws Exception { - List payload = new ArrayList<>() - for (int x = 0; x < 20; x++) { - String index = x % 2 == 0 ? "bulk_a": "bulk_b" - payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap(){{ - put("msg", "test") - }}, IndexOperationRequest.Operation.Index)) - } - for (int x = 0; x < 5; x++) { - payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap(){{ - put("msg", "test") - }}, IndexOperationRequest.Operation.Index)) - } - IndexOperationResponse response = service.bulk(payload, [refresh: "true"]) - assertNotNull(response) - assertTrue(response.getTook() > 0) - waitForIndexRefresh() - - /* - * Now, check to ensure that both indexes got populated appropriately. - */ - String query = "{ \"query\": { \"match_all\": {}}}" - Long indexA = service.count(query, "bulk_a", TYPE, null) - Long indexB = service.count(query, "bulk_b", TYPE, null) - Long indexC = service.count(query, "bulk_c", TYPE, null) - - assertNotNull(indexA) - assertNotNull(indexB) - assertNotNull(indexC) - assertEquals(indexA, indexB) - assertEquals(10, indexA.intValue()) - assertEquals(10, indexB.intValue()) - assertEquals(5, indexC.intValue()) - - Long total = service.count(query, "bulk_*", TYPE, null) - assertNotNull(total) - assertEquals(25, total.intValue()) - } - - @Test - void testBulkRequestParameters() throws Exception { - List payload = new ArrayList<>() - for (int x = 0; x < 20; x++) { - String index = x % 2 == 0 ? "bulk_a": "bulk_b" - payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap(){{ - put("msg", "test") - }}, IndexOperationRequest.Operation.Index)) - } - for (int x = 0; x < 5; x++) { - payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap(){{ - put("msg", "test") - }}, IndexOperationRequest.Operation.Index)) - } - IndexOperationResponse response = service.bulk(payload, [refresh: "true"]) - assertNotNull(response) - assertTrue(response.getTook() > 0) - - /* - * Now, check to ensure that both indexes got populated and refreshed appropriately. - */ - String query = "{ \"query\": { \"match_all\": {}}}" - Long indexA = service.count(query, "bulk_a", TYPE, null) - Long indexB = service.count(query, "bulk_b", TYPE, null) - Long indexC = service.count(query, "bulk_c", TYPE, null) - - assertNotNull(indexA) - assertNotNull(indexB) - assertNotNull(indexC) - assertEquals(indexA, indexB) - assertEquals(10, indexA.intValue()) - assertEquals(10, indexB.intValue()) - assertEquals(5, indexC.intValue()) - - Long total = service.count(query, "bulk_*", TYPE, null) - assertNotNull(total) - assertEquals(25, total.intValue()) - } - - @Test - void testUpdateAndUpsert() { - final String TEST_ID = "update-test" - Map doc = new HashMap<>() - doc.put("msg", "Buongiorno, mondo") - service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index), [refresh: "true"]) - Map result = service.get(INDEX, TYPE, TEST_ID, null) - assertEquals(doc, result, "Not the same") - - Map updates = new HashMap<>() - updates.put("from", "john.smith") - Map merged = new HashMap<>() - merged.putAll(updates) - merged.putAll(doc) - IndexOperationRequest request = new IndexOperationRequest(INDEX, TYPE, TEST_ID, updates, IndexOperationRequest.Operation.Update) - service.add(request, [refresh: "true"]) - result = service.get(INDEX, TYPE, TEST_ID, null) - assertTrue(result.containsKey("from")) - assertTrue(result.containsKey("msg")) - assertEquals(merged, result, "Not the same after update.") - - final String UPSERTED_ID = "upsert-ftw" - Map upsertItems = new HashMap<>() - upsertItems.put("upsert_1", "hello") - upsertItems.put("upsert_2", 1) - upsertItems.put("upsert_3", true) - request = new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert) - service.add(request, [refresh: "true"]) - result = service.get(INDEX, TYPE, UPSERTED_ID, null) - assertEquals(upsertItems, result) - - List deletes = new ArrayList<>() - deletes.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, null, IndexOperationRequest.Operation.Delete)) - deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete)) - assertFalse(service.bulk(deletes, [refresh: "true"]).hasErrors()) - waitForIndexRefresh() // wait 1s for index refresh (doesn't prevent GET but affects later tests using _search or _bulk) - ElasticsearchException ee = assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE, TEST_ID, null) }) - assertTrue(ee.isNotFound()) - ee = assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE, UPSERTED_ID, null) }) - assertTrue(ee.isNotFound()) - } - - @Test - void testGetBulkResponsesWithErrors() { - def ops = [ - new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: 1], IndexOperationRequest.Operation.Index), // OK - new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "two", intField: 1], IndexOperationRequest.Operation.Create), // already exists - new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: "notaninteger"], IndexOperationRequest.Operation.Index) // can't parse int field - ] - def response = service.bulk(ops, [refresh: "true"]) - assert response.hasErrors() - assert response.items.findAll { - def key = it.keySet().stream().findFirst().get() - (it[key] as Map).containsKey("error") - }.size() == 2 - } - - private static void waitForIndexRefresh() { - Thread.sleep(1000) - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy deleted file mode 100644 index de3e6ccfa8e8..000000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License") you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.elasticsearch.integration - -import org.apache.nifi.elasticsearch.ElasticSearchClientService -import org.apache.nifi.elasticsearch.ElasticSearchLookupService -import org.apache.nifi.schema.access.SchemaAccessUtils -import org.apache.nifi.serialization.record.MapRecord -import org.apache.nifi.util.TestRunner -import org.apache.nifi.util.TestRunners -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test - -import static org.junit.jupiter.api.Assertions.assertEquals -import static org.junit.jupiter.api.Assertions.assertNotNull -import static org.junit.jupiter.api.Assertions.assertTrue - -class ElasticSearchLookupServiceTest { - ElasticSearchClientService mockClientService - ElasticSearchLookupService lookupService - TestRunner runner - - @BeforeEach - void setup() throws Exception { - mockClientService = new TestElasticSearchClientService() - lookupService = new ElasticSearchLookupService() - def registry = new TestSchemaRegistry() - runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class) - runner.addControllerService("clientService", mockClientService) - runner.addControllerService("lookupService", lookupService) - runner.addControllerService("registry", registry) - runner.enableControllerService(mockClientService) - runner.enableControllerService(registry) - runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "clientService") - runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "users") - runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "clientService") - runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "lookupService") - runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_REGISTRY, "registry") - runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INFER_SCHEMA) - runner.enableControllerService(lookupService) - } - - @Test - void simpleLookupTest() throws Exception { - def coordinates = ["_id": "12345" ] - - Optional result = lookupService.lookup(coordinates) as Optional - - assertNotNull(result) - assertTrue(result.isPresent()) - MapRecord record = result.get() - assertEquals("john.smith", record.getAsString("username")) - assertEquals("testing1234", record.getAsString("password")) - assertEquals("john.smith@test.com", record.getAsString("email")) - assertEquals("Software Engineer", record.getAsString("position")) - } -} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy deleted file mode 100644 index c2ea2e257a46..000000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License") you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.elasticsearch.integration - -import org.apache.nifi.elasticsearch.ElasticSearchClientService -import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl -import org.apache.nifi.elasticsearch.ElasticSearchLookupService -import org.apache.nifi.lookup.LookupFailureException -import org.apache.nifi.record.path.RecordPath -import org.apache.nifi.schema.access.SchemaAccessUtils -import org.apache.nifi.schemaregistry.services.SchemaRegistry -import org.apache.nifi.serialization.record.MapRecord -import org.apache.nifi.serialization.record.Record -import org.apache.nifi.serialization.record.RecordSchema -import org.apache.nifi.serialization.record.type.RecordDataType -import org.apache.nifi.util.TestRunner -import org.apache.nifi.util.TestRunners -import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test - -import static org.junit.jupiter.api.Assertions.assertEquals -import static org.junit.jupiter.api.Assertions.assertNotNull -import static org.junit.jupiter.api.Assertions.assertTrue -import static org.junit.jupiter.api.Assumptions.assumeTrue - -class ElasticSearchLookupService_IT { - private TestRunner runner - private ElasticSearchClientService service - private ElasticSearchLookupService lookupService - - @BeforeAll - static void beforeAll() throws Exception { - assumeTrue(ElasticSearchClientService_IT.isElasticsearchSetup(), "Elasticsearch integration-tests not setup") - - System.out.println( - String.format("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nVERSION: %s%nFLAVOUR %s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n", - ElasticSearchClientService_IT.TYPE, ElasticSearchClientService_IT.VERSION, ElasticSearchClientService_IT.FLAVOUR) - ) - } - - @BeforeEach - void before() throws Exception { - runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class) - service = new ElasticSearchClientServiceImpl() - lookupService = new ElasticSearchLookupService() - runner.addControllerService("Client Service", service) - runner.addControllerService("Lookup Service", lookupService) - runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400") - runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000") - runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000") - runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service") - runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service") - runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service") - runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "user_details") - setTypeOnLookupService() - - try { - runner.enableControllerService(service) - runner.enableControllerService(lookupService) - } catch (Exception ex) { - ex.printStackTrace() - throw ex - } - } - - void setTypeOnLookupService() { - if (ElasticSearchClientService_IT.TYPE != null) { - runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, ElasticSearchClientService_IT.TYPE) - } else { - runner.removeProperty(lookupService, ElasticSearchLookupService.TYPE) - } - } - - @Test - void testValidity() throws Exception { - setDefaultSchema() - runner.assertValid() - } - - private void setDefaultSchema() throws Exception { - runner.disableControllerService(lookupService) - SchemaRegistry registry = new TestSchemaRegistry() - runner.addControllerService("registry", registry) - runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_REGISTRY, "registry") - runner.enableControllerService(registry) - runner.enableControllerService(lookupService) - } - - @Test - void lookupById() { - def coordinates = [ _id: "2" ] - Optional result = lookupService.lookup(coordinates) - - assertNotNull(result) - assertTrue(result.isPresent()) - def record = result.get() - assertEquals("jane.doe@company.com", record.getAsString("email")) - assertEquals("098-765-4321", record.getAsString("phone")) - assertEquals("GHIJK", record.getAsString("accessKey")) - } - - @Test - void testInvalidIdScenarios() { - def coordinates = [ - [ - _id: 1 - ], - [ - _id: "1", "email": "john.smith@company.com" - ] - ] - - coordinates.each { coordinate -> - def exception = null - - try { - lookupService.lookup(coordinate) - } catch (Exception ex) { - exception = ex - } - - assertNotNull(exception) - assertTrue(exception instanceof LookupFailureException) - } - } - - @Test - void lookupByQuery() { - def coordinates = [ "phone": "098-765-4321", "email": "jane.doe@company.com" ] - Optional result = lookupService.lookup(coordinates) - - assertNotNull(result) - assertTrue(result.isPresent()) - def record = result.get() - assertEquals("jane.doe@company.com", record.getAsString("email")) - assertEquals("098-765-4321", record.getAsString("phone")) - assertEquals("GHIJK", record.getAsString("accessKey")) - } - - @Test - void testNestedSchema() { - def coordinates = [ - "subField.deeper.deepest.super_secret": "The sky is blue", - "subField.deeper.secretz": "Buongiorno, mondo!!", - "msg": "Hello, world" - ] - - runner.disableControllerService(lookupService) - runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested") - setTypeOnLookupService() - runner.enableControllerService(lookupService) - - Optional response = lookupService.lookup(coordinates) - assertNotNull(response) - assertTrue(response.isPresent()) - def rec = response.get() - assertEquals("Hello, world", rec.getValue("msg")) - def subRec = getSubRecord(rec, "subField") - assertNotNull(subRec) - def deeper = getSubRecord(subRec, "deeper") - assertNotNull(deeper) - def deepest = getSubRecord(deeper, "deepest") - assertNotNull(deepest) - assertEquals("The sky is blue", deepest.getAsString("super_secret")) - } - - @Test - void testDetectedSchema() throws LookupFailureException { - runner.disableControllerService(lookupService) - runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "complex") - setTypeOnLookupService() - runner.enableControllerService(lookupService) - def coordinates = ["_id": "1" ] - - Optional response = lookupService.lookup(coordinates) - assertNotNull(response) - Record rec = response.get() - Record subRec = getSubRecord(rec, "subField") - - def r2 = new MapRecord(rec.schema, [:]) - def path = RecordPath.compile("/subField/longField") - def result = path.evaluate(r2) - result.selectedFields.findFirst().get().updateValue(1234567890L) - - assertNotNull(rec) - assertNotNull(subRec) - assertEquals("Hello, world", rec.getValue("msg")) - assertNotNull(rec.getValue("subField")) - assertEquals(new Long(100000), subRec.getValue("longField")) - assertEquals("2018-04-10T12:18:05Z", subRec.getValue("dateField")) - } - - static Record getSubRecord(Record rec, String fieldName) { - RecordSchema schema = rec.schema - RecordSchema subSchema = ((RecordDataType)schema.getField(fieldName).get().dataType).childSchema - rec.getAsRecord(fieldName, subSchema) - } - - @Test - void testMappings() { - runner.disableControllerService(lookupService) - runner.setProperty(lookupService, "\$.subField.longField", "/longField2") - runner.setProperty(lookupService, '$.subField.dateField', '/dateField2') - runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested") - setTypeOnLookupService() - runner.enableControllerService(lookupService) - - def coordinates = ["msg": "Hello, world"] - def result = lookupService.lookup(coordinates) - assertTrue(result.isPresent()) - def rec = result.get() - ["dateField2": "2018-08-14T10:08:00Z", "longField2": 150000L].each { field -> - def value = rec.getValue(field.key) - assertEquals(field.value, value) - } - } -} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchStringLookupServiceTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchStringLookupServiceTest.groovy deleted file mode 100644 index 318d1b462f0a..000000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchStringLookupServiceTest.groovy +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License") you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.elasticsearch.integration - -import org.apache.nifi.elasticsearch.ElasticSearchClientService -import org.apache.nifi.elasticsearch.ElasticSearchStringLookupService -import org.apache.nifi.util.TestRunner -import org.apache.nifi.util.TestRunners -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test - -import static org.junit.jupiter.api.Assertions.assertEquals -import static org.junit.jupiter.api.Assertions.assertNotNull -import static org.junit.jupiter.api.Assertions.assertTrue - -class ElasticSearchStringLookupServiceTest { - ElasticSearchClientService mockClientService - ElasticSearchStringLookupService lookupService - TestRunner runner - - @BeforeEach - void setup() throws Exception { - mockClientService = new TestElasticSearchClientService() - lookupService = new ElasticSearchStringLookupService() - runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class) - runner.addControllerService("clientService", mockClientService) - runner.addControllerService("lookupService", lookupService) - runner.enableControllerService(mockClientService) - runner.setProperty(lookupService, ElasticSearchStringLookupService.CLIENT_SERVICE, "clientService") - runner.setProperty(lookupService, ElasticSearchStringLookupService.INDEX, "users") - runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "clientService") - runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "lookupService") - runner.enableControllerService(lookupService) - } - - @Test - void simpleLookupTest() throws Exception { - def coordinates = [ (ElasticSearchStringLookupService.ID): "12345" ] - - Optional result = lookupService.lookup(coordinates) - - assertNotNull(result) - assertTrue(result.isPresent()) - String json = result.get() - assertEquals('{"username":"john.smith","password":"testing1234","email":"john.smith@test.com","position":"Software Engineer"}', json) - } -} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy deleted file mode 100644 index 4e0f42546b8e..000000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License") you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.elasticsearch.integration - -import org.apache.nifi.controller.AbstractControllerService -import org.apache.nifi.elasticsearch.DeleteOperationResponse -import org.apache.nifi.elasticsearch.ElasticSearchClientService -import org.apache.nifi.elasticsearch.IndexOperationRequest -import org.apache.nifi.elasticsearch.IndexOperationResponse -import org.apache.nifi.elasticsearch.SearchResponse -import org.apache.nifi.elasticsearch.UpdateOperationResponse - -class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService { - Map data = [ - "username": "john.smith", - "password": "testing1234", - "email": "john.smith@test.com", - "position": "Software Engineer" - ] - - @Override - IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters) { - return null - } - - @Override - IndexOperationResponse bulk(List operations, Map requestParameters) { - return null - } - - @Override - Long count(String query, String index, String type, Map requestParameters) { - return null - } - - @Override - DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters) { - return null - } - - @Override - DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters) { - return null - } - - @Override - DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters) { - return null - } - - @Override - UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters) { - return null - } - - @Override - void refresh(final String index, final Map requestParameters) { - } - - @Override - Map get(String index, String type, String id, Map requestParameters) { - return data - } - - @Override - SearchResponse search(String query, String index, String type, Map requestParameters) { - List hits = [[ - "_source": data - ]] - return new SearchResponse(hits, null, null, null, null, 1, 100, false, null) - } - - @Override - SearchResponse scroll(String scroll) { - return search(null, null, null, null) - } - - @Override - String initialisePointInTime(String index, String keepAlive) { - return null - } - - @Override - DeleteOperationResponse deletePointInTime(String pitId) { - return null - } - - @Override - DeleteOperationResponse deleteScroll(String scrollId) { - return null - } - - @Override - String getTransitUrl(String index, String type) { - return "" - } -} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestSchemaRegistry.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestSchemaRegistry.groovy deleted file mode 100644 index 2a0bd8eca5b3..000000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestSchemaRegistry.groovy +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License") you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.elasticsearch.integration - -import org.apache.nifi.controller.AbstractControllerService -import org.apache.nifi.schema.access.SchemaField -import org.apache.nifi.schemaregistry.services.SchemaRegistry -import org.apache.nifi.serialization.SimpleRecordSchema -import org.apache.nifi.serialization.record.RecordField -import org.apache.nifi.serialization.record.RecordFieldType -import org.apache.nifi.serialization.record.RecordSchema -import org.apache.nifi.serialization.record.SchemaIdentifier - -class TestSchemaRegistry extends AbstractControllerService implements SchemaRegistry { - @Override - RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) { - new SimpleRecordSchema([ - new RecordField("msg", RecordFieldType.STRING.dataType) - ]) - } - - @Override - Set getSuppliedSchemaFields() { - [ SchemaField.SCHEMA_NAME ] - } -} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/.gitignore b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/.gitignore deleted file mode 100644 index 00aca0c29684..000000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/.gitignore +++ /dev/null @@ -1 +0,0 @@ -# This is a placeholder to force Maven to compile the groovy code. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java new file mode 100644 index 000000000000..9f99ec0da874 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SearchResponseTest { + @Test + void test() { + List> results = new ArrayList<>(); + Map aggs = new HashMap<>(); + String pitId = "pitId"; + String scrollId = "scrollId"; + String searchAfter = "searchAfter"; + int num = 10; + int took = 100; + boolean timeout = false; + List warnings = Arrays.asList("auth"); + SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings); + String str = response.toString(); + + assertEquals(results, response.getHits()); + assertEquals(aggs, response.getAggregations()); + assertEquals(pitId, response.getPitId()); + assertEquals(scrollId, response.getScrollId()); + assertEquals(num, response.getNumberOfHits()); + assertEquals(took, response.getTook()); + assertEquals(timeout, response.isTimedOut()); + assertEquals(warnings, response.getWarnings()); + assertTrue(str.contains("aggregations")); + assertTrue(str.contains("hits")); + assertTrue(str.contains("pitId")); + assertTrue(str.contains("scrollId")); + assertTrue(str.contains("searchAfter")); + assertTrue(str.contains("numberOfHits")); + assertTrue(str.contains("took")); + assertTrue(str.contains("timedOut")); + assertTrue(str.contains("warnings")); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestControllerServiceProcessor.java similarity index 68% rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.groovy rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestControllerServiceProcessor.java index 75e9dbd1e06c..d4f166fd93a1 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestControllerServiceProcessor.java @@ -15,37 +15,38 @@ * limitations under the License. */ -package org.apache.nifi.elasticsearch.integration +package org.apache.nifi.elasticsearch; -import org.apache.nifi.components.PropertyDescriptor -import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl -import org.apache.nifi.elasticsearch.ElasticSearchLookupService -import org.apache.nifi.processor.AbstractProcessor -import org.apache.nifi.processor.ProcessContext -import org.apache.nifi.processor.ProcessSession -import org.apache.nifi.processor.exception.ProcessException +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; -class TestControllerServiceProcessor extends AbstractProcessor { +import java.util.Arrays; +import java.util.List; + +public class TestControllerServiceProcessor extends AbstractProcessor { public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() .name("Client Service") .description("ElasticSearchClientServiceImpl") .identifiesControllerService(ElasticSearchClientServiceImpl.class) .required(true) - .build() + .build(); public static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder() .name("Lookup Service") .description("ElasticSearchClientServiceImpl") .identifiesControllerService(ElasticSearchLookupService.class) .required(false) - .build() + .build(); @Override - void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { } @Override protected List getSupportedPropertyDescriptors() { - [ CLIENT_SERVICE, LOOKUP_SERVICE ] + return Arrays.asList(CLIENT_SERVICE, LOOKUP_SERVICE); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java new file mode 100644 index 000000000000..01dc128ca3d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import org.apache.nifi.controller.AbstractControllerService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService { + private Map data; + + public TestElasticSearchClientService() { + data = new HashMap<>(); + data.put("username", "john.smith"); + data.put("password", "testing1234"); + data.put("email", "john.smith@test.com"); + data.put("position", "Software Engineer"); + } + + @Override + public IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters) { + return null; + } + + @Override + public IndexOperationResponse bulk(List operations, Map requestParameters) { + return null; + } + + @Override + public Long count(String query, String index, String type, Map requestParameters) { + return null; + } + + @Override + public DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters) { + return null; + } + + @Override + public DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters) { + return null; + } + + @Override + public DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters) { + return null; + } + + @Override + public UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters) { + return null; + } + + @Override + public void refresh(final String index, final Map requestParameters) { + } + + @Override + public Map get(String index, String type, String id, Map requestParameters) { + return data; + } + + @Override + public SearchResponse search(String query, String index, String type, Map requestParameters) { + List> hits = new ArrayList<>(); + Map source = new HashMap<>(); + source.put("_source", data); + hits.add(source); + return new SearchResponse(hits, null, null, null, null, + 1, 100, false, null); + } + + @Override + public SearchResponse scroll(String scroll) { + return search(null, null, null, null); + } + + @Override + public String initialisePointInTime(String index, String keepAlive) { + return null; + } + + @Override + public DeleteOperationResponse deletePointInTime(String pitId) { + return null; + } + + @Override + public DeleteOperationResponse deleteScroll(String scrollId) { + return null; + } + + @Override + public String getTransitUrl(String index, String type) { + return ""; + } + + public Map getData() { + return data; + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java new file mode 100644 index 000000000000..813ce1c5d2a6 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.nifi.schema.access.SchemaField.SCHEMA_NAME; + +public class TestSchemaRegistry extends AbstractControllerService implements SchemaRegistry { + @Override + public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) { + List fields = Arrays.asList(new RecordField("msg", RecordFieldType.STRING.getDataType())); + return new SimpleRecordSchema(fields); + } + + @Override + public Set getSuppliedSchemaFields() { + return new HashSet<>(Arrays.asList(SCHEMA_NAME)); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java new file mode 100644 index 000000000000..476f9441773a --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.elasticsearch.integration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.http.auth.AuthScope.ANY; + +public abstract class AbstractElasticsearch_IT { + protected static final DockerImageName IMAGE = DockerImageName + .parse(System.getProperty("elasticsearch.docker.image")); + protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE) + .withPassword(System.getProperty("elasticsearch.elastic_user.password")) + .withEnv("xpack.security.enabled", "true"); + + protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password"); + protected static final boolean ENABLE_TEST_CONTAINERS = System.getProperty("elasticsearch.testcontainers.enabled") + != null && System.getProperty("elasticsearch.testcontainers.enabled").equalsIgnoreCase("true"); + protected static String ELASTIC_HOST; + + protected static final ObjectMapper MAPPER = new ObjectMapper(); + + public static void startTestcontainer() { + if (ENABLE_TEST_CONTAINERS) { + ELASTICSEARCH_CONTAINER.start(); + ELASTIC_HOST = String.format("http://%s", ELASTICSEARCH_CONTAINER.getHttpHostAddress()); + } else { + ELASTIC_HOST = System.getProperty("elasticsearch.endpoint"); + } + } + + public static void stopTestcontainer() { + if (ENABLE_TEST_CONTAINERS) { + ELASTICSEARCH_CONTAINER.stop(); + } + } + + private static String[] getElasticVersion() { + String fullVersion = IMAGE.getVersionPart(); + String[] parts = fullVersion.split("\\."); + if (parts.length == 1) { + throw new RuntimeException("The elasticsearch version should have at least a major and minor version ex. 7.17"); + } + + return parts; + } + + protected static int getElasticMajorVersion() { + return Integer.valueOf(getElasticVersion()[0]); + } + + protected static int getElasticMinorVersion() { + return Integer.valueOf(getElasticVersion()[1]); + } + + private static RestClient testDataManagementClient; + + protected static void setupTestData() throws IOException { + int majorVersion = getElasticMajorVersion(); + URL url = new URL(ELASTIC_HOST); + testDataManagementClient = RestClient + .builder(new HttpHost(url.getHost(), url.getPort(), url.getProtocol())) + .setHttpClientConfigCallback(httpClientBuilder -> { + UsernamePasswordCredentials credentials = new UsernamePasswordCredentials("elastic", ELASTIC_USER_PASSWORD); + BasicCredentialsProvider provider = new BasicCredentialsProvider(); + provider.setCredentials(ANY, credentials); + httpClientBuilder.setDefaultCredentialsProvider(provider); + return httpClientBuilder; + }) + .build(); + String script = String.format("src/test/resources/setup-%s.script", majorVersion); + + List actions = readSetupActions(script); + + for (SetupAction action : actions) { + String endpoint = String.format("%s/%s", ELASTIC_HOST, action.path); + Request request = new Request(action.verb, endpoint); + HttpEntity jsonBody = new NStringEntity(action.json, ContentType.APPLICATION_JSON); + request.setEntity(jsonBody); + + try { + testDataManagementClient.performRequest(request); + } catch (ResponseException re) { + throw new RuntimeException(re); + } + } + } + + protected static void tearDownTestData() throws IOException { + deleteIndex("user_details"); + deleteIndex("complex"); + deleteIndex("nested"); + deleteIndex("bulk_a"); + deleteIndex("bulk_b"); + deleteIndex("bulk_c"); + deleteIndex("error_handler"); + deleteIndex("messages"); + } + + private static void deleteIndex(String name) throws IOException { + Request request = new Request("DELETE", String.format("%s/%s", ELASTIC_HOST, name)); + testDataManagementClient.performRequest(request); + } + + private static List readSetupActions(String scriptPath) throws IOException { + List actions = new ArrayList<>(); + BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(scriptPath))); + String line = reader.readLine(); + while (line != null) { + if (!line.trim().isEmpty() && !line.trim().startsWith("#")) { + String verb = line.substring(0, line.indexOf(":")); + String path = line.substring(verb.length() + 1, line.indexOf(":", verb.length() + 1)); + int loc = verb.length() + path.length() + 2; + String json = line.substring(loc); + + actions.add(new SetupAction(verb, path, json)); + } + line = reader.readLine(); + } + + return actions; + } + + private static final class SetupAction { + private String verb; + private String path; + private String json; + + public SetupAction(String verb, String path, String json) { + this.verb = verb; + this.path = path; + this.json = json; + } + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java new file mode 100644 index 000000000000..b136c46c5aa9 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java @@ -0,0 +1,750 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch.integration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.nifi.elasticsearch.DeleteOperationResponse; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl; +import org.apache.nifi.elasticsearch.ElasticsearchException; +import org.apache.nifi.elasticsearch.IndexOperationRequest; +import org.apache.nifi.elasticsearch.IndexOperationResponse; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.elasticsearch.TestControllerServiceProcessor; +import org.apache.nifi.elasticsearch.UpdateOperationResponse; +import org.apache.nifi.security.util.StandardTlsConfiguration; +import org.apache.nifi.security.util.TemporaryKeyStoreBuilder; +import org.apache.nifi.security.util.TlsConfiguration; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +public class ElasticSearchClientService_IT extends AbstractElasticsearch_IT { + private TestRunner runner; + private ElasticSearchClientServiceImpl service; + + static final String INDEX = "messages"; + public static String TYPE; + + private static TlsConfiguration generatedTlsConfiguration; + private static TlsConfiguration truststoreTlsConfiguration; + + @BeforeAll + static void beforeAll() throws IOException { + startTestcontainer(); + TYPE = getElasticMajorVersion() == 6 ? "_doc" : ""; + System.out.println( + String.format("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%n%s%s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n", + TYPE, IMAGE.getRepository(), IMAGE.getVersionPart()) + ); + + generatedTlsConfiguration = new TemporaryKeyStoreBuilder().build(); + truststoreTlsConfiguration = new StandardTlsConfiguration( + null, + null, + null, + generatedTlsConfiguration.getTruststorePath(), + generatedTlsConfiguration.getTruststorePassword(), + generatedTlsConfiguration.getTruststoreType() + ); + + setupTestData(); + } + + @AfterAll + public static void afterAll() throws IOException { + tearDownTestData(); + stopTestcontainer(); + } + + @BeforeEach + void before() throws Exception { + runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); + service = new ElasticSearchClientServiceImpl(); + runner.addControllerService("Client Service", service); + runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, ELASTIC_HOST); + runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000"); + runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000"); + runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000"); + runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue()); + runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic"); + runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD); + + try { + runner.enableControllerService(service); + } catch (Exception ex) { + throw ex; + } + + service.refresh(null, null); + } + + @AfterEach + void after() throws Exception { + service.onDisabled(); + } + + private String prettyJson(Object o) throws JsonProcessingException { + return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o); + } + + private class MapBuilder { + private Map toBuild; + + public MapBuilder() { + toBuild = new HashMap<>(); + } + + public MapBuilder of(String key, Object value) { + toBuild.put(key, value); + return this; + } + + public MapBuilder of(String key, Object value, String key2, Object value2) { + toBuild.put(key, value); + toBuild.put(key2, value2); + return this; + } + + public MapBuilder of(String key, Object value, String key2, Object value2, String key3, Object value3) { + toBuild.put(key, value); + toBuild.put(key2, value2); + toBuild.put(key3, value3); + return this; + } + + public MapBuilder of(String key, Object value, String key2, Object value2, String key3, Object value3, + String key4, Object value4) { + toBuild.put(key, value); + toBuild.put(key2, value2); + toBuild.put(key3, value3); + toBuild.put(key4, value4); + return this; + } + + public MapBuilder of(String key, Object value, String key2, Object value2, String key3, Object value3, + String key4, Object value4, String key5, Object value5) { + toBuild.put(key, value); + toBuild.put(key2, value2); + toBuild.put(key3, value3); + toBuild.put(key4, value4); + toBuild.put(key5, value5); + return this; + } + + public Map build() { + return toBuild; + } + } + + @Test + void testBasicSearch() throws Exception { + Map temp = new MapBuilder() + .of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(), + "aggs", new MapBuilder() + .of("term_counts", new MapBuilder() + .of("terms", new MapBuilder() + .of("field", "msg", "size", 5) + .build()) + .build()) + .build()) + .build(); + String query = prettyJson(temp); + + SearchResponse response = service.search(query, INDEX, TYPE, null); + assertNotNull(response, "Response was null"); + + assertEquals(15, response.getNumberOfHits(), "Wrong count"); + assertFalse(response.isTimedOut(), "Timed out"); + assertNotNull(response.getHits(), "Hits was null"); + assertEquals(10, response.getHits().size(), "Wrong number of hits"); + assertNotNull(response.getAggregations(), "Aggregations are missing"); + assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong"); + assertNull(response.getScrollId(), "Unexpected ScrollId"); + assertNull(response.getSearchAfter(), "Unexpected Search_After"); + assertNull(response.getPitId(), "Unexpected pitId"); + + Map termCounts = (Map) response.getAggregations().get("term_counts"); + assertNotNull(termCounts, "Term counts was missing"); + List> buckets = (List>) termCounts.get("buckets"); + assertNotNull(buckets, "Buckets branch was empty"); + Map expected = new MapBuilder() + .of("one", 1, "two", 2, "three", 3, + "four", 4, "five", 5) + .build(); + + buckets.forEach( aggRes -> { + String key = (String) aggRes.get("key"); + Integer docCount = (Integer) aggRes.get("doc_count"); + assertEquals(expected.get(key), docCount, "${key} did not match."); + }); + } + + @Test + void testBasicSearchRequestParameters() throws Exception { + Map temp = new MapBuilder() + .of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(), + "aggs", new MapBuilder() + .of("term_counts", new MapBuilder() + .of("terms", new MapBuilder() + .of("field", "msg", "size", 5) + .build()) + .build()) + .build()) + .build(); + String query = prettyJson(temp); + + + SearchResponse response = service.search(query, "messages", TYPE, createParameters("preference", "_local")); + assertNotNull(response, "Response was null"); + + assertEquals(15, response.getNumberOfHits(), "Wrong count"); + assertFalse(response.isTimedOut(), "Timed out"); + assertNotNull(response.getHits(), "Hits was null"); + assertEquals(10, response.getHits().size(), "Wrong number of hits"); + assertNotNull(response.getAggregations(), "Aggregations are missing"); + assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong"); + + Map termCounts = (Map) response.getAggregations().get("term_counts"); + assertNotNull(termCounts, "Term counts was missing"); + List> buckets = (List>) termCounts.get("buckets"); + assertNotNull(buckets, "Buckets branch was empty"); + Map expected = new MapBuilder() + .of("one", 1, "two", 2, "three", 3, + "four", 4, "five", 5) + .build(); + + buckets.forEach( (aggRes) -> { + String key = (String) aggRes.get("key"); + Integer docCount = (Integer) aggRes.get("doc_count"); + assertEquals(expected.get(key), docCount, String.format("%s did not match.", key)); + }); + } + + @Test + void testV5SearchWarnings() throws JsonProcessingException { + int version = getElasticMajorVersion(); + assumeTrue(version == 5, "Requires version 5 (no search API deprecations yet for 8.x)"); + + String query = prettyJson(new MapBuilder() + .of("size", 1, "query", new MapBuilder().of("query_string", new MapBuilder() + .of("query", 1, "all_fields", true).build()).build()) + .build()); + final SearchResponse response = service.search(query, INDEX, TYPE, null); + assertTrue(!response.getWarnings().isEmpty(), "Missing warnings"); + } + + @Test + public void testV7SearchWarnings() throws JsonProcessingException { + assumeTrue(getElasticMajorVersion() == 7, + "Requires Elasticsearch 7"); + String query = prettyJson(new MapBuilder() + .of("size", 1, "query", new MapBuilder().of("match_all", new HashMap<>()).build()) + .build()); + String type = "a-type"; + final SearchResponse response = service.search(query, INDEX, type, null); + assertTrue(!response.getWarnings().isEmpty(), "Missing warnings"); + } + + @Test + void testScroll() throws JsonProcessingException { + final String query = prettyJson(new MapBuilder() + .of("size", 2, "query", new MapBuilder().of("match_all", new HashMap<>()).build(), + "aggs", new MapBuilder() + .of("term_counts", new MapBuilder() + .of("terms", new MapBuilder() + .of("field", "msg", "size", 5) + .build()) + .build()) + .build()) + .build()); + + // initiate the scroll + final SearchResponse response = service.search(query, INDEX, TYPE, Collections.singletonMap("scroll", "10s")); + assertNotNull(response, "Response was null"); + + assertEquals(15, response.getNumberOfHits(), "Wrong count"); + assertFalse(response.isTimedOut(), "Timed out"); + assertNotNull(response.getHits(), "Hits was null"); + assertEquals(2, response.getHits().size(), "Wrong number of hits"); + assertNotNull(response.getAggregations(), "Aggregations are missing"); + assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong"); + assertNotNull(response.getScrollId(), "ScrollId missing"); + assertNull(response.getSearchAfter(), "Unexpected Search_After"); + assertNull(response.getPitId(), "Unexpected pitId"); + + final Map termCounts = (Map) response.getAggregations().get("term_counts"); + assertNotNull(termCounts, "Term counts was missing"); + assertEquals(5, ((List)termCounts.get("buckets")).size(), "Buckets count is wrong"); + + // scroll the next page + Map parameters = createParameters("scroll_id", response.getScrollId(), "scroll", "10s"); + final SearchResponse scrollResponse = service.scroll(prettyJson(parameters)); + assertNotNull(scrollResponse, "Scroll Response was null"); + + assertEquals(15, scrollResponse.getNumberOfHits(), "Wrong count"); + assertFalse(scrollResponse.isTimedOut(), "Timed out"); + assertNotNull(scrollResponse.getHits(), "Hits was null"); + assertEquals(2, scrollResponse.getHits().size(), "Wrong number of hits"); + assertNotNull(scrollResponse.getAggregations(), "Aggregations missing"); + assertEquals(0, scrollResponse.getAggregations().size(), "Aggregation count is wrong"); + assertNotNull(scrollResponse.getScrollId(), "ScrollId missing"); + assertNull(scrollResponse.getSearchAfter(), "Unexpected Search_After"); + assertNull(scrollResponse.getPitId(), "Unexpected pitId"); + + assertNotEquals(scrollResponse.getHits(), response.getHits(), () -> "Same results"); + + // delete the scroll + DeleteOperationResponse deleteResponse = service.deleteScroll(scrollResponse.getScrollId()); + assertNotNull(deleteResponse, "Delete Response was null"); + assertTrue(deleteResponse.getTook() > 0); + + // delete scroll again (should now be unknown but the 404 caught and ignored) + deleteResponse = service.deleteScroll(scrollResponse.getScrollId()); + assertNotNull(deleteResponse, "Delete Response was null"); + assertEquals(0L, deleteResponse.getTook()); + } + + @Test + void testSearchAfter() throws JsonProcessingException { + final Map queryMap = new MapBuilder() + .of("size", 2, "query", new MapBuilder() + .of("match_all", new HashMap<>()).build(), "aggs", new MapBuilder() + .of("term_counts", new MapBuilder() + .of("terms", new MapBuilder() + .of("field", "msg", "size", 5) + .build()) + .build()).build()) + .of("sort", Arrays.asList( + new MapBuilder().of("msg", "desc").build() + )) + .build(); + final String query = prettyJson(queryMap); + + // search first page + final SearchResponse response = service.search(query, INDEX, TYPE, null); + assertNotNull(response, "Response was null"); + + assertEquals(15, response.getNumberOfHits(), "Wrong count"); + assertFalse(response.isTimedOut(), "Timed out"); + assertNotNull(response.getHits(), "Hits was null"); + assertEquals(2, response.getHits().size(), "Wrong number of hits"); + assertNotNull(response.getAggregations(), "Aggregations missing"); + assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong"); + assertNull(response.getScrollId(), "Unexpected ScrollId"); + assertNotNull(response.getSearchAfter(), "Search_After missing"); + assertNull(response.getPitId(), "Unexpected pitId"); + + final Map termCounts = (Map)response.getAggregations().get("term_counts"); + assertNotNull(termCounts, "Term counts was missing"); + assertEquals(5, ((List)termCounts.get("buckets")).size(), "Buckets count is wrong"); + + // search the next page + queryMap.put("search_after", MAPPER.readValue(response.getSearchAfter(), List.class)); + queryMap.remove("aggs"); + final String secondPage = prettyJson(queryMap); + final SearchResponse secondResponse = service.search(secondPage, INDEX, TYPE, null); + assertNotNull(secondResponse, "Second Response was null"); + + assertEquals(15, secondResponse.getNumberOfHits(), "Wrong count"); + assertFalse(secondResponse.isTimedOut(), "Timed out"); + assertNotNull(secondResponse.getHits(), "Hits was null"); + assertEquals(2, secondResponse.getHits().size(), "Wrong number of hits"); + assertNotNull(secondResponse.getAggregations(), "Aggregations missing"); + assertEquals(0, secondResponse.getAggregations().size(), "Aggregation count is wrong"); + assertNull(secondResponse.getScrollId(), "Unexpected ScrollId"); + assertNotNull(secondResponse.getSearchAfter(), "Unexpected Search_After"); + assertNull(secondResponse.getPitId(), "Unexpected pitId"); + + assertNotEquals(secondResponse.getHits(), response.getHits(), "Same results"); + } + + @Test + void testPointInTime() throws JsonProcessingException { + // Point in Time only available in 7.10+ with XPack enabled + double majorVersion = getElasticMajorVersion(); + double minorVersion = getElasticMinorVersion(); + assumeTrue(majorVersion >= 8 || (majorVersion == 7 && minorVersion >= 10), "Requires version 7.10+"); + + // initialise + final String pitId = service.initialisePointInTime(INDEX, "10s"); + + final Map queryMap = new MapBuilder() + .of("size", 2, "query", new MapBuilder().of("match_all", new HashMap<>()).build()) + .of("aggs", new MapBuilder().of("term_counts", new MapBuilder() + .of("terms", new MapBuilder() + .of("field", "msg", "size", 5) + .build()) + .build()).build()) + .of("sort", Arrays.asList( + new MapBuilder().of("msg", "desc").build() + )) + .of("pit", new MapBuilder() + .of("id", pitId, "keep_alive", "10s") + .build()) + .build(); + final String query = prettyJson(queryMap); + + // search first page + final SearchResponse response = service.search(query, null, TYPE, null); + assertNotNull(response, "Response was null"); + + assertEquals(15, response.getNumberOfHits(), "Wrong count"); + assertFalse(response.isTimedOut(), "Timed out"); + assertNotNull(response.getHits(), "Hits was null"); + assertEquals(2, response.getHits().size(), "Wrong number of hits"); + assertNotNull(response.getAggregations(), "Aggregations missing"); + assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong"); + assertNull(response.getScrollId(), "Unexpected ScrollId"); + assertNotNull(response.getSearchAfter(), "Unexpected Search_After"); + assertNotNull(response.getPitId(), "pitId missing"); + + final Map termCounts = (Map) response.getAggregations().get("term_counts"); + assertNotNull(termCounts, "Term counts was missing"); + assertEquals(5, ((List)termCounts.get("buckets")).size(), "Buckets count is wrong"); + + // search the next page + queryMap.put("search_after", MAPPER.readValue(response.getSearchAfter(), List.class)); + queryMap.remove("aggs"); + final String secondPage = prettyJson(queryMap); + final SearchResponse secondResponse = service.search(secondPage, null, TYPE, null); + assertNotNull(secondResponse, "Second Response was null"); + + assertEquals(15, secondResponse.getNumberOfHits(), "Wrong count"); + assertFalse(secondResponse.isTimedOut(), "Timed out"); + assertNotNull(secondResponse.getHits(), "Hits was null"); + assertEquals(2, secondResponse.getHits().size(), "Wrong number of hits"); + assertNotNull(secondResponse.getAggregations(), "Aggregations missing"); + assertEquals(0, secondResponse.getAggregations().size(), "Aggregation count is wrong"); + assertNull(secondResponse.getScrollId(), "Unexpected ScrollId"); + assertNotNull(secondResponse.getSearchAfter(), "Unexpected Search_After"); + assertNotNull(secondResponse.getPitId(), "pitId missing"); + + assertNotEquals(secondResponse.getHits(), response.getHits(), "Same results"); + + // delete pitId + DeleteOperationResponse deleteResponse = service.deletePointInTime(pitId); + assertNotNull(deleteResponse, "Delete Response was null"); + assertTrue(deleteResponse.getTook() > 0); + + // delete pitId again (should now be unknown but the 404 caught and ignored) + deleteResponse = service.deletePointInTime(pitId); + assertNotNull(deleteResponse, "Delete Response was null"); + assertEquals(0L, deleteResponse.getTook()); + } + + @Test + void testDeleteByQuery() throws Exception { + String query = prettyJson(new MapBuilder() + .of("query", new MapBuilder() + .of("match", new MapBuilder().of("msg", "five").build()) + .build()).build()); + DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, null); + assertNotNull(response); + assertTrue(response.getTook() > 0); + } + + @Test + void testDeleteByQueryRequestParameters() throws Exception { + String query = prettyJson(new MapBuilder() + .of("query", new MapBuilder() + .of("match", new MapBuilder().of("msg", "six").build()) + .build()).build()); + Map parameters = new HashMap<>(); + parameters.put("refresh", "true"); + DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, parameters); + assertNotNull(response); + assertTrue(response.getTook() > 0); + } + + @Test + void testUpdateByQuery() throws Exception { + String query = prettyJson(new MapBuilder() + .of("query", new MapBuilder() + .of("match", new MapBuilder().of("msg", "four").build()) + .build()).build()); + UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, null); + assertNotNull(response); + assertTrue(response.getTook() > 0); + } + + @Test + void testUpdateByQueryRequestParameters() throws Exception { + String query = prettyJson(new MapBuilder() + .of("query", new MapBuilder() + .of("match", new MapBuilder().of("msg", "four").build()) + .build()).build()); + Map parameters = new HashMap<>(); + parameters.put("refresh", "true"); + parameters.put("slices", "1"); + UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, parameters); + assertNotNull(response); + assertTrue(response.getTook() > 0); + } + + @Test + void testDeleteById() throws Exception { + final String ID = "1"; + final Map originalDoc = service.get(INDEX, TYPE, ID, null); + try { + DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID, null); + assertNotNull(response); + assertTrue(response.getTook() > 0); + final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> + service.get(INDEX, TYPE, ID, null)); + assertTrue(ee.isNotFound()); + final Map doc = service.get(INDEX, TYPE, "2", null); + assertNotNull(doc); + } finally { + // replace the deleted doc + service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index), null); + waitForIndexRefresh(); // (affects later tests using _search or _bulk) + } + } + + @Test + void testGet() { + for (int index = 1; index <= 15; index++) { + String id = String.valueOf(index); + Map doc = service.get(INDEX, TYPE, id, null); + assertNotNull(doc, "Doc was null"); + assertNotNull(doc.get("msg"), "${doc.toString()}\t${doc.keySet().toString()}"); + } + } + + @Test + void testGetNotFound() { + final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, TYPE, "not_found", null)); + assertTrue(ee.isNotFound()); + } + + @Test + void testNullSuppression() throws InterruptedException { + Map doc = new HashMap<>(); + doc.put("msg", "test"); + doc.put("is_null", null); + doc.put("is_empty", ""); + doc.put("is_blank", " "); + doc.put("empty_nested", Collections.emptyMap()); + doc.put("empty_array", Collections.emptyList()); + + // index with nulls + suppressNulls(false); + IndexOperationResponse response = service.bulk(Arrays.asList(new IndexOperationRequest("nulls", TYPE, "1", doc, IndexOperationRequest.Operation.Index)), null); + assertNotNull(response); + assertTrue(response.getTook() > 0); + waitForIndexRefresh(); + + Map result = service.get("nulls", TYPE, "1", null); + assertEquals(doc, result); + + // suppress nulls + suppressNulls(true); + response = service.bulk(Arrays.asList(new IndexOperationRequest("nulls", TYPE, "2", doc, IndexOperationRequest.Operation.Index)), null); + assertNotNull(response); + assertTrue(response.getTook() > 0); + waitForIndexRefresh(); + + result = service.get("nulls", TYPE, "2", null); + assertTrue(result.keySet().containsAll(Arrays.asList("msg", "is_blank")), "Non-nulls (present): " + result); + assertFalse(result.keySet().contains("is_null"), "is_null (should be omitted): " + result); + assertFalse(result.keySet().contains("is_empty"), "is_empty (should be omitted): " + result); + assertFalse(result.keySet().contains("empty_nested"), "empty_nested (should be omitted): " + result); + assertFalse(result.keySet().contains("empty_array"), "empty_array (should be omitted): " + result); + } + + private void suppressNulls(final boolean suppressNulls) { + runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service"); + runner.disableControllerService(service); + runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, suppressNulls + ? ElasticSearchClientService.ALWAYS_SUPPRESS.getValue() + : ElasticSearchClientService.NEVER_SUPPRESS.getValue()); + runner.enableControllerService(service); + runner.assertValid(); + } + + @Test + void testBulkAddTwoIndexes() throws Exception { + List payload = new ArrayList<>(); + for (int x = 0; x < 20; x++) { + String index = x % 2 == 0 ? "bulk_a": "bulk_b"; + payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap(){{ + put("msg", "test"); + }}, IndexOperationRequest.Operation.Index)); + } + for (int x = 0; x < 5; x++) { + payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap(){{ + put("msg", "test"); + }}, IndexOperationRequest.Operation.Index)); + } + IndexOperationResponse response = service.bulk(payload, createParameters("refresh", "true")); + assertNotNull(response); + assertTrue(response.getTook() > 0); + waitForIndexRefresh(); + + /* + * Now, check to ensure that both indexes got populated appropriately. + */ + String query = "{ \"query\": { \"match_all\": {}}}"; + Long indexA = service.count(query, "bulk_a", TYPE, null); + Long indexB = service.count(query, "bulk_b", TYPE, null); + Long indexC = service.count(query, "bulk_c", TYPE, null); + + assertNotNull(indexA); + assertNotNull(indexB); + assertNotNull(indexC); + assertEquals(indexA, indexB); + assertEquals(10, indexA.intValue()); + assertEquals(10, indexB.intValue()); + assertEquals(5, indexC.intValue()); + + Long total = service.count(query, "bulk_*", TYPE, null); + assertNotNull(total); + assertEquals(25, total.intValue()); + } + + @Test + void testBulkRequestParameters() throws Exception { + List payload = new ArrayList<>(); + for (int x = 0; x < 20; x++) { + String index = x % 2 == 0 ? "bulk_a": "bulk_b"; + payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new MapBuilder().of("msg", "test").build(), IndexOperationRequest.Operation.Index)); + } + for (int x = 0; x < 5; x++) { + payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new MapBuilder().of("msg", "test").build(), IndexOperationRequest.Operation.Index)); + } + IndexOperationResponse response = service.bulk(payload, createParameters("refresh", "true")); + assertNotNull(response); + assertTrue(response.getTook() > 0); + + /* + * Now, check to ensure that both indexes got populated and refreshed appropriately. + */ + String query = "{ \"query\": { \"match_all\": {}}}"; + Long indexA = service.count(query, "bulk_a", TYPE, null); + Long indexB = service.count(query, "bulk_b", TYPE, null); + Long indexC = service.count(query, "bulk_c", TYPE, null); + + assertNotNull(indexA); + assertNotNull(indexB); + assertNotNull(indexC); + assertEquals(indexA, indexB); + assertEquals(10, indexA.intValue()); + assertEquals(10, indexB.intValue()); + assertEquals(5, indexC.intValue()); + + Long total = service.count(query, "bulk_*", TYPE, null); + assertNotNull(total); + assertEquals(25, total.intValue()); + } + + @Test + void testUpdateAndUpsert() throws InterruptedException { + final String TEST_ID = "update-test"; + Map doc = new HashMap<>(); + doc.put("msg", "Buongiorno, mondo"); + service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index), createParameters("refresh", "true")); + Map result = service.get(INDEX, TYPE, TEST_ID, null); + assertEquals(doc, result, "Not the same"); + + Map updates = new HashMap<>(); + updates.put("from", "john.smith"); + Map merged = new HashMap<>(); + merged.putAll(updates); + merged.putAll(doc); + IndexOperationRequest request = new IndexOperationRequest(INDEX, TYPE, TEST_ID, updates, IndexOperationRequest.Operation.Update); + service.add(request, createParameters("refresh", "true")); + result = service.get(INDEX, TYPE, TEST_ID, null); + assertTrue(result.containsKey("from")); + assertTrue(result.containsKey("msg")); + assertEquals(merged, result, "Not the same after update."); + + final String UPSERTED_ID = "upsert-ftw"; + Map upsertItems = new HashMap<>(); + upsertItems.put("upsert_1", "hello"); + upsertItems.put("upsert_2", 1); + upsertItems.put("upsert_3", true); + request = new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert); + service.add(request, createParameters("refresh", "true")); + result = service.get(INDEX, TYPE, UPSERTED_ID, null); + assertEquals(upsertItems, result); + + List deletes = new ArrayList<>(); + deletes.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, null, IndexOperationRequest.Operation.Delete)); + deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete)); + assertFalse(service.bulk(deletes, createParameters("refresh", "true")).hasErrors()); + waitForIndexRefresh(); // wait 1s for index refresh (doesn't prevent GET but affects later tests using _search or _bulk) + ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, TYPE, TEST_ID, null) ); + assertTrue(ee.isNotFound()); + ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, TYPE, UPSERTED_ID, null)); + assertTrue(ee.isNotFound()); + } + + @Test + void testGetBulkResponsesWithErrors() { + List ops = Arrays.asList( + new IndexOperationRequest(INDEX, TYPE, "1", new MapBuilder().of("msg", "one", "intField", 1).build(), IndexOperationRequest.Operation.Index), // OK + new IndexOperationRequest(INDEX, TYPE, "2", new MapBuilder().of("msg", "two", "intField", 1).build(), IndexOperationRequest.Operation.Create), // already exists + new IndexOperationRequest(INDEX, TYPE, "1", new MapBuilder().of("msg", "one", "intField", "notaninteger").build(), IndexOperationRequest.Operation.Index) // can't parse int field + ); + IndexOperationResponse response = service.bulk(ops, createParameters("refresh", "true")); + assertTrue(response.hasErrors()); + assertEquals(2, response.getItems().stream().filter(it -> { + String key = it.keySet().stream().findFirst().get(); + return ((Map)it.get(key)).containsKey("error"); + }).count()); + } + + private Map createParameters(String... extra) { + if (extra.length % 2 == 1) { //Putting this here to help maintainers catch stupid bugs before they happen + throw new RuntimeException("createParameters must have an even number of String parameters."); + } + + Map parameters = new HashMap<>(); + for (int index = 0; index < extra.length; index += 2) { + parameters.put(extra[index], extra[index + 1]); + } + + return parameters; + } + + private static void waitForIndexRefresh() throws InterruptedException { + Thread.sleep(1000); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java new file mode 100644 index 000000000000..6cb061903f59 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch.integration; + +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl; +import org.apache.nifi.elasticsearch.ElasticSearchLookupService; +import org.apache.nifi.elasticsearch.TestControllerServiceProcessor; +import org.apache.nifi.elasticsearch.TestSchemaRegistry; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ElasticSearchLookupService_IT extends AbstractElasticsearch_IT { + private TestRunner runner; + private ElasticSearchClientService service; + private ElasticSearchLookupService lookupService; + + @BeforeAll + public static void beforeAll() throws IOException { + startTestcontainer(); + setupTestData(); + } + + @AfterAll + public static void afterAll() throws IOException { + tearDownTestData(); + stopTestcontainer(); + } + + @BeforeEach + void before() throws Exception { + runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); + service = new ElasticSearchClientServiceImpl(); + lookupService = new ElasticSearchLookupService(); + runner.addControllerService("Client Service", service); + runner.addControllerService("Lookup Service", lookupService); + runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, ELASTIC_HOST); + runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000"); + runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000"); + runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic"); + runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD); + runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service"); + runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service"); + runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service"); + runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "user_details"); + setTypeOnLookupService(); + + try { + runner.enableControllerService(service); + runner.enableControllerService(lookupService); + } catch (Exception ex) { + throw ex; + } + + service.refresh(null, null); + } + + void setTypeOnLookupService() { + if (getElasticMajorVersion() == 6) { + runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, ElasticSearchClientService_IT.TYPE); + } else { + runner.removeProperty(lookupService, ElasticSearchLookupService.TYPE); + } + } + + @Test + void testValidity() throws Exception { + setDefaultSchema(); + runner.assertValid(); + } + + private void setDefaultSchema() throws Exception { + runner.disableControllerService(lookupService); + SchemaRegistry registry = new TestSchemaRegistry(); + runner.addControllerService("registry", registry); + runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_REGISTRY, "registry"); + runner.enableControllerService(registry); + runner.enableControllerService(lookupService); + } + + @Test + void lookupById() throws LookupFailureException { + Map coordinates = new HashMap<>(); + coordinates.put("_id", "2"); + Optional result = lookupService.lookup(coordinates); + + assertNotNull(result); + assertTrue(result.isPresent()); + Record record = result.get(); + assertEquals("jane.doe@company.com", record.getAsString("email")); + assertEquals("098-765-4321", record.getAsString("phone")); + assertEquals("GHIJK", record.getAsString("accessKey")); + } + + @Test + void testInvalidIdScenarios() { + List> coordinates = new ArrayList<>(); + Map temp = new HashMap<>(); + temp.put("_id", 1); + coordinates.add(temp); + temp = new HashMap<>(); + temp.put("_id", 1); + temp.put("email", "john.smith@company.com"); + coordinates.add(temp); + + for (Map coordinate: coordinates) { + Exception exception = null; + + try { + lookupService.lookup(coordinate); + } catch (Exception ex) { + exception = ex; + } + + assertNotNull(exception); + assertTrue(exception instanceof LookupFailureException); + } + } + + @Test + void lookupByQuery() throws LookupFailureException { + Map coordinates = new HashMap<>(); + coordinates.put("phone", "098-765-4321"); + coordinates.put("email", "jane.doe@company.com"); + Optional result = lookupService.lookup(coordinates); + + assertNotNull(result); + assertTrue(result.isPresent()); + Record record = result.get(); + assertEquals("jane.doe@company.com", record.getAsString("email")); + assertEquals("098-765-4321", record.getAsString("phone")); + assertEquals("GHIJK", record.getAsString("accessKey")); + } + + @Test + void testNestedSchema() throws LookupFailureException { + Map coordinates = new HashMap<>(); + coordinates.put("subField.deeper.deepest.super_secret", "The sky is blue"); + coordinates.put("subField.deeper.secretz", "Buongiorno, mondo!!"); + coordinates.put("msg", "Hello, world"); + + runner.disableControllerService(lookupService); + runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested"); + setTypeOnLookupService(); + runner.enableControllerService(lookupService); + + Optional response = lookupService.lookup(coordinates); + assertNotNull(response); + assertTrue(response.isPresent()); + Record rec = response.get(); + assertEquals("Hello, world", rec.getValue("msg")); + Record subRec = getSubRecord(rec, "subField"); + assertNotNull(subRec); + Record deeper = getSubRecord(subRec, "deeper"); + assertNotNull(deeper); + Record deepest = getSubRecord(deeper, "deepest"); + assertNotNull(deepest); + assertEquals("The sky is blue", deepest.getAsString("super_secret")); + } + + @Test + void testDetectedSchema() throws LookupFailureException { + runner.disableControllerService(lookupService); + runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "complex"); + setTypeOnLookupService(); + runner.enableControllerService(lookupService); + Map coordinates = new HashMap<>(); + coordinates.put("_id", "1"); + + Optional response = lookupService.lookup(coordinates); + assertNotNull(response); + assertTrue(response.isPresent()); + Record rec = response.get(); + Record subRec = getSubRecord(rec, "subField"); + + Record r2 = new MapRecord(rec.getSchema(), new HashMap<>()); + RecordPath path = RecordPath.compile("/subField/longField"); + RecordPathResult result = path.evaluate(r2); + result.getSelectedFields().findFirst().get().updateValue(1234567890L); + + assertNotNull(rec); + assertNotNull(subRec); + assertEquals("Hello, world", rec.getValue("msg")); + assertNotNull(rec.getValue("subField")); + assertEquals(100000, subRec.getValue("longField")); + assertEquals("2018-04-10T12:18:05Z", subRec.getValue("dateField")); + } + + public static Record getSubRecord(Record rec, String fieldName) { + RecordSchema schema = rec.getSchema(); + RecordSchema subSchema = ((RecordDataType)schema.getField(fieldName).get().getDataType()).getChildSchema(); + return rec.getAsRecord(fieldName, subSchema); + } + + @Test + void testMappings() throws LookupFailureException { + runner.disableControllerService(lookupService); + runner.setProperty(lookupService, "$.subField.longField", "/longField2"); + runner.setProperty(lookupService, "$.subField.dateField", "/dateField2"); + runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested"); + setTypeOnLookupService(); + runner.enableControllerService(lookupService); + + Map coordinates = new HashMap<>(); + coordinates.put("msg", "Hello, world"); + Optional result = lookupService.lookup(coordinates); + assertTrue(result.isPresent()); + Record rec = result.get(); + Map entries = new HashMap<>(); + entries.put("dateField2", "2018-08-14T10:08:00Z"); + entries.put("longField2", 150000); + entries.entrySet().forEach( (field) -> { + Object value = rec.getValue(field.getKey()); + assertEquals(field.getValue(), value); + }); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchLookupServiceTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchLookupServiceTest.java new file mode 100644 index 000000000000..f536784f34a0 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchLookupServiceTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch.unit; + +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchLookupService; +import org.apache.nifi.elasticsearch.TestControllerServiceProcessor; +import org.apache.nifi.elasticsearch.TestElasticSearchClientService; +import org.apache.nifi.elasticsearch.TestSchemaRegistry; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ElasticSearchLookupServiceTest { + ElasticSearchClientService mockClientService; + ElasticSearchLookupService lookupService; + TestRunner runner; + + @BeforeEach + void setup() throws Exception { + mockClientService = new TestElasticSearchClientService(); + lookupService = new ElasticSearchLookupService(); + TestSchemaRegistry registry = new TestSchemaRegistry(); + runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); + runner.addControllerService("clientService", mockClientService); + runner.addControllerService("lookupService", lookupService); + runner.addControllerService("registry", registry); + runner.enableControllerService(mockClientService); + runner.enableControllerService(registry); + runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "clientService"); + runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "users"); + runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "clientService"); + runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "lookupService"); + runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_REGISTRY, "registry"); + runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INFER_SCHEMA); + runner.enableControllerService(lookupService); + } + + @Test + void simpleLookupTest() throws Exception { + Map coordinates = new HashMap<>(); + coordinates.put("_id", "12345"); + + Optional result = lookupService.lookup(coordinates); + + assertNotNull(result); + assertTrue(result.isPresent()); + Record record = result.get(); + assertEquals("john.smith", record.getAsString("username")); + assertEquals("testing1234", record.getAsString("password")); + assertEquals("john.smith@test.com", record.getAsString("email")); + assertEquals("Software Engineer", record.getAsString("position")); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java new file mode 100644 index 000000000000..4327c26d7f4c --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch.unit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchStringLookupService; +import org.apache.nifi.elasticsearch.TestControllerServiceProcessor; +import org.apache.nifi.elasticsearch.TestElasticSearchClientService; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ElasticSearchStringLookupServiceTest { + private ElasticSearchClientService mockClientService; + private ElasticSearchStringLookupService lookupService; + private TestRunner runner; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @BeforeEach + public void setup() throws Exception { + mockClientService = new TestElasticSearchClientService(); + lookupService = new ElasticSearchStringLookupService(); + runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); + runner.addControllerService("clientService", mockClientService); + runner.addControllerService("lookupService", lookupService); + runner.enableControllerService(mockClientService); + runner.setProperty(lookupService, ElasticSearchStringLookupService.CLIENT_SERVICE, "clientService"); + runner.setProperty(lookupService, ElasticSearchStringLookupService.INDEX, "users"); + runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "clientService"); + runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "lookupService"); + runner.enableControllerService(lookupService); + } + + @Test + public void simpleLookupTest() throws Exception { + Map coordinates = new HashMap<>(); + coordinates.put(ElasticSearchStringLookupService.ID, "12345"); + + Optional result = lookupService.lookup(coordinates); + + assertNotNull(result); + assertTrue(result.isPresent()); + String json = result.get(); + Map parsed = MAPPER.readValue(json, Map.class); + assertEquals(((TestElasticSearchClientService)mockClientService).getData(), parsed); + } +} diff --git a/pom.xml b/pom.xml index d8ee172dc8b1..713d335efafb 100644 --- a/pom.xml +++ b/pom.xml @@ -333,7 +333,7 @@ org.testcontainers testcontainers - 1.17.3 + ${testcontainers.version} test