-
Notifications
You must be signed in to change notification settings - Fork 3
/
StartTransformation.java
215 lines (165 loc) · 7.88 KB
/
StartTransformation.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package uk.gov.justice.event.tool;
import static java.lang.String.format;
import static java.nio.file.Files.delete;
import static org.wildfly.swarm.bootstrap.Main.MAIN_PROCESS_FILE;
import uk.gov.justice.event.tool.task.StreamTransformationTask;
import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepository;
import uk.gov.justice.tools.eventsourcing.transformation.service.EventStreamTransformationService;
import uk.gov.justice.tools.eventsourcing.transformation.service.PrePublishedQueueTruncatorService;
import uk.gov.justice.tools.eventsourcing.transformation.service.PublishedEventsRebuilderService;
import java.io.File;
import java.io.IOException;
import java.util.Deque;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.enterprise.concurrent.ManagedExecutorService;
import javax.enterprise.concurrent.ManagedTaskListener;
import javax.inject.Inject;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.wildfly.swarm.spi.runtime.annotations.ConfigurationValue;
@Singleton
@Startup
public class StartTransformation implements ManagedTaskListener {
private static final String NO_PROCESS_FILE_WARNING = "!!!!! No Swarm Process File specific, application will not auto-shutdown on completion. Please use option '-Dorg.wildfly.swarm.mainProcessFile=/pathTo/aFile' to specify location of process file with read/write permissions !!!!!";
@Inject
@ConfigurationValue("streamCountReportingInterval")
private int streamCountReportingInterval;
@Inject
@ConfigurationValue("processAllStreams")
private boolean processAllStreams;
@Inject
private Logger logger;
@Resource(name = "event-tool")
private ManagedExecutorService executorService;
@Inject
private EventRepository eventRepository;
@Inject
private EventStreamTransformationService eventStreamTransformationService;
@Inject
private PassesDeterminer passesDeterminer;
@Inject
private PublishedEventsRebuilderService publishedEventsRebuilderService;
@Inject
private PrePublishedQueueTruncatorService prePublishedQueueTruncatorService;
final Deque<Future<UUID>> outstandingTasks = new LinkedBlockingDeque<>();
boolean allTasksCreated = false;
private StopWatch stopWatch = new StopWatch();
private AtomicInteger processedStreamsCount = new AtomicInteger(0);
@PostConstruct
void go() {
logger.info("-------------- Invoke Event Streams Transformation -------------");
stopWatch.start();
checkForMainProcessFile();
createTransformationTasks(passesDeterminer.getPassValue());
logger.info("-------------- Invocation of Event Streams Transformation Completed --------------");
}
private void createTransformationTasks(final int pass) {
if (streamCountReportingInterval == 0) {
throw new IllegalArgumentException("Invalid streamCountReportingInterval argument");
}
final Stream<UUID> activeStreams = processAllStreams ? eventRepository.getAllStreamIds() : eventRepository.getAllActiveStreamIds();
logger.info(format("Processing %s streams", processAllStreams ? "all" : "active"));
activeStreams
.forEach(streamId -> {
final StreamTransformationTask transformationTask = new StreamTransformationTask(streamId, eventStreamTransformationService, this, pass);
outstandingTasks.add(executorService.submit(transformationTask));
processedStreamsCount.getAndIncrement();
reportTransformationProgress(pass);
});
activeStreams.close();
if (outstandingTasks.isEmpty()) {
truncateQueueAndRebuildPublishedEvents();
shutdown();
}
allTasksCreated = true;
}
private void reportTransformationProgress(final int pass) {
if ((processedStreamsCount.get() != 0 && streamCountReportingInterval != 0)
&& (processedStreamsCount.get() % streamCountReportingInterval == 0)) {
final long time = stopWatch.getTime();
logger.info(format("Pass %s - Streams count: %s - time(ms): %s", pass, processedStreamsCount, time));
}
}
public void taskStarting(final Future<?> futureTask, final ManagedExecutorService managedExecutorService, final Object task) {
logger.debug("Starting Transformation task");
}
public void taskSubmitted(final Future<?> futureTask, final ManagedExecutorService managedExecutorService, final Object task) {
logger.debug("Submitted Transformation task");
}
public void taskDone(final Future<?> futureTask, final ManagedExecutorService managedExecutorService, final Object task, final Throwable throwable) {
logger.debug("Completed Transformation task");
removeOutstandingTask(futureTask);
nextPassIfFinished();
}
public void taskAborted(final Future<?> futureTask, final ManagedExecutorService managedExecutorService, final Object task, final Throwable throwable) {
logger.error("Aborted Transformation task", throwable);
removeOutstandingTask(futureTask);
truncateQueueAndRebuildPublishedEvents();
shutDownIfFinished();
}
private void removeOutstandingTask(final Future<?> futureTask) {
outstandingTasks.remove(futureTask);
}
private void nextPassIfFinished() {
if (isTaskFinished()) {
final boolean isLastElementInPasses = passesDeterminer.isLastElementInPasses();
if (isLastElementInPasses) {
truncateQueueAndRebuildPublishedEvents();
shutdown();
} else {
createTransformationTasks(passesDeterminer.getNextPassValue());
}
}
}
private void shutDownIfFinished() {
if (isTaskFinished()) {
shutdown();
}
}
private boolean isTaskFinished() {
return allTasksCreated && outstandingTasks.isEmpty();
}
private void shutdown() {
stopWatch.stop();
logger.info("Shutdown Time taken in secs: " + stopWatch.getTime() / 1000);
logger.info("========== ALL TASKS HAVE BEEN DISPATCHED -- ATTEMPTING SHUTDOWN =================");
final String processFile = System.getProperty(MAIN_PROCESS_FILE);
if (processFile != null) {
final File uuidFile = new File(processFile);
if (uuidFile.exists()) {
try {
delete(uuidFile.toPath());
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn(format("Failed to delete process file '%s', file does not exist", processFile), e);
}
}
} else {
if (logger.isWarnEnabled()) {
logger.warn(format("Failed to delete process file '%s', file does not exist", processFile));
}
}
}
}
private void truncateQueueAndRebuildPublishedEvents() {
logger.info("-------------- Truncating pre_publish_queue --------------");
prePublishedQueueTruncatorService.truncate();
logger.info("-------------- Truncate of pre_publish_queue complete --------------");
logger.info("-------------- Rebuilding the Published Events after complete transformation --------------");
publishedEventsRebuilderService.rebuild();
logger.info("-------------- Rebuild of the Published Events complete --------------");
}
private void checkForMainProcessFile() {
if (System.getProperty(MAIN_PROCESS_FILE) == null) {
logger.warn(NO_PROCESS_FILE_WARNING);
}
}
}