Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lb-actoron committed Mar 23, 2022
1 parent b794d12 commit 1c97553
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 74 deletions.
13 changes: 9 additions & 4 deletions platform/bridge/src/main/java/jadex/bridge/SFuture.java
Expand Up @@ -9,6 +9,7 @@
import jadex.bridge.service.component.interceptors.FutureFunctionality;
import jadex.bridge.service.search.ServiceEvent;
import jadex.bridge.service.types.registry.SlidingCuckooFilter;
import jadex.commons.ICommand;
import jadex.commons.IResultCommand;
import jadex.commons.SUtil;
import jadex.commons.future.Future;
Expand Down Expand Up @@ -380,7 +381,7 @@ public static <T> T getFirstResultAndTerminate(ITerminableIntermediateFuture<T>
*/
public static <T> ISubscriptionIntermediateFuture<T> combineSubscriptionFutures(IInternalAccess ia, ISubscriptionIntermediateFuture<T> f1, ISubscriptionIntermediateFuture<T> f2)
{
return combineSubscriptionFutures(ia, f1, f2, null);
return combineSubscriptionFutures(ia, f1, f2, null, null);
}

/**
Expand All @@ -390,7 +391,7 @@ public static <T> ISubscriptionIntermediateFuture<T> combineSubscriptionFutures(
* @param f2 Future 2.
* @return A future combining results of f1 and f2.
*/
public static <T, E> ISubscriptionIntermediateFuture<T> combineSubscriptionFutures(IInternalAccess ia, ISubscriptionIntermediateFuture<E> f1, ISubscriptionIntermediateFuture<E> f2, IResultCommand<T, E> cmd)
public static <T, E> ISubscriptionIntermediateFuture<T> combineSubscriptionFutures(IInternalAccess ia, ISubscriptionIntermediateFuture<E> f1, ISubscriptionIntermediateFuture<E> f2, IResultCommand<T, E> cmd, ICommand<Exception> termcom)
{
final SlidingCuckooFilter scf = new SlidingCuckooFilter();

Expand Down Expand Up @@ -456,9 +457,12 @@ protected Object removeValue(Object val)
public void handleTerminated(Exception reason)
{
// TODO: multi delegation future with multiple sources but one target?
if(f2!=null)
if(f2!=null && !f2.isDone())
f2.terminate(reason);

if(termcom!=null)
termcom.execute(reason);

super.handleTerminated(reason);
}
});
Expand All @@ -471,7 +475,8 @@ public void handleTerminated(Exception reason)
f2.next(result->
{
((IntermediateFuture)ret).addIntermediateResult((T)result);
}).catchEx(exception -> {}); // Ignore exception (printed when no listener supplied)
})
.catchEx(exception -> {}); // Ignore exception (printed when no listener supplied)
}

return ret;
Expand Down
Expand Up @@ -51,6 +51,7 @@
import jadex.bridge.service.types.serialization.ISerializationServices;
import jadex.commons.SReflect;
import jadex.commons.SUtil;
import jadex.commons.TimeoutException;
import jadex.commons.Tuple2;
import jadex.commons.collection.LeaseTimeMap;
import jadex.commons.collection.MultiCollection;
Expand Down Expand Up @@ -130,6 +131,9 @@ public abstract class AbstractRestPublishService implements IWebPublishService
/** Http header for the client side timeout of calls (req). */
public static final String HEADER_JADEX_CLIENTTIMEOUT = "x-jadex-clienttimeout";

/** Http header for the client side to indicate that conversation is still alive/ongoing. */
public static final String HEADER_JADEX_ALIVE = "x-jadex-alive";

/** Http header to terminate the call (req). */
public static final String HEADER_JADEX_TERMINATE = "x-jadex-terminate";

Expand Down Expand Up @@ -483,6 +487,8 @@ public IFuture<Void> execute(IInternalAccess ia)
return;
}

pruneObsoleteConversations();

