Skip to content

Commit

Permalink
fix status query subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ap-actoron committed Jun 3, 2021
1 parent e6e9eb1 commit f030ad6
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.Set;
import java.util.logging.Level;

import jadex.base.Starter;
import jadex.bridge.IComponentIdentifier;
import jadex.bridge.IComponentStep;
import jadex.bridge.IInternalAccess;
Expand All @@ -22,6 +21,7 @@
import jadex.bridge.service.ServiceScope;
import jadex.bridge.service.annotation.Service;
import jadex.bridge.service.search.IServiceRegistry;
import jadex.bridge.service.search.QueryEvent;
import jadex.bridge.service.search.ServiceEvent;
import jadex.bridge.service.search.ServiceNotFoundException;
import jadex.bridge.service.search.ServiceQuery;
Expand All @@ -38,6 +38,7 @@
import jadex.commons.future.IFuture;
import jadex.commons.future.ISubscriptionIntermediateFuture;
import jadex.commons.future.IntermediateEmptyResultListener;
import jadex.commons.future.SubscriptionIntermediateDelegationFuture;
import jadex.commons.future.SubscriptionIntermediateFuture;
import jadex.commons.future.TerminableIntermediateFuture;
import jadex.commons.future.TerminationCommand;
Expand Down Expand Up @@ -496,4 +497,18 @@ public void terminated(Exception reason)
SFuture.avoidCallTimeouts(reglis, agent);
return reglis;
}

/**
* Get registered queries.
* @return A stream of events for added/removed queries.
*/
public ISubscriptionIntermediateFuture<QueryEvent> subscribeToQueries()
{
System.out.println(agent+" subscribeToQueries");

ISubscriptionIntermediateFuture<QueryEvent> fut = serviceregistry.subscribeToQueries();
SubscriptionIntermediateDelegationFuture<QueryEvent> ret = new SubscriptionIntermediateDelegationFuture<QueryEvent>(fut);
SFuture.avoidCallTimeouts(ret, agent);
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import jadex.bridge.service.IService;
import jadex.bridge.service.IServiceIdentifier;
import jadex.bridge.service.ServiceScope;
import jadex.bridge.service.types.factory.IComponentFactory;
import jadex.commons.SUtil;
import jadex.commons.Tuple2;
import jadex.commons.future.Future;
Expand Down Expand Up @@ -57,7 +56,7 @@ public class ServiceRegistry implements IServiceRegistry // extends AbstractServ
protected Map<IServiceIdentifier, IService> localserviceproxies;

/** The query change subscribers. */
protected List<ISubscriptionIntermediateFuture<QueryEvent>> querysubs;
protected List<SubscriptionIntermediateFuture<QueryEvent>> querysubs;

//-------- methods --------

Expand Down Expand Up @@ -1075,7 +1074,16 @@ public ISubscriptionIntermediateFuture<QueryEvent> subscribeToQueries()
{
public void terminated(Exception reason)
{
querysubs.remove(fut);
rwlock.writeLock().lock();
try
{
querysubs.remove(fut);

}
finally
{
rwlock.writeLock().unlock();
}
}
});

