-
Notifications
You must be signed in to change notification settings - Fork 214
/
ActorAskCacheLoader.java
140 lines (121 loc) · 6.21 KB
/
ActorAskCacheLoader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.cacheloaders;
import static java.util.Objects.requireNonNull;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import javax.annotation.concurrent.Immutable;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Scheduler;
/**
* Asynchronous cache loader that loads a value by asking an actor provided by a "Entity-Region-Provider".
*
* @param <V> type of values in the cache entry.
* @param <T> type of messages sent when loading entries by entity id.
*/
@Immutable
public final class ActorAskCacheLoader<V, T, K extends EntityId>
implements AsyncCacheLoader<K, Entry<V>> {
private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(ActorAskCacheLoader.class);
private final AskWithRetryConfig askWithRetryConfig;
private final Scheduler scheduler;
private final Function<EntityType, ActorRef> entityRegionProvider;
private final Map<EntityType, Function<K, T>> commandCreatorMap;
private final Map<EntityType, Function<Object, Entry<V>>> responseTransformerMap;
private ActorAskCacheLoader(final AskWithRetryConfig askWithRetryConfig,
final Scheduler scheduler,
final Function<EntityType, ActorRef> entityRegionProvider,
final Map<EntityType, Function<K, T>> commandCreatorMap,
final Map<EntityType, Function<Object, Entry<V>>> responseTransformerMap) {
this.askWithRetryConfig = requireNonNull(askWithRetryConfig);
this.scheduler = scheduler;
this.entityRegionProvider = requireNonNull(entityRegionProvider);
this.commandCreatorMap = Map.copyOf(requireNonNull(commandCreatorMap));
this.responseTransformerMap = Map.copyOf(requireNonNull(responseTransformerMap));
}
/**
* Constructs an {@link ActorAskCacheLoader} with a sharded entity region which supports a single resource type.
*
* @param askWithRetryConfig the configuration for the "ask with retry" pattern applied for the cache loader.
* @param scheduler the scheduler to use for the "ask with retry" for retries.
* @param entityType the entity type.
* @param entityRegion the entity region.
* @param commandCreator function for creating a load-command by an entity id (without resource type).
* @param responseTransformer function for mapping a load-response to an {@link Entry}.
* @param <V> type of values in the cache entry.
* @return the built ActorAskCacheLoader.
*/
public static <V, K extends EntityId> ActorAskCacheLoader<V, Command<?>, K> forShard(
final AskWithRetryConfig askWithRetryConfig,
final Scheduler scheduler,
final EntityType entityType,
final ActorRef entityRegion,
final Function<K, Command<?>> commandCreator,
final Function<Object, Entry<V>> responseTransformer) {
requireNonNull(askWithRetryConfig);
requireNonNull(entityType);
requireNonNull(entityRegion);
requireNonNull(commandCreator);
requireNonNull(responseTransformer);
return new ActorAskCacheLoader<>(askWithRetryConfig, scheduler,
EntityRegionMap.singleton(entityType, entityRegion),
Collections.singletonMap(entityType, commandCreator),
Collections.singletonMap(entityType, responseTransformer));
}
@Override
public CompletableFuture<Entry<V>> asyncLoad(final K entityId, final Executor executor) {
final var entityType = entityId.getEntityType();
return CompletableFuture.supplyAsync(() -> getCommand(entityType, entityId), executor)
.thenCompose(command -> {
final ActorRef entityRegion = getEntityRegion(entityType);
LOGGER.debug("Going to retrieve cache entry for key <{}> with command <{}>: ", entityId, command);
return AskWithRetry.askWithRetry(entityRegion, command, askWithRetryConfig, scheduler, executor,
response -> transformResponse(entityType, response)
);
});
}
private ActorRef getEntityRegion(final EntityType entityType) {
final ActorRef entityRegion = entityRegionProvider.apply(entityType);
if (entityRegion == null) {
throw new IllegalStateException("null entity region returned for resource type " + entityType);
}
return entityRegion;
}
private T getCommand(final EntityType entityType, final K id) {
final Function<K, T> commandCreator = commandCreatorMap.get(entityType);
if (commandCreator == null) {
final var message =
String.format("Don't know how to create retrieve command for resource type <%s> and id <%s>",
entityType, id);
throw new NullPointerException(message);
} else {
return commandCreator.apply(id);
}
}
private Entry<V> transformResponse(final EntityType entityType, final Object response) {
return checkNotNull(responseTransformerMap.get(entityType), "entityType").apply(response);
}
}