Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1467: Replace guava caches in places where the keyspace might be large #947

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a4f618a
Adding parallel enrichment bolt.
cestella Feb 21, 2018
99fe0b8
Updating to include trace statements.
cestella Feb 22, 2018
79736c6
Updating with some cleanup
cestella Feb 22, 2018
cb4a527
Updating spec.
cestella Feb 22, 2018
fb4d438
Updating threadpool creation
cestella Feb 22, 2018
87ef6a7
better docs
cestella Feb 22, 2018
6ae9594
Updating readme.
cestella Feb 22, 2018
82ebc95
Better documentation.
cestella Feb 22, 2018
235046d
Updating bolt to avoid an error.
cestella Feb 22, 2018
2e09e6e
Updated shuffles to local or shuffle
cestella Feb 27, 2018
cc3162d
Updating bolt to not log so much.
cestella Feb 28, 2018
0601a9b
Bug in the stellar adapter that is preventing caching.
cestella Mar 1, 2018
6b7161d
Move the clone to the right place and make a test case for this.
cestella Mar 1, 2018
4706529
enricher test update
cestella Mar 1, 2018
09b3adf
Adding the ability to monitor the cache
cestella Mar 2, 2018
167260a
parallel strategy should use a concurrency level set to the threadpoo…
cestella Mar 2, 2018
934bdea
Moved to a different cache.
cestella Mar 2, 2018
571be7d
updating test
cestella Mar 2, 2018
d359746
dependencies for caffeine
cestella Mar 2, 2018
5786282
Replacing guava caches in places where the cache sizes may be large.
cestella Mar 2, 2018
832e109
Updating to avoid a rogue caffeine 2.3.3 transitive dependency
cestella Mar 5, 2018
4f56cdc
Refactored for documentation and to make the abstraction more clear.
cestella Mar 6, 2018
9237e25
Merge branch 'single_bolt_split_join_poc' into guava_cache_replacement
cestella Mar 6, 2018
ba5473b
Added diagram, documentation and explanation.
cestella Mar 6, 2018
852f599
try a svg
cestella Mar 6, 2018
d2b41af
Migrated to svg
cestella Mar 6, 2018
54666e7
Merge branch 'single_bolt_split_join_poc' into guava_cache_replacement
cestella Mar 6, 2018
1c62138
Added license to svg
cestella Mar 6, 2018
4fd1639
Merge branch 'single_bolt_split_join_poc' into guava_cache_replacement
cestella Mar 6, 2018
15518eb
Merge branch 'master' into guava_cache_replacement
cestella Mar 7, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions metron-interface/metron-rest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<eclipse.link.version>2.6.4</eclipse.link.version>
</properties>
<dependencies>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${global_caffeine_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion metron-platform/metron-enrichment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.6.2</version>
<version>${global_caffeine_version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.metron.enrichment.bolt;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
import org.apache.metron.common.configuration.ConfigurationType;
Expand Down Expand Up @@ -146,13 +146,8 @@ public void prepare(Map conf, TopologyContext topologyContext,
throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
if (this.adapter == null)
throw new IllegalStateException("Adapter must be specified");
loader = new CacheLoader<CacheKey, JSONObject>() {
@Override
public JSONObject load(CacheKey key) throws Exception {
return adapter.enrich(key);
}
};
cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
loader = key -> adapter.enrich(key);
cache = Caffeine.newBuilder().maximumSize(maxCacheSize)
.expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
.build(loader);
boolean success = adapter.initializeAdapter(getConfigurations().getGlobalConfig());
Expand Down Expand Up @@ -228,7 +223,7 @@ public void execute(Tuple tuple) {
subGroup = adapter.getStreamSubGroup(enrichmentType, field);

perfLog.mark("enrich");
enrichedField = cache.getUnchecked(cacheKey);
enrichedField = cache.get(cacheKey);
perfLog.log("enrich", "key={}, time to run enrichment type={}", key, enrichmentType);

if (enrichedField == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
*/
package org.apache.metron.enrichment.bolt;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.base.Joiner;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Sets;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
Expand All @@ -46,6 +45,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {

public static class Perf {} // used for performance logging
Expand Down Expand Up @@ -89,29 +91,25 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
if (this.maxTimeRetain == null) {
throw new IllegalStateException("maxTimeRetain must be specified");
}
loader = new CacheLoader<String, Map<String, Tuple>>() {
@Override
public Map<String, Tuple> load(String key) throws Exception {
return new HashMap<>();
}
};
cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
.expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES).removalListener(new JoinRemoveListener())
.build(loader);
loader = s -> new HashMap<>();
cache = Caffeine.newBuilder().maximumSize(maxCacheSize)
.expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
.removalListener(new JoinRemoveListener())
Copy link
Contributor

@nickwallen nickwallen Mar 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we only want notified of a full cache when ERROR logging is set. Is that the case?

In the JoinRemoveListener we end up doing some work that we probably don't need to do unless ERROR logging is set. One easy fix would be to only add the "remove listener" if LOG.isDebugEnabled().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I believe this was intentionally done before this PR (I migrated this to the new caching strategy) and the idea is that if a removal is happening from the join cache under specific circumstances, we want to know about it because a message could be being dropped because the cache is being overwhelmed. @merrimanr Can you chime in here on the rationale?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is pre-existing. We can address at a later time.

I remember now, maxing out this cache causes the Split/Join to fail, which is a major problem for the Split/Join topology. And this cache here is only for the Split/Join, not the Unified topology.

We should probably look at adding similar logging (only when ERROR enabled) for the other places where we use the cache. Or just some mechanism to periodically log cache stats. Anywho, down the road.

.build(loader);
prepare(map, topologyContext);
}

class JoinRemoveListener implements RemovalListener<String, Map<String, Tuple>> {

@Override
public void onRemoval(RemovalNotification<String, Map<String, Tuple>> removalNotification) {
if (removalNotification.getCause() == RemovalCause.SIZE) {
public void onRemoval(@Nullable String s, @Nullable Map<String, Tuple> stringTupleMap, @Nonnull RemovalCause removalCause) {
if (removalCause == RemovalCause.SIZE) {
String errorMessage = "Join cache reached max size limit. Increase the maxCacheSize setting or add more tasks to enrichment/threatintel join bolt.";
Exception exception = new Exception(errorMessage);
LOG.error(errorMessage, exception);
collector.reportError(exception);
}
if (removalNotification.getCause() == RemovalCause.EXPIRED) {
if (removalCause == RemovalCause.EXPIRED) {
String errorMessage = "Message was in the join cache too long which may be caused by slow enrichments/threatintels. Increase the maxTimeRetain setting.";
Exception exception = new Exception(errorMessage);
LOG.error(errorMessage, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ protected void initializeStellar() {
put("field2", "value2");
put("source.type", "test");
}})
.withThrowable(new CacheLoader.InvalidCacheLoadException("CacheLoader returned null for key CacheKey{field='field1', value='value1'}."));
.withThrowable(new Exception("[Metron] Could not enrich string: value1"));
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.metron.enrichment.bolt;

import com.google.common.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.Constants;
import org.apache.metron.common.error.MetronError;
Expand All @@ -41,6 +41,7 @@

import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -176,10 +177,10 @@ public void testExecuteShouldReportError() throws ExecutionException {
when(tuple.getValueByField("key")).thenReturn(key);
when(tuple.getValueByField("message")).thenReturn(new JSONObject());
joinBolt.cache = mock(LoadingCache.class);
when(joinBolt.cache.get(key)).thenThrow(new ExecutionException(new Exception("join exception")));
when(joinBolt.cache.get(any())).thenThrow(new RuntimeException(new Exception("join exception")));

joinBolt.execute(tuple);
ExecutionException expectedExecutionException = new ExecutionException(new Exception("join exception"));
RuntimeException expectedExecutionException = new RuntimeException(new Exception("join exception"));
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
.withMessage("Joining problem: {}")
Expand Down
5 changes: 5 additions & 0 deletions metron-stellar/stellar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<commons.config.version>1.10</commons.config.version>
</properties>
<dependencies>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${global_caffeine_version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@

package org.apache.metron.stellar.common;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.TokenStream;

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.metron.stellar.dsl.Context;
Expand Down Expand Up @@ -95,16 +93,11 @@ static Cache<String, StellarCompiler.Expression> createCache( int cacheSize
, int expiryTime
, TimeUnit expiryUnit
) {
CacheLoader<String, StellarCompiler.Expression> loader = new CacheLoader<String, StellarCompiler.Expression>() {
@Override
public StellarCompiler.Expression load(String key) throws Exception {
return compile(key);
}
};
return CacheBuilder.newBuilder()
.maximumSize(cacheSize)
.expireAfterAccess(expiryTime, expiryUnit)
.build(loader);
CacheLoader<String, StellarCompiler.Expression> loader = key -> compile(key);
return Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterAccess(expiryTime, expiryUnit)
.build(loader);
}

/**
Expand All @@ -119,8 +112,8 @@ public Set<String> variablesUsed(final String rule) {
}
StellarCompiler.Expression expression = null;
try {
expression = expressionCache.get(rule, () -> compile(rule));
} catch (ExecutionException e) {
expression = expressionCache.get(rule, r -> compile(r));
} catch (Throwable e) {
throw new ParseException("Unable to parse: " + rule + " due to: " + e.getMessage(), e);
}
return expression.variablesUsed;
Expand All @@ -143,8 +136,8 @@ public T parse(final String rule, final VariableResolver variableResolver, final
context.setActivityType(ActivityType.PARSE_ACTIVITY);
}
try {
expression = expressionCache.get(rule, () -> compile(rule));
} catch (ExecutionException|UncheckedExecutionException e) {
expression = expressionCache.get(rule, r -> compile(r));
} catch (Throwable e) {
throw new ParseException("Unable to parse: " + rule + " due to: " + e.getMessage(), e);
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.metron.stellar.dsl.functions;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.metron.stellar.dsl.BaseStellarFunction;
import org.apache.metron.stellar.dsl.Stellar;
import org.apache.metron.stellar.common.utils.ConversionUtils;
Expand Down Expand Up @@ -77,7 +77,7 @@ public int hashCode() {
}

private static LoadingCache<TimezonedFormat, ThreadLocal<SimpleDateFormat>> formatCache =
CacheBuilder.newBuilder().build(
Caffeine.newBuilder().build(
new CacheLoader<TimezonedFormat, ThreadLocal<SimpleDateFormat>>() {
@Override
public ThreadLocal<SimpleDateFormat> load(final TimezonedFormat format) throws Exception {
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
<base_flume_version>1.5.2</base_flume_version>
<!-- full dependency versions -->
<global_accumulo_version>1.8.0</global_accumulo_version>
<global_caffeine_version>2.6.2</global_caffeine_version>
<global_antlr_version>4.5</global_antlr_version>
<global_opencsv_version>3.7</global_opencsv_version>
<global_curator_version>2.7.1</global_curator_version>
Expand Down