From 23dc7bee8be918fa3fbb46e2152209d67560a3c1 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Thu, 21 Jun 2018 06:55:09 -0400 Subject: [PATCH] NIFI-5326 Wrote RandomLookupService. --- .../nifi/lookup/RandomLookupService.java | 21 ++ .../lookup/RandomJsonMapLookupService.java | 142 +++++++++++++ .../TestRandomJsonMapLookupService.groovy | 199 ++++++++++++++++++ 3 files changed, 362 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RandomLookupService.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RandomJsonMapLookupService.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/TestRandomJsonMapLookupService.groovy diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RandomLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RandomLookupService.java new file mode 100644 index 000000000000..4589a2f58ede --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RandomLookupService.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +public interface RandomLookupService extends LookupService { +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RandomJsonMapLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RandomJsonMapLookupService.java new file mode 100644 index 000000000000..2f1e8fe73f16 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RandomJsonMapLookupService.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +public class RandomJsonMapLookupService extends AbstractControllerService implements RandomLookupService> { + static final String NON_MAP_ID_KEY = "selectedValue"; + + static final PropertyDescriptor FILE_PATH = new PropertyDescriptor.Builder() + .displayName("File Path") + .name("random-lookup-file-path") + .description("Input file that acts as a data source. Caution: the entire file will be loaded into memory.") + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + List _temp = new ArrayList<>(); + _temp.add(FILE_PATH); + return Collections.unmodifiableList(_temp); + } + + private volatile Map dataSetHash; + private volatile List flatKeyList; + private volatile List> dataSetList; + private volatile boolean isList; + private Random random; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws IOException { + final String path = context.getProperty(FILE_PATH).evaluateAttributeExpressions().getValue(); + ObjectMapper mapper = new ObjectMapper(); + String firstLine = peekAtFile(path); + if (firstLine.startsWith("[")) { + dataSetList = mapper.readValue(new File(path), List.class); + isList = true; + } else { + dataSetHash = mapper.readValue(new File(path), Map.class); + flatKeyList = dataSetHash.keySet().stream() + .collect(Collectors.toList()); + isList = false; + } + random = new Random(); + } + + @OnDisabled + public void onDisabled() { + dataSetHash = null; + dataSetList = null; + flatKeyList = null; + } + + private String peekAtFile(String path) throws IOException { + BufferedReader reader = new BufferedReader(new FileReader(path)); + String line = reader.readLine(); + reader.close(); + return line; + } + + private int getRandomIndex() { + int ceiling = isList ? dataSetList.size() : flatKeyList.size(); + int pick = random.nextInt(ceiling); + + return pick; + } + + private Map pickElement(int index) throws LookupFailureException { + Map obj; + if (isList) { + Object o = dataSetList.get(index); + if (o instanceof Map) { + obj = (Map)o; + } else { + obj = new HashMap<>(); + obj.put(NON_MAP_ID_KEY, o); + } + } else { + String key = flatKeyList.get(index); + Object o = dataSetHash.get(key); + + obj = new HashMap<>(); + obj.put(key, o); + } + + return obj; + } + + @Override + public Optional> lookup(Map coordinates) throws LookupFailureException { + int index = getRandomIndex(); + return Optional.ofNullable(pickElement(index)); + } + + @Override + public Class getValueType() { + return Map.class; + } + + @Override + public Set getRequiredKeys() { + return null; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/TestRandomJsonMapLookupService.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/TestRandomJsonMapLookupService.groovy new file mode 100644 index 000000000000..e950606ff430 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/TestRandomJsonMapLookupService.groovy @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup + +import org.apache.nifi.components.PropertyDescriptor +import org.apache.nifi.processor.AbstractProcessor +import org.apache.nifi.processor.ProcessContext +import org.apache.nifi.processor.ProcessSession +import org.apache.nifi.processor.exception.ProcessException +import org.apache.nifi.util.TestRunners +import org.junit.Assert +import org.junit.Test + +import static groovy.json.JsonOutput.prettyPrint +import static groovy.json.JsonOutput.toJson + +class TestRandomJsonMapLookupService { + static File writeTestFile(input) { + def temp = File.createTempFile(String.valueOf(System.currentTimeMillis()), String.valueOf(System.currentTimeMillis())) + temp.write(prettyPrint(toJson(input))) + temp.deleteOnExit() + temp + } + + static File buildHashFile(int ceiling) { + def o = [:] + 0.upto(ceiling) { + o["test-${it}"] = UUID.randomUUID().toString() + } + + writeTestFile(o) + } + + static File buildListFile(int ceiling) { + def o = [] + 0.upto(ceiling - 1) { + o << UUID.randomUUID().toString() + } + + writeTestFile(o) + } + + private Map buildRunner(int ceiling, String type) { + buildRunner(ceiling, type, null) + } + + private Map buildRunner(int ceiling, Closure builder) { + buildRunner(ceiling, null, builder) + } + + private Map buildRunner(int ceiling, String type, Closure closure) { + def testData = type ? "build${type}File"(ceiling) : closure.call(ceiling) + def service = new RandomJsonMapLookupService() + def runner = TestRunners.newTestRunner(RandomJsonProcessor.class) + runner.addControllerService("service", service) + runner.setProperty(service, RandomJsonMapLookupService.FILE_PATH, testData.absolutePath) + runner.enableControllerService(service) + runner.setProperty("service", "service") + [ + runner: runner, + service: service + ] + } + + static final String HASH = "Hash" + static final String LIST = "List" + + void testHash(int ceiling, int maxSeconds) { + def setup = buildRunner(ceiling, HASH) + setup["runner"].assertValid() + def start = System.currentTimeSeconds() + 0.upto(5000) { + Optional result = setup["service"].lookup([:]) + Assert.assertNotNull(result) + Assert.assertTrue(result.isPresent()) + def r = result.get() + Assert.assertTrue(r instanceof Map) + Assert.assertTrue(r.keySet().iterator().next() instanceof String) + } + def end = System.currentTimeSeconds() + Assert.assertTrue(end - start < maxSeconds) + } + + void testList(int ceiling, int maxSeconds) { + def setup = buildRunner(ceiling, LIST) + setup["runner"].assertValid() + def start = System.currentTimeSeconds() + 0.upto(5000) { + Optional result = setup["service"].lookup([:]) + Assert.assertNotNull(result) + Assert.assertTrue(result.isPresent()) + def r = result.get() + Assert.assertNotNull(r) + Assert.assertTrue(r instanceof Map) + Assert.assertTrue(r.keySet().iterator().next() instanceof String) + } + def end = System.currentTimeSeconds() + Assert.assertTrue(end - start < maxSeconds) + } + + @Test + void testNestedHashes() { + def setup = buildRunner(20000) { ceiling -> + def o = [:] + 0.upto(ceiling - 1) { + o << [ + msg: UUID.randomUUID().toString(), + currentTime: System.currentTimeMillis() + ] + } + + writeTestFile(o) + } + setup["runner"].assertValid() + def start = System.currentTimeSeconds() + 0.upto(5000) { + Optional result = setup["service"].lookup([:]) + Assert.assertNotNull(result) + Assert.assertTrue(result.isPresent()) + Assert.assertNotNull(result.get()) + } + def end = System.currentTimeSeconds() + Assert.assertTrue(end - start < 1) + } + + @Test + void testSimpleList() { + testList(5000, 2) + } + + @Test + void testMediumList() { + testList(25000, 2) + } + + @Test + void testLargerList() { + testList(50000, 2) + } + + @Test + void testMuchLargerList() { + testList(100000, 2) + } + + @Test + void testSimpleHash() { + testHash(5000, 2) + } + + @Test + void testMediumHash() { + testHash(25000, 2) + } + + @Test + void testLargerHash() { + testHash(50000, 2) + } + + @Test + void testMuchLargerHash() { + testHash(100000, 2) + } +} + +class RandomJsonProcessor extends AbstractProcessor { + @Override + List getSupportedPropertyDescriptors() { + [ + new PropertyDescriptor.Builder() + .name("service") + .displayName("Service") + .identifiesControllerService(RandomLookupService.class) + .required(true) + .build() + ] + } + + @Override + void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } +} \ No newline at end of file