Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cache): add hazelcast distributed cache option #6645

Merged
merged 9 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ project.ext.externalDependency = [
'hadoopCommon':'org.apache.hadoop:hadoop-common:2.7.2',
'hadoopMapreduceClient':'org.apache.hadoop:hadoop-mapreduce-client-core:2.7.2',
'hadoopCommon3':'org.apache.hadoop:hadoop-common:3.3.4',
'hazelcast':'com.hazelcast:hazelcast:5.2.1',
'hazelcastSpring':'com.hazelcast:hazelcast-spring:5.2.1',
'hazelcastTest':'com.hazelcast:hazelcast:5.2.1:tests',
'hibernateCore': 'org.hibernate:hibernate-core:5.2.16.Final',
'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9',
'httpAsyncClient': 'org.apache.httpcomponents:httpasyncclient:4.1.5',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.cache.Cache;

import static com.datahub.util.RecordUtils.*;


@RequiredArgsConstructor
@Slf4j
Expand Down Expand Up @@ -88,10 +90,10 @@ public LineageSearchResult searchAcrossLineage(@Nonnull Urn sourceUrn, @Nonnull
maxHops = maxHops != null ? maxHops : 1000;
lineageResult = _graphService.getLineage(sourceUrn, direction, 0, MAX_RELATIONSHIPS, maxHops);
if (cacheEnabled) {
cache.put(Pair.of(sourceUrn, direction), new CachedEntityLineageResult(lineageResult, System.currentTimeMillis()));
cache.put(Pair.of(sourceUrn, direction), new CachedEntityLineageResult(toJsonString(lineageResult), System.currentTimeMillis()));
}
} else {
lineageResult = cachedLineageResult.getEntityLineageResult();
lineageResult = toRecordTemplate(EntityLineageResult.class, cachedLineageResult.getEntityLineageResult());
if (System.currentTimeMillis() - cachedLineageResult.getTimestamp() > DAY_IN_MS) {
log.warn("Cached lineage entry for: {} is older than one day.", sourceUrn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
Expand All @@ -15,6 +16,8 @@
import lombok.Value;
import org.springframework.cache.Cache;

import static com.datahub.util.RecordUtils.*;


/**
* Wrapper class to allow searching in batches and caching the results.
Expand All @@ -33,7 +36,7 @@ public class CacheableSearcher<K> {
private final boolean enableCache;

@Value
public static class QueryPagination {
public static class QueryPagination implements Serializable {
int from;
int size;
}
Expand Down Expand Up @@ -88,16 +91,19 @@ private SearchResult getBatch(int batchId) {
QueryPagination batch = getBatchQuerySize(batchId);
SearchResult result;
if (enableCache()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "getBatch_cache_access").time();
K cacheKey = cacheKeyGenerator.apply(batch);
result = cache.get(cacheKey, SearchResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "getBatch_cache_miss").time();
result = searcher.apply(batch);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "getBatch_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getBatch_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "getBatch_cache_access").time();
K cacheKey = cacheKeyGenerator.apply(batch);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(SearchResult.class, json) : null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hazelcast requires that the cached object is a string? Or serializable? Cannot record template be serialized by itself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires that it be serializable, RecordTemplate does not implement Serializable and any RecordTemplate being used in the key or value was throwing errors. Looked into trying to inject a custom deserializer into the Hazelcast deserialization config, but this was much easier.

cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "getBatch_cache_miss").time();
result = searcher.apply(batch);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "getBatch_cache_miss_count").inc();
}
}
} else {
result = searcher.apply(batch);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.linkedin.metadata.search.cache;

import com.linkedin.metadata.graph.EntityLineageResult;
import lombok.Data;


@Data
public class CachedEntityLineageResult {
private final EntityLineageResult entityLineageResult;
private final String entityLineageResult;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are serializing this result? interesting

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is essentially a DTO wrapper around the result to be able to include the timestamp (and anything else we want to add in later) onto the cached value so any RecordTemplate used within it has to be serialized

private final long timestamp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.javatuples.Quintet;
import org.springframework.cache.CacheManager;

import static com.datahub.util.RecordUtils.*;


@RequiredArgsConstructor
public class CachingAllEntitiesSearchAggregator {
Expand All @@ -27,6 +29,8 @@ public SearchResult getSearchResults(List<String> entities, @Nonnull String inpu
return new CacheableSearcher<>(cacheManager.getCache(ALL_ENTITIES_SEARCH_AGGREGATOR_CACHE_NAME), batchSize,
querySize -> aggregator.search(entities, input, postFilters, sortCriterion, querySize.getFrom(),
querySize.getSize(), searchFlags),
querySize -> Quintet.with(entities, input, postFilters, sortCriterion, querySize), searchFlags, enableCache).getSearchResults(from, size);
querySize -> Quintet.with(entities, input, postFilters != null ? toJsonString(postFilters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, querySize), searchFlags, enableCache)
.getSearchResults(from, size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;

import static com.datahub.util.RecordUtils.*;


@RequiredArgsConstructor
public class CachingEntitySearchService {
Expand Down Expand Up @@ -115,7 +117,8 @@ public SearchResult getCachedSearchResults(
cacheManager.getCache(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME),
batchSize,
querySize -> getRawSearchResults(entityName, query, filters, sortCriterion, querySize.getFrom(), querySize.getSize()),
querySize -> Quintet.with(entityName, query, filters, sortCriterion, querySize), flags, enableCache).getSearchResults(from, size);
querySize -> Quintet.with(entityName, query, filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, querySize), flags, enableCache).getSearchResults(from, size);
}


Expand All @@ -133,16 +136,19 @@ public AutoCompleteResult getCachedAutoCompleteResults(
Cache cache = cacheManager.getCache(ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME);
AutoCompleteResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters, limit);
result = cache.get(cacheKey, AutoCompleteResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "autocomplete_cache_miss").time();
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "autocomplete_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedAutoCompleteResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters != null ? toJsonString(filters) : null, limit);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(AutoCompleteResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "autocomplete_cache_miss").time();
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "autocomplete_cache_miss_count").inc();
}
}
} else {
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
Expand All @@ -165,16 +171,19 @@ public BrowseResult getCachedBrowseResults(
Cache cache = cacheManager.getCache(ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME);
BrowseResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters, from, size);
result = cache.get(cacheKey, BrowseResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "browse_cache_miss").time();
result = getRawBrowseResults(entityName, path, filters, from, size);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "browse_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedBrowseResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters != null ? toJsonString(filters) : null, from, size);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(BrowseResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "browse_cache_miss").time();
result = getRawBrowseResults(entityName, path, filters, from, size);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "browse_cache_miss_count").inc();
}
}
} else {
result = getRawBrowseResults(entityName, path, filters, from, size);
Expand Down
3 changes: 3 additions & 0 deletions metadata-service/factories/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ dependencies {
compile externalDependency.elasticSearchRest
compile externalDependency.httpClient
compile externalDependency.gson
compile externalDependency.hazelcast
compile externalDependency.hazelcastSpring
compile externalDependency.kafkaClients
compile externalDependency.kafkaAvroSerde
compileOnly externalDependency.lombok
Expand All @@ -34,6 +36,7 @@ dependencies {

testCompile externalDependency.mockito
testCompile externalDependency.testng
testCompile externalDependency.hazelcastTest

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.gms.factory.common;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.hazelcast.config.Config;
import com.hazelcast.config.EvictionConfig;
import com.hazelcast.config.EvictionPolicy;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizePolicy;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.spring.cache.HazelcastCacheManager;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class CacheConfig {

@Value("${CACHE_TTL_SECONDS:600}")
private int cacheTtlSeconds;

@Value("${CACHE_MAX_SIZE:10000}")
private int cacheMaxSize;

@Value("${searchService.cache.hazelcast.serviceName:hazelcast-service}")
private String hazelcastServiceName;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have hazelcast in the name of variables here?
isn't the idea that we can swap in differnt impls for the caffeine cache?
does this seem overly-specific?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see - so we are only accounting for 2 cases: caffeine and hazel. i think i get it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spring Cache Manager is intended for very very simple use cases when switching out cache implementations. Unfortunately our use cases are complex enough where it requires using the provider level interfaces so we would need to reimplement any other implementations we support. This is unchanged from how it previously was, but just exposes the implementation name in the config. It's possible that other providers would also require a headless K8s service for their distributed cache implementation, but unlikely (if we even do implement another underlying supported cache) so I think it's okay to be specific here.


@Bean
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "caffeine")
public CacheManager caffeineCacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(caffeineCacheBuilder());
return cacheManager;
}

private Caffeine<Object, Object> caffeineCacheBuilder() {
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(cacheMaxSize)
.expireAfterAccess(cacheTtlSeconds, TimeUnit.SECONDS)
.recordStats();
}

@Bean
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "hazelcast")
public CacheManager hazelcastCacheManager() {
Config config = new Config();
// TODO: This setting is equivalent to expireAfterAccess, refreshes timer after a get, put, containsKey etc.
// is this behavior what we actually desire? Should we change it now?
MapConfig mapConfig = new MapConfig().setMaxIdleSeconds(cacheTtlSeconds);

EvictionConfig evictionConfig = new EvictionConfig()
.setMaxSizePolicy(MaxSizePolicy.PER_NODE)
.setSize(cacheMaxSize)
.setEvictionPolicy(EvictionPolicy.LFU);
mapConfig.setEvictionConfig(evictionConfig);
mapConfig.setName("default");
config.addMapConfig(mapConfig);

config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
.setProperty("service-dns", hazelcastServiceName);


HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);

return new HazelcastCacheManager(hazelcastInstance);
}
}

This file was deleted.

4 changes: 4 additions & 0 deletions metadata-service/factories/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ graphService:
searchService:
resultBatchSize: ${SEARCH_SERVICE_BATCH_SIZE:100}
enableCache: ${SEARCH_SERVICE_ENABLE_CACHE:false}
cacheImplementation: ${SEARCH_SERVICE_CACHE_IMPLEMENTATION:caffeine}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor minor: in terms of config layout, what if we did this

cache:
    type: "hazelcase"
    config:
         hazelcast:
                 serviceName: .... same

this may read a bit easier. thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I had it at the upper level to be in line with the enableCache variable, but I think it's fine to start a new pattern here.

cache:
hazelcast:
serviceName: ${SEARCH_SERVICE_HAZELCAST_SERVICE_NAME:hazelcast-service}

configEntityRegistry:
# TODO: Change to read from resources on classpath.
Expand Down
Loading