From 1860b064d6da7d97148507e8235ea825903ac57c Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Sat, 17 Nov 2018 16:20:55 -0500 Subject: [PATCH 1/4] METRON-1880 Use Caffeine for Profiler Caching --- .../metron-profiler-common/pom.xml | 5 +++ .../profiler/DefaultMessageDistributor.java | 45 +++++++++---------- .../DefaultMessageDistributorTest.java | 6 +-- 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/metron-analytics/metron-profiler-common/pom.xml b/metron-analytics/metron-profiler-common/pom.xml index 630709e87b..eff70c46ee 100644 --- a/metron-analytics/metron-profiler-common/pom.xml +++ b/metron-analytics/metron-profiler-common/pom.xml @@ -27,6 +27,11 @@ UTF-8 + + com.github.ben-manes.caffeine + caffeine + ${global_caffeine_version} + org.apache.hadoop hadoop-auth diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index b0cd63bc97..f1ae559d31 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -20,17 +20,19 @@ package org.apache.metron.profiler; -import com.google.common.base.Ticker; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Ticker; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.stellar.dsl.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.util.ArrayList; @@ -38,6 +40,7 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static java.lang.String.format; @@ -126,7 +129,7 @@ public DefaultMessageDistributor( this.periodDurationMillis = periodDurationMillis; // build the cache of active profiles - this.activeCache = CacheBuilder + this.activeCache = Caffeine .newBuilder() .maximumSize(maxNumberOfRoutes) .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) @@ -135,7 +138,7 @@ public DefaultMessageDistributor( .build(); // build the cache of expired profiles - this.expiredCache = CacheBuilder + this.expiredCache = Caffeine .newBuilder() .maximumSize(maxNumberOfRoutes) .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) @@ -219,8 +222,9 @@ public List flushExpired() { private void cacheMaintenance() { activeCache.cleanUp(); expiredCache.cleanUp(); - - LOG.debug("Cache maintenance complete: activeCacheSize={}, expiredCacheSize={}", activeCache.size(), expiredCache.size()); + LOG.debug("Cache maintenance triggered: activeCacheStats={}, expiredCacheStats={}", + activeCache.stats().toString(), + expiredCache.stats().toString()); } /** @@ -256,14 +260,14 @@ private List flushCache(Cache cache public ProfileBuilder getBuilder(MessageRoute route, Context context) throws ExecutionException { ProfileConfig profile = route.getProfileDefinition(); String entity = route.getEntity(); - return activeCache.get( - cacheKey(profile, entity), - () -> new DefaultProfileBuilder.Builder() + Function profileCreator = (k) -> + new DefaultProfileBuilder.Builder() .withDefinition(profile) .withEntity(entity) .withPeriodDurationMillis(periodDurationMillis) .withContext(context) - .build()); + .build(); + return activeCache.get(cacheKey(profile, entity), profileCreator); } /** @@ -299,15 +303,13 @@ public DefaultMessageDistributor withPeriodDuration(int duration, TimeUnit units private class ActiveCacheRemovalListener implements RemovalListener, Serializable { @Override - public void onRemoval(RemovalNotification notification) { - - ProfileBuilder expired = notification.getValue(); + public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) { LOG.warn("Profile expired from active cache; profile={}, entity={}", expired.getDefinition().getProfile(), expired.getEntity()); // add the profile to the expired cache - expiredCache.put(notification.getKey(), expired); + expiredCache.put(key, expired); } } @@ -317,20 +319,15 @@ public void onRemoval(RemovalNotification notification) private class ExpiredCacheRemovalListener implements RemovalListener, Serializable { @Override - public void onRemoval(RemovalNotification notification) { - - if(notification.wasEvicted()) { - + public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) { + if(cause.wasEvicted()) { // the expired profile was NOT flushed in time - ProfileBuilder expired = notification.getValue(); LOG.warn("Expired profile NOT flushed before removal, some state lost; profile={}, entity={}", expired.getDefinition().getProfile(), expired.getEntity()); } else { - // the expired profile was flushed successfully - ProfileBuilder expired = notification.getValue(); LOG.debug("Expired profile successfully flushed; profile={}, entity={}", expired.getDefinition().getProfile(), expired.getEntity()); diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java index 48161e25a1..56f0695fae 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java @@ -20,7 +20,7 @@ package org.apache.metron.profiler; -import com.google.common.base.Ticker; +import com.github.benmanes.caffeine.cache.Ticker; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.utils.JSONUtils; @@ -278,7 +278,7 @@ public void testExpiredProfilesShouldBeRemoved() throws Exception { * An implementation of Ticker that can be used to drive time * when testing the Guava caches. */ - private class FixedTicker extends Ticker { + private class FixedTicker implements Ticker { /** * The time that will be reported. @@ -298,7 +298,7 @@ public FixedTicker advanceTime(long time, TimeUnit units) { this.timestampNanos += units.toNanos(time); return this; } - + @Override public long read() { return this.timestampNanos; From 70f233ad5c0ec02674e0af1a7d91290cfef2c0f7 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 20 Nov 2018 07:56:59 -0500 Subject: [PATCH 2/4] Fix to example profile in README --- metron-analytics/metron-profiler-common/README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/metron-analytics/metron-profiler-common/README.md b/metron-analytics/metron-profiler-common/README.md index 8f26aaf9a7..fe4c2ed839 100644 --- a/metron-analytics/metron-profiler-common/README.md +++ b/metron-analytics/metron-profiler-common/README.md @@ -28,7 +28,7 @@ The Profiler is a feature extraction mechanism that can generate a profile descr This is achieved by summarizing the telemetry data consumed by Metron over tumbling windows. A summary statistic is applied to the data received within a given window. Collecting these values across many windows result in a time series that is useful for analysis. -Any field contained within a message can be used to generate a profile. A profile can even be produced by combining fields that originate in different data sources. A user has considerable power to transform the data used in a profile by leveraging the Stellar language. +Any field contained within a message can be used to generate a profile. A profile can even be produced by combining fields that originate in different data sources. A user has considerable power to transform the data used in a profile by leveraging the Stellar language. There are three separate ports of the Profiler that share this common code base. * The [Storm Profiler](../metron-profiler-storm/README.md) builds low-latency profiles over streaming data sets. @@ -58,12 +58,12 @@ Let's start with a simple example. The following profile maintains a count of th "profile": "hello-world", "foreach": "ip_src_addr", "init": { - "count": 0 + "count": "0" }, "update": { "count": "count + 1" }, - "result": "count", + "result": "count" } ] } @@ -321,7 +321,7 @@ It is important to note that the Profiler can persist any serializable Object, n ``` $ source /etc/default/metron $ bin/stellar -z $ZOOKEEPER - + [Stellar]>>> stats := PROFILE_GET( "example4", "10.0.0.1", PROFILE_FIXED(30, "MINUTES")) [org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9, ...] ``` @@ -330,10 +330,10 @@ It is important to note that the Profiler can persist any serializable Object, n ``` [Stellar]>>> aStat := GET_FIRST(stats) org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9 - + [Stellar]>>> STATS_MEAN(aStat) 15979.0625 - + [Stellar]>>> STATS_PERCENTILE(aStat, 90) 30310.958 ``` @@ -341,7 +341,7 @@ It is important to note that the Profiler can persist any serializable Object, n 1. Merge all of the profile measurements over the past 30 minutes into a single sketch and calculate the 90th percentile. ``` [Stellar]>>> merged := STATS_MERGE( stats) - + [Stellar]>>> STATS_PERCENTILE(merged, 90) 29810.992 ``` From 2e83017d5d4c71bde47da791e41eb6f59613e055 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 20 Nov 2018 11:46:50 -0500 Subject: [PATCH 3/4] Need to enable recording of stats in the cache, otherwise its always 0s --- .../profiler/DefaultMessageDistributor.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index f1ae559d31..4e8095703a 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -129,22 +129,28 @@ public DefaultMessageDistributor( this.periodDurationMillis = periodDurationMillis; // build the cache of active profiles - this.activeCache = Caffeine + Caffeine activeCacheBuilder = Caffeine .newBuilder() .maximumSize(maxNumberOfRoutes) .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) .removalListener(new ActiveCacheRemovalListener()) - .ticker(ticker) - .build(); + .ticker(ticker); + if(LOG.isDebugEnabled()) { + activeCacheBuilder.recordStats(); + } + this.activeCache = activeCacheBuilder.build(); // build the cache of expired profiles - this.expiredCache = Caffeine + Caffeine expiredCacheBuilder = Caffeine .newBuilder() .maximumSize(maxNumberOfRoutes) .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) .removalListener(new ExpiredCacheRemovalListener()) - .ticker(ticker) - .build(); + .ticker(ticker); + if(LOG.isDebugEnabled()) { + expiredCacheBuilder.recordStats(); + } + this.expiredCache = expiredCacheBuilder.build(); } /** From 585fb1200e63ebc0eabb2b2fccaa5faf3db7247c Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 20 Nov 2018 12:33:25 -0500 Subject: [PATCH 4/4] Allow tests to define Executor for cache maintenance to ensure cache maintenance is performed synchronously during tests --- .../profiler/DefaultMessageDistributor.java | 31 +++++++++++++------ .../DefaultMessageDistributorTest.java | 21 ++++++++----- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index 4e8095703a..0e504671fe 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -25,6 +25,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.stellar.dsl.Context; @@ -39,6 +40,8 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -102,7 +105,7 @@ public DefaultMessageDistributor( long periodDurationMillis, long profileTimeToLiveMillis, long maxNumberOfRoutes) { - this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker()); + this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker(), Optional.empty()); } /** @@ -113,12 +116,14 @@ public DefaultMessageDistributor( * @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser * used routes will be evicted from the internal cache. * @param ticker The ticker used to drive time for the caches. Only needs set for testing. + * @param cacheMaintenanceExecutor The executor responsible for running cache maintenance tasks. Only needed for testing. */ public DefaultMessageDistributor( long periodDurationMillis, long profileTimeToLiveMillis, long maxNumberOfRoutes, - Ticker ticker) { + Ticker ticker, + Optional cacheMaintenanceExecutor) { if(profileTimeToLiveMillis < periodDurationMillis) { throw new IllegalStateException(format( @@ -135,10 +140,13 @@ public DefaultMessageDistributor( .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) .removalListener(new ActiveCacheRemovalListener()) .ticker(ticker); - if(LOG.isDebugEnabled()) { + if (cacheMaintenanceExecutor.isPresent()) { + activeCacheBuilder.executor(cacheMaintenanceExecutor.get()); + } + if (LOG.isDebugEnabled()) { activeCacheBuilder.recordStats(); } - this.activeCache = activeCacheBuilder.build(); + this.activeCache = activeCacheBuilder.build(); // build the cache of expired profiles Caffeine expiredCacheBuilder = Caffeine @@ -147,7 +155,10 @@ public DefaultMessageDistributor( .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) .removalListener(new ExpiredCacheRemovalListener()) .ticker(ticker); - if(LOG.isDebugEnabled()) { + if (cacheMaintenanceExecutor.isPresent()) { + expiredCacheBuilder.executor(cacheMaintenanceExecutor.get()); + } + if (LOG.isDebugEnabled()) { expiredCacheBuilder.recordStats(); } this.expiredCache = expiredCacheBuilder.build(); @@ -328,15 +339,17 @@ private class ExpiredCacheRemovalListener implements RemovalListener