Skip to content
This repository has been archived by the owner on Oct 20, 2022. It is now read-only.

Commit

Permalink
broker: add option to disable updateContext forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
marcc-orange committed Oct 19, 2015
1 parent 1f93bd9 commit 1c4cfc4
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public class Configuration {
@Value("${remote.authToken:}")
private String remoteAuthToken;

/**
* Disable forwarding updateContext requests to the remote broker
*
* All NGSI requests are forwarded by the Cepheus-Broker to the remote broker.
* Except:
* - subscribeContext/updateContextSubscription/unsubscribeContext (handled by the app)
* - updateContext (if
*/
@Value("${remote.forward.updateContext:true}")
private boolean remoteForwardUpdateContext = true;

public Configuration() {
}

Expand Down Expand Up @@ -91,6 +102,14 @@ public void setRemoteAuthToken(String remoteAuthToken) {
this.remoteAuthToken = remoteAuthToken;
}

public boolean isRemoteForwardUpdateContext() {
return remoteForwardUpdateContext;
}

public void setRemoteForwardUpdateContext(boolean remoteForwardUpdateContext) {
this.remoteForwardUpdateContext = remoteForwardUpdateContext;
}

/*
* Inject Orion-specific headers into the given HttpHeaders list
* @param httpHeaders
Expand All @@ -115,6 +134,7 @@ public String toString() {
", remoteServiceName='" + remoteServiceName + '\'' +
", remoteServicePath='" + remoteServicePath + '\'' +
", remoteAuthToken='" + remoteAuthToken + '\'' +
", remoteForwardUpdateContext=" + remoteForwardUpdateContext +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public UpdateContextResponse updateContext(final UpdateContext update) throws Ex

logger.debug("<= updateContext with entityId: {} and attributes: {} ", contextElement.getEntityId().toString(), attributesName);

/*
* If a registration matches the updateContext, the updateContext is forwarded to the corresponding providingURL.
* Else, the update is forwarded to the remote broker and the subscribers are notified.
*/

// Search registrations to forward updateContext
Iterator<URI> providingApplication = localRegistrations.findProvidingApplication(contextElement.getEntityId(), attributesName);
if (providingApplication.hasNext()) {
Expand All @@ -84,15 +89,17 @@ public UpdateContextResponse updateContext(final UpdateContext update) throws Ex
}

// Forward the update to the remote broker
final String brokerUrl = configuration.getRemoteUrl();
if (brokerUrl == null || brokerUrl.isEmpty()) {
logger.warn("No remote.url parameter defined to forward updateContext");
} else {
HttpHeaders httpHeaders = getRemoteBrokerHeaders(brokerUrl);
logger.debug("=> updateContext forwarded to remote broker {} with Content-Type {}", brokerUrl, httpHeaders.getContentType());
ngsiClient.updateContext(brokerUrl, httpHeaders, update).addCallback(
updateContextResponse -> logger.debug("UpdateContext completed for {} ", brokerUrl),
throwable -> logger.warn("UpdateContext failed for {}: {}", brokerUrl, throwable.toString()));
if (configuration.isRemoteForwardUpdateContext()) {
final String brokerUrl = configuration.getRemoteUrl();
if (brokerUrl == null || brokerUrl.isEmpty()) {
logger.warn("No remote.url parameter defined to forward updateContext");
} else {
HttpHeaders httpHeaders = getRemoteBrokerHeaders(brokerUrl);
logger.debug("=> updateContext forwarded to remote broker {} with Content-Type {}", brokerUrl, httpHeaders.getContentType());
ngsiClient.updateContext(brokerUrl, httpHeaders, update)
.addCallback(updateContextResponse -> logger.debug("UpdateContext completed for {} ", brokerUrl),
throwable -> logger.warn("UpdateContext failed for {}: {}", brokerUrl, throwable.toString()));
}
}

List<ContextElementResponse> contextElementResponseList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ public void checkPropertiesValues() throws MissingRemoteBrokerException {
assertEquals("gateway", configuration.getRemoteServiceName());
assertEquals("gateway1", configuration.getRemoteServicePath());
assertEquals("XXXXXXXXX", configuration.getRemoteAuthToken());
assertEquals(true, configuration.isRemoteForwardUpdateContext());
}

@Test
public void getHeadersForBroker(){
configuration.setRemoteServiceName("SERVICE");
configuration.setRemoteServicePath("PATH");
configuration.setRemoteAuthToken("TOKEN");
configuration.setRemoteForwardUpdateContext(false);

HttpHeaders httpHeaders = new HttpHeaders();
configuration.addRemoteHeaders(httpHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public void setup() throws Exception {
when(configuration.getRemoteServiceName()).thenReturn(null);
when(configuration.getRemoteServicePath()).thenReturn(null);
when(configuration.getRemoteAuthToken()).thenReturn(null);
when(configuration.isRemoteForwardUpdateContext()).thenReturn(true);
when(updateContextResponseListenableFuture.get()).thenReturn(createUpdateContextResponseTempSensorAndPressure());
doNothing().when(updateContextResponseListenableFuture).addCallback(any(), any());
when(queryContextResponseListenableFuture.get()).thenReturn(createQueryContextResponseTemperature());
Expand Down Expand Up @@ -242,6 +243,7 @@ public void postUpdateContextWithoutProvidingApplication() throws Exception {
.andExpect(MockMvcResultMatchers.jsonPath("$.contextElementResponses[0].contextElement.attributes[1].value").value("1015"))
.andExpect(MockMvcResultMatchers.jsonPath("$.contextElementResponses[0].statusCode.code").value("200"));

verify(configuration, atLeastOnce()).isRemoteForwardUpdateContext();

//Capture attributes (Set<String> searchAttributes) when findProvidingApplication is called on localRegistrations Set<String> searchAttributes
verify(localRegistrations).findProvidingApplication(entityIdArgumentCaptor.capture(), attributeArgumentCaptor.capture());
Expand Down Expand Up @@ -280,6 +282,31 @@ public void postUpdateContextWithoutProvidingApplication() throws Exception {
verify(ngsiClient, never()).notifyContext(any(), any(), any());
}

@Test
public void checkRemoteBrokerNotCalledWhenForwardIsDisabled() throws Exception {
// Disable updateContext forwarding to remote broker
when(configuration.isRemoteForwardUpdateContext()).thenReturn(false);

//localRegistrations mock return always without providingApplication
when(providingApplication.hasNext()).thenReturn(false);
when(localRegistrations.findProvidingApplication(any(), any())).thenReturn(providingApplication);
//subscriptions mock return always without matched subscriptions
when(matchedSubscriptions.hasNext()).thenReturn(false);
when(subscriptions.findSubscriptions(any(), any())).thenReturn(matchedSubscriptions);

//ngsiclient mock return always createUpdateContextREsponseTemperature when call updateContext
when(ngsiClient.updateContext(any(), any(), any())).thenReturn(updateContextResponseListenableFuture);

mockMvc.perform(post("/v1/updateContext")
.content(json(mapper, createUpdateContextTempSensorAndPressure()))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$.contextElementResponses[0].statusCode.code").value("200"));

verify(configuration, atLeastOnce()).isRemoteForwardUpdateContext();
verify(ngsiClient, never()).updateContext(eq("http://orionhost:9999"), any(), any());
}

@Test
public void postUpdateContextWithoutProvidingApplicationAndWithoutRemoteBrokerButWithNotify() throws Exception {

Expand Down
5 changes: 4 additions & 1 deletion cepheus-broker/src/test/resources/test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ remote.servicePath=gateway1
# remote broker OAuth token for secured brokers
remote.authToken=XXXXXXXXX

spring.datasource.url=jdbc:sqlite::memory:
# forwading updateContext to remote broker
remote.forward.updateContext=true

spring.datasource.url=jdbc:sqlite::memory:

0 comments on commit 1c4cfc4

Please sign in to comment.