// https://stackoverflow.com/questions/14139753/httpservletrequest-getsessiontrue-thread-safe
// solution: always create on container thread and remember
//final Map<String, Object> session = getSession(request, true);
Expand All @@ -496,6 +502,8 @@ public IFuture<Void> execute(IInternalAccess ia)
String callid = request.getHeader(HEADER_JADEX_CALLID);
ri.setCallid(callid);

String alive = request.getHeader(HEADER_JADEX_ALIVE);

// check if it is a login request
String platformsecret = request.getHeader(HEADER_JADEX_LOGIN);
String logout = request.getHeader(HEADER_JADEX_LOGOUT);
Expand Down Expand Up @@ -535,6 +543,29 @@ else if(isloggedin!=null)
//writeResponse(ret, Response.Status.OK.getStatusCode(), callid, null, request, response, true, null);
writeResponse(ri.setResult(ret).setStatus(Response.Status.OK.getStatusCode()).setFinished(true));
}
else if(alive!=null)
{
ConversationInfo cinfo = conversationinfos.get(callid);
if(cinfo!=null)
{
boolean aliveb = Boolean.parseBoolean(alive);
if(aliveb)
{
System.out.println("timestamp updated: "+callid);
cinfo.updateTimestamp();
}
else
{
terminateConversation(cinfo, new RuntimeException("Terminated from client"));
}
writeResponse(ri.setStatus(Response.Status.OK.getStatusCode()).setFinished(true));
}
else
{
System.out.println("callid not found for alive: "+callid);
writeResponse(ri.setStatus(Response.Status.NOT_FOUND.getStatusCode()).setFinished(true));
}
}
else
{
// check if call is an intermediate result fetch
Expand All @@ -560,12 +591,14 @@ else if(isloggedin!=null)
//IAsyncContextInfo ctx = getAsyncContextInfo(request);
//saveRequestContext(callid, ctx);

if(!"true".equals(terminate))
terminateConversation(rinfo, null);

/*if(!"true".equals(terminate))
((ITerminableFuture)rinfo.getFuture()).terminate(new RuntimeException(terminate));
else
((ITerminableFuture)rinfo.getFuture()).terminate();
conversationinfos.remove(callid);
conversationinfos.remove(callid);*/

//if(callid.indexOf("subscribeToPlatforms")!=-1)
// System.out.println("Removed connection: "+callid);
Expand All @@ -582,7 +615,7 @@ else if(terminate!=null)
System.out.println("Future cannot be terminated: "+callid+" "+request);
writeResponse(ri.setStatus(Response.Status.NOT_FOUND.getStatusCode()).setFinished(true));
}

