From ad8724eed0af784fcd6a822a11842d86aefc8832 Mon Sep 17 00:00:00 2001 From: cstella Date: Tue, 31 Jan 2017 15:19:55 -0500 Subject: [PATCH] METRON-678: Multithread the flat file loader closes apache/incubator-metron#428 --- .../common/utils/file/ReaderSpliterator.java | 232 ++++++++++++++++++ .../utils/file/ReaderSpliteratorTest.java | 185 ++++++++++++++ .../metron-data-management/README.md | 21 +- .../nonbulk/flatfile/ExtractorState.java | 46 ++++ .../SimpleEnrichmentFlatFileLoader.java | 116 +++++++-- .../SimpleEnrichmentFlatFileLoaderTest.java | 180 ++++++++------ .../ElasticsearchIndexingIntegrationTest.java | 1 + .../integration/IndexingIntegrationTest.java | 2 + 8 files changed, 669 insertions(+), 114 deletions(-) create mode 100644 metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java create mode 100644 metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java create mode 100644 metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java new file mode 100644 index 0000000000..20a40fa76f --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.utils.file; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.util.Spliterators.spliterator; + +/** + * A Spliterator which works well on sequential streams by constructing a + * fixed batch size split rather than inheriting the spliterator from BufferedReader.lines() + * which gives up and reports no size and has no strategy for batching. This is a bug + * in Java 8 and will be fixed in Java 9. + * + * The ideas have been informed by https://www.airpair.com/java/posts/parallel-processing-of-io-based-data-with-java-streams + * except more specific to strings and motivated by a JDK 8 bug as + * described at http://bytefish.de/blog/jdk8_files_lines_parallel_stream/ + */ +public class ReaderSpliterator implements Spliterator { + private static int characteristics = NONNULL | ORDERED | IMMUTABLE; + private int batchSize ; + private BufferedReader reader; + public ReaderSpliterator(BufferedReader reader) { + this(reader, 128); + } + + public ReaderSpliterator(BufferedReader reader, int batchSize) { + this.batchSize = batchSize; + this.reader = reader; + } + + @Override + public void forEachRemaining(Consumer action) { + if (action == null) { + throw new NullPointerException(); + } + try { + for (String line = null; (line = reader.readLine()) != null;) { + action.accept(line); + } + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + /** + * If a remaining element exists, performs the given action on it, + * returning {@code true}; else returns {@code false}. If this + * Spliterator is {@link #ORDERED} the action is performed on the + * next element in encounter order. Exceptions thrown by the + * action are relayed to the caller. + * + * @param action The action + * @return {@code false} if no remaining elements existed + * upon entry to this method, else {@code true}. + * @throws NullPointerException if the specified action is null + */ + @Override + public boolean tryAdvance(Consumer action) { + if (action == null) { + throw new NullPointerException(); + } + try { + final String line = reader.readLine(); + if (line == null) { + return false; + } + action.accept(line); + return true; + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + /** + * If this spliterator can be partitioned, returns a Spliterator + * covering elements, that will, upon return from this method, not + * be covered by this Spliterator. + *

+ *

