-
Notifications
You must be signed in to change notification settings - Fork 789
/
AggregateAnnotationCommandHandler.java
318 lines (287 loc) · 14.9 KB
/
AggregateAnnotationCommandHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
/*
* Copyright (c) 2010-2014. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.commandhandling.annotation;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandTargetResolver;
import org.axonframework.commandhandling.VersionedAggregateIdentifier;
import org.axonframework.common.Assert;
import org.axonframework.common.Subscribable;
import org.axonframework.common.annotation.AbstractMessageHandler;
import org.axonframework.common.annotation.ClasspathParameterResolverFactory;
import org.axonframework.common.annotation.ParameterResolverFactory;
import org.axonframework.domain.AggregateRoot;
import org.axonframework.repository.Repository;
import org.axonframework.unitofwork.UnitOfWork;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import static org.axonframework.commandhandling.annotation.CommandMessageHandlerUtils.resolveAcceptedCommandName;
/**
* Command handler that handles commands based on {@link org.axonframework.commandhandling.annotation.CommandHandler}
* annotations on an aggregate. Those annotations may appear on methods, in which case a specific aggregate instance
* needs to be targeted by the command, or on the constructor. The latter will create a new Aggregate instance, which
* is then stored in the repository.
*
* @param <T> the type of aggregate this handler handles commands for
* @author Allard Buijze
* @since 1.2
*/
public class AggregateAnnotationCommandHandler<T extends AggregateRoot>
implements Subscribable, CommandHandler<Object> {
private final CommandBus commandBus;
private final Repository<T> repository;
private final CommandTargetResolver commandTargetResolver;
private final Map<String, CommandHandler<Object>> handlers;
/**
* Initializes an AnnotationCommandHandler based on the annotations on given <code>aggregateType</code>, using the
* given <code>repository</code> to add and load aggregate instances.
*
* @param aggregateType The type of aggregate
* @param repository The repository providing access to aggregate instances
*/
public AggregateAnnotationCommandHandler(Class<T> aggregateType, Repository<T> repository) {
this(aggregateType, repository, new AnnotationCommandTargetResolver());
}
/**
* Initializes an AnnotationCommandHandler based on the annotations on given <code>aggregateType</code>, using the
* given <code>repository</code> to add and load aggregate instances and the default ParameterResolverFactory.
*
* @param aggregateType The type of aggregate
* @param repository The repository providing access to aggregate instances
* @param commandTargetResolver The target resolution strategy
*/
public AggregateAnnotationCommandHandler(Class<T> aggregateType, Repository<T> repository,
CommandTargetResolver commandTargetResolver) {
this(aggregateType, repository, commandTargetResolver,
ClasspathParameterResolverFactory.forClass(aggregateType));
}
/**
* Initializes an AnnotationCommandHandler based on the annotations on given <code>aggregateType</code>, using the
* given <code>repository</code> to add and load aggregate instances and the given
* <code>parameterResolverFactory</code>.
*
* @param aggregateType The type of aggregate
* @param repository The repository providing access to aggregate instances
* @param commandTargetResolver The target resolution strategy
* @param parameterResolverFactory The strategy for resolving parameter values for handler methods
*/
public AggregateAnnotationCommandHandler(Class<T> aggregateType, Repository<T> repository,
CommandTargetResolver commandTargetResolver,
ParameterResolverFactory parameterResolverFactory) {
Assert.notNull(aggregateType, "aggregateType may not be null");
Assert.notNull(repository, "repository may not be null");
Assert.notNull(commandTargetResolver, "commandTargetResolver may not be null");
this.repository = repository;
this.commandBus = null;
this.commandTargetResolver = commandTargetResolver;
this.handlers = initializeHandlers(new AggregateCommandHandlerInspector<T>(aggregateType,
parameterResolverFactory));
}
/**
* Initializes an AnnotationCommandHandler based on the annotations on given <code>aggregateType</code>, to be
* registered on the given <code>commandBus</code>.
*
* @param aggregateType The type of aggregate
* @param repository The repository providing access to aggregate instances
* @param commandBus The command bus to register command handlers to
* @deprecated Use {@link #AggregateAnnotationCommandHandler(Class, org.axonframework.repository.Repository)}
* and subscribe the adapter to the command bus using
* {@link org.axonframework.commandhandling.CommandBus#subscribe(String,
* org.axonframework.commandhandling.CommandHandler)}. Alternatively, use
* {@link #subscribe(Class, org.axonframework.repository.Repository, org.axonframework.commandhandling.CommandBus)}.
*/
@Deprecated
public AggregateAnnotationCommandHandler(Class<T> aggregateType, Repository<T> repository,
CommandBus commandBus) {
this(aggregateType, repository, commandBus, new AnnotationCommandTargetResolver());
}
/**
* Initializes an AnnotationCommandHandler based on the annotations on given <code>aggregateType</code>, to be
* registered on the given <code>commandBus</code>.
*
* @param aggregateType The type of aggregate
* @param repository The repository providing access to aggregate instances
* @param commandBus The command bus to register command handlers to
* @param commandTargetResolver The target resolution strategy
* @deprecated Use {@link #AggregateAnnotationCommandHandler(Class, org.axonframework.repository.Repository,
* org.axonframework.commandhandling.CommandTargetResolver)} and subscribe the handler to the command
* bus using {@link org.axonframework.commandhandling.CommandBus#subscribe(String,
* org.axonframework.commandhandling.CommandHandler)}. Alternatively, use
* {@link #subscribe(Class, org.axonframework.repository.Repository, org.axonframework.commandhandling.CommandBus,
* org.axonframework.commandhandling.CommandTargetResolver)}.
*/
@Deprecated
public AggregateAnnotationCommandHandler(Class<T> aggregateType, Repository<T> repository,
CommandBus commandBus, CommandTargetResolver commandTargetResolver) {
Assert.notNull(aggregateType, "aggregateType may not be null");
Assert.notNull(repository, "repository may not be null");
Assert.notNull(commandTargetResolver, "commandTargetResolver may not be null");
this.repository = repository;
this.commandBus = commandBus;
this.commandTargetResolver = commandTargetResolver;
this.handlers = initializeHandlers(new AggregateCommandHandlerInspector<T>(
aggregateType, ClasspathParameterResolverFactory.forClass(aggregateType)));
}
/**
* Subscribe a handler for the given aggregate type to the given command bus.
*
* @param aggregateType The type of aggregate
* @param repository The repository providing access to aggregate instances
* @param commandBus The command bus to register command handlers to
* @param <T> The type of aggregate this handler handles commands for
* @return the Adapter created for the command handler target. Can be used to unsubscribe.
*/
public static <T extends AggregateRoot> AggregateAnnotationCommandHandler subscribe(
Class<T> aggregateType, Repository<T> repository, CommandBus commandBus) {
AggregateAnnotationCommandHandler<T> adapter = new AggregateAnnotationCommandHandler<T>(aggregateType,
repository);
for (String supportedCommand : adapter.supportedCommands()) {
commandBus.subscribe(supportedCommand, adapter);
}
return adapter;
}
/**
* Subscribe a handler for the given aggregate type to the given command bus.
*
* @param aggregateType The type of aggregate
* @param repository The repository providing access to aggregate instances
* @param commandBus The command bus to register command handlers to
* @param commandTargetResolver The target resolution strategy
* @param <T> The type of aggregate this handler handles commands for
* @return the Adapter created for the command handler target. Can be used to unsubscribe.
*/
public static <T extends AggregateRoot> AggregateAnnotationCommandHandler subscribe(
Class<T> aggregateType, Repository<T> repository, CommandBus commandBus,
CommandTargetResolver commandTargetResolver) {
AggregateAnnotationCommandHandler<T> adapter = new AggregateAnnotationCommandHandler<T>(
aggregateType, repository, commandTargetResolver);
for (String supportedCommand : adapter.supportedCommands()) {
commandBus.subscribe(supportedCommand, adapter);
}
return adapter;
}
private Map<String, CommandHandler<Object>> initializeHandlers(AggregateCommandHandlerInspector<T> inspector) {
Map<String, CommandHandler<Object>> handlersFound = new HashMap<String, CommandHandler<Object>>();
for (final AbstractMessageHandler commandHandler : inspector.getHandlers()) {
handlersFound.put(resolveAcceptedCommandName(commandHandler),
new AggregateCommandHandler(commandHandler));
}
for (final ConstructorCommandMessageHandler<T> handler : inspector.getConstructorHandlers()) {
handlersFound.put(resolveAcceptedCommandName(handler), new AggregateConstructorCommandHandler(handler));
}
return handlersFound;
}
/**
* {@inheritDoc}
*
* @deprecated unsubscribing this handler should be done using {@link CommandBus#unsubscribe(String,
* org.axonframework.commandhandling.CommandHandler)}. Retrieve the supported commands with {@link
* #supportedCommands()}.
*/
@Override
@PreDestroy
@Deprecated
public synchronized void unsubscribe() {
if (commandBus != null) {
for (String commandType : handlers.keySet()) {
commandBus.unsubscribe(commandType, this);
}
}
}
/**
* {@inheritDoc}
*
* @deprecated subscribing this handler should be done using {@link CommandBus#subscribe(String,
* org.axonframework.commandhandling.CommandHandler)}. Retrieve the supported commands with {@link
* #supportedCommands()}.
*/
@PostConstruct
@Override
@Deprecated
public synchronized void subscribe() {
if (commandBus != null) {
for (String commandType : handlers.keySet()) {
commandBus.subscribe(commandType, this);
}
}
}
/**
* Returns the set of commands supported by the annotated command handler managed by this adapter. This may be used
* to (un)subscribe the adapter from the command bus.
*
* @return the set of commands supported by the annotated command handler
*/
public Set<String> supportedCommands() {
return handlers.keySet();
}
@Override
public Object handle(CommandMessage<Object> commandMessage, UnitOfWork unitOfWork) throws Throwable {
return handlers.get(commandMessage.getCommandName()).handle(commandMessage, unitOfWork);
}
private T loadAggregate(CommandMessage<?> command) {
VersionedAggregateIdentifier iv = commandTargetResolver.resolveTarget(command);
return repository.load(iv.getIdentifier(), iv.getVersion());
}
/**
* Resolves the value to return when the given <code>command</code> has created the given <code>aggregate</code>.
* This implementation returns the identifier of the created aggregate.
* <p/>
* This method may be overridden to change the return value of this Command Handler
*
* @param command The command being executed
* @param createdAggregate The aggregate that has been created as a result of the command
* @return The value to report as result of the command
*/
protected Object resolveReturnValue(CommandMessage<?> command, T createdAggregate) {
return createdAggregate.getIdentifier();
}
private class AggregateConstructorCommandHandler implements CommandHandler<Object> {
private final ConstructorCommandMessageHandler<T> handler;
public AggregateConstructorCommandHandler(ConstructorCommandMessageHandler<T> handler) {
this.handler = handler;
}
@Override
public Object handle(CommandMessage<Object> command, UnitOfWork unitOfWork) throws Throwable {
try {
final T createdAggregate = handler.invoke(null, command);
repository.add(createdAggregate);
return resolveReturnValue(command, createdAggregate);
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
}
private class AggregateCommandHandler implements CommandHandler<Object> {
private final AbstractMessageHandler commandHandler;
public AggregateCommandHandler(AbstractMessageHandler commandHandler) {
this.commandHandler = commandHandler;
}
@Override
public Object handle(CommandMessage<Object> command, UnitOfWork unitOfWork) throws Throwable {
T aggregate = loadAggregate(command);
try {
return commandHandler.invoke(aggregate, command);
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
}
}