This repository has been archived by the owner on Jul 19, 2022. It is now read-only.
/
Coordinator.java
executable file
·263 lines (220 loc) · 9.59 KB
/
Coordinator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
/**
* This file is part of CloudML [ http://cloudml.org ]
*
* Copyright (C) 2012 - SINTEF ICT
* Contact: Franck Chauvel <franck.chauvel@sintef.no>
*
* Module: root
*
* CloudML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* CloudML is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with CloudML. If not, see
* <http://www.gnu.org/licenses/>.
*/
package org.cloudml.mrt;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.cloudml.codecs.JsonCodec;
import org.cloudml.core.*;
import org.cloudml.mrt.cmd.CmdWrapper;
import org.cloudml.mrt.cmd.abstracts.Change;
import org.cloudml.mrt.cmd.abstracts.Instruction;
import org.cloudml.mrt.cmd.abstracts.Listener;
import org.cloudml.mrt.cmd.gen.Ack;
import org.cloudml.mrt.cmd.gen.CloudMLCmds;
import org.cloudml.mrt.cmd.gen.Extended;
import org.cloudml.mrt.cmd.gen.Snapshot;
import org.cloudml.mrt.sample.SystemOutPeerStub;
import org.yaml.snakeyaml.Yaml;
/**
* @author Hui Song & Nicolas Ferry
*/
public class Coordinator {
public static Coordinator SINGLE_INSTANCE = null;
public static final String ADDITIONAL_PREFIX = "!additional";
private static final Logger journal = Logger.getLogger(Coordinator.class.getName());
CommandReception reception = null;
CommandExecutor executor = null;
List<Change> changeList = new ArrayList<Change>();
NodificationCentre notificationCentre = new NodificationCentre();
Instruction lastInstruction = null;
public Coordinator() {
ModelRepo repo = new SimpleModelRepo();
executor = new CommandExecutor(repo);
}
public void updateStatusInternalComponent(String name, String newState, String identity) {
//A PeerStub identifies who launches the modifications
PeerStub committer = new SystemOutPeerStub(identity);
//A wrapper hides the complexity of invoking the coordinator
CmdWrapper wrapper = new CmdWrapper(this, committer);
//Update the value of status
try {
Thread.sleep(1500);
journal.log(Level.INFO, ">> Updating the model..");
wrapper.eSet("/componentInstances[name='" + name + "']", wrapper.makePair("status", "" + newState + ""));
journal.log(Level.INFO, ">> Status of: " + name + " changed in: " + newState + "");
} catch (org.apache.commons.jxpath.JXPathNotFoundException e) {
journal.log(Level.INFO, "Machine: " + name + " not in this model");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void updateStatus(String name, ComponentInstance.State newState, String identity) {
//A PeerStub identifies who launches the modifications
PeerStub committer = new SystemOutPeerStub(identity);
//A wrapper hides the complexity of invoking the coordinator
CmdWrapper wrapper = new CmdWrapper(this, committer);
journal.log(Level.INFO, ">> Prepare to update status of: "+name);
//Update the value of status
try {
Thread.sleep(1500);
Object res = wrapper.eGet("/componentInstances[name='" + name + "']/status");
ComponentInstance res2=(ComponentInstance)wrapper.eGet("/componentInstances[name='" + name + "']");
if (res !=null) {
if (!res.toString().equals(newState.toString())) {
journal.log(Level.INFO, ">> Updating the model..");
wrapper.eSet("/componentInstances[name='" + name + "']", wrapper.makePair("status", "" + newState + ""));
journal.log(Level.INFO, ">> Status of: " + name + " changed in: " + newState + "");
if(res2 instanceof VMInstance)
((VMInstance)res2).setStatus(newState);
}
}else{
if(res2 instanceof ExternalComponentInstance)
((ExternalComponentInstance)res2).setStatus(ComponentInstance.State.PENDING);
journal.log(Level.INFO, ">> No former status, updating the model..");
wrapper.eSet("/componentInstances[name='" + name + "']", wrapper.makePair("status", "" + newState + ""));
journal.log(Level.INFO, ">> Status of: " + name + " changed in: " + newState + "");
if(res2 instanceof VMInstance)
((VMInstance)res2).setStatus(newState);
}
} catch (org.apache.commons.jxpath.JXPathNotFoundException e) {
journal.log(Level.INFO, "Machine: " + name + " not in this model");
} catch (InterruptedException e) {
journal.log(Level.SEVERE, ">> Could not update status!");
journal.log(Level.SEVERE, e.getLocalizedMessage(), e);
e.printStackTrace();
}
}
public void ack(String status, String identity){
Ack change=new Ack(identity);
change.fromPeer=identity;
change.status=status;
changeList.add(change);
}
public void updateIP(String name, String ip, String identity){
PeerStub committer = new SystemOutPeerStub(identity);
CmdWrapper wrapper = new CmdWrapper(this, committer);
try {
Thread.sleep(1500);
Object res = wrapper.eGet("/componentInstances[name='" + name + "']/publicAddress");
if (res !=null) {
journal.log(Level.INFO, ">> Updating the model..");
wrapper.eSet("/componentInstances[name='" + name + "']", wrapper.makePair("publicAddress", "" + ip + ""));
journal.log(Level.INFO, ">> IP of: " + name + " changed in: " + ip + "");
}
} catch (org.apache.commons.jxpath.JXPathNotFoundException e) {
journal.log(Level.INFO, "Machine: " + name + " not in this model");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void setModelRepo(ModelRepo repo) {
this.executor.setModelRepo(repo);
}
public Coordinator(String initModel) {
this();
Extended extended = new Extended();
extended.name = "LoadDeployment";
extended.params = Arrays.asList(initModel);
this.process(extended, new PeerStub() {
@Override
public String getID() {
return "RootUser";
}
@Override
public void sendMessage(Object message) {
System.out.println(String.format("RootUser:>> %s", message));
}
});
}
public void start() {
if (reception != null)
reception.start();
notificationCentre.coordinator = this;
notificationCentre.startListening();
}
public void removeListener(PeerStub from){
notificationCentre.removeListener(from);
}
public void setReception(CommandReception reception) {
this.reception = reception;
}
public Object process(Instruction inst, PeerStub from) {
//Do something before, such as record every instruction
this.lastInstruction = inst;
inst.fromPeer = from.getID();
return executor.execute(inst, changeList);
//Do something after, such as...
}
public Object process(Listener listener, PeerStub from) {
listener.id = listener.id + from.getID();
if (listener.cancel) {
notificationCentre.removeListener(listener);
} else {
listener.root = executor.repo.getRoot();
notificationCentre.addListener(listener, from);
}
return null;
}
public String process(String cmdLiteral, PeerStub from) {
if (cmdLiteral.startsWith(ADDITIONAL_PREFIX)) {
String additional = cmdLiteral.substring(ADDITIONAL_PREFIX.length());
lastInstruction.addAdditional(additional);
return (String) process(lastInstruction, from);
}
Yaml yaml = CloudMLCmds.INSTANCE.getYaml();
String ret = "";
for (Object cmd : yaml.loadAll(cmdLiteral)) {
Object obj = null;
if (cmd instanceof Instruction)
obj = process((Instruction) cmd, from);
else if (cmd instanceof Listener)
obj = process((Listener) cmd, from);
if (obj != null) {
ret += String.format("###return of %s###\n%s\n", cmd.getClass().getSimpleName(), codec(obj));
}
}
return ret;
}
public String codec(Object object) {
if (object instanceof Deployment) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonCodec jsonCodec = new JsonCodec();
jsonCodec.save((Deployment) object, baos);
return baos.toString("UTF-8");
} catch (UnsupportedEncodingException ex) {
Logger.getLogger(Coordinator.class.getName()).log(Level.SEVERE, null, ex);
return null;
}
} else {
Snapshot snapshot = new Snapshot();
snapshot.content = object;
return CloudMLCmds.INSTANCE.getYaml().dump(snapshot);
}
}
}