-
Notifications
You must be signed in to change notification settings - Fork 5
/
Dispatcher.java
128 lines (117 loc) · 4.62 KB
/
Dispatcher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package org.genericsystem.ir;
import java.lang.invoke.MethodHandles;
import org.genericsystem.common.Generic;
import org.genericsystem.ir.Model.Task;
import org.genericsystem.kernel.Cache;
import org.genericsystem.kernel.Engine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.json.JsonObject;
public class Dispatcher extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final Engine engine = new Engine(System.getenv("HOME") + "/genericsystem/tasks/", Task.class);
protected Cache cache = engine.newCache();
protected final Generic taskType = engine.find(Task.class);
private static final String EMAIL_SETTINGS = System.getenv("HOME") + "/genericsystem/.gs-ir/conf/MailWatcherVerticle.json";
public static final String ADDRESS = "org.genericsystem.repartitor";
protected static final String TASK = "task";
protected static final String NEW_STATE = "newState";
protected static final String STATE = "state";
protected static final String TODO = "todo";
protected static final String RUNNING = "running";
protected static final String FINISHED = "finished";
protected static final String ABORTED = "aborted";
private static final long MESSAGE_SEND_PERIODICITY = 5000;
public static void main(String[] args) {
Tools.deployOnCluster(vertx -> {
vertx.deployVerticle(new Dispatcher(), res_ -> {
if (res_.failed())
throw new IllegalStateException(res_.cause());
});
});
}
@Override
public void start(Future<Void> startFuture) throws Exception {
cache.safeConsum(nothing -> {
for (Generic task : taskType.getInstances()) {
JsonObject json = new JsonObject((String) task.getValue());
if (RUNNING.equals(json.getString(STATE)))
updateTaskState(json, TODO);
}
});
watchMail();
vertx.deployVerticle(new HttpServerVerticle(), ar -> {
if (ar.failed())
throw new IllegalStateException("Unable to create HTTP server.", ar.cause());
else
logger.info("HTTP server started.");
});
vertx.eventBus().consumer(ADDRESS + ":watchMail", message -> {
logger.info("Restarting mail watcher thread…");
watchMail();
});
vertx.eventBus().consumer(ADDRESS + ":updateState", message -> {
JsonObject json = (JsonObject) message.body();
updateTaskState(json.getJsonObject(TASK), json.getString(NEW_STATE));
});
vertx.eventBus().consumer(ADDRESS + ":add", message -> {
cache.safeConsum(unused -> {
taskType.addInstance((String) message.body());
cache.flush();
});
});
vertx.setPeriodic(MESSAGE_SEND_PERIODICITY, h -> {
cache.safeConsum(unused -> {
for (Generic task : taskType.getInstances()) {
JsonObject json = new JsonObject((String) task.getValue());
if (TODO.equals(json.getString(STATE))) {
vertx.eventBus().send(json.getString(DistributedVerticle.TYPE), new JsonObject(json.encode()).put(STATE, RUNNING).encodePrettily(), reply -> {
if (reply.failed())
switch (((ReplyException) reply.cause()).failureType()) {
case NO_HANDLERS:
logger.warn("No handler for task: {}.", json.encodePrettily());
break;
case TIMEOUT:
logger.warn("Sending of task {} timed out.", reply.cause(), json.encodePrettily());
break;
case RECIPIENT_FAILURE:
logger.info("Task {} rejected by recipient.", reply.cause(), json.encodePrettily());
break;
}
else
updateTaskState(json, RUNNING);
});
}
}
cache.flush();
});
});
startFuture.complete();
}
private void updateTaskState(JsonObject oldValue, String newState) {
logger.debug("Updating: {}, newState: {}.", oldValue.encodePrettily(), newState);
cache.safeConsum(unused -> {
Generic task = taskType.getInstances().filter(g -> oldValue.equals(new JsonObject((String) g.getValue()))).first();
JsonObject newValue = new JsonObject(oldValue.encode()).put(STATE, newState);
task.update(newValue.encodePrettily());
cache.flush();
});
}
private void watchMail() {
vertx.fileSystem().readFile(EMAIL_SETTINGS, ar -> {
if (ar.failed())
throw new IllegalStateException("Impossible to load configuration for MailWatcherVerticle.", ar.cause());
else {
JsonObject config = new JsonObject(ar.result());
vertx.deployVerticle(new MailWatcherVerticle(), new DeploymentOptions().setConfig(config), res -> {
if (res.failed())
throw new IllegalStateException("Unable to deploy MailWatcherVerticle", res.cause());
});
}
});
}
}