-
Notifications
You must be signed in to change notification settings - Fork 215
/
StagedCommand.java
213 lines (186 loc) · 6.72 KB
/
StagedCommand.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.persistence.stages;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import akka.actor.ActorRef;
/**
* Non-serializable local-only command for multi-stage processing by
* {@link org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceActor}.
* <p>
* It contains a sequence of actions. Some actions are asynchronous. The connection actor can thus schedule the next
* action as a staged command to self after an asynchronous action. Synchronous actions can be executed right away.
*/
public final class StagedCommand implements ConnectivityCommand<StagedCommand>, Iterator<StagedCommand> {
private final ConnectivityCommand<?> command;
@Nullable private final ConnectivityEvent<?> event;
private final WithDittoHeaders response;
private final ActorRef sender;
private final Collection<ConnectionAction> actions;
private StagedCommand(final ConnectivityCommand<?> command,
@Nullable final ConnectivityEvent<?> event,
final WithDittoHeaders response,
final ActorRef sender,
final Collection<ConnectionAction> actions) {
this.command = command;
this.event = event;
this.response = response;
this.sender = sender;
this.actions = actions;
}
/**
* Create a staged command.
*
* @param command the original command.
* @param event the event to persist.
* @param response the response to send.
* @param actions remaining actions.
* @return the staged command.
*/
public static StagedCommand of(final ConnectivityCommand<?> command, @Nullable final ConnectivityEvent<?> event,
final WithDittoHeaders response, final List<ConnectionAction> actions) {
return new StagedCommand(command, event, response, ActorRef.noSender(), actions);
}
/**
* @return the wrapped command.
*/
public ConnectivityCommand<?> getCommand() {
return command;
}
/**
* @return the event to persist, apply or publish or dummy-event.
*/
public Optional<ConnectivityEvent<?>> getEvent() {
return Optional.ofNullable(event);
}
/**
* @return the response to send to the original sender, or the signal to forward to client actors.
*/
public WithDittoHeaders getResponse() {
return response;
}
/**
* @return the original sender of a command that created this staged command.
*/
public ActorRef getSender() {
return sender;
}
/**
* Enhance this command with a sender unless this command has a sender already.
*
* @param newSender the new sender.
* @return either an enhanced command or this command.
*/
public StagedCommand withSenderUnlessDefined(final ActorRef newSender) {
if (Objects.equals(ActorRef.noSender(), sender)) {
return new StagedCommand(command, event, response, newSender, actions);
} else {
return this;
}
}
/**
* Return a copy of this command with a new response.
*
* @param response the response.
* @return the copy.
*/
public StagedCommand withResponse(final WithDittoHeaders response) {
return new StagedCommand(command, event, response, sender, actions);
}
@Override
public Category getCategory() {
return command.getCategory();
}
@Override
public DittoHeaders getDittoHeaders() {
return command.getDittoHeaders();
}
@Override
public StagedCommand setDittoHeaders(final DittoHeaders dittoHeaders) {
return new StagedCommand(command.setDittoHeaders(dittoHeaders), event, response, sender, actions);
}
@Override
public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<JsonField> predicate) {
return command.toJson(schemaVersion, predicate);
}
@Override
public String getManifest() {
return command.getManifest();
}
@Override
public String getType() {
return command.getType();
}
@Override
public int hashCode() {
return Objects.hash(command, event, response, sender, actions);
}
@Override
public boolean equals(final Object o) {
if (o instanceof StagedCommand that) {
return Objects.equals(command, that.command) &&
Objects.equals(event, that.event) &&
Objects.equals(response, that.response) &&
Objects.equals(sender, that.sender) &&
Objects.equals(actions, that.actions);
} else {
return false;
}
}
@Override
public String toString() {
return String.format("%s[command=%s,event=%s,response=%s,sender=%s,actions=%s]",
getClass().getSimpleName(), command, event, response, sender, actions);
}
@Override
public boolean hasNext() {
return !actions.isEmpty();
}
@Override
public StagedCommand next() {
final Queue<ConnectionAction> queue = getActionsAsQueue();
try {
queue.remove();
} catch (final NoSuchElementException e) {
throw new NoSuchElementException("Action queue did not contain more elements");
}
return new StagedCommand(command, event, response, sender, queue);
}
/**
* Get the next action.
*
* @return the next action.
* @throws java.util.NoSuchElementException unless {@code this.hasNext()}.
*/
public ConnectionAction nextAction() {
return actions.iterator().next();
}
private Queue<ConnectionAction> getActionsAsQueue() {
return new LinkedList<>(actions);
}
}