// Result already available?
/*else if(rinfo.checkForResult())
{
Expand Down Expand Up @@ -716,7 +749,8 @@ else if(mis!=null && mis.size()>0)
// System.out.println("added context: "+fcallid+""+ctx);

((IIntermediateFuture<Object>)ret)
.addResultListener(component.getFeature(IExecutionFeature.class).createResultListener(new IIntermediateFutureCommandResultListener<Object>()
.addResultListener(component.getFeature(IExecutionFeature.class)
.createResultListener(new IIntermediateFutureCommandResultListener<Object>()
{
public void resultAvailable(Collection<Object> result)
{
Expand Down Expand Up @@ -770,8 +804,9 @@ protected void handleResult(Object result, Throwable exception, Object command,
//if(max!=null)
//if(command==null)
// System.out.println("handleResult:"+result+", "+exception+", "+command+", sse:"+(session.getAttribute("sse")!=null));
if(command!=null)
return; // skipping commands (e.g. updatetimer)

//if(command!=null)
// return; // skipping commands (e.g. updatetimer)

/*if(exception instanceof FutureTerminatedException)
{
Expand All @@ -795,6 +830,13 @@ else if(exception!=null)

ri.setStatus(rescode);
}
else if(command!=null)
{
// Command is just a enum type for updatetimer in IForwardCommandFuture
//System.out.println("received command: "+command);
ri.setResult(command);
ri.setStatus(202);
}
else
{
ri.setResult(result);
Expand Down Expand Up @@ -1042,6 +1084,62 @@ public void exceptionOccurred(Exception exception)
}
}

/**
* Prune the timeouted conversations.
*/
protected void pruneObsoleteConversations()
{
for(Map.Entry<String, ConversationInfo> entry: conversationinfos.entrySet())
{
// TODO: which timeout? (client vs server).
if(System.currentTimeMillis() - entry.getValue().getTimestamp() > Starter.getDefaultTimeout(component.getId()))
{
// System.out.println("terminating due to timeout: "+exception);
//System.out.println("Conversation timed out: "+entry.getKey());

terminateConversation(entry.getValue(), null);

/*entry.getValue().setTerminated(true);
if(entry.getValue().getFuture() instanceof ITerminableFuture<?>)
{
System.out.println("Conversation timed out, terminated future: "+entry.getKey());
((ITerminableFuture<?>)entry.getValue().getFuture()).terminate(new TimeoutException());
}
else
{
// TODO: better handling of
// non-terminable futures?
//throw new TimeoutException();
System.out.println("WARNING: Conversation timed out and future cannot be terminated: "+entry.getKey());
}*/
}
}
}

/**
*
* @param cinfo
* @param ex
*/
protected void terminateConversation(ConversationInfo cinfo, Exception ex)
{
// Terminate the future if requested
cinfo.setTerminated(true);
if(cinfo.getFuture() instanceof ITerminableFuture)
{
if(ex!=null)
((ITerminableFuture)cinfo.getFuture()).terminate(ex);
else
((ITerminableFuture)cinfo.getFuture()).terminate();

conversationinfos.remove(cinfo);
}
else
{
System.out.println("WARNING: future cannot be terminated: "+cinfo);
}
}

/**
* Set the cors header in the response.
* @param response The response.
Expand All @@ -1055,7 +1153,6 @@ protected void setCORSHeader(HttpServletResponse response)
response.addHeader("Access-Control-Allow-Headers", "Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control");
}


/**
* Set the cache header in the response.
* @param response The response.
Expand Down Expand Up @@ -2145,6 +2242,9 @@ protected SSEEvent createSSEEvent(ResponseInfo ri)
*/
protected SSEEvent createSSEEvent(Object result, boolean finished, String callid, Integer max, String exceptiontype)
{
if(result!=null && result.toString().indexOf("Mes")!=-1)
System.out.println("here");

SSEEvent event = new SSEEvent();
// Wrap content in SSE event class to add Jadex meta info
event.setData(result).setFinished(finished).setCallId(callid).setMax(max).setExecptionType(exceptiontype);
Expand Down Expand Up @@ -3230,7 +3330,7 @@ public static class ConversationInfo

// to check time gap between last request from browser and current result
// if gap>timeout -> abort future as probably no browser listening any more
//protected long lastcheck;
protected long lastcheck;

protected IFuture<?> future;

Expand All @@ -3246,7 +3346,7 @@ public ConversationInfo(String sessionid)
this.sessionid = sessionid;
//this.mappingInfo = mappingInfo;
//this.future = future;
//this.lastcheck = System.currentTimeMillis();
this.lastcheck = updateTimestamp();
}

/**
Expand Down Expand Up @@ -3337,12 +3437,20 @@ public Object getResults()
}*/

/**
* Get the timestamp of the last check (i.e. last request from browser).
* /
* Renew the timestamp.
*/
public long updateTimestamp()
{
return lastcheck = System.currentTimeMillis();
}

/**
* Get the timestamp of the last check (i.e. last request from browser).
*/
public long getTimestamp()
{
return lastcheck;
}*/
}

/**
* Get the future.
Expand Down

0 comments on commit 1c97553

Please sign in to comment.