From 6fdf67c962a4bcfeb43da9c1aa1318ace6b194c3 Mon Sep 17 00:00:00 2001 From: mattf-horton Date: Wed, 1 Feb 2017 16:54:17 -0800 Subject: [PATCH 1/4] METRON-322 The first easy changes: add batchTimeout most places batchSize is currently used. --- .../configuration/IndexingConfigurations.java | 29 +++++++++++++++---- .../writer/IndexingWriterConfiguration.java | 5 ++++ .../writer/ParserWriterConfiguration.java | 18 ++++++++++-- .../SingleBatchConfigurationFacade.java | 5 ++++ .../writer/WriterConfiguration.java | 1 + .../management/IndexingConfigFunctions.java | 10 +++++-- .../SimpleHBaseEnrichmentWriterTest.java | 6 ++++ 7 files changed, 64 insertions(+), 10 deletions(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java index 5f7998b70f..093aca0b14 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java @@ -29,6 +29,7 @@ public class IndexingConfigurations extends Configurations { public static final String BATCH_SIZE_CONF = "batchSize"; + public static final String BATCH_TIMEOUT_CONF = "batchTimeout"; public static final String ENABLED_CONF = "enabled"; public static final String INDEX_CONF = "index"; @@ -73,7 +74,11 @@ public boolean isDefault(String sensorName, String writerName) { } public int getBatchSize(String sensorName, String writerName ) { - return getBatchSize(getSensorIndexingConfig(sensorName, writerName)); + return getBatchSize(getSensorIndexingConfig(sensorName, writerName)); + } + + public int getBatchTimeout(String sensorName, String writerName ) { + return getBatchTimeout(getSensorIndexingConfig(sensorName, writerName)); } public String getIndex(String sensorName, String writerName) { @@ -94,10 +99,18 @@ public static boolean isEnabled(Map conf) { public static int getBatchSize(Map conf) { return getAs( BATCH_SIZE_CONF - ,conf - , 1 - , Integer.class - ); + ,conf + , 1 + , Integer.class + ); + } + + public static int getBatchTimeout(Map conf) { + return getAs( BATCH_TIMEOUT_CONF + ,conf + , 0 + , Integer.class + ); } public static String getIndex(Map conf, String sensorName) { @@ -119,6 +132,12 @@ public static Map setBatchSize(Map conf, int bat return ret; } + public static Map setBatchTimeout(Map conf, int batchTimeout) { + Map ret = conf == null?new HashMap<>():conf; + ret.put(BATCH_TIMEOUT_CONF, batchTimeout); + return ret; + } + public static Map setIndex(Map conf, String index) { Map ret = conf == null?new HashMap<>():conf; ret.put(INDEX_CONF, index); diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java index 7fca9c2b83..b713754a84 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java @@ -39,6 +39,11 @@ public int getBatchSize(String sensorName) { return config.orElse(new IndexingConfigurations()).getBatchSize(sensorName, writerName); } + @Override + public int getBatchTimeout(String sensorName) { + return config.orElse(new IndexingConfigurations()).getBatchTimeout(sensorName, writerName); + } + @Override public String getIndex(String sensorName) { return config.orElse(new IndexingConfigurations()).getIndex(sensorName, writerName); diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java index b9595dbf64..c37e939aa5 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java @@ -32,15 +32,27 @@ public ParserWriterConfiguration(ParserConfigurations config) { @Override public int getBatchSize(String sensorName) { if(config != null - && config.getSensorParserConfig(sensorName) != null - && config.getSensorParserConfig(sensorName).getParserConfig() != null - ) { + && config.getSensorParserConfig(sensorName) != null + && config.getSensorParserConfig(sensorName).getParserConfig() != null + ) { Object batchObj = config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.BATCH_SIZE_CONF); return batchObj == null ? 1 : ConversionUtils.convert(batchObj, Integer.class); } return 1; } + @Override + public int getBatchTimeout(String sensorName) { + if(config != null + && config.getSensorParserConfig(sensorName) != null + && config.getSensorParserConfig(sensorName).getParserConfig() != null + ) { + Object batchObj = config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.BATCH_TIMEOUT_CONF); + return batchObj == null ? 0 : ConversionUtils.convert(batchObj, Integer.class); + } + return 0; + } + @Override public String getIndex(String sensorName) { if(config != null && config.getSensorParserConfig(sensorName) != null diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java index 69e5541aef..4ecb9545d5 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java @@ -31,6 +31,11 @@ public int getBatchSize(String sensorName) { return 1; } + @Override + public int getBatchTimeout(String sensorName) { + return 0; + } + @Override public String getIndex(String sensorName) { return config.getIndex(sensorName); diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java index 45271e8187..3482378fa4 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java @@ -23,6 +23,7 @@ public interface WriterConfiguration extends Serializable { int getBatchSize(String sensorName); + int getBatchTimeout(String sensorName); String getIndex(String sensorName); boolean isEnabled(String sensorName); Map getSensorConfig(String sensorName); diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java index 2755dd0e7d..ef03f185bb 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java @@ -39,10 +39,11 @@ public class IndexingConfigFunctions { @Stellar( namespace = "INDEXING" ,name = "SET_BATCH" - ,description = "Set batch size" + ,description = "Set batch size and timeout" ,params = {"sensorConfig - Sensor config to add transformation to." ,"writer - The writer to update (e.g. elasticsearch, solr or hdfs)" - ,"size - batch size (integer)" + ,"size - batch size (integer), defaults to 5" + ,"timeout - (optional) batch timeout in seconds (integer), defaults to 0, meaning system default" } ,returns = "The String representation of the config in zookeeper" ) @@ -74,6 +75,11 @@ public Object apply(List args, Context context) throws ParseException { batchSize = ConversionUtils.convert(args.get(i++), Integer.class); } configObj.put(writer, IndexingConfigurations.setBatchSize((Map) configObj.get(writer), batchSize)); + int batchTimeout = 0; + if(args.size() > 3) { + batchTimeout = ConversionUtils.convert(args.get(i++), Integer.class); + } + configObj.put(writer, IndexingConfigurations.setBatchTimeout((Map) configObj.get(writer), batchTimeout)); try { return JSONUtils.INSTANCE.toJSON(configObj, true); } catch (JsonProcessingException e) { diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java index b9b3246850..44b067d6a0 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java @@ -157,6 +157,12 @@ public int getBatchSize(String sensorName) { return batchSize; } + @Override + public int getBatchTimeout(String sensorName) { + //TODO - enable unit testing + return 0; + } + @Override public String getIndex(String sensorName) { return SENSOR_TYPE; From b3eeb3dbbc0834c4c9f3d2fc24d099f2f8e5a3ea Mon Sep 17 00:00:00 2001 From: mattf-horton Date: Thu, 2 Feb 2017 00:57:41 -0800 Subject: [PATCH 2/4] mods for flush and timeout in BulkWriterComponent. Some work in BulkMessageWriterBolt, but no getComponentConfiguration() yet. --- .../writer/WriterConfiguration.java | 2 +- .../metron/writer/BulkWriterComponent.java | 102 ++++++++++++------ .../writer/bolt/BulkMessageWriterBolt.java | 20 +++- 3 files changed, 86 insertions(+), 38 deletions(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java index 3482378fa4..446fe92814 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java @@ -23,7 +23,7 @@ public interface WriterConfiguration extends Serializable { int getBatchSize(String sensorName); - int getBatchTimeout(String sensorName); + default int getBatchTimeout(String sensorName) {return 0;} String getIndex(String sensorName); boolean isEnabled(String sensorName); Map getSensorConfig(String sensorName); diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index 124ffd3b36..8ec2dca3ce 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -36,9 +36,11 @@ public class BulkWriterComponent { .getLogger(BulkWriterComponent.class); private Map> sensorTupleMap = new HashMap<>(); private Map> sensorMessageMap = new HashMap<>(); + private Map batchLastCreateTimeNs = new HashMap<>(); private OutputCollector collector; private boolean handleCommit = true; private boolean handleError = true; + public BulkWriterComponent(OutputCollector collector) { this.collector = collector; } @@ -79,7 +81,6 @@ protected Collection createTupleCollection() { return new ArrayList<>(); } - public void errorAll(Throwable e) { for(Map.Entry> kv : sensorTupleMap.entrySet()) { error(e, kv.getValue()); @@ -93,6 +94,7 @@ public void errorAll(String sensorType, Throwable e) { sensorTupleMap.remove(sensorType); sensorMessageMap.remove(sensorType); } + public void write( String sensorType , Tuple tuple , MESSAGE_T message @@ -100,54 +102,88 @@ public void write( String sensorType , WriterConfiguration configurations ) throws Exception { - if(!configurations.isEnabled(sensorType)) { + if (!configurations.isEnabled(sensorType)) { return; } int batchSize = configurations.getBatchSize(sensorType); Collection tupleList = sensorTupleMap.get(sensorType); if (tupleList == null) { tupleList = createTupleCollection(); + if (batchSize > 1) { + sensorTupleMap.put(sensorType, tupleList); + batchLastCreateTimeNs.put(sensorType, System.nanoTime()); + } } tupleList.add(tuple); List messageList = sensorMessageMap.get(sensorType); if (messageList == null) { messageList = new ArrayList<>(); + if (batchSize > 1) { + sensorMessageMap.put(sensorType, messageList); + } } messageList.add(message); - if (tupleList.size() < batchSize) { - sensorTupleMap.put(sensorType, tupleList); - sensorMessageMap.put(sensorType, messageList); - } else { - long startTime = System.nanoTime(); - try { - BulkWriterResponse response = bulkMessageWriter.write(sensorType, configurations, tupleList, messageList); - - // Commit or error piecemeal. - if(handleCommit) { - commit(response); - } - - if(handleError) { - error(response); - } else if (response.hasErrors()) { - throw new IllegalStateException("Unhandled bulk errors in response: " + response.getErrors()); - } - } catch (Throwable e) { - if(handleError) { - error(e, tupleList); - } - else { - throw e; - } + if (tupleList.size() >= batchSize) { + flush(sensorType, bulkMessageWriter, configurations, tupleList, messageList); + } + } + + private void flush( String sensorType + , BulkMessageWriter bulkMessageWriter + , WriterConfiguration configurations + , Collection tupleList + , List messageList + ) throws Exception + { + long startTime = System.nanoTime(); + try { + BulkWriterResponse response = bulkMessageWriter.write(sensorType, configurations, tupleList, messageList); + + // Commit or error piecemeal. + if(handleCommit) { + commit(response); } - finally { - sensorTupleMap.remove(sensorType); - sensorMessageMap.remove(sensorType); + + if(handleError) { + error(response); + } else if (response.hasErrors()) { + throw new IllegalStateException("Unhandled bulk errors in response: " + response.getErrors()); + } + } catch (Throwable e) { + if(handleError) { + error(e, tupleList); + } + else { + throw e; + } + } + finally { + sensorTupleMap.remove(sensorType); + sensorMessageMap.remove(sensorType); + } + long endTime = System.nanoTime(); + long elapsed = endTime - startTime; + LOG.debug("Bulk batch for sensor " + sensorType + " completed in ~" + elapsed + " ns"); + } + + public void flushTimeouts( + BulkMessageWriter bulkMessageWriter + , WriterConfiguration configurations + ) throws Exception + { + // Flushes all queues older than their batchTimeouts. + // Note queues with batchSize == 1 don't get batched, so they never persist in the sensorTupleMap. + for (String sensorType : sensorTupleMap.keySet()) { + int batchTimeout = configurations.getBatchTimeout(sensorType); + Long batchCreateTime = batchLastCreateTimeNs.get(sensorType); + if (batchCreateTime == null) batchCreateTime = 0L; //shouldn't happen + long ageNs = System.nanoTime() - batchCreateTime; + if (ageNs >= batchTimeout*(1e9)) { + LOG.debug("Flushing " + sensorType + " batch queue due to age " + ageNs + "ns > " + batchTimeout + " secs"); + flush(sensorType, bulkMessageWriter, configurations, + sensorTupleMap.get(sensorType), sensorMessageMap.get(sensorType)); } - long endTime = System.nanoTime(); - long elapsed = endTime - startTime; - LOG.debug("Bulk batch completed in ~" + elapsed + " ns"); } } } diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index 66c4c73d43..b18618c881 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -23,6 +23,7 @@ import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; +import static org.apache.storm.utils.TupleUtils.isTick; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; import org.apache.metron.common.configuration.writer.WriterConfiguration; @@ -93,11 +94,23 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { - JSONObject message = messageGetter.getMessage(tuple); - String sensorType = MessageUtils.getSensorType(message); try { - WriterConfiguration writerConfiguration = configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + if (isTick(tuple)) { + if (!(bulkMessageWriter instanceof WriterToBulkWriter)) { + //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. + LOG.debug("Flushing message queues older than their batchTimeouts"); + writerComponent.flushTimeouts(bulkMessageWriter, configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))); + } + return; + } + + JSONObject message = messageGetter.getMessage(tuple); + String sensorType = MessageUtils.getSensorType(message); + LOG.trace("Writing enrichment message: {}", message); + WriterConfiguration writerConfiguration = configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); if(writerConfiguration.isDefault(sensorType)) { //want to warn, but not fail the tuple collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); @@ -108,7 +121,6 @@ public void execute(Tuple tuple) { , bulkMessageWriter , writerConfiguration ); - LOG.trace("Writing enrichment message: {}", message); } catch(Exception e) { throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); From 666f14a0ffe0eefc7de12e226d6953fd3e32d2d5 Mon Sep 17 00:00:00 2001 From: mattf-horton Date: Sun, 5 Feb 2017 22:06:07 -0800 Subject: [PATCH 3/4] full implementation of getComponentConfiguration() method in BulkMessageWriterBolt, with implementation details in IndexingConfigurations and new BatchTimeoutHelper class. --- .../configuration/IndexingConfigurations.java | 21 ++- .../writer/IndexingWriterConfiguration.java | 7 + .../writer/ParserWriterConfiguration.java | 8 + .../SingleBatchConfigurationFacade.java | 8 + .../writer/WriterConfiguration.java | 5 +- metron-platform/metron-indexing/README.md | 8 +- .../metron/writer/BulkWriterComponent.java | 3 +- .../writer/bolt/BatchTimeoutHelper.java | 169 ++++++++++++++++++ .../writer/bolt/BulkMessageWriterBolt.java | 59 +++++- 9 files changed, 280 insertions(+), 8 deletions(-) create mode 100644 metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java index 093aca0b14..f41102bc5a 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java @@ -24,14 +24,15 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; +import java.util.*; +import java.util.function.Supplier; public class IndexingConfigurations extends Configurations { public static final String BATCH_SIZE_CONF = "batchSize"; public static final String BATCH_TIMEOUT_CONF = "batchTimeout"; public static final String ENABLED_CONF = "enabled"; public static final String INDEX_CONF = "index"; + private static final String SENSOR_MARKER_STRING = "zzzxxxFOOzzzxxx"; public Map getSensorIndexingConfig(String sensorType, String writerName) { Map ret = (Map) configurations.get(getKey(sensorType)); @@ -44,6 +45,7 @@ public Map getSensorIndexingConfig(String sensorType, String wri } } + public void updateSensorIndexingConfig(String sensorType, byte[] data) throws IOException { updateSensorIndexingConfig(sensorType, new ByteArrayInputStream(data)); } @@ -81,6 +83,21 @@ public int getBatchTimeout(String sensorName, String writerName ) { return getBatchTimeout(getSensorIndexingConfig(sensorName, writerName)); } + public List getAllConfiguredTimeouts(String writerName) { + //The configuration infrastructure was not designed to enumerate sensors, so we synthesize. + String markerKeyString = getKey(SENSOR_MARKER_STRING); + int prefixStringLength = markerKeyString.indexOf(SENSOR_MARKER_STRING); + String keyPrefixString = markerKeyString.substring(0, prefixStringLength); + List configuredBatchTimeouts = new ArrayList<>(); + for (String sensorKeyString : configurations.keySet()) { + if (sensorKeyString.startsWith(keyPrefixString)) { + String configuredSensorName = sensorKeyString.substring(prefixStringLength); + configuredBatchTimeouts.add(getBatchTimeout(configuredSensorName, writerName)); + } + } + return configuredBatchTimeouts; + } + public String getIndex(String sensorName, String writerName) { return getIndex(getSensorIndexingConfig(sensorName, writerName), sensorName); } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java index b713754a84..38a427e5d5 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java @@ -22,8 +22,10 @@ import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.utils.ConversionUtils; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; public class IndexingWriterConfiguration implements WriterConfiguration{ private Optional config; @@ -44,6 +46,11 @@ public int getBatchTimeout(String sensorName) { return config.orElse(new IndexingConfigurations()).getBatchTimeout(sensorName, writerName); } + @Override + public List getAllConfiguredTimeouts() { + return config.orElse(new IndexingConfigurations()).getAllConfiguredTimeouts(writerName); + } + @Override public String getIndex(String sensorName) { return config.orElse(new IndexingConfigurations()).getIndex(sensorName, writerName); diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java index c37e939aa5..40a2992bfc 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java @@ -22,6 +22,8 @@ import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.utils.ConversionUtils; +import java.util.ArrayList; +import java.util.List; import java.util.Map; public class ParserWriterConfiguration implements WriterConfiguration { @@ -53,6 +55,12 @@ public int getBatchTimeout(String sensorName) { return 0; } + @Override + public List getAllConfiguredTimeouts() { + // TODO - stub implementation + return new ArrayList(); + } + @Override public String getIndex(String sensorName) { if(config != null && config.getSensorParserConfig(sensorName) != null diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java index 4ecb9545d5..e50bd2bb33 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java @@ -18,6 +18,8 @@ package org.apache.metron.common.configuration.writer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; public class SingleBatchConfigurationFacade implements WriterConfiguration { @@ -36,6 +38,12 @@ public int getBatchTimeout(String sensorName) { return 0; } + @Override + public List getAllConfiguredTimeouts() { + // null implementation since batching is disabled + return new ArrayList(); + } + @Override public String getIndex(String sensorName) { return config.getIndex(sensorName); diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java index 446fe92814..7f44bc9715 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java @@ -19,11 +19,14 @@ package org.apache.metron.common.configuration.writer; import java.io.Serializable; +import java.util.List; import java.util.Map; +import java.util.function.Supplier; public interface WriterConfiguration extends Serializable { int getBatchSize(String sensorName); - default int getBatchTimeout(String sensorName) {return 0;} + int getBatchTimeout(String sensorName); + List getAllConfiguredTimeouts(); String getIndex(String sensorName); boolean isEnabled(String sensorName); Map getSensorConfig(String sensorName); diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md index cbe7a7692e..b76ff67beb 100644 --- a/metron-platform/metron-indexing/README.md +++ b/metron-platform/metron-indexing/README.md @@ -40,7 +40,10 @@ elasticsearch or solr and hdfs writers running. The configuration for an individual writer-specific configuration is a JSON map with the following fields: * `index` : The name of the index to write to (defaulted to the name of the sensor). -* `batchSize` : The size of the batch that is written to the indices at once (defaulted to `1`). +* `batchSize` : The size of the batch that is written to the indices at once. Defaults to `1` (no batching). +* `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met. Optional. +If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm +parameter `topology.message.timeout.secs`. Ignored if batchSize is `1`, since this disables batching. * `enabled` : Whether the writer is enabled (default `true`). ### Indexing Configuration Examples @@ -72,11 +75,13 @@ Storm console. e.g.: "elasticsearch": { "index": "foo", "batchSize" : 100, + "batchTimeout" : 0, "enabled" : true }, "hdfs": { "index": "foo", "batchSize": 1, + "batchTimeout" : 0, "enabled" : true } } @@ -100,6 +105,7 @@ Storm console. e.g.: "hdfs": { "index": "foo", "batchSize": 100, + "batchTimeout" : 0, "enabled" : false } } diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index 8ec2dca3ce..2adb0f9a93 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.TimeUnit; public class BulkWriterComponent { public static final Logger LOG = LoggerFactory @@ -179,7 +180,7 @@ public void flushTimeouts( Long batchCreateTime = batchLastCreateTimeNs.get(sensorType); if (batchCreateTime == null) batchCreateTime = 0L; //shouldn't happen long ageNs = System.nanoTime() - batchCreateTime; - if (ageNs >= batchTimeout*(1e9)) { + if (ageNs >= TimeUnit.SECONDS.toNanos(batchTimeout)) { LOG.debug("Flushing " + sensorType + " batch queue due to age " + ageNs + "ns > " + batchTimeout + " secs"); flush(sensorType, bulkMessageWriter, configurations, sensorTupleMap.get(sensorType), sensorMessageMap.get(sensorType)); diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java new file mode 100644 index 0000000000..b44dc2b6ce --- /dev/null +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.writer.bolt; + +import org.apache.metron.common.configuration.Configurations; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +/** + * Routines to help figure out the effective batchTimeout(s), using information from + * multiple configuration sources, topology.message.timeout.secs, and batchTimeoutDivisor, + * and use it to calculate defaultBatchTimeout and appropriate topology.tick.tuple.freq.secs. + * + * These methods cause no side effects outside of setting the internal member variables. + * "base" config are from defaults and storm.yaml (subordinate to Component config settings) + * "cli" config are from CLI arguments (superior to Component config settings) + * 0 or Integer.MAX_VALUE means disabled, except in batchTimeout values, where + * 0 means use the default. + * + * These lookups are fairly expensive, and changing the result values currently require + * a restart of the Storm topology anyway, so we just calculate at first use, and cache + * the results in the BatchTimeoutHelper instance. If you want different results, + * you'll need a new instance or a restart. + */ +public class BatchTimeoutHelper { + + private static final Logger LOG = LoggerFactory + .getLogger(BulkMessageWriterBolt.class); + private boolean initialized = false; + private Supplier> listAllConfiguredTimeouts; + protected int batchTimeoutDivisor; + protected int baseMessageTimeoutSecs; + protected int cliMessageTimeoutSecs; + protected int baseTickTupleFreqSecs; + protected int cliTickTupleFreqSecs; + protected int effectiveMessageTimeoutSecs; + protected int maxBatchTimeoutAllowedSecs; //derived from MessageTimeoutSecs value + protected int minBatchTimeoutRequestedSecs; //min of all sensorType configured batchTimeout requests + protected int recommendedTickIntervalSecs; //the answer + + BatchTimeoutHelper( Supplier> listAllConfiguredTimeouts + , int batchTimeoutDivisor + ) + { + // The two arguments to the constructor are information only available at the Bolt level (batchTimeoutDivisor) + // and WriterConfiguration level (listAllConfiguredTimeouts). + this.batchTimeoutDivisor = batchTimeoutDivisor; + this.listAllConfiguredTimeouts = listAllConfiguredTimeouts; + // Reads and calculations are deferred until first call, then frozen for the duration of this BatchTimeoutHelper instance. + } + + private synchronized void init() { + if (initialized) return; + readGlobalTimeoutConfigs(); + calcMaxBatchTimeoutAllowed(); + readMinBatchTimeoutRequested(); + calcRecommendedTickInterval(); + initialized = true; + } + + //modified from Utils.readStormConfig() + private Map readStormConfigWithoutCLI() { + Map ret = Utils.readDefaultConfig(); + String confFile = System.getProperty("storm.conf.file"); + Map storm; + if (confFile == null || confFile.equals("")) { + storm = Utils.findAndReadConfigFile("storm.yaml", false); + } else { + storm = Utils.findAndReadConfigFile(confFile, true); + } + ret.putAll(storm); + return ret; + } + + private void readGlobalTimeoutConfigs() { + Map stormConf = readStormConfigWithoutCLI(); + Map cliConf = Utils.readCommandLineOpts(); + baseMessageTimeoutSecs = Integer.parseInt( + (String)stormConf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "0")); + cliMessageTimeoutSecs = Integer.parseInt( + (String) cliConf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "0")); + baseTickTupleFreqSecs = Integer.parseInt( + (String)stormConf.getOrDefault(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, "0")); + cliTickTupleFreqSecs = Integer.parseInt( + (String) cliConf.getOrDefault(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, "0")); + } + + private void calcMaxBatchTimeoutAllowed() { + // The max batchTimeout allowed also becomes the default batchTimeout. + effectiveMessageTimeoutSecs = (cliMessageTimeoutSecs == 0 ? baseMessageTimeoutSecs : cliMessageTimeoutSecs); + if (effectiveMessageTimeoutSecs == 0) { + LOG.info("topology.message.timeout.secs is disabled in both Storm config and CLI. Allowing unlimited batchTimeouts."); + maxBatchTimeoutAllowedSecs = Integer.MAX_VALUE; + } + else { + //Recommended value for safe batchTimeout is 1/2 * TOPOLOGY_MESSAGE_TIMEOUT_SECS. + //We further divide this by batchTimeoutDivisor for the particular Writer Bolt we are in, + //and subtract a delta of 1 second for surety (as well as rounding down) + maxBatchTimeoutAllowedSecs = effectiveMessageTimeoutSecs / (2 * batchTimeoutDivisor) - 1; + } + } + + /** + * @return the max batchTimeout allowed, in seconds + */ + protected int getDefaultBatchTimeout() { + if (!initialized) {this.init();} + return maxBatchTimeoutAllowedSecs; + } + + private void readMinBatchTimeoutRequested() { + // The knowledge of how to list the currently configured batchTimeouts + // is delegated to the WriterConfiguration for the bolt that called us. + List configuredTimeouts = listAllConfiguredTimeouts.get(); + + // Discard zero values, which mean "use default" + int minval = Integer.MAX_VALUE; + for (int k : configuredTimeouts) { + if (k < minval && k > 0) minval = k; + } + minBatchTimeoutRequestedSecs = minval; + } + + private void calcRecommendedTickInterval() { + recommendedTickIntervalSecs = Integer.min(minBatchTimeoutRequestedSecs, maxBatchTimeoutAllowedSecs); + //If needed, we can +=1 to assure triggering on each cycle, but this shouldn't be necessary. + //Note that this strategy means that sensors with batchTimeout requested less frequently + //may now have latency of "their requested batchTimeout" + "this recommended tick interval", + //in the worst case. + } + + protected int getRecommendedTickInterval() { + if (!initialized) {this.init();} + // Remember that parameter settings in the CLI override parameter settings set by the Storm component. + // We shouldn't have to deal with this in the Metron environment, but just in case, + // warn if our recommended value will be overridden by cliTickTupleFreqSecs. + if (cliTickTupleFreqSecs > 0 && cliTickTupleFreqSecs < recommendedTickIntervalSecs) { + LOG.warn("Parameter '" + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + "' has been forced to value '" + + Integer.toString(cliTickTupleFreqSecs) + "' via CLI configuration. This will override the desired " + + "setting of '" + Integer.toString(recommendedTickIntervalSecs) + + "' and may lead to delayed batch flushing."); + } + return recommendedTickIntervalSecs; + } +} diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index b18618c881..b6068c0b21 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -18,6 +18,7 @@ package org.apache.metron.writer.bolt; import org.apache.metron.common.bolt.ConfiguredIndexingBolt; +import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -50,7 +51,10 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { private String messageGetterStr = MessageGetters.NAMED.name(); private transient MessageGetter messageGetter = null; private transient OutputCollector collector; - private transient Function configurationTransformation; + private transient Function configurationTransformation = null; + private BatchTimeoutHelper timeoutHelper = null; + private int batchTimeoutDivisor = 1; + public BulkMessageWriterBolt(String zookeeperUrl) { super(zookeeperUrl); } @@ -70,6 +74,55 @@ public BulkMessageWriterBolt withMessageGetter(String messageGetter) { return this; } + /** + * If this BulkMessageWriterBolt is in a topology where it is daisy-chained with + * other queuing Writers, then the max amount of time it takes for a tuple + * to clear the whole topology is the sum of all the batchTimeouts for all the + * daisy-chained Writers. In the common case where each Writer is using the default + * batchTimeout, it is then necessary to divide that batchTimeout by the number of + * daisy-chained Writers. For example, the Enrichment and Threat Intel features + * both use a BulkMessageWriterBolt, but are in a single topology, so one would + * initialize those Bolts withBatchTimeoutDivisor(2). Default value, if not set, is 1. + * + * If non-default batchTimeouts are configured for some components, the administrator + * will want to take this behavior into account. + * + * @param batchTimeoutDivisor + * @return + */ + public BulkMessageWriterBolt withBatchTimeoutDivisor(int batchTimeoutDivisor) { + this.batchTimeoutDivisor = batchTimeoutDivisor; + return this; + } + + @Override + public Map getComponentConfiguration() { + // configure how often a tick tuple will be sent to our bolt + if (timeoutHelper == null) { + // Not sure if called before or after prepare(), so do some of the same stuff as prepare() does, + // to get the valid WriterConfiguration: + if(bulkMessageWriter instanceof WriterToBulkWriter) { + configurationTransformation = WriterToBulkWriter.TRANSFORMATION; + } + else { + configurationTransformation = x -> x; + } + WriterConfiguration wrconf = configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + + timeoutHelper = new BatchTimeoutHelper(wrconf::getAllConfiguredTimeouts, batchTimeoutDivisor); + } + int requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval(); + + Map conf = super.getComponentConfiguration(); + if (conf == null) { + conf = new HashMap(); + } + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, requestedTickFreqSecs); + LOG.info("Requesting " + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + " set to " + Integer.toString(requestedTickFreqSecs)); + return conf; + } + @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.writerComponent = new BulkWriterComponent<>(collector); @@ -84,8 +137,8 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll } try { bulkMessageWriter.init(stormConf - , configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) - ); + , configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))); } catch (Exception e) { throw new RuntimeException(e); } From 03565c695eef1ceef7e4226e0f9d443f20363efd Mon Sep 17 00:00:00 2001 From: mattf-horton Date: Sun, 5 Feb 2017 23:17:10 -0800 Subject: [PATCH 4/4] minor cleanups --- .../configuration/IndexingConfigurations.java | 18 ++++++++---------- .../writer/IndexingWriterConfiguration.java | 1 - .../writer/WriterConfiguration.java | 1 - .../metron/writer/BulkWriterComponent.java | 2 ++ .../metron/writer/bolt/BatchTimeoutHelper.java | 4 ---- 5 files changed, 10 insertions(+), 16 deletions(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java index f41102bc5a..84d68e9d7b 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.*; -import java.util.function.Supplier; public class IndexingConfigurations extends Configurations { public static final String BATCH_SIZE_CONF = "batchSize"; @@ -45,7 +44,6 @@ public Map getSensorIndexingConfig(String sensorType, String wri } } - public void updateSensorIndexingConfig(String sensorType, byte[] data) throws IOException { updateSensorIndexingConfig(sensorType, new ByteArrayInputStream(data)); } @@ -116,18 +114,18 @@ public static boolean isEnabled(Map conf) { public static int getBatchSize(Map conf) { return getAs( BATCH_SIZE_CONF - ,conf - , 1 - , Integer.class - ); + ,conf + , 1 + , Integer.class + ); } public static int getBatchTimeout(Map conf) { return getAs( BATCH_TIMEOUT_CONF - ,conf - , 0 - , Integer.class - ); + ,conf + , 0 + , Integer.class + ); } public static String getIndex(Map conf, String sensorName) { diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java index 38a427e5d5..ffeaf00a39 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Supplier; public class IndexingWriterConfiguration implements WriterConfiguration{ private Optional config; diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java index 7f44bc9715..2354f95602 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java @@ -21,7 +21,6 @@ import java.io.Serializable; import java.util.List; import java.util.Map; -import java.util.function.Supplier; public interface WriterConfiguration extends Serializable { int getBatchSize(String sensorName); diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index 2adb0f9a93..85d7fb81c9 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -174,6 +174,8 @@ public void flushTimeouts( ) throws Exception { // Flushes all queues older than their batchTimeouts. + // This strategy does not guarantee each queue to be flushed at the exact batchTimeout. Rather, + // the latency will be between the sensorType's batchTimeout and batchTimeout + tick.tuple.freq.secs. // Note queues with batchSize == 1 don't get batched, so they never persist in the sensorTupleMap. for (String sensorType : sensorTupleMap.keySet()) { int batchTimeout = configurations.getBatchTimeout(sensorType); diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java index b44dc2b6ce..164df5f54a 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java @@ -17,17 +17,13 @@ */ package org.apache.metron.writer.bolt; -import org.apache.metron.common.configuration.Configurations; -import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.storm.Config; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Supplier; /**