From cb492222dbd12686646ab052a1f6dfdf5c2d7744 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 24 Sep 2018 18:51:36 -0400 Subject: [PATCH 1/4] METRON-1791 Add GUID to Messages Produced by Profiler --- .../java/org/apache/metron/profiler/bolt/KafkaEmitter.java | 3 +++ .../org/apache/metron/profiler/bolt/KafkaEmitterTest.java | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java index 87920da400..9b26310165 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java @@ -21,6 +21,7 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.ClassUtils; +import org.apache.metron.common.Constants; import org.apache.metron.profiler.ProfileMeasurement; import org.apache.storm.task.OutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -33,6 +34,7 @@ import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.util.Map; +import java.util.UUID; /** * Responsible for emitting a {@link ProfileMeasurement} to an output stream that will @@ -134,6 +136,7 @@ private JSONObject createMessage(ProfileMeasurement measurement) { message.put("timestamp", System.currentTimeMillis()); message.put("source.type", sourceType); message.put("is_alert", "true"); + message.put(Constants.GUID, UUID.randomUUID().toString()); return message; } diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java index 95a2d29475..62593016b2 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.profiler.ProfileMeasurement; @@ -148,6 +149,7 @@ public void testTriageValueInMessage() throws Exception { assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); assertEquals("profiler", actual.get("source.type")); assertNotNull(actual.get("timestamp")); + assertNotNull(actual.get(Constants.GUID)); // validate that the triage value has been added assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); @@ -214,6 +216,8 @@ public void testInvalidType() throws Exception { assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); assertEquals("profiler", actual.get("source.type")); + assertNotNull(actual.get("timestamp")); + assertNotNull(actual.get(Constants.GUID)); // the invalid expression should be skipped and not included in the message assertFalse(actual.containsKey("invalid")); From 2cc5ab577633d6f55a90a3afa9e92a3a4c4028d9 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 25 Sep 2018 07:43:59 -0400 Subject: [PATCH 2/4] Added integration test to validate triage messages --- .../metron/profiler/bolt/KafkaEmitter.java | 28 +++++++--- .../zookeeper/triage-result/profiler.json | 20 ++++++++ .../integration/ProfilerIntegrationTest.java | 51 ++++++++++++++----- 3 files changed, 78 insertions(+), 21 deletions(-) create mode 100644 metron-analytics/metron-profiler/src/test/config/zookeeper/triage-result/profiler.json diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java index 9b26310165..d139e448cb 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java @@ -44,6 +44,14 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String PROFILE_FIELD = "profile"; + public static final String ENTITY_FIELD = "entity"; + public static final String PERIOD_ID_FIELD = "period"; + public static final String PERIOD_START_FIELD = "period.start"; + public static final String PERIOD_END_FIELD = "period.end"; + public static final String TIMESTAMP_FIELD = "timestamp"; + public static final String ALERT_FIELD = "is_alert"; + /** * The stream identifier used for this destination; */ @@ -128,14 +136,14 @@ private void appendTriageValues(ProfileMeasurement measurement, JSONObject messa private JSONObject createMessage(ProfileMeasurement measurement) { JSONObject message = new JSONObject(); - message.put("profile", measurement.getDefinition().getProfile()); - message.put("entity", measurement.getEntity()); - message.put("period", measurement.getPeriod().getPeriod()); - message.put("period.start", measurement.getPeriod().getStartTimeMillis()); - message.put("period.end", measurement.getPeriod().getEndTimeMillis()); - message.put("timestamp", System.currentTimeMillis()); - message.put("source.type", sourceType); - message.put("is_alert", "true"); + message.put(PROFILE_FIELD, measurement.getDefinition().getProfile()); + message.put(ENTITY_FIELD, measurement.getEntity()); + message.put(PERIOD_ID_FIELD, measurement.getPeriod().getPeriod()); + message.put(PERIOD_START_FIELD, measurement.getPeriod().getStartTimeMillis()); + message.put(PERIOD_END_FIELD, measurement.getPeriod().getEndTimeMillis()); + message.put(TIMESTAMP_FIELD, System.currentTimeMillis()); + message.put(Constants.SENSOR_TYPE, sourceType); + message.put(ALERT_FIELD, "true"); message.put(Constants.GUID, UUID.randomUUID().toString()); return message; } @@ -161,6 +169,10 @@ public void setStreamId(String streamId) { this.streamId = streamId; } + public String getSourceType() { + return sourceType; + } + public void setSourceType(String sourceType) { this.sourceType = sourceType; } diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/triage-result/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/triage-result/profiler.json new file mode 100644 index 0000000000..7d63da7073 --- /dev/null +++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/triage-result/profiler.json @@ -0,0 +1,20 @@ +{ + "profiles": [ + { + "profile": "profile-with-triage", + "foreach": "'global'", + "update": { + "stats": "STATS_ADD(stats, 1)" + }, + "result": { + "profile": "stats", + "triage": { + "min": "STATS_MIN(stats)", + "max": "STATS_MAX(stats)", + "mean": "STATS_MEAN(stats)" + } + } + } + ], + "timestampField": "timestamp" +} diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java index 322ba130e1..e1bc1365b4 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java @@ -40,6 +40,8 @@ import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver; import org.apache.storm.Config; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -62,6 +64,13 @@ import static com.google.code.tempusfugit.temporal.Duration.seconds; import static com.google.code.tempusfugit.temporal.Timeout.timeout; import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout; +import static org.apache.metron.profiler.bolt.KafkaEmitter.ALERT_FIELD; +import static org.apache.metron.profiler.bolt.KafkaEmitter.ENTITY_FIELD; +import static org.apache.metron.profiler.bolt.KafkaEmitter.PERIOD_END_FIELD; +import static org.apache.metron.profiler.bolt.KafkaEmitter.PERIOD_ID_FIELD; +import static org.apache.metron.profiler.bolt.KafkaEmitter.PERIOD_START_FIELD; +import static org.apache.metron.profiler.bolt.KafkaEmitter.PROFILE_FIELD; +import static org.apache.metron.profiler.bolt.KafkaEmitter.TIMESTAMP_FIELD; import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY; import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE; import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER; @@ -285,18 +294,35 @@ public void testProfileWithStatsObject() throws Exception { assertTrue(results.get(0) instanceof OnlineStatisticsProvider); } - /** - * Generates an error message for if the byte comparison fails. - * - * @param expected The expected value. - * @param actual The actual value. - * @return - * @throws UnsupportedEncodingException - */ - private String failMessage(byte[] expected, byte[] actual) throws UnsupportedEncodingException { - return String.format("expected '%s', got '%s'", - new String(expected, "UTF-8"), - new String(actual, "UTF-8")); + @Test + public void testProfileWithTriageResult() throws Exception { + uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/triage-result"); + + // start the topology and write test messages to kafka + fluxComponent.submitTopology(); + List telemetry = FileUtils.readLines(new File("src/test/resources/telemetry.json")); + kafkaComponent.writeMessages(inputTopic, telemetry); + + // wait until the triage message is output to kafka + waitOrTimeout(() -> kafkaComponent.readMessages(outputTopic).size() > 0, timeout(seconds(90))); + + List outputMessages = kafkaComponent.readMessages(outputTopic); + assertEquals(1, outputMessages.size()); + + // validate the triage message + JSONObject message = (JSONObject) new JSONParser().parse(new String(outputMessages.get(0), "UTF-8")); + assertEquals("profile-with-triage", message.get(PROFILE_FIELD)); + assertEquals("global", message.get(ENTITY_FIELD)); + assertEquals(76548935L, message.get(PERIOD_ID_FIELD)); + assertEquals(1530978700000L, message.get(PERIOD_START_FIELD)); + assertEquals(1530978720000L, message.get(PERIOD_END_FIELD)); + assertEquals("profiler", message.get(Constants.SENSOR_TYPE)); + assertEquals("true", message.get(ALERT_FIELD)); + assertEquals(1.0, message.get("min")); + assertEquals(1.0, message.get("max")); + assertEquals(1.0, message.get("mean")); + assertTrue(message.containsKey(TIMESTAMP_FIELD)); + assertTrue(message.containsKey(Constants.GUID)); } private static String getMessage(String ipSource, long timestamp) { @@ -471,7 +497,6 @@ private void assign(String var, String expression) { */ private T execute(String expression, Class clazz) { T results = executor.execute(expression, Collections.emptyMap(), clazz); - LOG.debug("{} = {}", expression, results); return results; } From 48df90b671c59b643625f58e5e3f7791e1d16320 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 25 Sep 2018 08:53:24 -0400 Subject: [PATCH 3/4] METRON-1792 Simplify Profile Definitions in Integration Tests --- .../zookeeper/event-time-test/profiler.json | 19 --- .../processing-time-test/profiler.json | 11 -- .../profile-with-stats/profiler.json | 12 -- .../zookeeper/triage-result/profiler.json | 20 ---- .../integration/ConfigUploadComponent.java | 33 ++++-- .../integration/ProfilerIntegrationTest.java | 110 +++++++++++++++--- 6 files changed, 117 insertions(+), 88 deletions(-) delete mode 100644 metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json delete mode 100644 metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json delete mode 100644 metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json delete mode 100644 metron-analytics/metron-profiler/src/test/config/zookeeper/triage-result/profiler.json diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json deleted file mode 100644 index 534b7c6969..0000000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "timestampField": "timestamp", - "profiles": [ - { - "profile": "count-by-ip", - "foreach": "ip_src_addr", - "init": { "count": 0 }, - "update": { "count" : "count + 1" }, - "result": "count" - }, - { - "profile": "total-count", - "foreach": "'total'", - "init": { "count": 0 }, - "update": { "count": "count + 1" }, - "result": "count" - } - ] -} \ No newline at end of file diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json deleted file mode 100644 index e75ec0fede..0000000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "profiles": [ - { - "profile": "processing-time-test", - "foreach": "ip_src_addr", - "init": { "counter": "0" }, - "update": { "counter": "counter + 1" }, - "result": "counter" - } - ] -} \ No newline at end of file diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json deleted file mode 100644 index 083e73fbd3..0000000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "profiles": [ - { - "profile": "profile-with-stats", - "foreach": "'global'", - "init": { "stats": "STATS_INIT()" }, - "update": { "stats": "STATS_ADD(stats, 1)" }, - "result": "stats" - } - ], - "timestampField": "timestamp" -} \ No newline at end of file diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/triage-result/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/triage-result/profiler.json deleted file mode 100644 index 7d63da7073..0000000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/triage-result/profiler.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "profiles": [ - { - "profile": "profile-with-triage", - "foreach": "'global'", - "update": { - "stats": "STATS_ADD(stats, 1)" - }, - "result": { - "profile": "stats", - "triage": { - "min": "STATS_MIN(stats)", - "max": "STATS_MAX(stats)", - "mean": "STATS_MEAN(stats)" - } - } - } - ], - "timestampField": "timestamp" -} diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java index b59d0b5e30..c825a10309 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java @@ -19,12 +19,15 @@ */ package org.apache.metron.profiler.integration; +import org.apache.commons.lang3.ArrayUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.ZKServerComponent; +import java.util.Arrays; import java.util.Properties; import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient; @@ -41,7 +44,8 @@ public class ConfigUploadComponent implements InMemoryComponent { private Properties topologyProperties; private String globalConfiguration; - private String profilerConfiguration; + private String profilerConfigurationPath; + private ProfilerConfig profilerConfig; @Override public void start() throws UnableToStartException { @@ -86,11 +90,17 @@ private void upload() throws Exception { * @param client The zookeeper client. */ private void uploadProfilerConfig(CuratorFramework client) throws Exception { - if (profilerConfiguration != null) { - byte[] globalConfig = readProfilerConfigFromFile(profilerConfiguration); - if (globalConfig.length > 0) { - writeProfilerConfigToZookeeper(readProfilerConfigFromFile(profilerConfiguration), client); - } + byte[] configBytes = null; + + if (profilerConfigurationPath != null) { + configBytes = readProfilerConfigFromFile(profilerConfigurationPath); + + } else if(profilerConfig != null) { + configBytes = profilerConfig.toJSON().getBytes(); + } + + if (ArrayUtils.getLength(configBytes) > 0) { + writeProfilerConfigToZookeeper(configBytes, client); } } @@ -117,8 +127,13 @@ public ConfigUploadComponent withGlobalConfiguration(String path) { return this; } - public ConfigUploadComponent withProfilerConfiguration(String path) { - this.profilerConfiguration = path; + public ConfigUploadComponent withProfilerConfigurationPath(String path) { + this.profilerConfigurationPath = path; + return this; + } + + public ConfigUploadComponent withProfilerConfiguration(ProfilerConfig profilerConfig) { + this.profilerConfig = profilerConfig; return this; } -} \ No newline at end of file +} diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java index e1bc1365b4..c7f19b2d33 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java @@ -23,6 +23,7 @@ import org.adrianwalker.multilinestring.Multiline; import org.apache.commons.io.FileUtils; import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.integration.BaseIntegrationTest; @@ -51,7 +52,6 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.io.UnsupportedEncodingException; import java.lang.invoke.MethodHandles; import java.util.Arrays; import java.util.Collections; @@ -91,18 +91,15 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { public static final long startAt = 10; public static final String entity = "10.0.0.1"; - private static final String tableName = "profiler"; private static final String columnFamily = "P"; private static final String inputTopic = Constants.INDEXING_TOPIC; private static final String outputTopic = "profiles"; private static final int saltDivisor = 10; - private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(20); private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(10); private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(10); private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20); - private static final long maxRoutesPerBolt = 100000; private static ZKServerComponent zkComponent; @@ -111,11 +108,9 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { private static ConfigUploadComponent configUploadComponent; private static ComponentRunner runner; private static MockHTable profilerTable; - private static String message1; private static String message2; private static String message3; - private StellarStatefulExecutor executor; /** @@ -136,9 +131,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @Multiline private static String kryoSerializers; + /** + * { + * "profiles": [ + * { + * "profile": "processing-time-test", + * "foreach": "ip_src_addr", + * "init": { "counter": "0" }, + * "update": { "counter": "counter + 1" }, + * "result": "counter" + * } + * ] + * } + */ + @Multiline + private static String processingTimeProfile; + @Test public void testProcessingTime() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(processingTimeProfile)); // start the topology and write 3 test messages to kafka fluxComponent.submitTopology(); @@ -147,7 +158,7 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message3); // retrieve the profile measurement using PROFILE_GET - String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; + String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))"; List measurements = execute(profileGetExpression, List.class); // need to keep checking for measurements until the profiler has flushed one out @@ -179,7 +190,7 @@ public void testProcessingTime() throws Exception { @Test public void testProcessingTimeWithTimeToLiveFlush() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(processingTimeProfile)); // start the topology and write 3 test messages to kafka fluxComponent.submitTopology(); @@ -195,7 +206,7 @@ public void testProcessingTimeWithTimeToLiveFlush() throws Exception { kafkaComponent.writeMessages(inputTopic, message3); // retrieve the profile measurement using PROFILE_GET - String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; + String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))"; List measurements = execute(profileGetExpression, List.class); // need to keep checking for measurements until the profiler has flushed one out @@ -222,9 +233,33 @@ public void testProcessingTimeWithTimeToLiveFlush() throws Exception { assertEquals(3, measurements.get(0).intValue()); } + /** + * { + * "timestampField": "timestamp", + * "profiles": [ + * { + * "profile": "count-by-ip", + * "foreach": "ip_src_addr", + * "init": { "count": 0 }, + * "update": { "count" : "count + 1" }, + * "result": "count" + * }, + * { + * "profile": "total-count", + * "foreach": "'total'", + * "init": { "count": 0 }, + * "update": { "count": "count + 1" }, + * "result": "count" + * } + * ] + * } + */ + @Multiline + private static String eventTimeProfile; + @Test public void testEventTime() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/event-time-test"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(eventTimeProfile)); // start the topology and write test messages to kafka fluxComponent.submitTopology(); @@ -264,6 +299,23 @@ public void testEventTime() throws Exception { } } + /** + * { + * "profiles": [ + * { + * "profile": "profile-with-stats", + * "foreach": "'global'", + * "init": { "stats": "STATS_INIT()" }, + * "update": { "stats": "STATS_ADD(stats, 1)" }, + * "result": "stats" + * } + * ], + * "timestampField": "timestamp" + * } + */ + @Multiline + private static String profileWithStats; + /** * The result produced by a Profile has to be serializable within Storm. If the result is not * serializable the topology will crash and burn. @@ -273,7 +325,7 @@ public void testEventTime() throws Exception { */ @Test public void testProfileWithStatsObject() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/profile-with-stats"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithStats)); // start the topology and write test messages to kafka fluxComponent.submitTopology(); @@ -294,9 +346,34 @@ public void testProfileWithStatsObject() throws Exception { assertTrue(results.get(0) instanceof OnlineStatisticsProvider); } + /** + * { + * "profiles": [ + * { + * "profile": "profile-with-triage", + * "foreach": "'global'", + * "update": { + * "stats": "STATS_ADD(stats, 1)" + * }, + * "result": { + * "profile": "stats", + * "triage": { + * "min": "STATS_MIN(stats)", + * "max": "STATS_MAX(stats)", + * "mean": "STATS_MEAN(stats)" + * } + * } + * } + * ], + * "timestampField": "timestamp" + * } + */ + @Multiline + private static String profileWithTriageResult; + @Test public void testProfileWithTriageResult() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/triage-result"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithTriageResult)); // start the topology and write test messages to kafka fluxComponent.submitTopology(); @@ -467,13 +544,12 @@ public void tearDown() throws Exception { /** * Uploads config values to Zookeeper. - * @param path The path on the local filesystem to the config values. + * @param profilerConfig The Profiler configuration. * @throws Exception */ - public void uploadConfigToZookeeper(String path) throws Exception { + public void uploadConfigToZookeeper(ProfilerConfig profilerConfig) throws Exception { configUploadComponent - .withGlobalConfiguration(path) - .withProfilerConfiguration(path) + .withProfilerConfiguration(profilerConfig) .update(); } From d4eb1b9dc5d74afcaf0430d3ff6981dbc5894092 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 25 Sep 2018 10:03:37 -0400 Subject: [PATCH 4/4] The ZkConfigurationCacheIntegrationTest needs a profile definition to use --- .../ZKConfigurationsCacheIntegrationTest.java | 5 ++--- .../src/test/resources/profiler/profiler.json | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) create mode 100644 metron-platform/metron-common/src/test/resources/profiler/profiler.json diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java index 5240d7ac18..ce898d3462 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java @@ -119,7 +119,6 @@ public class ZKConfigurationsCacheIntegrationTest { @Multiline public static String globalConfig; - public static File profilerDir = new File("../../metron-analytics/metron-profiler/src/test/config/zookeeper"); public ConfigurationsCache cache; public ZKServerComponent zkComponent; @@ -154,7 +153,7 @@ public void setup() throws Exception { } { //profiler - byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/event-time-test/profiler.json"))); + byte[] config = IOUtils.toByteArray(new FileInputStream(new File("src/test/resources/profiler/profiler.json"))); ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client); } { @@ -284,7 +283,7 @@ public void validateBaseWrite() throws Exception { } //profiler { - File inFile = new File(profilerDir, "/event-time-test/profiler.json"); + File inFile = new File("src/test/resources/profiler/profiler.json"); ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, ProfilerConfig.class); ProfilerConfigurations config = cache.get( ProfilerConfigurations.class); assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig())); diff --git a/metron-platform/metron-common/src/test/resources/profiler/profiler.json b/metron-platform/metron-common/src/test/resources/profiler/profiler.json new file mode 100644 index 0000000000..32be68dfe5 --- /dev/null +++ b/metron-platform/metron-common/src/test/resources/profiler/profiler.json @@ -0,0 +1,19 @@ +{ + "timestampField": "timestamp", + "profiles": [ + { + "profile": "count-by-ip", + "foreach": "ip_src_addr", + "init": { "count": 0 }, + "update": { "count" : "count + 1" }, + "result": "count" + }, + { + "profile": "total-count", + "foreach": "'total'", + "init": { "count": 0 }, + "update": { "count": "count + 1" }, + "result": "count" + } + ] +}