-
Notifications
You must be signed in to change notification settings - Fork 214
/
TopLevelPolicyActionCommandStrategy.java
209 lines (187 loc) · 9.84 KB
/
TopLevelPolicyActionCommandStrategy.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.service.persistence.actors.strategies.commands;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.entitytag.EntityTag;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.persistentactors.results.Result;
import org.eclipse.ditto.internal.utils.persistentactors.results.ResultFactory;
import org.eclipse.ditto.internal.utils.persistentactors.results.ResultVisitor;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyEntry;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.signals.commands.actions.ActivateTokenIntegration;
import org.eclipse.ditto.policies.model.signals.commands.actions.DeactivateTokenIntegration;
import org.eclipse.ditto.policies.model.signals.commands.actions.PolicyActionCommand;
import org.eclipse.ditto.policies.model.signals.commands.actions.TopLevelPolicyActionCommand;
import org.eclipse.ditto.policies.model.signals.commands.actions.TopLevelPolicyActionCommandResponse;
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyActionFailedException;
import org.eclipse.ditto.policies.model.signals.events.PolicyActionEvent;
import org.eclipse.ditto.policies.model.signals.events.PolicyEvent;
import org.eclipse.ditto.policies.service.common.config.PolicyConfig;
import akka.actor.ActorSystem;
/**
* This strategy handles the {@link org.eclipse.ditto.policies.model.signals.commands.actions.TopLevelPolicyActionCommand} command.
*/
final class TopLevelPolicyActionCommandStrategy
extends AbstractPolicyCommandStrategy<TopLevelPolicyActionCommand, PolicyEvent<?>> {
private final Map<String, AbstractPolicyActionCommandStrategy<?>> policyActionCommandStrategyMap;
TopLevelPolicyActionCommandStrategy(final PolicyConfig policyConfig, final ActorSystem system) {
super(TopLevelPolicyActionCommand.class, policyConfig);
policyActionCommandStrategyMap = instantiatePolicyActionCommandStrategies(policyConfig, system);
}
@Override
protected Result<PolicyEvent<?>> doApply(final Context<PolicyId> context,
@Nullable final Policy policy,
final long nextRevision,
final TopLevelPolicyActionCommand command,
@Nullable final Metadata metadata) {
final Policy nonNullPolicy = checkNotNull(policy, "policy");
final PolicyActionCommand<?> actionCommand = command.getPolicyActionCommand();
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final List<PolicyEntry> entries = command.getAuthorizedLabels()
.stream()
.map(nonNullPolicy::getEntryFor)
.flatMap(Optional::stream)
.filter(policyEntry -> actionCommand.isApplicable(policyEntry, dittoHeaders.getAuthorizationContext()))
.toList();
final AbstractPolicyActionCommandStrategy<?> strategy =
policyActionCommandStrategyMap.get(actionCommand.getName());
if (strategy == null) {
// builds an internal server error, 500
final PolicyActionFailedException exception = PolicyActionFailedException.newBuilder()
.action(actionCommand.getName())
.dittoHeaders(dittoHeaders)
.build();
context.getLog()
.withCorrelationId(command)
.error(exception, "Strategy not found for top-level action <{}>", actionCommand.getName());
return ResultFactory.newErrorResult(exception, command);
} else if (entries.isEmpty()) {
return ResultFactory.newErrorResult(command.getNotApplicableException(dittoHeaders), command);
} else {
final List<PolicyActionCommand<?>> commands = entries.stream()
.map(PolicyEntry::getLabel)
.map(actionCommand::setLabel)
.collect(Collectors.toList());
final ResultCollectionVisitor visitor =
collectResults(strategy, context, policy, nextRevision, commands);
if (visitor.error != null) {
return ResultFactory.newErrorResult(visitor.error, command);
} else {
final Optional<CompletionStage<PolicyEvent<?>>> event = visitor.aggregateEvents();
if (event.isPresent()) {
final TopLevelPolicyActionCommandResponse response =
TopLevelPolicyActionCommandResponse.of(context.getState(), dittoHeaders);
return ResultFactory.newMutationResult(command, event.get(),
CompletableFuture.completedFuture(response));
} else {
// builds an internal server error, 500
final PolicyActionFailedException exception = PolicyActionFailedException.newBuilder()
.action(actionCommand.getName())
.dittoHeaders(dittoHeaders)
.build();
context.getLog()
.withCorrelationId(command)
.error(exception, "Visitor could not aggregate events for action <{}>", actionCommand.getName());
return ResultFactory.newErrorResult(exception, command);
}
}
}
}
@Override
public Optional<EntityTag> previousEntityTag(final TopLevelPolicyActionCommand command,
@Nullable final Policy previousEntity) {
// top level policy action commands do not support entity tag
return Optional.empty();
}
@Override
public Optional<EntityTag> nextEntityTag(final TopLevelPolicyActionCommand command,
@Nullable final Policy newEntity) {
// top level policy action commands do not support entity tag
return Optional.empty();
}
private static Map<String, AbstractPolicyActionCommandStrategy<?>> instantiatePolicyActionCommandStrategies(
final PolicyConfig policyConfig, final ActorSystem system) {
return Map.of(
ActivateTokenIntegration.NAME, new ActivateTokenIntegrationStrategy(policyConfig, system),
DeactivateTokenIntegration.NAME, new DeactivateTokenIntegrationStrategy(policyConfig, system)
);
}
private static ResultCollectionVisitor collectResults(
final AbstractPolicyActionCommandStrategy<?> strategy,
final Context<PolicyId> context,
@Nullable final Policy policy,
final long nextRevision,
final Collection<PolicyActionCommand<?>> commands) {
final ResultCollectionVisitor visitor = new ResultCollectionVisitor();
for (final PolicyActionCommand<?> command : commands) {
strategy.typeCheckAndApply(context, policy, nextRevision, command)
.ifPresent(result -> result.accept(visitor));
}
return visitor;
}
private static final class ResultCollectionVisitor implements ResultVisitor<PolicyActionEvent<?>> {
@Nullable private DittoRuntimeException error;
@Nullable private CompletionStage<PolicyActionEvent<?>> firstEvent;
private final List<CompletableFuture<PolicyActionEvent>> otherEvents = new ArrayList<>();
private Optional<CompletionStage<PolicyEvent<?>>> aggregateEvents() {
if (firstEvent == null) {
return Optional.empty();
} else {
return Optional.of(
firstEvent.thenCompose(fe ->
CompletableFuture.allOf(otherEvents.toArray(new CompletableFuture[0]))
.thenApply(aVoid -> fe.aggregateWith(otherEvents.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.toList()
))
)
);
}
}
@Override
public void onMutation(final Command<?> command, final CompletionStage<PolicyActionEvent<?>> event,
final CompletionStage<WithDittoHeaders> response, final boolean becomeCreated,
final boolean becomeDeleted) {
if (firstEvent == null) {
firstEvent = event;
} else {
otherEvents.add(event.thenApply(x -> (PolicyActionEvent) x).toCompletableFuture());
}
}
@Override
public void onQuery(final Command<?> command, final CompletionStage<WithDittoHeaders> response) {
// do nothing
}
@Override
public void onError(final DittoRuntimeException error, final Command<?> errorCausingCommand) {
this.error = error;
}
}
}