Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(entity-client): fix entity client cache and test #10149

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ project.ext.spec = [
'restliDocgen' : 'com.linkedin.pegasus:restli-docgen:' + pegasusVersion,
'restliServer' : 'com.linkedin.pegasus:restli-server:' + pegasusVersion,
'restliSpringBridge': 'com.linkedin.pegasus:restli-spring-bridge:' + pegasusVersion,
'restliTestUtils' : 'com.linkedin.pegasus:restli-client-testutils:' + pegasusVersion,
]
]
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class EntityHydrator {
private final DataJobHydrator _dataJobHydrator = new DataJobHydrator();
private final DatasetHydrator _datasetHydrator = new DatasetHydrator();

public Optional<ObjectNode> getHydratedEntity(String entityTypeName, String urn) {
public Optional<ObjectNode> getHydratedEntity(String urn) {
final ObjectNode document = JsonNodeFactory.instance.objectNode();
// Hydrate fields from urn
Urn urnObj;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.linkedin.metadata.datahubusage.DataHubUsageEventType;
import com.linkedin.metadata.kafka.hydrator.EntityHydrator;
import com.linkedin.metadata.kafka.hydrator.EntityType;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.Value;
Expand Down Expand Up @@ -45,18 +43,6 @@ public class DataHubUsageEventTransformer {

private final EntityHydrator _entityHydrator;

private static final Map<EntityType, String> ENTITY_TYPE_MAP;

static {
ENTITY_TYPE_MAP = new HashMap<>(6);
ENTITY_TYPE_MAP.put(EntityType.CHART, CHART_ENTITY_NAME);
ENTITY_TYPE_MAP.put(EntityType.CORP_USER, CORP_GROUP_ENTITY_NAME);
ENTITY_TYPE_MAP.put(EntityType.DASHBOARD, DASHBOARD_ENTITY_NAME);
ENTITY_TYPE_MAP.put(EntityType.DATA_FLOW, DATA_FLOW_ENTITY_NAME);
ENTITY_TYPE_MAP.put(EntityType.DATA_JOB, DATA_JOB_ENTITY_NAME);
ENTITY_TYPE_MAP.put(EntityType.DATASET, DATASET_ENTITY_NAME);
}

@Value
public static class TransformedDocument {
String id;
Expand Down Expand Up @@ -130,8 +116,7 @@ private void setFieldsForEntity(ObjectNode recordObject, ObjectNode searchObject
}

private void setFieldsForEntity(EntityType entityType, String urn, ObjectNode searchObject) {
String entityTypeName = ENTITY_TYPE_MAP.get(entityType);
Optional<ObjectNode> entityObject = _entityHydrator.getHydratedEntity(entityTypeName, urn);
Optional<ObjectNode> entityObject = _entityHydrator.getHydratedEntity(urn);
if (!entityObject.isPresent()) {
log.info("No matches for urn {}", urn);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -29,21 +29,21 @@
@Slf4j
@Builder
public class ClientCache<K, V, C extends ClientCacheConfig> {
@NonNull protected final C config;
@NonNull protected final LoadingCache<K, V> cache;
@NonNull private final Function<Iterable<? extends K>, Map<K, V>> loadFunction;
@NonNull private final Weigher<K, V> weigher;
@NonNull private final BiFunction<C, K, Integer> ttlSecondsFunction;
@Nonnull protected final C config;
@Nonnull protected final LoadingCache<K, V> cache;
@Nonnull private final Function<Iterable<? extends K>, Map<K, V>> loadFunction;
@Nonnull private final Weigher<K, V> weigher;
@Nonnull private final BiFunction<C, K, Integer> ttlSecondsFunction;

public @Nullable V get(@NonNull K key) {
public @Nullable V get(@Nonnull K key) {
return cache.get(key);
}

public @NonNull Map<@NonNull K, @NonNull V> getAll(@NonNull Iterable<? extends @NonNull K> keys) {
public @Nonnull Map<K, V> getAll(@Nonnull Iterable<? extends K> keys) {
return cache.getAll(keys);
}

public void refresh(@NonNull K key) {
public void refresh(@Nonnull K key) {
cache.refresh(key);
}

Expand All @@ -62,13 +62,13 @@ public ClientCache<K, V, C> build(Class<?> metricClazz) {
CacheLoader<K, V> loader =
new CacheLoader<K, V>() {
@Override
public V load(@NonNull K key) {
public V load(@Nonnull K key) {
return loadAll(Set.of(key)).get(key);
}

@Override
@NonNull
public Map<K, V> loadAll(@NonNull Set<? extends K> keys) {
@Nonnull
public Map<K, V> loadAll(@Nonnull Set<? extends K> keys) {
return loadFunction.apply(keys);
}
};
Expand All @@ -84,7 +84,7 @@ public Map<K, V> loadAll(@NonNull Set<? extends K> keys) {
.expireAfter(
new Expiry<K, V>() {
public long expireAfterCreate(
@NonNull K key, @NonNull V aspect, long currentTime) {
@Nonnull K key, @Nonnull V aspect, long currentTime) {
int ttlSeconds = ttlSecondsFunction.apply(config, key);
if (ttlSeconds < 0) {
ttlSeconds = Integer.MAX_VALUE;
Expand All @@ -93,12 +93,12 @@ public long expireAfterCreate(
}

public long expireAfterUpdate(
@NonNull K key, @NonNull V aspect, long currentTime, long currentDuration) {
@Nonnull K key, @Nonnull V aspect, long currentTime, long currentDuration) {
return currentDuration;
}

public long expireAfterRead(
@NonNull K key, @NonNull V aspect, long currentTime, long currentDuration) {
@Nonnull K key, @Nonnull V aspect, long currentTime, long currentDuration) {
return currentDuration;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class EntityClientCache {
@NonNull private EntityClientCacheConfig config;
@NonNull private final ClientCache<Key, EnvelopedAspect, EntityClientCacheConfig> cache;
@NonNull private Function<CollectionKey, Map<Urn, EntityResponse>> loadFunction;
@NonNull private final Function<CollectionKey, Map<Urn, EntityResponse>> loadFunction;

public EntityResponse getV2(
@Nonnull OperationContext opContext,
Expand Down Expand Up @@ -104,7 +104,14 @@ private EntityClientCacheBuilder cache(LoadingCache<Key, EnvelopedAspect> cache)
return this;
}

public EntityClientCache build(Class<?> metricClazz) {
private EntityClientCacheBuilder loadFunction(
Function<CollectionKey, Map<Urn, EntityResponse>> loadFunction) {
return this;
}

public EntityClientCache build(
@Nonnull final Function<CollectionKey, Map<Urn, EntityResponse>> fetchFunction,
Class<?> metricClazz) {
// estimate size
Weigher<Key, EnvelopedAspect> weighByEstimatedSize =
(key, value) -> value.getValue().data().toString().getBytes().length;
Expand All @@ -118,7 +125,7 @@ public EntityClientCache build(Class<?> metricClazz) {
return keysByContextEntity.entrySet().stream()
.flatMap(
entry ->
loadByEntity(entry.getKey(), entry.getValue(), loadFunction)
loadByEntity(entry.getKey(), entry.getValue(), fetchFunction)
.entrySet()
.stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Expand All @@ -133,15 +140,15 @@ public EntityClientCache build(Class<?> metricClazz) {
.getOrDefault(key.getEntityName(), Map.of())
.getOrDefault(key.getAspectName(), config.getDefaultTTLSeconds());

cache =
this.cache =
ClientCache.<Key, EnvelopedAspect, EntityClientCacheConfig>builder()
.weigher(weighByEstimatedSize)
.config(config)
.config(this.config)
.loadFunction(loader)
.ttlSecondsFunction(ttlSeconds)
.build(metricClazz);

return new EntityClientCache(config, cache, loadFunction);
return new EntityClientCache(this.config, this.cache, fetchFunction);
}
}

Expand Down Expand Up @@ -191,6 +198,7 @@ private static Map<Key, EnvelopedAspect> loadByEntity(
envAspect -> {
Key key =
Key.builder()
.contextId(contextId)
.urn(resp.getKey())
.aspectName(envAspect.getName())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ default EntityClientCache buildEntityClientCache(
Class<?> metricClazz, EntityClientCacheConfig cacheConfig) {
return EntityClientCache.builder()
.config(cacheConfig)
.loadFunction(
.build(
(EntityClientCache.CollectionKey collectionKey) -> {
try {
String entityName =
Expand All @@ -61,8 +61,8 @@ default EntityClientCache buildEntityClientCache(
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException(e);
}
})
.build(metricClazz);
},
metricClazz);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions metadata-service/restli-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies {

implementation spec.product.pegasus.restliClient


testImplementation externalDependency.mockito
testImplementation externalDependency.testng
testImplementation spec.product.pegasus.restliTestUtils
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package com.linkedin.entity.client;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.entity.Aspect;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.parseq.retry.backoff.ConstantBackoff;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.restli.client.Client;
import com.linkedin.restli.client.Request;
import com.linkedin.restli.client.response.BatchKVResponse;
import com.linkedin.restli.client.testutils.MockBatchEntityResponseFactory;
import com.linkedin.restli.client.testutils.MockSuccessfulResponseFutureBuilder;
import com.linkedin.restli.common.EntityResponse;
import com.linkedin.restli.common.HttpStatus;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Set;
import org.testng.annotations.Test;

public class SystemRestliEntityClientTest {
private static final Urn TEST_URN =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)");

@Test
public void testCache() throws RemoteInvocationException, URISyntaxException {
Client mockRestliClient = mock(Client.class);

// Test No Cache Config

EntityClientCacheConfig noCacheConfig = new EntityClientCacheConfig();
noCacheConfig.setEnabled(true);

SystemRestliEntityClient noCacheTest =
new SystemRestliEntityClient(
TestOperationContexts.systemContextNoSearchAuthorization(),
mockRestliClient,
new ConstantBackoff(0),
0,
noCacheConfig);

com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);

mockResponse(mockRestliClient, responseStatusTrue);
assertEquals(
noCacheTest.getV2(TEST_URN, Set.of(Constants.STATUS_ASPECT_NAME)),
responseStatusTrue,
"Expected un-cached Status.removed=true result");

mockResponse(mockRestliClient, responseStatusFalse);
assertEquals(
noCacheTest.getV2(TEST_URN, Set.of(Constants.STATUS_ASPECT_NAME)),
responseStatusFalse,
"Expected un-cached Status.removed=false result");

verify(mockRestliClient, times(2)).sendRequest(any(Request.class));

// Test Cache Config
reset(mockRestliClient);

// Enable caching for entity/aspect
EntityClientCacheConfig cacheConfig = new EntityClientCacheConfig();
cacheConfig.setEnabled(true);
cacheConfig.setMaxBytes(100);
cacheConfig.setEntityAspectTTLSeconds(
Map.of(TEST_URN.getEntityType(), Map.of(Constants.STATUS_ASPECT_NAME, 60)));

SystemRestliEntityClient cacheTest =
new SystemRestliEntityClient(
TestOperationContexts.systemContextNoSearchAuthorization(),
mockRestliClient,
new ConstantBackoff(0),
0,
cacheConfig);

mockResponse(mockRestliClient, responseStatusTrue);
assertEquals(
cacheTest.getV2(TEST_URN, Set.of(Constants.STATUS_ASPECT_NAME)),
responseStatusTrue,
"Expected initial un-cached Status.removed=true result");

mockResponse(mockRestliClient, responseStatusFalse);
assertEquals(
cacheTest.getV2(TEST_URN, Set.of(Constants.STATUS_ASPECT_NAME)),
responseStatusTrue,
"Expected CACHED Status.removed=true result");

verify(mockRestliClient, times(1)).sendRequest(any(Request.class));
}

private static com.linkedin.entity.EntityResponse buildStatusResponse(boolean value) {
EnvelopedAspectMap aspects = new EnvelopedAspectMap();
aspects.put(
Constants.STATUS_ASPECT_NAME,
new EnvelopedAspect()
.setName(Constants.STATUS_ASPECT_NAME)
.setValue(new Aspect(new Status().setRemoved(value).data())));
return new com.linkedin.entity.EntityResponse()
.setUrn(TEST_URN)
.setEntityName(TEST_URN.getEntityType())
.setAspects(aspects);
}

private static void mockResponse(Client mock, com.linkedin.entity.EntityResponse response) {
final BatchKVResponse<String, EntityResponse<com.linkedin.entity.EntityResponse>> mockResponse =
MockBatchEntityResponseFactory.createWithPrimitiveKey(
String.class,
com.linkedin.entity.EntityResponse.class,
Map.of(TEST_URN.toString(), response),
Map.of(TEST_URN.toString(), HttpStatus.S_200_OK),
Map.of());

MockSuccessfulResponseFutureBuilder<
String, BatchKVResponse<String, EntityResponse<com.linkedin.entity.EntityResponse>>>
responseFutureBuilder =
new MockSuccessfulResponseFutureBuilder<
String,
BatchKVResponse<String, EntityResponse<com.linkedin.entity.EntityResponse>>>()
.setEntity(mockResponse);
when(mock.sendRequest(any(Request.class))).thenReturn(responseFutureBuilder.build());
}
}
Loading