diff --git a/platform/bridge/src/main/java/jadex/bridge/SFuture.java b/platform/bridge/src/main/java/jadex/bridge/SFuture.java index e7a0cb17ed..1001d9f177 100644 --- a/platform/bridge/src/main/java/jadex/bridge/SFuture.java +++ b/platform/bridge/src/main/java/jadex/bridge/SFuture.java @@ -9,6 +9,7 @@ import jadex.bridge.service.component.interceptors.FutureFunctionality; import jadex.bridge.service.search.ServiceEvent; import jadex.bridge.service.types.registry.SlidingCuckooFilter; +import jadex.commons.ICommand; import jadex.commons.IResultCommand; import jadex.commons.SUtil; import jadex.commons.future.Future; @@ -380,7 +381,7 @@ public static T getFirstResultAndTerminate(ITerminableIntermediateFuture */ public static ISubscriptionIntermediateFuture combineSubscriptionFutures(IInternalAccess ia, ISubscriptionIntermediateFuture f1, ISubscriptionIntermediateFuture f2) { - return combineSubscriptionFutures(ia, f1, f2, null); + return combineSubscriptionFutures(ia, f1, f2, null, null); } /** @@ -390,7 +391,7 @@ public static ISubscriptionIntermediateFuture combineSubscriptionFutures( * @param f2 Future 2. * @return A future combining results of f1 and f2. */ - public static ISubscriptionIntermediateFuture combineSubscriptionFutures(IInternalAccess ia, ISubscriptionIntermediateFuture f1, ISubscriptionIntermediateFuture f2, IResultCommand cmd) + public static ISubscriptionIntermediateFuture combineSubscriptionFutures(IInternalAccess ia, ISubscriptionIntermediateFuture f1, ISubscriptionIntermediateFuture f2, IResultCommand cmd, ICommand termcom) { final SlidingCuckooFilter scf = new SlidingCuckooFilter(); @@ -456,9 +457,12 @@ protected Object removeValue(Object val) public void handleTerminated(Exception reason) { // TODO: multi delegation future with multiple sources but one target? - if(f2!=null) + if(f2!=null && !f2.isDone()) f2.terminate(reason); + if(termcom!=null) + termcom.execute(reason); + super.handleTerminated(reason); } }); @@ -471,7 +475,8 @@ public void handleTerminated(Exception reason) f2.next(result-> { ((IntermediateFuture)ret).addIntermediateResult((T)result); - }).catchEx(exception -> {}); // Ignore exception (printed when no listener supplied) + }) + .catchEx(exception -> {}); // Ignore exception (printed when no listener supplied) } return ret; diff --git a/platform/webservice/src/main/java/jadex/extension/rs/publish/AbstractRestPublishService.java b/platform/webservice/src/main/java/jadex/extension/rs/publish/AbstractRestPublishService.java index 3a79063d29..7722af569d 100644 --- a/platform/webservice/src/main/java/jadex/extension/rs/publish/AbstractRestPublishService.java +++ b/platform/webservice/src/main/java/jadex/extension/rs/publish/AbstractRestPublishService.java @@ -51,6 +51,7 @@ import jadex.bridge.service.types.serialization.ISerializationServices; import jadex.commons.SReflect; import jadex.commons.SUtil; +import jadex.commons.TimeoutException; import jadex.commons.Tuple2; import jadex.commons.collection.LeaseTimeMap; import jadex.commons.collection.MultiCollection; @@ -130,6 +131,9 @@ public abstract class AbstractRestPublishService implements IWebPublishService /** Http header for the client side timeout of calls (req). */ public static final String HEADER_JADEX_CLIENTTIMEOUT = "x-jadex-clienttimeout"; + /** Http header for the client side to indicate that conversation is still alive/ongoing. */ + public static final String HEADER_JADEX_ALIVE = "x-jadex-alive"; + /** Http header to terminate the call (req). */ public static final String HEADER_JADEX_TERMINATE = "x-jadex-terminate"; @@ -483,6 +487,8 @@ public IFuture execute(IInternalAccess ia) return; } + pruneObsoleteConversations(); + // https://stackoverflow.com/questions/14139753/httpservletrequest-getsessiontrue-thread-safe // solution: always create on container thread and remember //final Map session = getSession(request, true); @@ -496,6 +502,8 @@ public IFuture execute(IInternalAccess ia) String callid = request.getHeader(HEADER_JADEX_CALLID); ri.setCallid(callid); + String alive = request.getHeader(HEADER_JADEX_ALIVE); + // check if it is a login request String platformsecret = request.getHeader(HEADER_JADEX_LOGIN); String logout = request.getHeader(HEADER_JADEX_LOGOUT); @@ -535,6 +543,29 @@ else if(isloggedin!=null) //writeResponse(ret, Response.Status.OK.getStatusCode(), callid, null, request, response, true, null); writeResponse(ri.setResult(ret).setStatus(Response.Status.OK.getStatusCode()).setFinished(true)); } + else if(alive!=null) + { + ConversationInfo cinfo = conversationinfos.get(callid); + if(cinfo!=null) + { + boolean aliveb = Boolean.parseBoolean(alive); + if(aliveb) + { + System.out.println("timestamp updated: "+callid); + cinfo.updateTimestamp(); + } + else + { + terminateConversation(cinfo, new RuntimeException("Terminated from client")); + } + writeResponse(ri.setStatus(Response.Status.OK.getStatusCode()).setFinished(true)); + } + else + { + System.out.println("callid not found for alive: "+callid); + writeResponse(ri.setStatus(Response.Status.NOT_FOUND.getStatusCode()).setFinished(true)); + } + } else { // check if call is an intermediate result fetch @@ -560,12 +591,14 @@ else if(isloggedin!=null) //IAsyncContextInfo ctx = getAsyncContextInfo(request); //saveRequestContext(callid, ctx); - if(!"true".equals(terminate)) + terminateConversation(rinfo, null); + + /*if(!"true".equals(terminate)) ((ITerminableFuture)rinfo.getFuture()).terminate(new RuntimeException(terminate)); else ((ITerminableFuture)rinfo.getFuture()).terminate(); - conversationinfos.remove(callid); + conversationinfos.remove(callid);*/ //if(callid.indexOf("subscribeToPlatforms")!=-1) // System.out.println("Removed connection: "+callid); @@ -582,7 +615,7 @@ else if(terminate!=null) System.out.println("Future cannot be terminated: "+callid+" "+request); writeResponse(ri.setStatus(Response.Status.NOT_FOUND.getStatusCode()).setFinished(true)); } - + // Result already available? /*else if(rinfo.checkForResult()) { @@ -716,7 +749,8 @@ else if(mis!=null && mis.size()>0) // System.out.println("added context: "+fcallid+""+ctx); ((IIntermediateFuture)ret) - .addResultListener(component.getFeature(IExecutionFeature.class).createResultListener(new IIntermediateFutureCommandResultListener() + .addResultListener(component.getFeature(IExecutionFeature.class) + .createResultListener(new IIntermediateFutureCommandResultListener() { public void resultAvailable(Collection result) { @@ -770,8 +804,9 @@ protected void handleResult(Object result, Throwable exception, Object command, //if(max!=null) //if(command==null) // System.out.println("handleResult:"+result+", "+exception+", "+command+", sse:"+(session.getAttribute("sse")!=null)); - if(command!=null) - return; // skipping commands (e.g. updatetimer) + + //if(command!=null) + // return; // skipping commands (e.g. updatetimer) /*if(exception instanceof FutureTerminatedException) { @@ -795,6 +830,13 @@ else if(exception!=null) ri.setStatus(rescode); } + else if(command!=null) + { + // Command is just a enum type for updatetimer in IForwardCommandFuture + //System.out.println("received command: "+command); + ri.setResult(command); + ri.setStatus(202); + } else { ri.setResult(result); @@ -1042,6 +1084,62 @@ public void exceptionOccurred(Exception exception) } } + /** + * Prune the timeouted conversations. + */ + protected void pruneObsoleteConversations() + { + for(Map.Entry entry: conversationinfos.entrySet()) + { + // TODO: which timeout? (client vs server). + if(System.currentTimeMillis() - entry.getValue().getTimestamp() > Starter.getDefaultTimeout(component.getId())) + { + // System.out.println("terminating due to timeout: "+exception); + //System.out.println("Conversation timed out: "+entry.getKey()); + + terminateConversation(entry.getValue(), null); + + /*entry.getValue().setTerminated(true); + if(entry.getValue().getFuture() instanceof ITerminableFuture) + { + System.out.println("Conversation timed out, terminated future: "+entry.getKey()); + ((ITerminableFuture)entry.getValue().getFuture()).terminate(new TimeoutException()); + } + else + { + // TODO: better handling of + // non-terminable futures? + //throw new TimeoutException(); + System.out.println("WARNING: Conversation timed out and future cannot be terminated: "+entry.getKey()); + }*/ + } + } + } + + /** + * + * @param cinfo + * @param ex + */ + protected void terminateConversation(ConversationInfo cinfo, Exception ex) + { + // Terminate the future if requested + cinfo.setTerminated(true); + if(cinfo.getFuture() instanceof ITerminableFuture) + { + if(ex!=null) + ((ITerminableFuture)cinfo.getFuture()).terminate(ex); + else + ((ITerminableFuture)cinfo.getFuture()).terminate(); + + conversationinfos.remove(cinfo); + } + else + { + System.out.println("WARNING: future cannot be terminated: "+cinfo); + } + } + /** * Set the cors header in the response. * @param response The response. @@ -1055,7 +1153,6 @@ protected void setCORSHeader(HttpServletResponse response) response.addHeader("Access-Control-Allow-Headers", "Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control"); } - /** * Set the cache header in the response. * @param response The response. @@ -2145,6 +2242,9 @@ protected SSEEvent createSSEEvent(ResponseInfo ri) */ protected SSEEvent createSSEEvent(Object result, boolean finished, String callid, Integer max, String exceptiontype) { + if(result!=null && result.toString().indexOf("Mes")!=-1) + System.out.println("here"); + SSEEvent event = new SSEEvent(); // Wrap content in SSE event class to add Jadex meta info event.setData(result).setFinished(finished).setCallId(callid).setMax(max).setExecptionType(exceptiontype); @@ -3230,7 +3330,7 @@ public static class ConversationInfo // to check time gap between last request from browser and current result // if gap>timeout -> abort future as probably no browser listening any more - //protected long lastcheck; + protected long lastcheck; protected IFuture future; @@ -3246,7 +3346,7 @@ public ConversationInfo(String sessionid) this.sessionid = sessionid; //this.mappingInfo = mappingInfo; //this.future = future; - //this.lastcheck = System.currentTimeMillis(); + this.lastcheck = updateTimestamp(); } /** @@ -3337,12 +3437,20 @@ public Object getResults() }*/ /** - * Get the timestamp of the last check (i.e. last request from browser). - * / + * Renew the timestamp. + */ + public long updateTimestamp() + { + return lastcheck = System.currentTimeMillis(); + } + + /** + * Get the timestamp of the last check (i.e. last request from browser). + */ public long getTimestamp() { return lastcheck; - }*/ + } /** * Get the future. diff --git a/platform/webservice/src/main/java/jadex/extension/rs/publish/jadex.js b/platform/webservice/src/main/java/jadex/extension/rs/publish/jadex.js index e0ce77bc52..849cf77209 100644 --- a/platform/webservice/src/main/java/jadex/extension/rs/publish/jadex.js +++ b/platform/webservice/src/main/java/jadex/extension/rs/publish/jadex.js @@ -15,6 +15,7 @@ { var Jadex = { + baseurl: 'webjcc', source: null, conversations: {}, @@ -33,7 +34,7 @@ console.log("created Jadex cookie: "+id); } - this.source = new EventSource('webjcc'); + this.source = new EventSource(this.baseurl); this.source.addEventListener('open', function(e) { //console.log('con established'); @@ -76,35 +77,73 @@ //console.log("message received: "+JSON.stringify(event.data)); var self = this; - // [callback, errhandler] - var cb = self.conversations[event.lastEventId]; - if(cb!=null) - { - var event = JSON.parse(event.data); + var sseevent = event.data!=null? JSON.parse(event.data): null; + var callid = sseevent.callId; - if(event?.data?.stackTrace!=null) - cb[1](event); - else if(event?.max!=null) - cb[2](event.max); + // check if updatetimer command was received + // send alive when callid is still used, otherwise ignore + if("updatetimer"===sseevent?.data?.value?.toLowerCase()) + { + var cinfo = self.conversations[callid]; + if(cinfo==null) + { + console.log("updatetimer, conversation not found: "+callid); + + // todo: which path??? + axios.get(self.baseurl, {headers: {'x-jadex-callid': callid, 'x-jadex-terminate': "true", + 'cache-control': 'no-cache, no-store'}}, this.transform) + .then(x => + { + console.log("terminate success: "+callid); + }).catch(err => + { + console.log("terminate err: "+callid+" "+err); + }); + } else - cb[0](event); + { + //console.log("terminating request sent: "+path); + axios.get(cinfo[3], {headers: {'x-jadex-callid': callid, 'x-jadex-alive': "true", + 'cache-control': 'no-cache, no-store'}}, this.transform) + .then(x => + { + console.log("alive success: "+callid); + }).catch(err => + { + console.log("alive err: "+callid+" "+err); + }); + } } else { - // done! todo: refactor id handling and create id on client - // problem: order of http and sse answer is undertermined - // but currently http answer contains conversation id and is needed before sse event - // that must use the id to lookup the handlers - - /*if(cnt<3) + // [callback, errhandler] + var cb = self.conversations[event.lastEventId]; + if(cb!=null) { - console.log("retry event: "+JSON.stringify(event)+" "+cnt); - setTimeout(() => self.processEvent(event, ++cnt), 1000); + if(sseevent?.data?.stackTrace!=null) + cb[1](sseevent); + else if(sseevent?.max!=null) + cb[2](sseevent.max); + else + cb[0](sseevent); } else - {*/ - console.log("cannot handle event: "+JSON.stringify(event)+" "+cnt); - //} + { + // done! todo: refactor id handling and create id on client + // problem: order of http and sse answer is undertermined + // but currently http answer contains conversation id and is needed before sse event + // that must use the id to lookup the handlers + + /*if(cnt<3) + { + console.log("retry event: "+JSON.stringify(event)+" "+cnt); + setTimeout(() => self.processEvent(event, ++cnt), 1000); + } + else + {*/ + console.log("cannot handle event: "+JSON.stringify(event)+" "+cnt); + //} + } } }, @@ -188,13 +227,40 @@ if(!sse) { - if(resp.status!=202) // ignore updatetimer commands + if(resp.status!=202) { if(max!=null) maxhandler(max); else handler(resp); } + else // updatetimer (and other) commands + { + if(resp.data.toLowerCase()==="updatetimer") + { + if(self.conversations[callid]==null) + { + console.log("ignoring updatetimer, conversation not found: "+callid); + } + else + { + //console.log("terminating request sent: "+path); + axios.get(path, {headers: {'x-jadex-callid': callid, 'x-jadex-alive': "alive", + 'cache-control': 'no-cache, no-store'}}, this.transform) + .then(x => + { + console.log("alive success: "+callid); + }).catch(err => + { + console.log("alive err: "+callid+" "+err); + }); + } + } + else + { + console.log("received unknown command: "+resp.data); + } + } } //call = axios.CancelToken.source(); @@ -329,11 +395,12 @@ createProxy: function(cid, servicetype) { + var self = this; let ret = new Proxy({cid: cid, type:servicetype, transform:self.transform}, { get: function(service, prop) { - let callstrprefix = 'webjcc/invokeServiceMethod?cid='+service.cid+'&servicetype='+service.type+'&methodname='+prop; + let callstrprefix = self.baseurl+'/invokeServiceMethod?cid='+service.cid+'&servicetype='+service.type+'&methodname='+prop; return function(...args) { let callstr = callstrprefix; diff --git a/tools/runtimetools-web/src/main/java/jadex/tools/web/jcc/JCCWebAgent.java b/tools/runtimetools-web/src/main/java/jadex/tools/web/jcc/JCCWebAgent.java index 2df5383942..29449b15a9 100644 --- a/tools/runtimetools-web/src/main/java/jadex/tools/web/jcc/JCCWebAgent.java +++ b/tools/runtimetools-web/src/main/java/jadex/tools/web/jcc/JCCWebAgent.java @@ -30,6 +30,7 @@ import jadex.bridge.service.types.publish.IWebPublishService; import jadex.bridge.service.types.security.ISecurityService; import jadex.commons.Boolean3; +import jadex.commons.ICommand; import jadex.commons.IResultCommand; import jadex.commons.MethodInfo; import jadex.commons.SUtil; @@ -169,13 +170,16 @@ public void customResultAvailable(Void result) throws Exception * Get events about known platforms. * @return Events for platforms. */ + protected int cnt = 0; public ISubscriptionIntermediateFuture> subscribeToPlatforms() { - //System.out.println("subscribeToPlatforms called"); + int fcnt = cnt++; + System.out.println("subscribeToPlatforms called: "+fcnt); ISubscriptionIntermediateFuture> net = agent.addQuery(new ServiceQuery<>(IExternalAccess.class, ServiceScope.NETWORK).setEventMode().setServiceTags(IExternalAccess.PLATFORM)); ISubscriptionIntermediateFuture> glo = agent.addQuery(new ServiceQuery<>(IExternalAccess.class, ServiceScope.GLOBAL).setEventMode().setServiceTags(IExternalAccess.PLATFORM)); + // creates no timeout future so that subscription will never be closed! ISubscriptionIntermediateFuture> ret = SFuture.combineSubscriptionFutures(agent, net, glo, new IResultCommand, ServiceEvent>() { @Override @@ -185,6 +189,13 @@ public ServiceEvent execute(ServiceEvent //System.out.println("subscribeToPlatforms: "+se); return se; } + }, new ICommand() + { + @Override + public void execute(Exception ex) + { + System.out.println("terminated subscription: "+fcnt); + } }); return ret; diff --git a/tools/runtimetools-web/src/main/java/jadex/tools/web/registryview/registryview.js b/tools/runtimetools-web/src/main/java/jadex/tools/web/registryview/registryview.js index 7ca8c65203..da0d2fac2a 100644 --- a/tools/runtimetools-web/src/main/java/jadex/tools/web/registryview/registryview.js +++ b/tools/runtimetools-web/src/main/java/jadex/tools/web/registryview/registryview.js @@ -231,6 +231,37 @@ class RegistryViewElement extends CidElement } } + getHeaderServiceTexts() + { + return [ + this.app.lang.t('Service Type'), + this.app.lang.t('Provided By'), + this.app.lang.t('Publication Scope'), + this.app.lang.t('Networks'), + this.app.lang.t('Security') + ]; + } + + getHeaderPlatformTexts() + { + return [ + this.app.lang.t('Platform'), + this.app.lang.t('Connected'), + this.app.lang.t('Protocol') + ]; + } + + getHeaderQueryTexts() + { + return [ + this.app.lang.t('Service Type'), + this.app.lang.t('Query Owner'), + this.app.lang.t('Search Scope'), + this.app.lang.t('Networks'), + this.app.lang.t('Tags') + ]; + } + createServiceTableDesc(x) { return [ @@ -257,7 +288,8 @@ class RegistryViewElement extends CidElement x.serviceType!=null? x.serviceType.value: '', this.beautifyCid(x.owner.name), x.scope.value, - x.networkNames!=null? x.networkNames.join(): '' + x.networkNames!=null? x.networkNames.join(): '', + x.serviceTags!=null? x.serviceTags.join(): '' ]; } @@ -439,36 +471,6 @@ class RegistryViewElement extends CidElement } } - getHeaderServiceTexts() - { - return [ - this.app.lang.t('Service Type'), - this.app.lang.t('Provided By'), - this.app.lang.t('Publication Scope'), - this.app.lang.t('Networks'), - this.app.lang.t('Security') - ]; - } - - getHeaderPlatformTexts() - { - return [ - this.app.lang.t('Platform'), - this.app.lang.t('Connected'), - this.app.lang.t('Protocol') - ]; - } - - getHeaderQueryTexts() - { - return [ - this.app.lang.t('Service Type'), - this.app.lang.t('Query Owner'), - this.app.lang.t('Search Scope'), - this.app.lang.t('Networks') - ]; - } - /*getFilteredElements() { var ret = this.getSubscription("Services").elements.filter(x => this.global? x.scope.toLowerCase()==="global": true) diff --git a/util/concurrent/src/main/java/jadex/commons/future/IBackwardCommandFuture.java b/util/concurrent/src/main/java/jadex/commons/future/IBackwardCommandFuture.java index de6cc93835..19ace9bc9a 100644 --- a/util/concurrent/src/main/java/jadex/commons/future/IBackwardCommandFuture.java +++ b/util/concurrent/src/main/java/jadex/commons/future/IBackwardCommandFuture.java @@ -1,7 +1,7 @@ package jadex.commons.future; /** - * + * Send a backward command in direction of the source, i.e. the original future emitting value. */ public interface IBackwardCommandFuture { diff --git a/util/concurrent/src/main/java/jadex/commons/future/IForwardCommandFuture.java b/util/concurrent/src/main/java/jadex/commons/future/IForwardCommandFuture.java index 3dbad43940..ee770b50c6 100644 --- a/util/concurrent/src/main/java/jadex/commons/future/IForwardCommandFuture.java +++ b/util/concurrent/src/main/java/jadex/commons/future/IForwardCommandFuture.java @@ -1,7 +1,7 @@ package jadex.commons.future; /** - * + * Send a command from source to listeners (same flow as results). */ public interface IForwardCommandFuture {