Skip to content

Commit

Permalink
[Community "wps-remote"] - Correctly set "task" messages and Progress…
Browse files Browse the repository at this point in the history
… accordingly to "LogMessages"
  • Loading branch information
afabiani committed Nov 14, 2018
1 parent 9c4caee commit fe63c15
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public Map<String, Object> execute(Map<String, Object> input, ProgressListener m

try {
// Generating a unique Process ID

LOGGER.info(
"Generating a unique Process ID for Remote Process ["
+ name
Expand Down Expand Up @@ -243,9 +242,18 @@ public void exceptionOccurred(final String pId, Exception cause, Map<String, Obj
}

@Override
public void setTask(String pId, String logMessage) {
public void setTask(final String pId, final String logMessage) {
if (pId != null && pId.equals(pid)) {
listener.setTask(new SimpleInternationalString(logMessage));
}
}

@Override
public double getProgress(final String pId) {
if (pId != null && pId.equals(pid)) {
return listener.getProgress();
}

return Double.NaN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ public interface RemoteProcessClientListener {
*/
public void progress(final String pId, final Double progress);

/**
* Gets the progress of the {@link RemoteProcess} associated to the remote service with the
* unique @param pId
*
* @param pId
* @return progress
*/
double getProgress(String pId);

/**
* Completes of the {@link RemoteProcess} associated to the remote service with the
* unique @param pId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public class XMPPClient extends RemoteProcessClient {
protected List<String> serviceChannels;

/*
* protected Map<String, List<String>> occupantsList = Collections .synchronizedMap(new HashMap<String, List<String>>());
* protected Map<String, List<String>> occupantsList = Collections
* .synchronizedMap(new HashMap<String, List<String>>());
*/

protected List<Name> registeredServices = Collections.synchronizedList(new ArrayList<Name>());
Expand Down Expand Up @@ -528,8 +529,9 @@ private void checkSecured(RemoteProcessFactoryConfiguration configuration) {
// if (url != null) {
this.certificateFile = loader.createFile(xmppServerEmbeddedCertFile.trim());
loader.copyFromClassPath(
xmppServerEmbeddedCertFile.trim(), this.certificateFile /*,
RemoteProcessFactoryConfigurationWatcher.class*/);
xmppServerEmbeddedCertFile.trim(), this.certificateFile /*
* , RemoteProcessFactoryConfigurationWatcher.class
*/);
// }
this.certificateFile = loader.find(xmppServerEmbeddedCertFile.trim());
this.certificatePassword = xmppServerEmbeddedCertPwd.trim();
Expand Down Expand Up @@ -627,12 +629,6 @@ public String execute(
*/
pendingRequests.add(
new RemoteRequestDescriptor(serviceName, input, metadata, pid, baseURL));

// NOTIFY LISTENERS
for (RemoteProcessClientListener listener : getRemoteClientListeners()) {
listener.progress(pid, 0.0);
listener.setTask(pid, "Blocked: no resources available for execution!");
}
}

return pid;
Expand Down Expand Up @@ -710,8 +706,8 @@ public void performLogin(String username, String password) throws Exception {
new MultiUserChat(connection, managementChannel + "@" + bus + "." + domain);
try {
mucManagementChannel.join(getJID(username), managementChannelPassword); /*
* , history, connection. getPacketReplyTimeout());
*/
* , history, connection. getPacketReplyTimeout());
*/
} catch (Exception e) {
mucManagementChannel.join(username, managementChannelPassword);
}
Expand All @@ -721,8 +717,8 @@ public void performLogin(String username, String password) throws Exception {
new MultiUserChat(connection, channel + "@" + bus + "." + domain);
try {
serviceChannel.join(getJID(username), managementChannelPassword); /*
* , history, connection. getPacketReplyTimeout());
*/
* , history, connection. getPacketReplyTimeout());
*/
} catch (Exception e) {
serviceChannel.join(username, managementChannelPassword);
}
Expand All @@ -744,7 +740,8 @@ public void performLogin(String username, String password) throws Exception {
* @param username
*/
private String getJID(String username) {
// final String id = md5Java(username + "@" + this.domain + "/" + System.nanoTime());
// final String id = md5Java(username + "@" + this.domain + "/" +
// System.nanoTime());
return username + "@" + this.domain;
}

Expand Down Expand Up @@ -944,15 +941,14 @@ protected void getEndpointsLoadAverages() throws Exception {
protected void checkPendingRequests() throws Exception {
synchronized (pendingRequests) {
for (RemoteRequestDescriptor request : pendingRequests) {

// Check if the request is still valid
final String pid = request.getPid();
boolean isRequestValid = false;
RemoteProcessClientListener blockedProcess = null;
for (RemoteProcessClientListener process : getRemoteClientListeners()) {
if (process.getPID().equals(pid)) {
process.progress(pid, 0.0);
process.setTask(pid, "Blocked: no resources available for execution!");
isRequestValid = true;
blockedProcess = process;
break;
}
}
Expand Down Expand Up @@ -1004,6 +1000,9 @@ protected void checkPendingRequests() throws Exception {
// Remove the request from the queue
pendingRequests.remove(request);
continue;
} else {
blockedProcess.setTask(pid, "Blocked: no resources available for execution!");
blockedProcess.progress(pid, blockedProcess.getProgress(pid));
}
}
}
Expand Down Expand Up @@ -1569,8 +1568,10 @@ public void processPacket(Packet packet) {
/** Manage the channel occupants list */
final String channel = p.getFrom().substring(0, p.getFrom().indexOf("@"));
/*
* if (xmppClient.occupantsList.get(channel) == null) { xmppClient.occupantsList.put(channel, new ArrayList<String>()); } if
* (xmppClient.occupantsList.get(channel) != null) { if (!xmppClient.occupantsList.get(channel).contains(p. getFrom()))
* if (xmppClient.occupantsList.get(channel) == null) {
* xmppClient.occupantsList.put(channel, new ArrayList<String>()); } if
* (xmppClient.occupantsList.get(channel) != null) { if
* (!xmppClient.occupantsList.get(channel).contains(p. getFrom()))
* xmppClient.occupantsList.get(channel).add(p.getFrom() ); }
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void handleSignal(
for (RemoteProcessClientListener listener : xmppClient.getRemoteClientListeners()) {
listener.setTask(pID, cause.getLocalizedMessage());
listener.exceptionOccurred(pID, cause, metadata);
listener.progress(pID, listener.getProgress(pID));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void handleSignal(
// NOTIFY LISTENERS
for (RemoteProcessClientListener listener : xmppClient.getRemoteClientListeners()) {
listener.setTask(pID, logMessage);
listener.progress(pID, listener.getProgress(pID));
}
} catch (Exception e) {
LOGGER.log(
Expand Down

0 comments on commit fe63c15

Please sign in to comment.