Expand Down Expand Up @@ -1112,7 +1120,7 @@ protected void notifyQueryListeners(QueryEvent event)
try
{
qis = new ArrayList<>();
querysubs.addAll(qis);
qis.addAll(querysubs);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import jadex.bridge.IComponentIdentifier;
import jadex.bridge.service.annotation.Service;
import jadex.bridge.service.search.QueryEvent;
import jadex.commons.future.ISubscriptionIntermediateFuture;

/**
Expand All @@ -14,4 +15,10 @@ public interface ISuperpeerStatusService
* Get the clients that are currently registered to super peer.
*/
public ISubscriptionIntermediateFuture<IComponentIdentifier> getRegisteredClients();

/**
* Get registered queries.
* @return A stream of events for added/removed queries.
*/
public ISubscriptionIntermediateFuture<QueryEvent> subscribeToQueries();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import jadex.bridge.service.annotation.Service;
import jadex.bridge.service.search.QueryEvent;
import jadex.bridge.service.search.ServiceEvent;
import jadex.bridge.service.search.ServiceQuery;
import jadex.bridge.service.types.transport.PlatformData;
import jadex.commons.future.IFuture;
import jadex.commons.future.ISubscriptionIntermediateFuture;
Expand All @@ -32,10 +31,11 @@ public interface IStatusService
* Get registered queries of a given (set of) scope(s) or no scope for all queries.
* @return A list of queries.
*/
// No intermediate for easier REST?
// TODO: subscription in registry to get notified about new queries? -> please no polling!
public IFuture<Collection<ServiceQuery<?>>> getQueries(String... scope);
public ISubscriptionIntermediateFuture<QueryEvent> subscribeToQueries(String... scope);

// // No intermediate for easier REST?
// // TODO: subscription in registry to get notified about new queries? -> please no polling!
// public IFuture<Collection<ServiceQuery<?>>> getQueries(String... scope);

// /**
// * Get provided services of a given (set of) scope(s) or no scope for all services.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,19 @@
import jadex.base.Starter;
import jadex.bridge.IInternalAccess;
import jadex.bridge.SFuture;
import jadex.bridge.VersionInfo;
import jadex.bridge.service.IService;
import jadex.bridge.service.IServiceIdentifier;
import jadex.bridge.service.PublishInfo;
import jadex.bridge.service.ServiceScope;
import jadex.bridge.service.component.IRequiredServicesFeature;
import jadex.bridge.service.search.IServiceRegistry;
import jadex.bridge.service.search.QueryEvent;
import jadex.bridge.service.search.ServiceEvent;
import jadex.bridge.service.search.ServiceQuery;
import jadex.bridge.service.search.ServiceQueryInfo;
import jadex.bridge.service.search.ServiceRegistry;
import jadex.bridge.service.types.memstat.IMemstatService;
import jadex.bridge.service.types.publish.IPublishService;
import jadex.bridge.service.types.publish.IWebPublishService;
import jadex.bridge.service.types.registry.ISuperpeerService;
import jadex.bridge.service.types.registry.ISuperpeerStatusService;
import jadex.bridge.service.types.transport.ITransportInfoService;
import jadex.bridge.service.types.transport.PlatformData;
import jadex.commons.Boolean3;
Expand Down Expand Up @@ -132,8 +129,13 @@ public void resultAvailable(Void result)
*/
public ISubscriptionIntermediateFuture<QueryEvent> subscribeToQueries(String... scope)
{
// Delegate request to super peer registry, if any, or fail with SNFE otherwise
ISubscriptionIntermediateFuture<QueryEvent> fut = agent.getLocalService(
new ServiceQuery<ISuperpeerStatusService>(ISuperpeerStatusService.class).setScope(ServiceScope.PLATFORM))
.subscribeToQueries();

Set<String> scopes = scope==null ? null: new HashSet<String>(Arrays.asList(scope));
ISubscriptionIntermediateFuture<QueryEvent> ret = new SubscriptionIntermediateDelegationFuture<QueryEvent>(ServiceRegistry.getRegistry(agent.getId()).subscribeToQueries())
SubscriptionIntermediateDelegationFuture<QueryEvent> ret = new SubscriptionIntermediateDelegationFuture<QueryEvent>(fut)
{
@Override
public boolean doAddIntermediateResult(QueryEvent event, boolean undone)
Expand All @@ -149,6 +151,7 @@ public boolean doAddIntermediateResult(QueryEvent event, boolean undone)
return false;
}
};
SFuture.avoidCallTimeouts(ret, agent);
return ret;
}

Expand Down Expand Up @@ -218,28 +221,28 @@ public void resultAvailable(Collection<PlatformData> result)
return ret;
}

/**
* Get registered queries of a given (set of) scope(s) or no scope for all queries.
* @return A list of queries.
*/
// No intermediate for easier REST?
// TODO: subscription in registry to get notified about new queries? -> please no polling!
public IFuture<Collection<ServiceQuery<?>>> getQueries(String... scope)
{
Set<String> scopes = scope==null ? null: new HashSet<String>(Arrays.asList(scope));
IntermediateFuture<ServiceQuery<?>> ret = new IntermediateFuture<ServiceQuery<?>>();
IServiceRegistry reg = ServiceRegistry.getRegistry(agent.getId());
for(ServiceQueryInfo<?> sqi: reg.getAllQueries())
{
if(scopes==null || scopes.contains(sqi.getQuery().getScope().name().toLowerCase()))
{
ret.addIntermediateResult(sqi.getQuery());
}
}
ret.setFinished();

return ret;
}
// /**
// * Get registered queries of a given (set of) scope(s) or no scope for all queries.
// * @return A list of queries.
// */
// // No intermediate for easier REST?
// // TODO: subscription in registry to get notified about new queries? -> please no polling!
// public IFuture<Collection<ServiceQuery<?>>> getQueries(String... scope)
// {
// Set<String> scopes = scope==null ? null: new HashSet<String>(Arrays.asList(scope));
// IntermediateFuture<ServiceQuery<?>> ret = new IntermediateFuture<ServiceQuery<?>>();
// IServiceRegistry reg = ServiceRegistry.getRegistry(agent.getId());
// for(ServiceQueryInfo<?> sqi: reg.getAllQueries())
// {
// if(scopes==null || scopes.contains(sqi.getQuery().getScope().name().toLowerCase()))
// {
// ret.addIntermediateResult(sqi.getQuery());
// }
// }
// ret.setFinished();
//
// return ret;
// }

// /**
// * Get provided services of a given (set of) scope(s) or no scope for all services.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ app.controller('Queries', [ '$scope', '$http',
});
}
]);
app.controller('Queries', [ '$scope', '$http',
/*app.controller('Queries', [ '$scope', '$http',
function($scope, $http) {
$http.get('status/getQueries',
{params: {'scope': JSON.stringify(["global","network"])}, // Stringify otherwise angular adds multiple singlevalued parameter occurrences, grrr.
Expand All @@ -39,7 +39,7 @@ app.controller('Queries', [ '$scope', '$http',
$scope.queries = response.data;
});
}
]);
]);*/

/**
* Beautify cid representation for readability and sorting: platform (agent@platform).
Expand Down

0 comments on commit f030ad6

Please sign in to comment.