From 765e05264b378a0a7608092e5842a31239418ae5 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 29 May 2018 13:49:31 -0400 Subject: [PATCH 1/5] METRON-1584 Indexing Topology Crashes with Invalid Message --- .../bolt/BulkMessageWriterBoltTest.java | 52 +++++- .../metron/writer/BulkWriterComponent.java | 27 ++- .../writer/bolt/BulkMessageWriterBolt.java | 175 +++++++++++++----- 3 files changed, 199 insertions(+), 55 deletions(-) diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index dedf5e6525..fa83f60a12 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -118,28 +118,32 @@ public void parseMessages() throws ParseException { private MessageGetStrategy messageGetStrategy; @Test - public void testSensorTypeMissing() throws Exception { + public void testSourceTypeMissing() throws Exception { + + // setup the bold BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") - .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) + .withBulkMessageWriter(bulkMessageWriter) + .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) .withMessageGetterField("message"); bulkMessageWriterBolt.setCuratorFramework(client); bulkMessageWriterBolt.setZKCache(cache); bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); + // initialize the bolt bulkMessageWriterBolt.declareOutputFields(declarer); - verify(declarer, times(1)).declareStream(eq("error"), argThat( - new FieldsMatcher("message"))); Map stormConf = new HashMap(); bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector); - BulkWriterComponent component = mock(BulkWriterComponent.class); - bulkMessageWriterBolt.setWriterComponent(component); - verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class)); + + // create a message with no source type JSONObject message = (JSONObject) new JSONParser().parse(sampleMessageString); message.remove("source.type"); when(tuple.getValueByField("message")).thenReturn(message); + + // the tuple should be handled as an error and ack'd bulkMessageWriterBolt.execute(tuple); - verify(component, times(1)).error(eq("null"), any(), any(), any()); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any()); + verify(outputCollector, times(1)).ack(tuple); } @Test @@ -295,4 +299,36 @@ public void testFlushOnTickTuple() throws Exception { assertEquals(3, tupleList.size()); verify(outputCollector, times(5)).ack(tuple); // 3 messages + 2nd tick } + + /** + * If an invalid message is sent to indexing, the message should be handled as an error + * and the topology should continue processing. + */ + @Test + public void testMessageInvalid() throws Exception { + FakeClock clock = new FakeClock(); + + // setup the bolt + BulkMessageWriterBolt bolt = new BulkMessageWriterBolt("zookeeperUrl") + .withBulkMessageWriter(bulkMessageWriter) + .withMessageGetter(MessageGetters.JSON_FROM_POSITION.name()) + .withMessageGetterField("message"); + bolt.setCuratorFramework(client); + bolt.setZKCache(cache); + bolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); + + // initialize the bolt + bolt.declareOutputFields(declarer); + Map stormConf = new HashMap(); + bolt.prepare(stormConf, topologyContext, outputCollector, clock); + + // execute a tuple that contains an invalid message + byte[] invalidJSON = "this is not valid JSON".getBytes(); + when(tuple.getBinary(0)).thenReturn(invalidJSON); + bolt.execute(tuple); + + // the tuple should be handled as an error and ack'd + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any()); + verify(outputCollector, times(1)).ack(tuple); + } } diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index 37c624fca8..f71f47e6b6 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -19,6 +19,7 @@ package org.apache.metron.writer; import com.google.common.collect.Iterables; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.error.MetronError; @@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) { } public void error(String sensorType, Throwable e, Iterable tuples, MessageGetStrategy messageGetStrategy) { + + if(!Iterables.isEmpty(tuples)) { + LOG.error("Failing tuples; count={}, error={}", Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)); + } tuples.forEach(t -> collector.ack(t)); MetronError error = new MetronError() .withSensorType(sensorType) .withErrorType(Constants.ErrorType.INDEXING_ERROR) .withThrowable(e); + tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); + ErrorUtils.handleError(collector, error); + } + + /** + * Error a set of tuples that may not contain a valid message. + * + *

Without a valid message, the source type is unknown. + *

Without a valid message, the JSON message cannot be added to the error. + * + * @param e The exception that occurred. + * @param tuples The tuples to error that may not contain valid messages. + */ + public void error(Throwable e, Iterable tuples) { + if(!Iterables.isEmpty(tuples)) { - LOG.error("Failing {} tuples", Iterables.size(tuples), e); + LOG.error("Failing tuples; count={}, error={}", Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)); } - tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); + tuples.forEach(t -> collector.ack(t)); + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.INDEXING_ERROR) + .withThrowable(e); ErrorUtils.handleError(collector, error); } diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index b5b97d8a6a..3d3e8fa316 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -18,6 +18,7 @@ package org.apache.metron.writer.bolt; import com.google.common.collect.ImmutableList; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredIndexingBolt; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; @@ -213,60 +214,144 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { + if (isTick(tuple)) { - try { - if (!(bulkMessageWriter instanceof WriterToBulkWriter)) { - //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. - LOG.debug("Flushing message queues older than their batchTimeouts"); - getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) - , messageGetStrategy); - } - } - catch(Exception e) { - throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); - } - finally { - collector.ack(tuple); - } - return; + handleTick(tuple); + + } else { + handleMessage(tuple); } + } + + /** + * Handle a tuple containing a message; anything other than a tick tuple. + * + * @param tuple The tuple containing a message. + */ + private void handleMessage(Tuple tuple) { + try { + + JSONObject message = getMessage(tuple); + if(message == null) { + handleMissingMessage(tuple); + return; + } - try - { - JSONObject message = (JSONObject) messageGetStrategy.get(tuple); String sensorType = MessageUtils.getSensorType(message); - LOG.trace("Writing enrichment message: {}", message); - WriterConfiguration writerConfiguration = configurationTransformation.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); if(sensorType == null) { - //sensor type somehow ended up being null. We want to error this message directly. - getWriterComponent().error("null" - , new Exception("Sensor type is not specified for message " - + message.toJSONString() - ) - , ImmutableList.of(tuple) - , messageGetStrategy - ); - } - else { - if (writerConfiguration.isDefault(sensorType)) { - //want to warn, but not fail the tuple - collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); - } - - getWriterComponent().write(sensorType - , tuple - , message - , bulkMessageWriter - , writerConfiguration - , messageGetStrategy - ); + handleMissingSensorType(tuple, message); + return; } + + writeMessage(tuple, message, sensorType); + + } catch (Exception e) { + throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); } - catch(Exception e) { + } + + /** + * Handles a tick tuple. + * + * @param tickTuple The tick tuple. + */ + private void handleTick(Tuple tickTuple) { + + try { + if (!(bulkMessageWriter instanceof WriterToBulkWriter)) { + //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. + LOG.debug("Flushing message queues older than their batchTimeouts"); + getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) + , messageGetStrategy); + } + + } catch(Exception e) { throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); + + } finally { + collector.ack(tickTuple); + } + } + + /** + * Retrieves the JSON message contained in a tuple. + * + * @param tuple The tuple containing a JSON message. + * @return The JSON message contained in the tuple. If none, returns null. + */ + private JSONObject getMessage(Tuple tuple) { + + JSONObject message = null; + try { + message = (JSONObject) messageGetStrategy.get(tuple); + + } catch(Throwable e) { + LOG.error("Unable to retrieve message from tuple", e); + } + + return message; + } + + /** + * Write a message. + * + * @param tuple The tuple containing a message. + * @param message The message to write. + * @param sensorType The sensor type of the message. + * @throws Exception + */ + private void writeMessage(Tuple tuple, JSONObject message, String sensorType) throws Exception { + + LOG.trace("Writing message: sourceType={}, message={}", sensorType, message); + WriterConfiguration writerConfiguration = configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + + if (writerConfiguration.isDefault(sensorType)) { + //want to warn, but not fail the tuple + collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); } + + getWriterComponent().write(sensorType + , tuple + , message + , bulkMessageWriter + , writerConfiguration + , messageGetStrategy + ); + } + + /** + * Handles error processing when a message is missing a sensor type. + * + * @param tuple The tuple. + * @param message The message with no sensor type. + */ + private void handleMissingSensorType(Tuple tuple, JSONObject message) { + + LOG.debug("Message is missing sensor type"); + + // sensor type somehow ended up being null. We want to error this message directly. + getWriterComponent().error("null", + new Exception("Sensor type is not specified for message " + message.toJSONString()), + ImmutableList.of(tuple), + messageGetStrategy + ); + } + + /** + * Handles error processing when a tuple does not contain a valid message. + * + * @param tuple The tuple. + */ + private void handleMissingMessage(Tuple tuple) { + + LOG.debug("Unable to extract message from tuple; expected valid JSON"); + + getWriterComponent().error( + new Exception("Unable to extract message from tuple; expected valid JSON"), + ImmutableList.of(tuple) + ); } @Override From 357ff79cf84b86a729544c19da1570e3caec6085 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Thu, 31 May 2018 17:32:38 -0400 Subject: [PATCH 2/5] Undo refactor --- .../writer/bolt/BulkMessageWriterBolt.java | 107 ++++++------------ 1 file changed, 34 insertions(+), 73 deletions(-) diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index 3d3e8fa316..1d8f0c68c5 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -18,7 +18,6 @@ package org.apache.metron.writer.bolt; import com.google.common.collect.ImmutableList; -import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredIndexingBolt; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; @@ -214,23 +213,27 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { - if (isTick(tuple)) { - handleTick(tuple); - - } else { - handleMessage(tuple); + try { + if (!(bulkMessageWriter instanceof WriterToBulkWriter)) { + //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. + LOG.debug("Flushing message queues older than their batchTimeouts"); + getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) + , messageGetStrategy); + } + } + catch(Exception e) { + throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); + } + finally { + collector.ack(tuple); + } + return; } - } - - /** - * Handle a tuple containing a message; anything other than a tick tuple. - * - * @param tuple The tuple containing a message. - */ - private void handleMessage(Tuple tuple) { - try { + try + { JSONObject message = getMessage(tuple); if(message == null) { handleMissingMessage(tuple); @@ -243,34 +246,25 @@ private void handleMessage(Tuple tuple) { return; } - writeMessage(tuple, message, sensorType); - - } catch (Exception e) { - throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); - } - } - - /** - * Handles a tick tuple. - * - * @param tickTuple The tick tuple. - */ - private void handleTick(Tuple tickTuple) { + LOG.trace("Writing enrichment message: {}", message); + WriterConfiguration writerConfiguration = configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); - try { - if (!(bulkMessageWriter instanceof WriterToBulkWriter)) { - //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. - LOG.debug("Flushing message queues older than their batchTimeouts"); - getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) - , messageGetStrategy); + if (writerConfiguration.isDefault(sensorType)) { + //want to warn, but not fail the tuple + collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); } - } catch(Exception e) { + getWriterComponent().write(sensorType + , tuple + , message + , bulkMessageWriter + , writerConfiguration + , messageGetStrategy + ); + } + catch(Exception e) { throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); - - } finally { - collector.ack(tickTuple); } } @@ -281,7 +275,6 @@ private void handleTick(Tuple tickTuple) { * @return The JSON message contained in the tuple. If none, returns null. */ private JSONObject getMessage(Tuple tuple) { - JSONObject message = null; try { message = (JSONObject) messageGetStrategy.get(tuple); @@ -293,34 +286,6 @@ private JSONObject getMessage(Tuple tuple) { return message; } - /** - * Write a message. - * - * @param tuple The tuple containing a message. - * @param message The message to write. - * @param sensorType The sensor type of the message. - * @throws Exception - */ - private void writeMessage(Tuple tuple, JSONObject message, String sensorType) throws Exception { - - LOG.trace("Writing message: sourceType={}, message={}", sensorType, message); - WriterConfiguration writerConfiguration = configurationTransformation.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); - - if (writerConfiguration.isDefault(sensorType)) { - //want to warn, but not fail the tuple - collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); - } - - getWriterComponent().write(sensorType - , tuple - , message - , bulkMessageWriter - , writerConfiguration - , messageGetStrategy - ); - } - /** * Handles error processing when a message is missing a sensor type. * @@ -328,10 +293,8 @@ private void writeMessage(Tuple tuple, JSONObject message, String sensorType) th * @param message The message with no sensor type. */ private void handleMissingSensorType(Tuple tuple, JSONObject message) { - - LOG.debug("Message is missing sensor type"); - // sensor type somehow ended up being null. We want to error this message directly. + LOG.debug("Message is missing sensor type"); getWriterComponent().error("null", new Exception("Sensor type is not specified for message " + message.toJSONString()), ImmutableList.of(tuple), @@ -345,9 +308,7 @@ private void handleMissingSensorType(Tuple tuple, JSONObject message) { * @param tuple The tuple. */ private void handleMissingMessage(Tuple tuple) { - LOG.debug("Unable to extract message from tuple; expected valid JSON"); - getWriterComponent().error( new Exception("Unable to extract message from tuple; expected valid JSON"), ImmutableList.of(tuple) From f6eae3ceec34c12981716eef57b94291fc9bd7f4 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Thu, 31 May 2018 17:42:03 -0400 Subject: [PATCH 3/5] Altered log to make sure stack trace hits the logs --- .../org/apache/metron/writer/BulkWriterComponent.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index f71f47e6b6..e67179cb01 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -116,9 +116,9 @@ public void commit(BulkWriterResponse response) { } public void error(String sensorType, Throwable e, Iterable tuples, MessageGetStrategy messageGetStrategy) { - if(!Iterables.isEmpty(tuples)) { - LOG.error("Failing tuples; count={}, error={}", Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)); + LOG.error(String.format("Failing tuples; count=%d, error=%s", + Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)), e); } tuples.forEach(t -> collector.ack(t)); MetronError error = new MetronError() @@ -139,9 +139,9 @@ public void error(String sensorType, Throwable e, Iterable tuples, Messag * @param tuples The tuples to error that may not contain valid messages. */ public void error(Throwable e, Iterable tuples) { - if(!Iterables.isEmpty(tuples)) { - LOG.error("Failing tuples; count={}, error={}", Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)); + LOG.error(String.format("Failing tuples; count=%d, error=%s", + Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)), e); } tuples.forEach(t -> collector.ack(t)); MetronError error = new MetronError() From f4035fa2b56fe731831f45a3fd766cc76e861c51 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Thu, 31 May 2018 18:38:48 -0400 Subject: [PATCH 4/5] Factored out ack of error'd tuples --- .../metron/writer/BulkWriterComponent.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index e67179cb01..dfa265d236 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -42,6 +42,8 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; +import static java.lang.String.format; + /** * This component implements message batching, with both flush on queue size, and flush on queue timeout. * There is a queue for each sensorType. @@ -116,17 +118,13 @@ public void commit(BulkWriterResponse response) { } public void error(String sensorType, Throwable e, Iterable tuples, MessageGetStrategy messageGetStrategy) { - if(!Iterables.isEmpty(tuples)) { - LOG.error(String.format("Failing tuples; count=%d, error=%s", - Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)), e); - } - tuples.forEach(t -> collector.ack(t)); + LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e); MetronError error = new MetronError() .withSensorType(sensorType) .withErrorType(Constants.ErrorType.INDEXING_ERROR) .withThrowable(e); tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); - ErrorUtils.handleError(collector, error); + handleError(tuples, error); } /** @@ -139,14 +137,21 @@ public void error(String sensorType, Throwable e, Iterable tuples, Messag * @param tuples The tuples to error that may not contain valid messages. */ public void error(Throwable e, Iterable tuples) { - if(!Iterables.isEmpty(tuples)) { - LOG.error(String.format("Failing tuples; count=%d, error=%s", - Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)), e); - } - tuples.forEach(t -> collector.ack(t)); + LOG.error(format("Failing %d tuple(s)", Iterables.size(tuples)), e); MetronError error = new MetronError() .withErrorType(Constants.ErrorType.INDEXING_ERROR) .withThrowable(e); + handleError(tuples, error); + } + + /** + * Errors a set of tuples. + * + * @param tuples The tuples to error. + * @param error + */ + private void handleError(Iterable tuples, MetronError error) { + tuples.forEach(t -> collector.ack(t)); ErrorUtils.handleError(collector, error); } From 9b92f54633bccce2f26d4ac128784a6853850cb8 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Thu, 31 May 2018 18:41:02 -0400 Subject: [PATCH 5/5] Typo --- .../metron/enrichment/bolt/BulkMessageWriterBoltTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index fa83f60a12..52516ac1d6 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -120,7 +120,7 @@ public void parseMessages() throws ParseException { @Test public void testSourceTypeMissing() throws Exception { - // setup the bold + // setup the bolt BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") .withBulkMessageWriter(bulkMessageWriter) .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())