From 47d814ef95d67738d20ce5dc530ba7b05d418a96 Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 27 Jan 2017 18:15:44 -0500 Subject: [PATCH 1/9] Multithreading the SimpleEnrichmentFlatFileLoader --- .../common/utils/file/ReaderSpliterator.java | 218 ++++++++++++++++++ .../nonbulk/flatfile/ExtractorState.java | 46 ++++ .../SimpleEnrichmentFlatFileLoader.java | 117 ++++++++-- 3 files changed, 358 insertions(+), 23 deletions(-) create mode 100644 metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java create mode 100644 metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java new file mode 100644 index 0000000000..d10146ba65 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.utils.file; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.util.Spliterators.spliterator; + +public class ReaderSpliterator implements Spliterator { + private static int characteristics = NONNULL | ORDERED | IMMUTABLE; + private int batchSize ; + private BufferedReader reader; + public ReaderSpliterator(BufferedReader reader) { + this(reader, 128); + } + + public ReaderSpliterator(BufferedReader reader, int batchSize) { + this.batchSize = batchSize; + this.reader = reader; + } + + @Override + public void forEachRemaining(Consumer action) { + if (action == null) { + throw new NullPointerException(); + } + try { + for (String line = null; (line = reader.readLine()) != null;) { + action.accept(line); + } + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + /** + * If a remaining element exists, performs the given action on it, + * returning {@code true}; else returns {@code false}. If this + * Spliterator is {@link #ORDERED} the action is performed on the + * next element in encounter order. Exceptions thrown by the + * action are relayed to the caller. + * + * @param action The action + * @return {@code false} if no remaining elements existed + * upon entry to this method, else {@code true}. + * @throws NullPointerException if the specified action is null + */ + @Override + public boolean tryAdvance(Consumer action) { + if (action == null) { + throw new NullPointerException(); + } + try { + final String line = reader.readLine(); + if (line == null) { + return false; + } + action.accept(line); + return true; + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + /** + * If this spliterator can be partitioned, returns a Spliterator + * covering elements, that will, upon return from this method, not + * be covered by this Spliterator. + *

+ *

If this Spliterator is {@link #ORDERED}, the returned Spliterator + * must cover a strict prefix of the elements. + *

+ *

Unless this Spliterator covers an infinite number of elements, + * repeated calls to {@code trySplit()} must eventually return {@code null}. + * Upon non-null return: + *

+ *

+ *

This method may return {@code null} for any reason, + * including emptiness, inability to split after traversal has + * commenced, data structure constraints, and efficiency + * considerations. + * + * @return a {@code Spliterator} covering some portion of the + * elements, or {@code null} if this spliterator cannot be split + * @apiNote An ideal {@code trySplit} method efficiently (without + * traversal) divides its elements exactly in half, allowing + * balanced parallel computation. Many departures from this ideal + * remain highly effective; for example, only approximately + * splitting an approximately balanced tree, or for a tree in + * which leaf nodes may contain either one or two elements, + * failing to further split these nodes. However, large + * deviations in balance and/or overly inefficient {@code + * trySplit} mechanics typically result in poor parallel + * performance. + */ + @Override + public Spliterator trySplit() { + final HoldingConsumer holder = new HoldingConsumer<>(); + if (!tryAdvance(holder)) { + return null; + } + final String[] batch = new String[batchSize]; + int j = 0; + do { + batch[j] = holder.value; + } + while (++j < batchSize && tryAdvance(holder)); + return spliterator(batch, 0, j, characteristics() | SIZED); + } + + /** + * Returns an estimate of the number of elements that would be + * encountered by a {@link #forEachRemaining} traversal, or returns {@link + * Long#MAX_VALUE} if infinite, unknown, or too expensive to compute. + *

+ *

If this Spliterator is {@link #SIZED} and has not yet been partially + * traversed or split, or this Spliterator is {@link #SUBSIZED} and has + * not yet been partially traversed, this estimate must be an accurate + * count of elements that would be encountered by a complete traversal. + * Otherwise, this estimate may be arbitrarily inaccurate, but must decrease + * as specified across invocations of {@link #trySplit}. + * + * @return the estimated size, or {@code Long.MAX_VALUE} if infinite, + * unknown, or too expensive to compute. + * @apiNote Even an inexact estimate is often useful and inexpensive to compute. + * For example, a sub-spliterator of an approximately balanced binary tree + * may return a value that estimates the number of elements to be half of + * that of its parent; if the root Spliterator does not maintain an + * accurate count, it could estimate size to be the power of two + * corresponding to its maximum depth. + */ + @Override + public long estimateSize() { + return Long.MAX_VALUE; + } + + /** + * Returns a set of characteristics of this Spliterator and its + * elements. The result is represented as ORed values from {@link + * #ORDERED}, {@link #DISTINCT}, {@link #SORTED}, {@link #SIZED}, + * {@link #NONNULL}, {@link #IMMUTABLE}, {@link #CONCURRENT}, + * {@link #SUBSIZED}. Repeated calls to {@code characteristics()} on + * a given spliterator, prior to or in-between calls to {@code trySplit}, + * should always return the same result. + *

+ *

If a Spliterator reports an inconsistent set of + * characteristics (either those returned from a single invocation + * or across multiple invocations), no guarantees can be made + * about any computation using this Spliterator. + * + * @return a representation of characteristics + * @apiNote The characteristics of a given spliterator before splitting + * may differ from the characteristics after splitting. For specific + * examples see the characteristic values {@link #SIZED}, {@link #SUBSIZED} + * and {@link #CONCURRENT}. + */ + @Override + public int characteristics() { + return characteristics; + } + + static class HoldingConsumer implements Consumer { + String value; + /** + * Performs this operation on the given argument. + * + * @param string the input argument + */ + @Override + public void accept(String string) { + this.value = string; + } + } + + public static Stream lineStream(BufferedReader in, int batchSize) { + return StreamSupport.stream(new ReaderSpliterator(in, batchSize), false) + .onClose(() -> { + try { + in.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + ); + } +} diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java new file mode 100644 index 0000000000..e44eb27175 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile; + +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.metron.dataloads.extractor.Extractor; +import org.apache.metron.enrichment.converter.HbaseConverter; + +public class ExtractorState { + private HTableInterface table; + private Extractor extractor; + private HbaseConverter converter; + + public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter) { + this.table = table; + this.extractor = extractor; + this.converter = converter; + } + + public HTableInterface getTable() { + return table; + } + + public Extractor getExtractor() { + return extractor; + } + + public HbaseConverter getConverter() { + return converter; + } +} diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java index 0c7501a9d7..c05d6fd89a 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java @@ -20,6 +20,7 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import org.apache.commons.cli.*; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.log4j.PropertyConfigurator; +import org.apache.metron.common.utils.ConversionUtils; +import org.apache.metron.common.utils.file.ReaderSpliterator; import org.apache.metron.dataloads.extractor.Extractor; import org.apache.metron.dataloads.extractor.ExtractorHandler; import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat; @@ -39,13 +42,13 @@ import org.apache.metron.common.utils.JSONUtils; import javax.annotation.Nullable; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; +import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.Stack; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Stream; public class SimpleEnrichmentFlatFileLoader { private static abstract class OptionHandler implements Function {} @@ -111,6 +114,26 @@ public Option apply(@Nullable String s) { return o; } }) + ,NUM_THREADS("p", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "threads", true, "The batch size to use for hbase puts"); + o.setArgName("NUM_THREADS"); + o.setRequired(false); + return o; + } + }) + ,BATCH_SIZE("b", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "batchSize", true, "The batch size to use for hbase puts"); + o.setArgName("SIZE"); + o.setRequired(false); + return o; + } + }) ,INPUT("i", new OptionHandler() { @Nullable @Override @@ -207,25 +230,56 @@ public List extract( String line return ret; } - - public void loadFile( File inputFile - , Extractor extractor - , HTableInterface table - , String cf - , HbaseConverter converter - , boolean lineByLine - ) throws IOException + public void load( final Iterable> streams + , final ThreadLocal state + , final String cf + , int numThreads + ) { + System.out.println("Number of threads: " + numThreads); + for(Stream stream : streams) { + try { + ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads); + forkJoinPool.submit(() -> + stream.parallel().forEach(input -> { + ExtractorState es = state.get(); + try { + es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter())); + } catch (IOException e) { + throw new IllegalStateException("Unable to continue: " + e.getMessage(), e); + } + } + ) + ).get(); + } catch (InterruptedException e) { + throw new IllegalStateException(e.getMessage(), e); + } catch (ExecutionException e) { + throw new IllegalStateException(e.getMessage(), e); + } finally { + stream.close(); + } + } + } + + private static Iterable> streamify(List files, int batchSize, boolean lineByLine) throws FileNotFoundException { + List> ret = new ArrayList<>(); if(!lineByLine) { - table.put(extract(FileUtils.readFileToString(inputFile), extractor, cf, converter)); + ret.add(files.stream().map(f -> { + try { + return FileUtils.readFileToString(f); + } catch (IOException e) { + throw new IllegalStateException("File " + f.getName() + " not found."); + } + })); } else { - BufferedReader br = new BufferedReader(new FileReader(inputFile)); - for(String line = null;(line = br.readLine()) != null;) { - table.put(extract(line, extractor, cf, converter)); + for(File f : files) { + ret.add(ReaderSpliterator.lineStream(new BufferedReader(new FileReader(f)), batchSize)); } } + return ret; } + public static void main(String... argv) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs(); @@ -237,23 +291,40 @@ public static void main(String... argv) throws Exception { ExtractorHandler handler = ExtractorHandler.load( FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli))) ); + int batchSize = 128; + if(LoadOptions.BATCH_SIZE.has(cli)) { + batchSize = ConversionUtils.convert(LoadOptions.BATCH_SIZE.get(cli), Integer.class); + } + int numThreads = Runtime.getRuntime().availableProcessors(); + if(LoadOptions.NUM_THREADS.has(cli)) { + numThreads = ConversionUtils.convert(LoadOptions.NUM_THREADS.get(cli), Integer.class); + } boolean lineByLine = !handler.getInputFormatHandler().getClass().equals(WholeFileFormat.class); - Extractor e = handler.getExtractor(); SensorEnrichmentUpdateConfig sensorEnrichmentUpdateConfig = null; if(LoadOptions.ENRICHMENT_CONFIG.has(cli)) { sensorEnrichmentUpdateConfig = JSONUtils.INSTANCE.load( new File(LoadOptions.ENRICHMENT_CONFIG.get(cli)) , SensorEnrichmentUpdateConfig.class ); } - HbaseConverter converter = new EnrichmentConverter(); List inputFiles = getFiles(new File(LoadOptions.INPUT.get(cli))); SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader(); - HTableInterface table = loader.getProvider() - .getTable(conf, LoadOptions.HBASE_TABLE.get(cli)); + ThreadLocal state = new ThreadLocal() { + @Override + protected ExtractorState initialValue() { + try { + ExtractorHandler handler = ExtractorHandler.load( + FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli))) + ); + HTableInterface table = loader.getProvider().getTable(conf, LoadOptions.HBASE_TABLE.get(cli)); + return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter()); + } catch (IOException e1) { + throw new IllegalStateException("Unable to get table: " + e1); + } + } + }; + + loader.load(streamify(inputFiles, batchSize, lineByLine), state, LoadOptions.HBASE_CF.get(cli), numThreads); - for (File f : inputFiles) { - loader.loadFile(f, e, table, LoadOptions.HBASE_CF.get(cli), converter, lineByLine); - } if(sensorEnrichmentUpdateConfig != null) { sensorEnrichmentUpdateConfig.updateSensorConfigs(); } From 918d4ce4aea5d7dfde992f32bf049c70f35dd182 Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 27 Jan 2017 18:23:19 -0500 Subject: [PATCH 2/9] doc changes. --- .../metron-data-management/README.md | 21 ++++++++++--------- .../SimpleEnrichmentFlatFileLoader.java | 4 ++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md index a0c0164686..7a9836cd05 100644 --- a/metron-platform/metron-data-management/README.md +++ b/metron-platform/metron-data-management/README.md @@ -240,16 +240,17 @@ each document to be considered as input to the Extractor. The parameters for the utility are as follows: -| Short Code | Long Code | Is Required? | Description | -|------------|---------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| -h | | No | Generate the help screen/set of options | -| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source | -| -t | --hbase_table | Yes | The HBase table to import into | -| -c | --hbase_cf | Yes | The HBase table column family to import into | -| -i | --input | Yes | The input data location on local disk. If this is a file, then that file will be loaded. If this is a directory, then the files will be loaded recursively under that directory. | -| -l | --log4j | No | The log4j properties file to load | -| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. | - +| Short Code | Long Code | Is Required? | Description | | +|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---| +| -h | | No | Generate the help screen/set of options | | +| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source | | +| -t | --hbase_table | Yes | The HBase table to import into | | +| -c | --hbase_cf | Yes | The HBase table column family to import into | | +| -i | --input | Yes | The input data location on local disk. If this is a file, then that file will be loaded. If this is a directory, then the files will be loaded recursively under that directory. | | +| -l | --log4j | No | The log4j properties file to load | | +| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. | | +| -p | --threads | No | The number of threads to use when extracting data | | +| -b | --batchSize | No | The batch size to use for HBase puts | | ### GeoLite2 Loader The shell script `$METRON_HOME/bin/geo_enrichment_load.sh` will retrieve MaxMind GeoLite2 data and load data into HDFS, and update the configuration. diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java index c05d6fd89a..a84b5d7b4c 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java @@ -118,7 +118,7 @@ public Option apply(@Nullable String s) { @Nullable @Override public Option apply(@Nullable String s) { - Option o = new Option(s, "threads", true, "The batch size to use for hbase puts"); + Option o = new Option(s, "threads", true, "The number of threads to use when extracting data"); o.setArgName("NUM_THREADS"); o.setRequired(false); return o; @@ -128,7 +128,7 @@ public Option apply(@Nullable String s) { @Nullable @Override public Option apply(@Nullable String s) { - Option o = new Option(s, "batchSize", true, "The batch size to use for hbase puts"); + Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts"); o.setArgName("SIZE"); o.setRequired(false); return o; From c6ca3a86881eb77bc9598a61e3c0cf8280ccb03f Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 27 Jan 2017 18:39:56 -0500 Subject: [PATCH 3/9] Updating docs. --- metron-platform/metron-data-management/README.md | 4 ++-- .../nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md index 7a9836cd05..26dd4721f2 100644 --- a/metron-platform/metron-data-management/README.md +++ b/metron-platform/metron-data-management/README.md @@ -249,8 +249,8 @@ The parameters for the utility are as follows: | -i | --input | Yes | The input data location on local disk. If this is a file, then that file will be loaded. If this is a directory, then the files will be loaded recursively under that directory. | | | -l | --log4j | No | The log4j properties file to load | | | -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. | | -| -p | --threads | No | The number of threads to use when extracting data | | -| -b | --batchSize | No | The batch size to use for HBase puts | | +| -p | --threads | No | The number of threads to use when extracting data. The default is the number of cores. | | +| -b | --batchSize | No | The batch size to use for HBase puts | | ### GeoLite2 Loader The shell script `$METRON_HOME/bin/geo_enrichment_load.sh` will retrieve MaxMind GeoLite2 data and load data into HDFS, and update the configuration. diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java index a84b5d7b4c..24c16cc82d 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java @@ -118,7 +118,7 @@ public Option apply(@Nullable String s) { @Nullable @Override public Option apply(@Nullable String s) { - Option o = new Option(s, "threads", true, "The number of threads to use when extracting data"); + Option o = new Option(s, "threads", true, "The number of threads to use when extracting data. The default is the number of cores of your machine."); o.setArgName("NUM_THREADS"); o.setRequired(false); return o; From 8c9a79cdfa38ea2fbd161095d5e346147558ec5f Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 27 Jan 2017 22:36:31 -0500 Subject: [PATCH 4/9] Investigating integration tests. --- .../integration/ElasticsearchIndexingIntegrationTest.java | 1 + .../metron/indexing/integration/IndexingIntegrationTest.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index 7e9f23129d..acc1565e5c 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -85,6 +85,7 @@ public ReadinessState process(ComponentRunner runner) { return ReadinessState.READY; } } else { + System.out.println("Missed index..."); return ReadinessState.NOT_READY; } } diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index 03ae9ffd8f..a93c442ab0 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -205,6 +205,7 @@ public void test() throws Exception { private void waitForIndex(String zookeeperQuorum) throws Exception { try(CuratorFramework client = getClient(zookeeperQuorum)) { client.start(); + System.out.println("Waiting for zookeeper..."); byte[] bytes = null; do { try { @@ -216,6 +217,7 @@ private void waitForIndex(String zookeeperQuorum) throws Exception { } } while(bytes == null || bytes.length == 0); + System.out.println("Found index config in zookeeper..."); } } From 315bd181aa634290ab987441d81c28addb7952e2 Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 27 Jan 2017 23:09:28 -0500 Subject: [PATCH 5/9] Update integration test to be a proper integration test. --- .../SimpleEnrichmentFlatFileLoader.java | 1 - .../SimpleEnrichmentFlatFileLoaderTest.java | 180 ++++++++++-------- 2 files changed, 99 insertions(+), 82 deletions(-) diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java index 24c16cc82d..9992422db3 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java @@ -236,7 +236,6 @@ public void load( final Iterable> streams , int numThreads ) { - System.out.println("Number of threads: " + numThreads); for(Stream stream : streams) { try { ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads); diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java index b4891aa9e0..4ffb91a537 100644 --- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java +++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java @@ -17,6 +17,7 @@ */ package org.apache.metron.dataloads.nonbulk.flatfile; +import com.google.common.collect.ImmutableList; import org.adrianwalker.multilinestring.Multiline; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.PosixParser; @@ -56,91 +57,108 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Stream; public class SimpleEnrichmentFlatFileLoaderTest { - private HBaseTestingUtility testUtil; - - /** The test table. */ - private HTable testTable; - private String tableName = "enrichment"; - private String cf = "cf"; - private String csvFile="input.csv"; - private String extractorJson = "extractor.json"; - private String enrichmentJson = "enrichment_config.json"; - private String log4jProperty = "log4j"; - - Configuration config = null; - /** - { - "config" : { - "columns" : { - "host" : 0, - "meta" : 2 - }, - "indicator_column" : "host", - "separator" : ",", - "type" : "enrichment" - }, - "extractor" : "CSV" - } - */ - @Multiline - private static String extractorConfig; - - @Before - public void setup() throws Exception { - Map.Entry kv = HBaseUtil.INSTANCE.create(true); - config = kv.getValue(); - testUtil = kv.getKey(); - testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf)); + private HBaseTestingUtility testUtil; + + /** The test table. */ + private HTable testTable; + private String tableName = "enrichment"; + private String cf = "cf"; + private String csvFile="input.csv"; + private String extractorJson = "extractor.json"; + private String enrichmentJson = "enrichment_config.json"; + private String log4jProperty = "log4j"; + + Configuration config = null; + /** + { + "config" : { + "columns" : { + "host" : 0, + "meta" : 2 + }, + "indicator_column" : "host", + "separator" : ",", + "type" : "enrichment" + }, + "extractor" : "CSV" } - - @After - public void teardown() throws Exception { - HBaseUtil.INSTANCE.teardown(testUtil); - } - - @Test - public void testCommandLine() throws Exception { - Configuration conf = HBaseConfiguration.create(); - - String[] argv = {"-c cf", "-t enrichment", "-e extractor.json", "-n enrichment_config.json", "-l log4j", "-i input.csv"}; - String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs(); - - CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs); - Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim()); - Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim()); - Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim()); - Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim()); - Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim()); - Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim()); - } - - @Test - public void test() throws Exception { - - Assert.assertNotNull(testTable); - String contents = "google.com,1,foo"; - - EnrichmentConverter converter = new EnrichmentConverter(); - ExtractorHandler handler = ExtractorHandler.load(extractorConfig); - Extractor e = handler.getExtractor(); - File file = new File (contents); - SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader(); - testTable.put(loader.extract(contents, e, cf, converter)); - - ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf)); - List> results = new ArrayList<>(); - for(Result r : scanner) { - results.add(converter.fromResult(r, cf)); - } - Assert.assertEquals(1, results.size()); - Assert.assertEquals(results.get(0).getKey().indicator, "google.com"); - Assert.assertEquals(results.get(0).getKey().type, "enrichment"); - Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2); - Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo"); - Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com"); + */ + @Multiline + private static String extractorConfig; + + @Before + public void setup() throws Exception { + Map.Entry kv = HBaseUtil.INSTANCE.create(true); + config = kv.getValue(); + testUtil = kv.getKey(); + testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf)); + } + + @After + public void teardown() throws Exception { + HBaseUtil.INSTANCE.teardown(testUtil); + } + + @Test + public void testCommandLine() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + String[] argv = { "-c cf", "-t enrichment" + , "-e extractor.json", "-n enrichment_config.json" + , "-l log4j", "-i input.csv" + , "-p 2", "-b 128" + }; + String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs(); + + CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs); + Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim()); + Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim()); + Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim()); + Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim()); + Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim()); + Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim()); + Assert.assertEquals("2", SimpleEnrichmentFlatFileLoader.LoadOptions.NUM_THREADS.get(cli).trim()); + Assert.assertEquals("128", SimpleEnrichmentFlatFileLoader.LoadOptions.BATCH_SIZE.get(cli).trim()); + } + + @Test + public void test() throws Exception { + + Assert.assertNotNull(testTable); + String contents = "google.com,1,foo"; + + EnrichmentConverter converter = new EnrichmentConverter(); + ExtractorHandler handler = ExtractorHandler.load(extractorConfig); + Extractor e = handler.getExtractor(); + SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader(); + Stream contentStreams = ImmutableList.of(contents).stream(); + ThreadLocal state = new ThreadLocal() { + @Override + protected ExtractorState initialValue() { + return new ExtractorState(testTable, e, converter); + } + }; + loader.load(ImmutableList.of(contentStreams) + , state + , cf + , 2 + ); + + ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf)); + List> results = new ArrayList<>(); + for(Result r : scanner) { + results.add(converter.fromResult(r, cf)); } + Assert.assertEquals(1, results.size()); + Assert.assertEquals(results.get(0).getKey().indicator, "google.com"); + Assert.assertEquals(results.get(0).getKey().type, "enrichment"); + Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2); + Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo"); + Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com"); + } } From 004c6f41b6c1cc3ecea70513e1a468501bd32e3c Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 27 Jan 2017 23:49:37 -0500 Subject: [PATCH 6/9] Adding spliterator unit test for completeness --- .../utils/file/ReaderSpliteratorTest.java | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java new file mode 100644 index 0000000000..b1ddede4eb --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.utils.file; + +import org.adrianwalker.multilinestring.Multiline; +import org.junit.Assert; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.StringReader; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ReaderSpliteratorTest { + /** + foo + bar + grok + foo + the + and + grok + foo + bar + */ + @Multiline + public static String data; + + @Test + public void testParallelStreamSmallBatch() { + Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 2); + Map count = + stream.parallel().map( s -> s.trim()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(5, count.size()); + Assert.assertEquals(3, (int)count.get("foo")); + Assert.assertEquals(2, (int)count.get("bar")); + Assert.assertEquals(1, (int)count.get("and")); + Assert.assertEquals(1, (int)count.get("the")); + } + + @Test + public void testParallelStreamLargeBatch() { + Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 100); + Map count = + stream.parallel().map( s -> s.trim()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(5, count.size()); + Assert.assertEquals(3, (int)count.get("foo")); + Assert.assertEquals(2, (int)count.get("bar")); + Assert.assertEquals(1, (int)count.get("and")); + Assert.assertEquals(1, (int)count.get("the")); + } + + @Test + public void testSequentialStreamLargeBatch() { + Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 100); + Map count = + stream.map( s -> s.trim()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(5, count.size()); + Assert.assertEquals(3, (int)count.get("foo")); + Assert.assertEquals(2, (int)count.get("bar")); + Assert.assertEquals(1, (int)count.get("and")); + Assert.assertEquals(1, (int)count.get("the")); + } + + @Test + public void testActuallyParallel() throws ExecutionException, InterruptedException { + //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most min(5, 2) = 2 threads will be used + Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 2); + ForkJoinPool forkJoinPool = new ForkJoinPool(2); + forkJoinPool.submit(() -> { + Map threads= + stream.parallel().map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() <= 2); + } + ).get(); + } + + @Test + public void testActuallyParallel_mediumBatch() throws ExecutionException, InterruptedException { + //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used + Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 2); + ForkJoinPool forkJoinPool = new ForkJoinPool(10); + forkJoinPool.submit(() -> { + Map threads= + stream.parallel().map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() <= (int)Math.ceil(9.0/2) && threads.size() > 1); + } + ).get(); + } + + @Test + public void testActuallyParallel_bigBatch() throws ExecutionException, InterruptedException { + //With 9 elements and a batch of 10, we should only have one batch, so only one thread will be used + //despite the thread pool size of 2. + Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 10); + ForkJoinPool forkJoinPool = new ForkJoinPool(2); + forkJoinPool.submit(() -> { + Map threads= + stream.parallel().map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(1, threads.size()); + } + ).get(); + } +} From f8dd48ef920c948e1fc5ff736e386f641e551b2b Mon Sep 17 00:00:00 2001 From: cstella Date: Sat, 28 Jan 2017 00:01:42 -0500 Subject: [PATCH 7/9] Updating test to use a proper file --- .../utils/file/ReaderSpliteratorTest.java | 150 ++++++++++-------- 1 file changed, 88 insertions(+), 62 deletions(-) diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java index b1ddede4eb..cf259d8fb2 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java @@ -19,10 +19,14 @@ import org.adrianwalker.multilinestring.Multiline; import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; -import java.io.BufferedReader; -import java.io.StringReader; +import java.io.*; +import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.StandardOpenOption; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; @@ -43,86 +47,108 @@ public class ReaderSpliteratorTest { */ @Multiline public static String data; + public static final File dataFile = new File("target/readerspliteratortest.data"); + + @BeforeClass + public static void setup() throws IOException { + if(dataFile.exists()) { + dataFile.delete(); + } + Files.write(dataFile.toPath(), data.getBytes(), StandardOpenOption.CREATE_NEW, StandardOpenOption.TRUNCATE_EXISTING); + dataFile.deleteOnExit(); + } + + public static BufferedReader getReader() throws FileNotFoundException { + return new BufferedReader(new FileReader(dataFile)); + } @Test - public void testParallelStreamSmallBatch() { - Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 2); - Map count = - stream.parallel().map( s -> s.trim()) - .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); - Assert.assertEquals(5, count.size()); - Assert.assertEquals(3, (int)count.get("foo")); - Assert.assertEquals(2, (int)count.get("bar")); - Assert.assertEquals(1, (int)count.get("and")); - Assert.assertEquals(1, (int)count.get("the")); + public void testParallelStreamSmallBatch() throws FileNotFoundException { + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2)) { + + Map count = + stream.parallel().map( s -> s.trim()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(5, count.size()); + Assert.assertEquals(3, (int)count.get("foo")); + Assert.assertEquals(2, (int)count.get("bar")); + Assert.assertEquals(1, (int)count.get("and")); + Assert.assertEquals(1, (int)count.get("the")); + } } @Test - public void testParallelStreamLargeBatch() { - Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 100); - Map count = - stream.parallel().map( s -> s.trim()) - .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); - Assert.assertEquals(5, count.size()); - Assert.assertEquals(3, (int)count.get("foo")); - Assert.assertEquals(2, (int)count.get("bar")); - Assert.assertEquals(1, (int)count.get("and")); - Assert.assertEquals(1, (int)count.get("the")); + public void testParallelStreamLargeBatch() throws FileNotFoundException { + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 100)) { + Map count = + stream.parallel().map(s -> s.trim()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(5, count.size()); + Assert.assertEquals(3, (int) count.get("foo")); + Assert.assertEquals(2, (int) count.get("bar")); + Assert.assertEquals(1, (int) count.get("and")); + Assert.assertEquals(1, (int) count.get("the")); + } } @Test - public void testSequentialStreamLargeBatch() { - Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 100); - Map count = - stream.map( s -> s.trim()) - .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); - Assert.assertEquals(5, count.size()); - Assert.assertEquals(3, (int)count.get("foo")); - Assert.assertEquals(2, (int)count.get("bar")); - Assert.assertEquals(1, (int)count.get("and")); - Assert.assertEquals(1, (int)count.get("the")); + public void testSequentialStreamLargeBatch() throws FileNotFoundException { + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 100)) { + Map count = + stream.map(s -> s.trim()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(5, count.size()); + Assert.assertEquals(3, (int) count.get("foo")); + Assert.assertEquals(2, (int) count.get("bar")); + Assert.assertEquals(1, (int) count.get("and")); + Assert.assertEquals(1, (int) count.get("the")); + } } @Test - public void testActuallyParallel() throws ExecutionException, InterruptedException { + public void testActuallyParallel() throws ExecutionException, InterruptedException, FileNotFoundException { //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most min(5, 2) = 2 threads will be used - Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 2); - ForkJoinPool forkJoinPool = new ForkJoinPool(2); - forkJoinPool.submit(() -> { - Map threads= - stream.parallel().map(s -> Thread.currentThread().getName()) - .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); - Assert.assertTrue(threads.size() <= 2); - } - ).get(); + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(2); + forkJoinPool.submit(() -> { + Map threads = + stream.parallel().map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() <= 2); + } + ).get(); + } } @Test - public void testActuallyParallel_mediumBatch() throws ExecutionException, InterruptedException { + public void testActuallyParallel_mediumBatch() throws ExecutionException, InterruptedException, FileNotFoundException { //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used - Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 2); - ForkJoinPool forkJoinPool = new ForkJoinPool(10); - forkJoinPool.submit(() -> { - Map threads= - stream.parallel().map(s -> Thread.currentThread().getName()) - .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); - Assert.assertTrue(threads.size() <= (int)Math.ceil(9.0/2) && threads.size() > 1); - } - ).get(); + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(10); + forkJoinPool.submit(() -> { + Map threads = + stream.parallel().map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1); + } + ).get(); + } } @Test - public void testActuallyParallel_bigBatch() throws ExecutionException, InterruptedException { + public void testActuallyParallel_bigBatch() throws ExecutionException, InterruptedException, FileNotFoundException { //With 9 elements and a batch of 10, we should only have one batch, so only one thread will be used //despite the thread pool size of 2. - Stream stream = ReaderSpliterator.lineStream(new BufferedReader(new StringReader(data)), 10); - ForkJoinPool forkJoinPool = new ForkJoinPool(2); - forkJoinPool.submit(() -> { - Map threads= - stream.parallel().map(s -> Thread.currentThread().getName()) - .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); - Assert.assertEquals(1, threads.size()); - } - ).get(); + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 10)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(2); + forkJoinPool.submit(() -> { + Map threads = + stream.parallel().map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(1, threads.size()); + } + ).get(); + } } + } From 9b04f9723d442c8f4fb7a8bcaa1d733fc1305dc4 Mon Sep 17 00:00:00 2001 From: cstella Date: Sat, 28 Jan 2017 00:17:12 -0500 Subject: [PATCH 8/9] Updating docs and renaming a few things. --- .../common/utils/file/ReaderSpliterator.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java index d10146ba65..ebb4fad207 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java @@ -27,6 +27,16 @@ import static java.util.Spliterators.spliterator; +/** + * A Spliterator which works well on sequential streams by constructing a + * fixed batch size split rather than inheriting the spliterator from BufferedReader.lines() + * which gives up and reports no size and has no strategy for batching. This is a bug + * in Java 8 and will be fixed in Java 9. + * + * The ideas have been informed by https://www.airpair.com/java/posts/parallel-processing-of-io-based-data-with-java-streams + * except more specific to strings and motivated by a JDK 8 bug as + * described at http://bytefish.de/blog/jdk8_files_lines_parallel_stream/ + */ public class ReaderSpliterator implements Spliterator { private static int characteristics = NONNULL | ORDERED | IMMUTABLE; private int batchSize ; @@ -127,7 +137,7 @@ public boolean tryAdvance(Consumer action) { */ @Override public Spliterator trySplit() { - final HoldingConsumer holder = new HoldingConsumer<>(); + final ConsumerWithLookback holder = new ConsumerWithLookback(); if (!tryAdvance(holder)) { return null; } @@ -191,7 +201,7 @@ public int characteristics() { return characteristics; } - static class HoldingConsumer implements Consumer { + static class ConsumerWithLookback implements Consumer { String value; /** * Performs this operation on the given argument. From eb5b82cc35bd767a169f548ea8144dd9ae165f84 Mon Sep 17 00:00:00 2001 From: cstella Date: Sat, 28 Jan 2017 00:23:25 -0500 Subject: [PATCH 9/9] Update one more test case. --- .../common/utils/file/ReaderSpliterator.java | 6 +++- .../utils/file/ReaderSpliteratorTest.java | 31 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java index ebb4fad207..20a40fa76f 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java @@ -215,7 +215,11 @@ public void accept(String string) { } public static Stream lineStream(BufferedReader in, int batchSize) { - return StreamSupport.stream(new ReaderSpliterator(in, batchSize), false) + return lineStream(in, batchSize, false); + } + + public static Stream lineStream(BufferedReader in, int batchSize, boolean isParallel) { + return StreamSupport.stream(new ReaderSpliterator(in, batchSize), isParallel) .onClose(() -> { try { in.close(); diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java index cf259d8fb2..965840f360 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java @@ -135,6 +135,37 @@ public void testActuallyParallel_mediumBatch() throws ExecutionException, Interr } } + @Test + public void testActuallyParallel_mediumBatchNotImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException { + //Since this is not parallel and we're not making the stream itself parallel, we should only use one thread from the thread pool. + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2, false)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(10); + forkJoinPool.submit(() -> { + Map threads = + stream.map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() == 1); + } + ).get(); + } + } + + @Test + public void testActuallyParallel_mediumBatchImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException { + //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used + //despite not calling .parallel() on the stream, we are constructing the stream to be implicitly parallel + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2, true)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(10); + forkJoinPool.submit(() -> { + Map threads = + stream.map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1); + } + ).get(); + } + } + @Test public void testActuallyParallel_bigBatch() throws ExecutionException, InterruptedException, FileNotFoundException { //With 9 elements and a batch of 10, we should only have one batch, so only one thread will be used