If this Spliterator is {@link #ORDERED}, the returned Spliterator + * must cover a strict prefix of the elements. + *

+ *

Unless this Spliterator covers an infinite number of elements, + * repeated calls to {@code trySplit()} must eventually return {@code null}. + * Upon non-null return: + *

+ *

+ *

This method may return {@code null} for any reason, + * including emptiness, inability to split after traversal has + * commenced, data structure constraints, and efficiency + * considerations. + * + * @return a {@code Spliterator} covering some portion of the + * elements, or {@code null} if this spliterator cannot be split + * @apiNote An ideal {@code trySplit} method efficiently (without + * traversal) divides its elements exactly in half, allowing + * balanced parallel computation. Many departures from this ideal + * remain highly effective; for example, only approximately + * splitting an approximately balanced tree, or for a tree in + * which leaf nodes may contain either one or two elements, + * failing to further split these nodes. However, large + * deviations in balance and/or overly inefficient {@code + * trySplit} mechanics typically result in poor parallel + * performance. + */ + @Override + public Spliterator trySplit() { + final ConsumerWithLookback holder = new ConsumerWithLookback(); + if (!tryAdvance(holder)) { + return null; + } + final String[] batch = new String[batchSize]; + int j = 0; + do { + batch[j] = holder.value; + } + while (++j < batchSize && tryAdvance(holder)); + return spliterator(batch, 0, j, characteristics() | SIZED); + } + + /** + * Returns an estimate of the number of elements that would be + * encountered by a {@link #forEachRemaining} traversal, or returns {@link + * Long#MAX_VALUE} if infinite, unknown, or too expensive to compute. + *

+ *

If this Spliterator is {@link #SIZED} and has not yet been partially + * traversed or split, or this Spliterator is {@link #SUBSIZED} and has + * not yet been partially traversed, this estimate must be an accurate + * count of elements that would be encountered by a complete traversal. + * Otherwise, this estimate may be arbitrarily inaccurate, but must decrease + * as specified across invocations of {@link #trySplit}. + * + * @return the estimated size, or {@code Long.MAX_VALUE} if infinite, + * unknown, or too expensive to compute. + * @apiNote Even an inexact estimate is often useful and inexpensive to compute. + * For example, a sub-spliterator of an approximately balanced binary tree + * may return a value that estimates the number of elements to be half of + * that of its parent; if the root Spliterator does not maintain an + * accurate count, it could estimate size to be the power of two + * corresponding to its maximum depth. + */ + @Override + public long estimateSize() { + return Long.MAX_VALUE; + } + + /** + * Returns a set of characteristics of this Spliterator and its + * elements. The result is represented as ORed values from {@link + * #ORDERED}, {@link #DISTINCT}, {@link #SORTED}, {@link #SIZED}, + * {@link #NONNULL}, {@link #IMMUTABLE}, {@link #CONCURRENT}, + * {@link #SUBSIZED}. Repeated calls to {@code characteristics()} on + * a given spliterator, prior to or in-between calls to {@code trySplit}, + * should always return the same result. + *

+ *

If a Spliterator reports an inconsistent set of + * characteristics (either those returned from a single invocation + * or across multiple invocations), no guarantees can be made + * about any computation using this Spliterator. + * + * @return a representation of characteristics + * @apiNote The characteristics of a given spliterator before splitting + * may differ from the characteristics after splitting. For specific + * examples see the characteristic values {@link #SIZED}, {@link #SUBSIZED} + * and {@link #CONCURRENT}. + */ + @Override + public int characteristics() { + return characteristics; + } + + static class ConsumerWithLookback implements Consumer { + String value; + /** + * Performs this operation on the given argument. + * + * @param string the input argument + */ + @Override + public void accept(String string) { + this.value = string; + } + } + + public static Stream lineStream(BufferedReader in, int batchSize) { + return lineStream(in, batchSize, false); + } + + public static Stream lineStream(BufferedReader in, int batchSize, boolean isParallel) { + return StreamSupport.stream(new ReaderSpliterator(in, batchSize), isParallel) + .onClose(() -> { + try { + in.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + ); + } +} diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java new file mode 100644 index 0000000000..965840f360 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.utils.file; + +import org.adrianwalker.multilinestring.Multiline; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.StandardOpenOption; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ReaderSpliteratorTest { + /** + foo + bar + grok + foo + the + and + grok + foo + bar + */ + @Multiline + public static String data; + public static final File dataFile = new File("target/readerspliteratortest.data"); + + @BeforeClass + public static void setup() throws IOException { + if(dataFile.exists()) { + dataFile.delete(); + } + Files.write(dataFile.toPath(), data.getBytes(), StandardOpenOption.CREATE_NEW, StandardOpenOption.TRUNCATE_EXISTING); + dataFile.deleteOnExit(); + } + + public static BufferedReader getReader() throws FileNotFoundException { + return new BufferedReader(new FileReader(dataFile)); + } + + @Test + public void testParallelStreamSmallBatch() throws FileNotFoundException { + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2)) { + + Map count = + stream.parallel().map( s -> s.trim()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(5, count.size()); + Assert.assertEquals(3, (int)count.get("foo")); + Assert.assertEquals(2, (int)count.get("bar")); + Assert.assertEquals(1, (int)count.get("and")); + Assert.assertEquals(1, (int)count.get("the")); + } + } + + @Test + public void testParallelStreamLargeBatch() throws FileNotFoundException { + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 100)) { + Map count = + stream.parallel().map(s -> s.trim()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(5, count.size()); + Assert.assertEquals(3, (int) count.get("foo")); + Assert.assertEquals(2, (int) count.get("bar")); + Assert.assertEquals(1, (int) count.get("and")); + Assert.assertEquals(1, (int) count.get("the")); + } + } + + @Test + public void testSequentialStreamLargeBatch() throws FileNotFoundException { + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 100)) { + Map count = + stream.map(s -> s.trim()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(5, count.size()); + Assert.assertEquals(3, (int) count.get("foo")); + Assert.assertEquals(2, (int) count.get("bar")); + Assert.assertEquals(1, (int) count.get("and")); + Assert.assertEquals(1, (int) count.get("the")); + } + } + + @Test + public void testActuallyParallel() throws ExecutionException, InterruptedException, FileNotFoundException { + //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most min(5, 2) = 2 threads will be used + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(2); + forkJoinPool.submit(() -> { + Map threads = + stream.parallel().map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() <= 2); + } + ).get(); + } + } + + @Test + public void testActuallyParallel_mediumBatch() throws ExecutionException, InterruptedException, FileNotFoundException { + //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(10); + forkJoinPool.submit(() -> { + Map threads = + stream.parallel().map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1); + } + ).get(); + } + } + + @Test + public void testActuallyParallel_mediumBatchNotImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException { + //Since this is not parallel and we're not making the stream itself parallel, we should only use one thread from the thread pool. + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2, false)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(10); + forkJoinPool.submit(() -> { + Map threads = + stream.map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() == 1); + } + ).get(); + } + } + + @Test + public void testActuallyParallel_mediumBatchImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException { + //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used + //despite not calling .parallel() on the stream, we are constructing the stream to be implicitly parallel + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 2, true)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(10); + forkJoinPool.submit(() -> { + Map threads = + stream.map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1); + } + ).get(); + } + } + + @Test + public void testActuallyParallel_bigBatch() throws ExecutionException, InterruptedException, FileNotFoundException { + //With 9 elements and a batch of 10, we should only have one batch, so only one thread will be used + //despite the thread pool size of 2. + try( Stream stream = ReaderSpliterator.lineStream(getReader(), 10)) { + ForkJoinPool forkJoinPool = new ForkJoinPool(2); + forkJoinPool.submit(() -> { + Map threads = + stream.parallel().map(s -> Thread.currentThread().getName()) + .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); + Assert.assertEquals(1, threads.size()); + } + ).get(); + } + } + +} diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md index a0c0164686..26dd4721f2 100644 --- a/metron-platform/metron-data-management/README.md +++ b/metron-platform/metron-data-management/README.md @@ -240,16 +240,17 @@ each document to be considered as input to the Extractor. The parameters for the utility are as follows: -| Short Code | Long Code | Is Required? | Description | -|------------|---------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| -h | | No | Generate the help screen/set of options | -| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source | -| -t | --hbase_table | Yes | The HBase table to import into | -| -c | --hbase_cf | Yes | The HBase table column family to import into | -| -i | --input | Yes | The input data location on local disk. If this is a file, then that file will be loaded. If this is a directory, then the files will be loaded recursively under that directory. | -| -l | --log4j | No | The log4j properties file to load | -| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. | - +| Short Code | Long Code | Is Required? | Description | | +|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---| +| -h | | No | Generate the help screen/set of options | | +| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source | | +| -t | --hbase_table | Yes | The HBase table to import into | | +| -c | --hbase_cf | Yes | The HBase table column family to import into | | +| -i | --input | Yes | The input data location on local disk. If this is a file, then that file will be loaded. If this is a directory, then the files will be loaded recursively under that directory. | | +| -l | --log4j | No | The log4j properties file to load | | +| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. | | +| -p | --threads | No | The number of threads to use when extracting data. The default is the number of cores. | | +| -b | --batchSize | No | The batch size to use for HBase puts | | ### GeoLite2 Loader The shell script `$METRON_HOME/bin/geo_enrichment_load.sh` will retrieve MaxMind GeoLite2 data and load data into HDFS, and update the configuration. diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java new file mode 100644 index 0000000000..e44eb27175 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile; + +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.metron.dataloads.extractor.Extractor; +import org.apache.metron.enrichment.converter.HbaseConverter; + +public class ExtractorState { + private HTableInterface table; + private Extractor extractor; + private HbaseConverter converter; + + public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter) { + this.table = table; + this.extractor = extractor; + this.converter = converter; + } + + public HTableInterface getTable() { + return table; + } + + public Extractor getExtractor() { + return extractor; + } + + public HbaseConverter getConverter() { + return converter; + } +} diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java index 0c7501a9d7..9992422db3 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java @@ -20,6 +20,7 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import org.apache.commons.cli.*; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.log4j.PropertyConfigurator; +import org.apache.metron.common.utils.ConversionUtils; +import org.apache.metron.common.utils.file.ReaderSpliterator; import org.apache.metron.dataloads.extractor.Extractor; import org.apache.metron.dataloads.extractor.ExtractorHandler; import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat; @@ -39,13 +42,13 @@ import org.apache.metron.common.utils.JSONUtils; import javax.annotation.Nullable; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; +import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.Stack; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Stream; public class SimpleEnrichmentFlatFileLoader { private static abstract class OptionHandler implements Function {} @@ -111,6 +114,26 @@ public Option apply(@Nullable String s) { return o; } }) + ,NUM_THREADS("p", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "threads", true, "The number of threads to use when extracting data. The default is the number of cores of your machine."); + o.setArgName("NUM_THREADS"); + o.setRequired(false); + return o; + } + }) + ,BATCH_SIZE("b", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts"); + o.setArgName("SIZE"); + o.setRequired(false); + return o; + } + }) ,INPUT("i", new OptionHandler() { @Nullable @Override @@ -207,25 +230,55 @@ public List extract( String line return ret; } - - public void loadFile( File inputFile - , Extractor extractor - , HTableInterface table - , String cf - , HbaseConverter converter - , boolean lineByLine - ) throws IOException + public void load( final Iterable> streams + , final ThreadLocal state + , final String cf + , int numThreads + ) { + for(Stream stream : streams) { + try { + ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads); + forkJoinPool.submit(() -> + stream.parallel().forEach(input -> { + ExtractorState es = state.get(); + try { + es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter())); + } catch (IOException e) { + throw new IllegalStateException("Unable to continue: " + e.getMessage(), e); + } + } + ) + ).get(); + } catch (InterruptedException e) { + throw new IllegalStateException(e.getMessage(), e); + } catch (ExecutionException e) { + throw new IllegalStateException(e.getMessage(), e); + } finally { + stream.close(); + } + } + } + + private static Iterable> streamify(List files, int batchSize, boolean lineByLine) throws FileNotFoundException { + List> ret = new ArrayList<>(); if(!lineByLine) { - table.put(extract(FileUtils.readFileToString(inputFile), extractor, cf, converter)); + ret.add(files.stream().map(f -> { + try { + return FileUtils.readFileToString(f); + } catch (IOException e) { + throw new IllegalStateException("File " + f.getName() + " not found."); + } + })); } else { - BufferedReader br = new BufferedReader(new FileReader(inputFile)); - for(String line = null;(line = br.readLine()) != null;) { - table.put(extract(line, extractor, cf, converter)); + for(File f : files) { + ret.add(ReaderSpliterator.lineStream(new BufferedReader(new FileReader(f)), batchSize)); } } + return ret; } + public static void main(String... argv) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs(); @@ -237,23 +290,40 @@ public static void main(String... argv) throws Exception { ExtractorHandler handler = ExtractorHandler.load( FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli))) ); + int batchSize = 128; + if(LoadOptions.BATCH_SIZE.has(cli)) { + batchSize = ConversionUtils.convert(LoadOptions.BATCH_SIZE.get(cli), Integer.class); + } + int numThreads = Runtime.getRuntime().availableProcessors(); + if(LoadOptions.NUM_THREADS.has(cli)) { + numThreads = ConversionUtils.convert(LoadOptions.NUM_THREADS.get(cli), Integer.class); + } boolean lineByLine = !handler.getInputFormatHandler().getClass().equals(WholeFileFormat.class); - Extractor e = handler.getExtractor(); SensorEnrichmentUpdateConfig sensorEnrichmentUpdateConfig = null; if(LoadOptions.ENRICHMENT_CONFIG.has(cli)) { sensorEnrichmentUpdateConfig = JSONUtils.INSTANCE.load( new File(LoadOptions.ENRICHMENT_CONFIG.get(cli)) , SensorEnrichmentUpdateConfig.class ); } - HbaseConverter converter = new EnrichmentConverter(); List inputFiles = getFiles(new File(LoadOptions.INPUT.get(cli))); SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader(); - HTableInterface table = loader.getProvider() - .getTable(conf, LoadOptions.HBASE_TABLE.get(cli)); + ThreadLocal state = new ThreadLocal() { + @Override + protected ExtractorState initialValue() { + try { + ExtractorHandler handler = ExtractorHandler.load( + FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli))) + ); + HTableInterface table = loader.getProvider().getTable(conf, LoadOptions.HBASE_TABLE.get(cli)); + return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter()); + } catch (IOException e1) { + throw new IllegalStateException("Unable to get table: " + e1); + } + } + }; + + loader.load(streamify(inputFiles, batchSize, lineByLine), state, LoadOptions.HBASE_CF.get(cli), numThreads); - for (File f : inputFiles) { - loader.loadFile(f, e, table, LoadOptions.HBASE_CF.get(cli), converter, lineByLine); - } if(sensorEnrichmentUpdateConfig != null) { sensorEnrichmentUpdateConfig.updateSensorConfigs(); } diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java index b4891aa9e0..4ffb91a537 100644 --- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java +++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java @@ -17,6 +17,7 @@ */ package org.apache.metron.dataloads.nonbulk.flatfile; +import com.google.common.collect.ImmutableList; import org.adrianwalker.multilinestring.Multiline; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.PosixParser; @@ -56,91 +57,108 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Stream; public class SimpleEnrichmentFlatFileLoaderTest { - private HBaseTestingUtility testUtil; - - /** The test table. */ - private HTable testTable; - private String tableName = "enrichment"; - private String cf = "cf"; - private String csvFile="input.csv"; - private String extractorJson = "extractor.json"; - private String enrichmentJson = "enrichment_config.json"; - private String log4jProperty = "log4j"; - - Configuration config = null; - /** - { - "config" : { - "columns" : { - "host" : 0, - "meta" : 2 - }, - "indicator_column" : "host", - "separator" : ",", - "type" : "enrichment" - }, - "extractor" : "CSV" - } - */ - @Multiline - private static String extractorConfig; - - @Before - public void setup() throws Exception { - Map.Entry kv = HBaseUtil.INSTANCE.create(true); - config = kv.getValue(); - testUtil = kv.getKey(); - testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf)); + private HBaseTestingUtility testUtil; + + /** The test table. */ + private HTable testTable; + private String tableName = "enrichment"; + private String cf = "cf"; + private String csvFile="input.csv"; + private String extractorJson = "extractor.json"; + private String enrichmentJson = "enrichment_config.json"; + private String log4jProperty = "log4j"; + + Configuration config = null; + /** + { + "config" : { + "columns" : { + "host" : 0, + "meta" : 2 + }, + "indicator_column" : "host", + "separator" : ",", + "type" : "enrichment" + }, + "extractor" : "CSV" } - - @After - public void teardown() throws Exception { - HBaseUtil.INSTANCE.teardown(testUtil); - } - - @Test - public void testCommandLine() throws Exception { - Configuration conf = HBaseConfiguration.create(); - - String[] argv = {"-c cf", "-t enrichment", "-e extractor.json", "-n enrichment_config.json", "-l log4j", "-i input.csv"}; - String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs(); - - CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs); - Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim()); - Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim()); - Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim()); - Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim()); - Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim()); - Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim()); - } - - @Test - public void test() throws Exception { - - Assert.assertNotNull(testTable); - String contents = "google.com,1,foo"; - - EnrichmentConverter converter = new EnrichmentConverter(); - ExtractorHandler handler = ExtractorHandler.load(extractorConfig); - Extractor e = handler.getExtractor(); - File file = new File (contents); - SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader(); - testTable.put(loader.extract(contents, e, cf, converter)); - - ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf)); - List> results = new ArrayList<>(); - for(Result r : scanner) { - results.add(converter.fromResult(r, cf)); - } - Assert.assertEquals(1, results.size()); - Assert.assertEquals(results.get(0).getKey().indicator, "google.com"); - Assert.assertEquals(results.get(0).getKey().type, "enrichment"); - Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2); - Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo"); - Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com"); + */ + @Multiline + private static String extractorConfig; + + @Before + public void setup() throws Exception { + Map.Entry kv = HBaseUtil.INSTANCE.create(true); + config = kv.getValue(); + testUtil = kv.getKey(); + testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf)); + } + + @After + public void teardown() throws Exception { + HBaseUtil.INSTANCE.teardown(testUtil); + } + + @Test + public void testCommandLine() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + String[] argv = { "-c cf", "-t enrichment" + , "-e extractor.json", "-n enrichment_config.json" + , "-l log4j", "-i input.csv" + , "-p 2", "-b 128" + }; + String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs(); + + CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs); + Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim()); + Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim()); + Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim()); + Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim()); + Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim()); + Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim()); + Assert.assertEquals("2", SimpleEnrichmentFlatFileLoader.LoadOptions.NUM_THREADS.get(cli).trim()); + Assert.assertEquals("128", SimpleEnrichmentFlatFileLoader.LoadOptions.BATCH_SIZE.get(cli).trim()); + } + + @Test + public void test() throws Exception { + + Assert.assertNotNull(testTable); + String contents = "google.com,1,foo"; + + EnrichmentConverter converter = new EnrichmentConverter(); + ExtractorHandler handler = ExtractorHandler.load(extractorConfig); + Extractor e = handler.getExtractor(); + SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader(); + Stream contentStreams = ImmutableList.of(contents).stream(); + ThreadLocal state = new ThreadLocal() { + @Override + protected ExtractorState initialValue() { + return new ExtractorState(testTable, e, converter); + } + }; + loader.load(ImmutableList.of(contentStreams) + , state + , cf + , 2 + ); + + ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf)); + List> results = new ArrayList<>(); + for(Result r : scanner) { + results.add(converter.fromResult(r, cf)); } + Assert.assertEquals(1, results.size()); + Assert.assertEquals(results.get(0).getKey().indicator, "google.com"); + Assert.assertEquals(results.get(0).getKey().type, "enrichment"); + Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2); + Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo"); + Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com"); + } } diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index 7e9f23129d..acc1565e5c 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -85,6 +85,7 @@ public ReadinessState process(ComponentRunner runner) { return ReadinessState.READY; } } else { + System.out.println("Missed index..."); return ReadinessState.NOT_READY; } } diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index 03ae9ffd8f..a93c442ab0 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -205,6 +205,7 @@ public void test() throws Exception { private void waitForIndex(String zookeeperQuorum) throws Exception { try(CuratorFramework client = getClient(zookeeperQuorum)) { client.start(); + System.out.println("Waiting for zookeeper..."); byte[] bytes = null; do { try { @@ -216,6 +217,7 @@ private void waitForIndex(String zookeeperQuorum) throws Exception { } } while(bytes == null || bytes.length == 0); + System.out.println("Found index config in zookeeper..."); } }