-
Notifications
You must be signed in to change notification settings - Fork 5
/
AbstractMultitonVerticle.java
153 lines (140 loc) · 4.91 KB
/
AbstractMultitonVerticle.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package org.genericsystem.ir;
import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.SharedData;
/**
* This class is used to deploy only a specific number of instance of a Verticle on a cluster. If a second instance of the Verticle is deployed, the deployment will be rolled back.
*
* @author Pierrik Lassalas
*/
public abstract class AbstractMultitonVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final String className = this.getClass().getSimpleName();
protected final String startingErrorMsg = "An error has occured while deploying" + className;
protected final String stoppingErrorMsg = "An error has occured while stopping" + className;
protected boolean isDeployed;
/**
* This method returns the name of the cluster-wide counter. This name should be unique.
*
* @return a String
*/
protected abstract String getCounter();
/**
* This method returns the expected maximum value of the cluster-wide counter. If represents the maximum number of instances for the Verticle extending this class.
*
* @return a String
*/
protected abstract int getCounterOk();
/**
* Try to deploy on cluster. If it succeed, the method {@link #deployVerticle(Vertx)} will be called, and {@link #undeployVerticle(Vertx)} otherwise.
*/
public void doDeploy() {
Tools.deployOnCluster(vertx -> {
SharedData sd = vertx.sharedData();
sd.getCounter(getCounter(), res -> {
if (!res.succeeded()) {
throw new IllegalStateException(startingErrorMsg, res.cause());
} else {
Counter counter = res.result();
logger.debug("Couter {} successfully acquired", getCounter());
counter.incrementAndGet(ar -> {
if (!ar.succeeded())
throw new IllegalStateException(startingErrorMsg, res.cause());
else {
long value = ar.result();
logger.debug("Counter {} incremented to {} (previously {})", getCounter(), value, (value - 1));
if (value == getCounterOk()) {
logger.debug("Deploying verticle...");
deployVerticle(vertx);
this.isDeployed = true;
logger.debug("Verticle deployed!");
} else {
logger.warn("An instance of {} is already deployed on the cluster. Aborting...", className);
counter.decrementAndGet(ar2 -> {
if (!ar2.succeeded())
throw new IllegalStateException(startingErrorMsg, res.cause());
else {
long newValue = ar2.result();
logger.debug("Counter {} decremented to {} (previously {})", getCounter(), newValue, (newValue + 1));
undeployVerticle(vertx);
}
});
}
}
});
}
});
});
}
/**
* This method defines the action to run once the Verticle is deployed (e.g., vertx.deployVerticle(...)).
*
* @param vertx - the vertx
*/
protected abstract void deployVerticle(Vertx vertx);
/**
* This method defines the action to run in case the Verticle can not be deployed.
*
* @param vertx - the vertx
*/
protected void undeployVerticle(Vertx vertx) {
logger.info("Undeploying verticle...");
vertx.close(res -> {
if (res.succeeded())
logger.info("Undeploy successful");
else
throw new IllegalStateException("An error has occured while shutting down the verticle", res.cause());
});
}
@Override
public void stop(Future<Void> stopFuture) throws Exception {
if (isDeployed) {
SharedData sd = vertx.sharedData();
sd.getCounter(getCounter(), res -> {
if (!res.succeeded()) {
throw new IllegalStateException(stoppingErrorMsg, res.cause());
} else {
Counter counter = res.result();
logger.debug("Couter {} successfully acquired", getCounter());
counter.get(ar -> {
if (!ar.succeeded())
throw new IllegalStateException(stoppingErrorMsg, res.cause());
else {
long value = ar.result();
logger.debug("Counter {} has value: {})", getCounter(), value);
if (value == getCounterOk())
throw new IllegalStateException(String.format("Unexpected value (%d) for counter %s", value, getCounter()));
else {
counter.decrementAndGet(ar2 -> {
if (!ar2.succeeded())
throw new IllegalStateException(stoppingErrorMsg, res.cause());
else {
long newValue = ar2.result();
logger.debug("Counter {} decremented to {} (previously {})", getCounter(), newValue, (newValue + 1));
}
});
}
}
});
}
});
}
super.stop(stopFuture);
}
/**
* Deploys only a single instance of a Verticle.
*
* @author Pierrik Lassalas
*/
public static abstract class AbstractSingletonVerticle extends AbstractMultitonVerticle {
@Override
protected int getCounterOk() {
return 1;
}
}
}