From a61e05cfe90fa09141888018c5e5d60c9e08ee5b Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 10 Jul 2018 10:08:48 -0400 Subject: [PATCH] Made core Profiler components Java serializable for running the Profiler in the spark-shell --- .../profiler/DefaultMessageDistributor.java | 28 ++++++++++--------- .../metron/profiler/DefaultMessageRouter.java | 3 +- .../apache/metron/profiler/MessageRoute.java | 4 ++- .../profiler/clock/DefaultClockFactory.java | 4 ++- .../metron/profiler/clock/EventTimeClock.java | 3 +- 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index 82f7174fbe..c926a708c6 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -20,19 +20,11 @@ package org.apache.metron.profiler; -import static java.lang.String.format; - import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.stellar.dsl.Context; @@ -40,6 +32,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; + /** * The default implementation of a {@link MessageDistributor}. * @@ -57,7 +59,7 @@ * lost. * */ -public class DefaultMessageDistributor implements MessageDistributor { +public class DefaultMessageDistributor implements MessageDistributor, Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -73,7 +75,7 @@ public class DefaultMessageDistributor implements MessageDistributor { * messages. Once it has not received messages for a period of time, it is * moved to the expired cache. */ - private transient Cache activeCache; + private Cache activeCache; /** * A cache of expired profiles. @@ -84,7 +86,7 @@ public class DefaultMessageDistributor implements MessageDistributor { * can flush the state of the expired profile. If the client does not flush * the expired profiles, this state will be lost forever. */ - private transient Cache expiredCache; + private Cache expiredCache; /** * Create a new message distributor. @@ -287,7 +289,7 @@ public DefaultMessageDistributor withPeriodDuration(int duration, TimeUnit units /** * A listener that is notified when profiles expire from the active cache. */ - private class ActiveCacheRemovalListener implements RemovalListener { + private class ActiveCacheRemovalListener implements RemovalListener, Serializable { @Override public void onRemoval(RemovalNotification notification) { @@ -305,7 +307,7 @@ public void onRemoval(RemovalNotification notification) /** * A listener that is notified when profiles expire from the active cache. */ - private class ExpiredCacheRemovalListener implements RemovalListener { + private class ExpiredCacheRemovalListener implements RemovalListener, Serializable { @Override public void onRemoval(RemovalNotification notification) { diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java index d1a1a3b597..19bfa8ce9f 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; @@ -44,7 +45,7 @@ * A single telemetry message may need to take multiple routes. This is the case * when a message is needed by more than one profile. */ -public class DefaultMessageRouter implements MessageRouter { +public class DefaultMessageRouter implements MessageRouter, Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java index 7288f03468..680e4e84a1 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java @@ -22,6 +22,8 @@ import org.apache.metron.common.configuration.profiler.ProfileConfig; +import java.io.Serializable; + /** * Defines the 'route' a message must take through the Profiler. * @@ -33,7 +35,7 @@ * * @see MessageRouter */ -public class MessageRoute { +public class MessageRoute implements Serializable { /** * The definition of the profile on this route. diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java index d62e62bd49..8a574f6c4e 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java @@ -21,6 +21,8 @@ import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import java.io.Serializable; + /** * Creates a {@link Clock} based on the profiler configuration. * @@ -29,7 +31,7 @@ * *

The default implementation of a {@link ClockFactory}. */ -public class DefaultClockFactory implements ClockFactory { +public class DefaultClockFactory implements ClockFactory, Serializable { /** * @param config The profiler configuration. diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java index 5cd574eff4..c094b7d4b7 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java @@ -26,6 +26,7 @@ import java.lang.invoke.MethodHandles; import java.util.Optional; +import java.io.Serializable; /** * A {@link Clock} that advances based on event time. @@ -33,7 +34,7 @@ * Event time is advanced by the timestamps contained within telemetry messages, rather * than the system clock. */ -public class EventTimeClock implements Clock { +public class EventTimeClock implements Clock, Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());