From 387cd2de46b6a2392d1fe7a537fc8107b3bf4a59 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 11 May 2018 15:22:00 -0400 Subject: [PATCH 1/5] METRON-1551 Profiler Should Not Use Java Serialization --- metron-analytics/metron-profiler/README.md | 62 +++++++++----- .../src/main/config/profiler.properties | 13 +++ .../src/main/flux/profiler/remote.yaml | 3 + .../profile-with-stats/profiler.json | 12 +++ .../integration/ProfilerIntegrationTest.java | 83 ++++++++++++++++++- .../package/templates/profiler.properties.j2 | 13 +++ 6 files changed, 163 insertions(+), 23 deletions(-) create mode 100644 metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md index 01918aaddb..1289d9ed24 100644 --- a/metron-analytics/metron-profiler/README.md +++ b/metron-analytics/metron-profiler/README.md @@ -123,7 +123,7 @@ Creating and refining profiles is an iterative process. Iterating against a liv [Stellar]>>> %functions PROFILER PROFILER_APPLY, PROFILER_FLUSH, PROFILER_INIT ``` - + 1. Create a simple `hello-world` profile that will count the number of messages for each `ip_src_addr`. The `SHELL_EDIT` function will open an editor in which you can copy/paste the following Profiler configuration. ``` [Stellar]>>> conf := SHELL_EDIT() @@ -142,12 +142,12 @@ Creating and refining profiles is an iterative process. Iterating against a liv } ``` -1. Create a Profile execution environment; the Profile Debugger. +1. Create a Profile execution environment; the Profile Debugger. The Profiler will output the number of profiles that have been defined, the number of messages that have been applied and the number of routes that have been followed. A route is defined when a message is applied to a specific profile. - * If a message is not needed by any profile, then there are no routes. + * If a message is not needed by any profile, then there are no routes. * If a message is needed by one profile, then one route has been followed. * If a message is needed by two profiles, then two routes have been followed. @@ -157,7 +157,7 @@ Creating and refining profiles is an iterative process. Iterating against a liv Profiler{1 profile(s), 0 messages(s), 0 route(s)} ``` -1. Create a message that mimics the telemetry that your profile will consume. +1. Create a message that mimics the telemetry that your profile will consume. This message can be as simple or complex as you like. For the `hello-world` profile, all you need is a message containing an `ip_src_addr` field. @@ -181,11 +181,11 @@ Creating and refining profiles is an iterative process. Iterating against a liv ``` 1. Flush the Profiler. - - A flush is what occurs at the end of each 15 minute period in the Profiler. The result is a list of Profile Measurements. Each measurement is a map containing detailed information about the profile data that has been generated. The `value` field is what is written to HBase when running this profile in the Profiler topology. - + + A flush is what occurs at the end of each 15 minute period in the Profiler. The result is a list of Profile Measurements. Each measurement is a map containing detailed information about the profile data that has been generated. The `value` field is what is written to HBase when running this profile in the Profiler topology. + There will always be one measurement for each [profile, entity] pair. This profile simply counts the number of messages by IP source address. Notice that the value is '3' for the entity '10.0.0.1' as we applied 3 messages with an 'ip_src_addr' of ’10.0.0.1'. - + ``` [Stellar]>>> values := PROFILER_FLUSH(profiler) [Stellar]>>> values @@ -197,9 +197,9 @@ Creating and refining profiles is an iterative process. Iterating against a liv Once you are happy with your profile against a controlled data set, it can be useful to introduce more complex, live data. This example extracts 10 messages of live, enriched telemetry to test your profile(s). ``` - [Stellar]>>> %define bootstrap.servers := "node1:6667" + [Stellar]>>> %define bootstrap.servers := "node1:6667" node1:6667 - [Stellar]>>> msgs := KAFKA_GET("indexing", 10) + [Stellar]>>> msgs := KAFKA_GET("indexing", 10) [Stellar]>>> LENGTH(msgs) 10 ``` @@ -228,7 +228,7 @@ Continuing the previous running example, at this point, you have seen how your p [Stellar]>>> [Stellar]>>> %functions CONFIG CONFIG_GET, CONFIG_PUT ``` - + 1. If you haven't already, define your profile. ``` [Stellar]>>> conf := SHELL_EDIT() @@ -250,7 +250,7 @@ Continuing the previous running example, at this point, you have seen how your p 1. Check what is already deployed. Pushing a new profile configuration is destructive. It will overwrite any existing configuration. Check what you have out there. Manually merge the existing configuration with your new profile definition. - + ``` [Stellar]>>> existing := CONFIG_GET("PROFILER") ``` @@ -260,7 +260,7 @@ Continuing the previous running example, at this point, you have seen how your p [Stellar]>>> CONFIG_PUT("PROFILER", conf) ``` -### Deploying Profiles from the Command Line +### Deploying Profiles from the Command Line 1. Create the profile definition in a file located at `$METRON_HOME/config/zookeeper/profiler.json`. This file will likely not exist, if you have never created Profiles before. @@ -364,7 +364,7 @@ Indicates whether processing time or event time is used. By default, processing By default, no `timestampField` is defined. In this case, the Profiler uses system time when generating profiles. This means that the profiles are generated based on when the data has been processed by the Profiler. This is also known as 'processing time'. -This is the simplest mode of operation, but has some draw backs. If the Profiler is consuming live data and all is well, the processing and event times will likely remain similar and consistent. If processing time diverges from event time, then the Profiler will generate skewed profiles. +This is the simplest mode of operation, but has some draw backs. If the Profiler is consuming live data and all is well, the processing and event times will likely remain similar and consistent. If processing time diverges from event time, then the Profiler will generate skewed profiles. There are a few scenarios that might cause skewed profiles when using processing time. For example when a system has undergone a scheduled maintenance window and is restarted, a high volume of messages will need to be processed by the Profiler. The output of the Profiler might indicate an increase in activity during this time, although no change in activity actually occurred on the target network. The same situation could occur if an upstream system which provides telemetry undergoes an outage. @@ -380,7 +380,7 @@ Alternatively, a `timestampField` can be defined. This must be the name of a fi * The Profiler will use the same field across all telemetry sources and for all profiles. -* Be aware of clock skew across telemetry sources. If your profile is processing telemetry from multiple sources where the clock differs significantly, the Profiler may assume that some of those messages are late and will be ignored. Adjusting the [`profiler.window.duration`](#profilerwindowduration) and [`profiler.window.lag`](#profilerwindowlag) can help accommodate skewed clocks. +* Be aware of clock skew across telemetry sources. If your profile is processing telemetry from multiple sources where the clock differs significantly, the Profiler may assume that some of those messages are late and will be ignored. Adjusting the [`profiler.window.duration`](#profilerwindowduration) and [`profiler.window.lag`](#profilerwindowlag) can help accommodate skewed clocks. ### Profiles @@ -516,14 +516,12 @@ The REPL can be a powerful for developing profiles. Read all about [Developing P ## Configuring the Profiler -The Profiler runs as an independent Storm topology. The configuration for the Profiler topology is stored in local filesystem at `$METRON_HOME/config/profiler.properties`. -The values can be changed on disk and then the Profiler topology must be restarted. - +The Profiler runs as an independent Storm topology. The configuration for the Profiler topology is stored in local filesystem at `$METRON_HOME/config/profiler.properties`. After changing these values, the Profiler topology must be restarted for the changes to take effect. | Setting | Description |--- |--- | [`profiler.input.topic`](#profilerinputtopic) | The name of the input Kafka topic. -| [`profiler.output.topic`](#profileroutputtopic) | The name of the output Kafka topic. +| [`profiler.output.topic`](#profileroutputtopic) | The name of the output Kafka topic. | [`profiler.period.duration`](#profilerperiodduration) | The duration of each profile period. | [`profiler.period.duration.units`](#profilerperioddurationunits) | The units used to specify the [`profiler.period.duration`](#profilerperiodduration). | [`profiler.window.duration`](#profilerwindowduration) | The duration of each profile window. @@ -539,6 +537,8 @@ The values can be changed on disk and then the Profiler topology must be restart | [`profiler.hbase.column.family`](#profilerhbasecolumnfamily) | The column family used to store profiles. | [`profiler.hbase.batch`](#profilerhbasebatch) | The number of puts that are written to HBase in a single batch. | [`profiler.hbase.flush.interval.seconds`](#profilerhbaseflushintervalseconds) | The maximum number of seconds between batch writes to HBase. +| [`topology.kryo.register`](#topologykryoregister) | Storm will use Kryo serialization for these classes. + ### `profiler.input.topic` @@ -654,6 +654,30 @@ The number of puts that are written to HBase in a single batch. The maximum number of seconds between batch writes to HBase. +### `topology.kryo.register` + +*Default*: +``` +[ org.apache.metron.profiler.ProfileMeasurement, \ + org.apache.metron.profiler.ProfilePeriod, \ + org.apache.metron.common.configuration.profiler.ProfileResult, \ + org.apache.metron.common.configuration.profiler.ProfileResultExpressions, \ + org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, \ + org.apache.metron.common.configuration.profiler.ProfilerConfig, \ + org.apache.metron.common.configuration.profiler.ProfileConfig, \ + org.json.simple.JSONObject, \ + java.util.LinkedHashMap, \ + org.apache.metron.statistics.OnlineStatisticsProvider ] +``` + +Storm will use Kryo serialization for these classes. Kryo serialization is more performant than Java serialization, in most cases. + +For these classes, Storm will uses Kryo's `FieldSerializer` as defined in the [Storm Serialization docs]((http://storm.apache.org/releases/1.1.2/Serialization.html). For all other classes not in this list, Storm defaults to using Java serialization which is slower and not recommended for a production topology. + +This value should only need altered if you have defined a profile that results in a non-primitive, user-defined type that is not in this list. If the class is not defined in this list, Java serialization will be used and the class must adhere to Java's serialization requirements. + +The performance of the entire Profiler topology can be negatively impacted if any profile produces results that undergo Java serialization. + ## Examples The following examples are intended to highlight the functionality provided by the Profiler. Try out these examples easily in the Stellar Shell as described in the [Creating Profiles](#creating-profiles) section. diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties index fe3c475162..dc30838628 100644 --- a/metron-analytics/metron-profiler/src/main/config/profiler.properties +++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties @@ -26,6 +26,19 @@ profiler.workers=1 profiler.executors=0 topology.message.timeout.secs=30 topology.max.spout.pending=100000 +topology.fall.back.on.java.serialization=true +topology.testing.always.try.serialize=false +topology.kryo.register=[ org.apache.metron.profiler.ProfileMeasurement, \ + org.apache.metron.profiler.ProfilePeriod, \ + org.apache.metron.common.configuration.profiler.ProfileResult, \ + org.apache.metron.common.configuration.profiler.ProfileResultExpressions, \ + org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, \ + org.apache.metron.common.configuration.profiler.ProfilerConfig, \ + org.apache.metron.common.configuration.profiler.ProfileConfig, \ + org.json.simple.JSONObject, \ + org.json.simple.JSONArray, \ + java.util.LinkedHashMap, \ + org.apache.metron.statistics.OnlineStatisticsProvider ] ##### Profiler ##### diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml index 6ad007bf8b..5e92c62d5c 100644 --- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml +++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml @@ -23,6 +23,9 @@ config: topology.auto-credentials: ${topology.auto-credentials} topology.message.timeout.secs: ${topology.message.timeout.secs} topology.max.spout.pending: ${topology.max.spout.pending} + topology.testing.always.try.serialize: ${topology.testing.always.try.serialize} + topology.fall.back.on.java.serialization: ${topology.fall.back.on.java.serialization} + topology.kryo.register: ${topology.kryo.register} components: diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json new file mode 100644 index 0000000000..083e73fbd3 --- /dev/null +++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json @@ -0,0 +1,12 @@ +{ + "profiles": [ + { + "profile": "profile-with-stats", + "foreach": "'global'", + "init": { "stats": "STATS_INIT()" }, + "update": { "stats": "STATS_ADD(stats, 1)" }, + "result": "stats" + } + ], + "timestampField": "timestamp" +} \ No newline at end of file diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java index 8f5ced3a88..c02c469ab8 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java @@ -20,6 +20,7 @@ package org.apache.metron.profiler.integration; +import org.adrianwalker.multilinestring.Multiline; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; @@ -45,6 +46,7 @@ import org.junit.Test; import java.io.File; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -93,6 +95,23 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { private static String message2; private static String message3; + /** + * [ + * org.apache.metron.profiler.ProfileMeasurement, + * org.apache.metron.profiler.ProfilePeriod, + * org.apache.metron.common.configuration.profiler.ProfileResult, + * org.apache.metron.common.configuration.profiler.ProfileResultExpressions, + * org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, + * org.apache.metron.common.configuration.profiler.ProfilerConfig, + * org.apache.metron.common.configuration.profiler.ProfileConfig, + * org.json.simple.JSONObject, + * java.util.LinkedHashMap, + * org.apache.metron.statistics.OnlineStatisticsProvider + * ] + */ + @Multiline + private static String kryoSerializers; + /** * The Profiler can generate profiles based on processing time. With processing time, * the Profiler builds profiles based on when the telemetry was processed. @@ -164,10 +183,60 @@ public void testEventTime() throws Exception { // embedded in the row key should match those in the source telemetry byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); byte[] actualRowKey = puts.get(0).getRow(); - String msg = String.format("expected '%s', got '%s'", - new String(expectedRowKey, "UTF-8"), - new String(actualRowKey, "UTF-8")); - assertArrayEquals(msg, expectedRowKey, actualRowKey); + assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); + } + + /** + * The result produced by a Profile has to be serializable within Storm. If the result is not + * serializable the topology will crash and burn. + * + * This test ensures that if a profile returns a STATS object created using the STATS_INIT and + * STATS_ADD functions, that it can be correctly serialized and persisted. + */ + @Test + public void testProfileWithStatsObject() throws Exception { + + // upload the profiler config to zookeeper + uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats"); + + // start the topology and write test messages to kafka + fluxComponent.submitTopology(); + kafkaComponent.writeMessages(inputTopic, message1); + kafkaComponent.writeMessages(inputTopic, message2); + kafkaComponent.writeMessages(inputTopic, message3); + + // wait until the profile is flushed + waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); + + // ensure that a value was persisted in HBase + List puts = profilerTable.getPutLog(); + assertEquals(1, puts.size()); + + // generate the expected row key. only the profile name, entity, and period are used to generate the row key + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile-with-stats") + .withEntity("global") + .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS); + RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); + byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement); + + // ensure the correct row key was generated + byte[] actualRowKey = puts.get(0).getRow(); + assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); + } + + /** + * Generates an error message for if the byte comparison fails. + * + * @param expected The expected value. + * @param actual The actual value. + * @return + * @throws UnsupportedEncodingException + */ + private String failMessage(byte[] expected, byte[] actual) throws UnsupportedEncodingException { + return String.format("expected '%s', got '%s'", + new String(expected, "UTF-8"), + new String(actual, "UTF-8")); } /** @@ -249,6 +318,12 @@ public static void setupBeforeClass() throws UnableToStartException { setProperty("topology.message.timeout.secs", "60"); setProperty("topology.max.spout.pending", "100000"); + // ensure tuples are serialized during the test, otherwise serialization problems + // will not be found until the topology is run on a cluster with multiple workers + setProperty("topology.testing.always.try.serialize", "true"); + setProperty("topology.fall.back.on.java.serialization", "false"); + setProperty("topology.kryo.register", kryoSerializers); + // kafka settings setProperty("profiler.input.topic", inputTopic); setProperty("profiler.output.topic", outputTopic); diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 index fabdaa7778..d8bc13dbcc 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 @@ -26,6 +26,19 @@ profiler.workers={{profiler_topology_workers}} profiler.executors={{profiler_acker_executors}} topology.message.timeout.secs={{profiler_topology_message_timeout_secs}} topology.max.spout.pending={{profiler_topology_max_spout_pending}} +topology.fall.back.on.java.serialization=true +topology.testing.always.try.serialize=false +topology.kryo.register=[ org.apache.metron.profiler.ProfileMeasurement, \ + org.apache.metron.profiler.ProfilePeriod, \ + org.apache.metron.common.configuration.profiler.ProfileResult, \ + org.apache.metron.common.configuration.profiler.ProfileResultExpressions, \ + org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, \ + org.apache.metron.common.configuration.profiler.ProfilerConfig, \ + org.apache.metron.common.configuration.profiler.ProfileConfig, \ + org.json.simple.JSONObject, \ + org.json.simple.JSONArray, \ + java.util.LinkedHashMap, \ + org.apache.metron.statistics.OnlineStatisticsProvider ] ##### Profiler ##### From 2898e30dc5537bdfe850462fcb98a32f176acd47 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Sat, 12 May 2018 14:31:44 -0400 Subject: [PATCH 2/5] Fixed a broken hyperlink in the README --- metron-analytics/metron-profiler/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md index 1289d9ed24..79cdd44b8a 100644 --- a/metron-analytics/metron-profiler/README.md +++ b/metron-analytics/metron-profiler/README.md @@ -672,7 +672,7 @@ The maximum number of seconds between batch writes to HBase. Storm will use Kryo serialization for these classes. Kryo serialization is more performant than Java serialization, in most cases. -For these classes, Storm will uses Kryo's `FieldSerializer` as defined in the [Storm Serialization docs]((http://storm.apache.org/releases/1.1.2/Serialization.html). For all other classes not in this list, Storm defaults to using Java serialization which is slower and not recommended for a production topology. +For these classes, Storm will uses Kryo's `FieldSerializer` as defined in the [Storm Serialization docs](http://storm.apache.org/releases/1.1.2/Serialization.html). For all other classes not in this list, Storm defaults to using Java serialization which is slower and not recommended for a production topology. This value should only need altered if you have defined a profile that results in a non-primitive, user-defined type that is not in this list. If the class is not defined in this list, Java serialization will be used and the class must adhere to Java's serialization requirements. From 788b86166e867d99c3e499b04a3f2480a05b3ddf Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 14 May 2018 12:28:39 -0400 Subject: [PATCH 3/5] Made all Profiler classes serializable in case a user chooses Java serialization in their topology. Test cases added to validate --- .../metron/profiler/ProfileMeasurement.java | 3 +- .../apache/metron/profiler/ProfilePeriod.java | 3 +- .../profiler/ProfileMeasurementTest.java | 108 ++++++++++++++++++ .../metron/profiler/ProfilePeriodTest.java | 49 ++++++++ .../configuration/profiler/ProfileResult.java | 4 +- .../profiler/ProfileResultExpressions.java | 4 +- .../profiler/ProfileTriageExpressions.java | 3 +- .../profiler/ProfilerConfig.java | 12 +- .../profiler/ProfilerConfigTest.java | 85 +++++++++++++- 9 files changed, 259 insertions(+), 12 deletions(-) create mode 100644 metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileMeasurementTest.java diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java index f6cc286e13..4737c3d9dc 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java @@ -22,6 +22,7 @@ import org.apache.metron.common.configuration.profiler.ProfileConfig; +import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Map; @@ -33,7 +34,7 @@ *

A profile contains many individual {@link ProfileMeasurement} values captured over a * period of time. These values in aggregate form a time series. */ -public class ProfileMeasurement { +public class ProfileMeasurement implements Serializable { /** * The name of the profile that this measurement is associated with. diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java index c2d8b21c1d..cbb827506c 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java @@ -20,6 +20,7 @@ package org.apache.metron.profiler; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -33,7 +34,7 @@ * The Profiler captures a ProfileMeasurement once every ProfilePeriod. There can be * multiple ProfilePeriods every hour. */ -public class ProfilePeriod { +public class ProfilePeriod implements Serializable { /** * A monotonically increasing number identifying the period. The first period is 0 diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileMeasurementTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileMeasurementTest.java new file mode 100644 index 0000000000..3a8d650acb --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileMeasurementTest.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.utils.SerDeUtils; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ProfileMeasurementTest { + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "update": {}, + * "result": { + * "profile": "2 + 2", + * "triage": { + * "eight": "4 + 4", + * "sixteen": "8 + 8" + * } + * } + * } + */ + @Multiline + private String profile; + private ProfileConfig definition; + private ProfileMeasurement measurement; + + public void setup() throws Exception { + definition = ProfileConfig.fromJSON(profile); + + measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withDefinition(definition) + .withPeriod(System.currentTimeMillis(), 10, TimeUnit.MINUTES) + .withProfileValue(22) + .withTriageValues(Collections.singletonMap("max", 200)); + } + + /** + * Ensure that the {@link ProfileMeasurement} can undergo Kryo serialization which + * occurs when the Profiler is running in Storm. + */ + @Test + public void testKryoSerialization() throws Exception { + + // round-trip serialization + byte[] raw = SerDeUtils.toBytes(measurement); + Object actual = SerDeUtils.fromBytes(raw, Object.class); + assertEquals(measurement, actual); + } + + /** + * Ensure that the {@link ProfileMeasurement} can undergo Java serialization, should a user + * prefer that over Kryo serialization, which can occur when the Profiler is running + * in Storm. + */ + @Test + public void testJavaSerialization() throws Exception { + + // serialize using java + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bytes); + out.writeObject(measurement); + + // the serialized bits + byte[] raw = bytes.toByteArray(); + assertTrue(raw.length > 0); + + // deserialize using java + ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(raw)); + Object actual = in.readObject(); + + // ensure that the round-trip was successful + assertEquals(measurement, actual); + } +} diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java index 1a72111368..f52bd09941 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java @@ -20,12 +20,18 @@ package org.apache.metron.profiler; +import org.apache.metron.common.utils.SerDeUtils; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Tests the ProfilePeriod class. @@ -124,4 +130,47 @@ public void testPeriodDurationOfZero() { TimeUnit units = TimeUnit.HOURS; new ProfilePeriod(0, duration, units); } + + /** + * Ensure that the ProfilePeriod can undergo Kryo serialization which + * occurs when the Profiler is running in Storm. + */ + @Test + public void testKryoSerialization() throws Exception { + + ProfilePeriod expected = new ProfilePeriod(AUG2016, 1, TimeUnit.HOURS); + + // round-trip java serialization + byte[] raw = SerDeUtils.toBytes(expected); + Object actual = SerDeUtils.fromBytes(raw, Object.class); + + assertEquals(expected, actual); + } + + /** + * Ensure that the ProfilePeriod can undergo Java serialization, should a user + * prefer that over Kryo serialization, which can occur when the Profiler is running + * in Storm. + */ + @Test + public void testJavaSerialization() throws Exception { + + ProfilePeriod expected = new ProfilePeriod(AUG2016, 1, TimeUnit.HOURS); + + // serialize using java + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bytes); + out.writeObject(expected); + + // the serialized bits + byte[] raw = bytes.toByteArray(); + assertTrue(raw.length > 0); + + // deserialize using java + ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(raw)); + Object actual = in.readObject(); + + // ensure that the round-trip was successful + assertEquals(expected, actual); + } } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java index 55642a938a..e2aa54d909 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java @@ -20,10 +20,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; + /** * Defines the 'result' field of a Profile definition. */ -public class ProfileResult { +public class ProfileResult implements Serializable { /** * A Stellar expression that is executed to produce diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java index 5bcec7227d..2cada019f2 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java @@ -20,11 +20,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import java.io.Serializable; + /** * A Stellar expression that is executed to produce a single * measurement that is persisted within the profile store. */ -public class ProfileResultExpressions { +public class ProfileResultExpressions implements Serializable { private String expression; diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java index da02cb2d8c..b1b71757bf 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -32,7 +33,7 @@ * The result of evaluating each expression are made available, keyed * by the given name, to the threat triage process. */ -public class ProfileTriageExpressions { +public class ProfileTriageExpressions implements Serializable { /** * A set of named Stellar expressions. The name of the expression diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java index e4fa99a85a..068a4c8155 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java @@ -56,7 +56,7 @@ public class ProfilerConfig implements Serializable { *

If a message does NOT contain this field, it will be dropped * and not included in any profiles. */ - private Optional timestampField = Optional.empty(); + private String timestampField = null; public List getProfiles() { return profiles; @@ -73,24 +73,24 @@ public ProfilerConfig withProfile(ProfileConfig profileConfig) { @JsonGetter("timestampField") public String getTimestampFieldForJson() { - return timestampField.orElse(null); + return timestampField; } public Optional getTimestampField() { - return timestampField; + return Optional.ofNullable(timestampField); } @JsonSetter("timestampField") public void setTimestampField(String timestampField) { - this.timestampField = Optional.of(timestampField); + this.timestampField = timestampField; } public void setTimestampField(Optional timestampField) { - this.timestampField = timestampField; + this.timestampField = timestampField.orElse(null); } public ProfilerConfig withTimestampField(Optional timestampField) { - this.timestampField = timestampField; + this.timestampField = timestampField.orElse(null); return this; } diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java index 1a1181187e..2cbdfb960e 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java @@ -20,9 +20,14 @@ package org.apache.metron.common.configuration.profiler; import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.utils.SerDeUtils; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -108,7 +113,6 @@ public void testFromJSONWithNoTimestampField() throws IOException { @Test public void testFromJSONWithTimestampField() throws IOException { ProfilerConfig conf = ProfilerConfig.fromJSON(timestampField); - assertTrue(conf.getTimestampField().isPresent()); } @@ -206,4 +210,83 @@ public void testToJSONWithTwoProfiles() throws Exception { ProfilerConfig actual = ProfilerConfig.fromJSON(asJson); assertEquals(expected, actual); } + + /** + * { + * "profiles": [ + * { + * "profile": "profile1", + * "foreach": "ip_src_addr", + * "init": { "count": "0" }, + * "update": { "count": "count + 1" }, + * "result": "count" + * }, + * { + * "profile": "profile2", + * "foreach": "ip_dst_addr", + * "init": { "count": "0" }, + * "update": { "count": "count + 1" }, + * "result": "count" + * }, + * { + * "profile": "profile3", + * "foreach": "ip_src_addr", + * "init": { "count": "0" }, + * "update": { "count": "count + 1" }, + * "result": { + * "profile": "count", + * "triage" : { "count": "count" } + * } + * } + * ] + * } + */ + @Multiline + private String profilesToSerialize; + + /** + * Ensure that the Profiler configuration can undergo Kryo serialization which + * occurs when the Profiler is running in Storm. + */ + @Test + public void testKryoSerialization() throws Exception { + + // setup a profiler config to serialize + ProfilerConfig expected = ProfilerConfig.fromJSON(profilesToSerialize); + + // round-trip java serialization + byte[] raw = SerDeUtils.toBytes(expected); + Object actual = SerDeUtils.fromBytes(raw, Object.class); + + assertEquals(expected, actual); + } + + /** + * Ensure that the Profiler configuration can undergo Java serialization, should a user + * prefer that over Kryo serialization, which can occur when the Profiler is running + * in Storm. + */ + @Test + public void testJavaSerialization() throws Exception { + + // setup a profiler config to serialize + ProfilerConfig expected = ProfilerConfig.fromJSON(profilesToSerialize); + + // serialize using java + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bytes); + out.writeObject(expected); + + // the serialized bits + byte[] raw = bytes.toByteArray(); + assertTrue(raw.length > 0); + + // deserialize using java + ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(raw)); + Object actual = in.readObject(); + + // ensure that the round-trip was successful + assertEquals(expected, actual); + } + } From db9446e1cbf73cd1e853c2d7af65169882ca9044 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 14 May 2018 13:43:47 -0400 Subject: [PATCH 4/5] Added Java serializable check to test classes and also added 'Serializable' where applicable --- .../statistics/sampling/UniformSampler.java | 5 +- .../metron/common/utils/SerDeUtils.java | 7 +- .../stellar/common/utils/BloomFilter.java | 8 +- .../stellar/common/utils/SerDeUtils.java | 16 ++-- .../common/utils/StellarProcessorUtils.java | 81 ++++++++++++++++--- 5 files changed, 95 insertions(+), 22 deletions(-) diff --git a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java index 11460e0c24..96ab2d42c8 100644 --- a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java +++ b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java @@ -17,6 +17,7 @@ */ package org.apache.metron.statistics.sampling; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -25,7 +26,8 @@ * This is a reservoir sampler without replacement where each element sampled will be included * with equal probability in the reservoir. */ -public class UniformSampler implements Sampler { +public class UniformSampler implements Sampler, Serializable { + private List reservoir; private int seen = 0; private int size; @@ -83,7 +85,6 @@ public boolean equals(Object o) { if (getSize() != that.getSize()) return false; return reservoir != null ? reservoir.equals(that.reservoir) : that.reservoir == null; - } @Override diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java index 4a89f9727f..5e2ceb9f41 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java @@ -47,6 +47,7 @@ import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer; import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer; import java.io.ByteArrayInputStream; +import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationHandler; @@ -188,7 +189,7 @@ public Object newInstance () { public static Serializer SERIALIZER = new Serializer(); - private static class Serializer implements Function { + private static class Serializer implements Function, Serializable { /** * Serializes the given Object into bytes. * @@ -199,9 +200,10 @@ public byte[] apply(Object o) { } } - public static class Deserializer implements Function { + public static class Deserializer implements Function, Serializable { private Class clazz; + public Deserializer(Class clazz) { this.clazz = clazz; } @@ -217,7 +219,6 @@ public T apply(byte[] bytes) { } } - private SerDeUtils() { // do not instantiate } diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java index 445dca56d8..f20577099d 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java @@ -26,10 +26,13 @@ public class BloomFilter implements Serializable { private static class BloomFunnel implements Funnel, Serializable { + Function serializer; + public BloomFunnel(Function serializer) { this.serializer = serializer; } + @Override public void funnel(T obj, PrimitiveSink primitiveSink) { primitiveSink.putBytes(serializer.apply(obj)); @@ -46,12 +49,13 @@ public int hashCode() { } } - public static class DefaultSerializer implements Function { + public static class DefaultSerializer implements Function, Serializable { @Override public byte[] apply(T t) { return SerDeUtils.toBytes(t); } } + private com.google.common.hash.BloomFilter filter; public BloomFilter(Function serializer, int expectedInsertions, double falsePositiveRate) { @@ -61,9 +65,11 @@ public BloomFilter(Function serializer, int expectedInsertions, doubl public boolean mightContain(T key) { return filter.mightContain(key); } + public void add(T key) { filter.put(key); } + public void merge(BloomFilter filter2) { filter.putAll(filter2.filter); } diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java index 90038519b5..eff4f88eb6 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java @@ -45,6 +45,7 @@ import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer; import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer; import java.io.ByteArrayInputStream; +import java.io.Serializable; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Modifier; @@ -88,14 +89,16 @@ protected Kryo initialValue() { UnmodifiableCollectionsSerializer.registerSerializers(ret); SynchronizedCollectionsSerializer.registerSerializers(ret); -// custom serializers for non-jdk libs + // custom serializers for non-jdk libs -// register CGLibProxySerializer, works in combination with the appropriate action in handleUnregisteredClass (see below) + // register CGLibProxySerializer, works in combination with the appropriate action in handleUnregisteredClass (see below) ret.register(CGLibProxySerializer.CGLibProxyMarker.class, new CGLibProxySerializer()); -// joda DateTime, LocalDate and LocalDateTime + + // joda DateTime, LocalDate and LocalDateTime ret.register(LocalDate.class, new JodaLocalDateSerializer()); ret.register(LocalDateTime.class, new JodaLocalDateTimeSerializer()); -// guava ImmutableList, ImmutableSet, ImmutableMap, ImmutableMultimap, UnmodifiableNavigableSet + + // guava ImmutableList, ImmutableSet, ImmutableMap, ImmutableMultimap, UnmodifiableNavigableSet ImmutableListSerializer.registerSerializers(ret); ImmutableSetSerializer.registerSerializers(ret); ImmutableMapSerializer.registerSerializers(ret); @@ -187,7 +190,7 @@ public Object newInstance () { public static Serializer SERIALIZER = new Serializer(); - private static class Serializer implements Function { + private static class Serializer implements Function, Serializable { /** * Serializes the given Object into bytes. * @@ -198,9 +201,10 @@ public byte[] apply(Object o) { } } - public static class Deserializer implements Function { + public static class Deserializer implements Function, Serializable { private Class clazz; + public Deserializer(Class clazz) { this.clazz = clazz; } diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java index d5f267e0ef..55bb46128d 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java @@ -19,6 +19,8 @@ package org.apache.metron.stellar.common.utils; import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.metron.stellar.common.StellarPredicateProcessor; import org.apache.metron.stellar.common.StellarProcessor; import org.apache.metron.stellar.dsl.Context; @@ -28,6 +30,11 @@ import org.apache.metron.stellar.dsl.VariableResolver; import org.junit.Assert; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.AbstractMap; import java.util.Collections; import java.util.List; @@ -40,6 +47,10 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Utilities for executing and validating Stellar expressions. */ @@ -58,24 +69,74 @@ public class StellarProcessorUtils { */ public static Object run(String expression, Map variables, Context context) { - // validate the expression - StellarProcessor processor = new StellarProcessor(); - Assert.assertTrue("Invalid expression; expr=" + expression, - processor.validate(expression, context)); + validate(expression, context); + Object result = execute(expression, variables, context); + ensureKryoSerializable(result); + ensureJavaSerializable(result); + + return result; + } + + /** + * Execute a Stellar expression. + * + * @param expression The expression to execute. + * @param variables The variables available to the expression. + * @param context The execution context. + * @return The result of executing the expression. + */ + private static Object execute(String expression, Map variables, Context context) { - // execute the expression - Object ret = processor.parse( + StellarProcessor processor = new StellarProcessor(); + Object result = processor.parse( expression, new DefaultVariableResolver(x -> variables.get(x), x -> variables.containsKey(x)), StellarFunctions.FUNCTION_RESOLVER(), context); - // ensure the result can be serialized/deserialized - byte[] raw = SerDeUtils.toBytes(ret); + return result; + } + + /** + * Ensure that a value can be serialized and deserialized using Kryo. + * + * @param value The value to validate. + */ + private static void ensureKryoSerializable(Object value) { + + byte[] raw = SerDeUtils.toBytes(value); Object actual = SerDeUtils.fromBytes(raw, Object.class); - Assert.assertEquals(ret, actual); + Assert.assertEquals(value, actual); + } - return ret; + /** + * Ensure a value can be serialized and deserialized using Java serialization. + * + * @param value The value to serialize + */ + private static void ensureJavaSerializable(Object value) { + + try { + // serialize using java + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bytes); + out.writeObject(value); + + // the serialized bits + byte[] raw = bytes.toByteArray(); + assertTrue(raw.length > 0); + + // deserialize using java + ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(raw)); + Object actual = in.readObject(); + + // ensure that the round-trip was successful + assertEquals(value, actual); + + } catch(IOException | ClassNotFoundException e) { + + fail("Unable to serialize value using Java serialization; error=" + ExceptionUtils.getRootCauseMessage(e)); + } } /** From 13f9a2dad60b7e9b6cd8a31302d926312cc8c66f Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 14 May 2018 14:39:30 -0400 Subject: [PATCH 5/5] Added extra details in error message --- .../common/utils/StellarProcessorUtils.java | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java index 55bb46128d..4ad5a40fbf 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java @@ -19,7 +19,6 @@ package org.apache.metron.stellar.common.utils; import com.google.common.collect.ImmutableList; -import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.metron.stellar.common.StellarPredicateProcessor; import org.apache.metron.stellar.common.StellarProcessor; @@ -71,8 +70,8 @@ public static Object run(String expression, Map variables, Conte validate(expression, context); Object result = execute(expression, variables, context); - ensureKryoSerializable(result); - ensureJavaSerializable(result); + ensureKryoSerializable(result, expression); + ensureJavaSerializable(result, expression); return result; } @@ -100,21 +99,39 @@ private static Object execute(String expression, Map variables, /** * Ensure that a value can be serialized and deserialized using Kryo. * + *

When a Stellar function is used in a Storm topology there are cases when the result + * needs to be serializable, like when using the Profiler. Storm can use either Kryo or + * basic Java serialization. It is highly recommended that all Stellar functions return a + * result that is Kryo serializable to allow for the broadest possible use of the function. + * * @param value The value to validate. */ - private static void ensureKryoSerializable(Object value) { + private static void ensureKryoSerializable(Object value, String expression) { + + String msg = String.format("Expression result is not Kryo serializable. It is highly recommended for all " + + "functions to return a result that is Kryo serializable to allow for their broadest possible use. " + + "expr=%s, value=%s", expression, value); byte[] raw = SerDeUtils.toBytes(value); Object actual = SerDeUtils.fromBytes(raw, Object.class); - Assert.assertEquals(value, actual); + Assert.assertEquals(msg, value, actual); } /** * Ensure a value can be serialized and deserialized using Java serialization. * + *

When a Stellar function is used in a Storm topology there are cases when the result + * needs to be serializable, like when using the Profiler. Storm can use either Kryo or + * basic Java serialization. It is highly recommended that all Stellar functions return a + * result that is Java serializable to allow for the broadest possible use of the function. + * * @param value The value to serialize */ - private static void ensureJavaSerializable(Object value) { + private static void ensureJavaSerializable(Object value, String expression) { + + String msg = String.format("Expression result is not Java serializable. It is highly recommended for all " + + "functions to return a result that is Java serializable to allow for their broadest possible use. " + + "expr=%s, value=%s", expression, value); try { // serialize using java @@ -131,11 +148,14 @@ private static void ensureJavaSerializable(Object value) { Object actual = in.readObject(); // ensure that the round-trip was successful - assertEquals(value, actual); + assertEquals(msg, value, actual); } catch(IOException | ClassNotFoundException e) { - fail("Unable to serialize value using Java serialization; error=" + ExceptionUtils.getRootCauseMessage(e)); + String error = String.format("Expression result is not Java serializable. It is highly recommended for all " + + "functions to return a result that is Java serializable to allow for their broadest possible use. " + + "expr=%s, value=%s, error=%s", expression, value, ExceptionUtils.getRootCauseMessage(e)); + fail(error); } }