Skip to content

Commit

Permalink
Merge pull request #417 from ScorpioBroker/fixsub
Browse files Browse the repository at this point in the history
Fixsub
  • Loading branch information
ScorpioBroker committed Jul 27, 2023
2 parents b30f6ae + 56b2605 commit d3fb3f6
Show file tree
Hide file tree
Showing 19 changed files with 598 additions and 61 deletions.
2 changes: 1 addition & 1 deletion AllInOneRunner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<relativePath>../BrokerParent</relativePath>
</parent>
<artifactId>all-in-one-runner</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
update subscriptions set subscription=subscription-'https://uri.etsi.org/ngsi-ld/lastFailure '
2 changes: 1 addition & 1 deletion BrokerParent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<name>BrokerParent</name>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<compiler-plugin.version>3.11.0</compiler-plugin.version>
Expand Down
2 changes: 1 addition & 1 deletion Commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<relativePath>../BrokerParent</relativePath>
</parent>
<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,5 +433,8 @@ public interface NGSIConstants {
public static final String NGSI_LD_NULL = "urn:ngsi-ld:null";
public static final String HAS_VALUE_NULL = "[{https://uri.etsi.org/ngsi-ld/hasValue=[{@value=urn:ngsi-ld:null}]";
public static final String HAS_OBJECT_NULL = "[{https://uri.etsi.org/ngsi-ld/hasObject=[{@id=urn:ngsi-ld:null}]";
public static final String LANGUAGE_PROPERTY = "LanguageProperty";
public static final String LANGUAGE_MAP = "languageMap";

}

2 changes: 1 addition & 1 deletion EntityManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<relativePath>../BrokerParent</relativePath>
</parent>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Uni<RestResponse<Object>> createEntity(HttpServerRequest req, Map<String,
* Method(PATCH) for "/ngsi-ld/v1/entities/{entityId}/attrs" rest endpoint.
*
* @param entityId
* @param body json ld message
* @param body json ld message
* @return ResponseEntity object
*/

Expand All @@ -101,7 +101,7 @@ public Uni<RestResponse<Object>> updateEntity(HttpServerRequest req, @PathParam(
* Method(POST) for "/ngsi-ld/v1/entities/{entityId}/attrs" rest endpoint.
*
* @param entityId
* @param body jsonld message
* @param body jsonld message
* @return ResponseEntity object
*/

Expand Down Expand Up @@ -153,15 +153,15 @@ public Uni<RestResponse<Object>> partialUpdateAttribute(HttpServerRequest req,
logger.trace("update entry :: completed");
return HttpUtils.generateUpdateResultResponse(updateResult);
});
}).onFailure().recoverWithUni(t -> entityService.patchToEndPoint(entityId, req, body, attrib)
.onItem().transform(isEndPointExist -> {
if (isEndPointExist)
return RestResponse.noContent();
else {
return HttpUtils.handleControllerExceptions(t);
}
}).onFailure().recoverWithItem(HttpUtils::handleControllerExceptions));
}
}).onFailure().recoverWithUni(t -> entityService.patchToEndPoint(entityId, req, body, attrib).onItem()
.transform(isEndPointExist -> {
if (isEndPointExist)
return RestResponse.noContent();
else {
return HttpUtils.handleControllerExceptions(t);
}
}).onFailure().recoverWithItem(HttpUtils::handleControllerExceptions));
}

/**
* Method(DELETE) for "/ngsi-ld/v1/entities/{entityId}/attrs/{attrId}" rest
Expand Down Expand Up @@ -249,6 +249,10 @@ private void noConcise(Object object, Map<String, Object> parentMap, String keyO
parentMap.put(keyOfObject, newMap);

}
if (NGSIConstants.LANGUAGE_MAP.equals(keyOfObject) && parentMap != null
&& NGSIConstants.LANGUAGE_PROPERTY.equals(parentMap.get(NGSIConstants.TYPE))) {
return;
}

// Iterate through every element of Map
Object[] mapKeys = map.keySet().toArray();
Expand Down Expand Up @@ -305,37 +309,39 @@ public Uni<RestResponse<Object>> mergePatch(HttpServerRequest request, @PathPara
}).onFailure().recoverWithItem(HttpUtils::handleControllerExceptions);

}

@Path("/entities/{entityId}")
@PUT
public Uni<RestResponse<Object>> replaceEntity(@PathParam("entityId") String entityId, HttpServerRequest request, Map<String, Object> body) {
public Uni<RestResponse<Object>> replaceEntity(@PathParam("entityId") String entityId, HttpServerRequest request,
Map<String, Object> body) {
logger.debug("replacing entity");
try {
HttpUtils.validateUri(entityId);
} catch (Exception e) {
return Uni.createFrom().item(HttpUtils.handleControllerExceptions(e));
}
noConcise(body);
body.put(NGSIConstants.ID,entityId);
if(!body.containsKey(NGSIConstants.TYPE)){
return Uni.createFrom().item(HttpUtils.handleControllerExceptions(
body.put(NGSIConstants.ID, entityId);
if (!body.containsKey(NGSIConstants.TYPE)) {
return Uni.createFrom().item(HttpUtils.handleControllerExceptions(
new ResponseException(ErrorType.BadRequestData, "Type can not be null")));
}
return HttpUtils.expandBody(request, body, AppConstants.REPLACE_ENTITY_PAYLOAD, ldService).onItem()
.transformToUni(tuple -> {

return entityService.replaceEntity(HttpUtils.getTenant(request), tuple.getItem2(), tuple.getItem1()).onItem()
.transform(opResult -> {
return entityService.replaceEntity(HttpUtils.getTenant(request), tuple.getItem2(), tuple.getItem1())
.onItem().transform(opResult -> {

logger.debug("Done replacing entity");
return HttpUtils.generateUpdateResultResponse(opResult);
}).onFailure().recoverWithItem(HttpUtils::handleControllerExceptions);
});
}


@Path("/entities/{entityId}/attrs/{attrId}")
@PUT
public Uni<RestResponse<Object>> replaceAttribute(@PathParam("attrId") String attrId,@PathParam("entityId") String entityId, HttpServerRequest request, Map<String, Object> body) {
public Uni<RestResponse<Object>> replaceAttribute(@PathParam("attrId") String attrId,
@PathParam("entityId") String entityId, HttpServerRequest request, Map<String, Object> body) {
logger.debug("replacing Attrs");
try {
HttpUtils.validateUri(entityId);
Expand All @@ -346,8 +352,8 @@ public Uni<RestResponse<Object>> replaceAttribute(@PathParam("attrId") String at
return HttpUtils.expandBody(request, body, AppConstants.PARTIAL_UPDATE_REQUEST, ldService).onItem()
.transformToUni(tuple -> {
String finalAttrId = tuple.getItem1().expandIri(attrId, false, true, null, null);
return entityService.replaceAttribute(HttpUtils.getTenant(request), tuple.getItem2(), tuple.getItem1(),entityId,finalAttrId).onItem()
.transform(opResult -> {
return entityService.replaceAttribute(HttpUtils.getTenant(request), tuple.getItem2(),
tuple.getItem1(), entityId, finalAttrId).onItem().transform(opResult -> {
logger.debug("Done replacing attribute");
return HttpUtils.generateUpdateResultResponse(opResult);
}).onFailure().recoverWithItem(HttpUtils::handleControllerExceptions);
Expand Down
2 changes: 1 addition & 1 deletion HistoryEntityManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<relativePath>../BrokerParent</relativePath>
</parent>
<profiles>
Expand Down
2 changes: 1 addition & 1 deletion HistoryQueryManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<relativePath>../BrokerParent</relativePath>
</parent>
<profiles>
Expand Down
2 changes: 1 addition & 1 deletion QueryManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<relativePath>../BrokerParent</relativePath>
</parent>
<profiles>
Expand Down
7 changes: 7 additions & 0 deletions RELEASENOTES
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
RELEASE NOTES SCORPIO 4.1.1

This release

- adds support for the NGSI-LD replace functionality
- fixes an issue with the lastFailure and lastSuccess fields in subscriptions

RELEASE NOTES SCORPIO 4.1.0

With this release we are reintroducing native releases for Scorpio.
Expand Down
2 changes: 1 addition & 1 deletion RegistryManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<relativePath>../BrokerParent</relativePath>
</parent>
<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion RegistrySubscriptionManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<relativePath>../BrokerParent</relativePath>
</parent>
<profiles>
Expand Down
2 changes: 1 addition & 1 deletion SubscriptionManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>eu.neclab.ngsildbroker</groupId>
<artifactId>broker-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<relativePath>../BrokerParent</relativePath>
</parent>
<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,34 +70,33 @@ public Uni<RowSet<Row>> getSubscription(String tenant, String subscriptionId) {

public Uni<Void> updateNotificationSuccess(String tenant, String id, String date) {
return clientManager.getClient(tenant, false).onItem().transformToUni(client -> {
return client
.preparedQuery("UPDATE subscriptions SET subscription = subscription || ('{\""
+ NGSIConstants.NGSI_LD_TIMES_SENT + "\": [{\"" + NGSIConstants.JSON_LD_VALUE
+ "\": '|| (subscription#>>'{" + NGSIConstants.NGSI_LD_TIMES_SENT + ",0, "
+ NGSIConstants.JSON_LD_VALUE + "}')::integer + 1 ||'}],\""
+ NGSIConstants.NGSI_LD_LAST_SUCCESS + "\": [{\"" + NGSIConstants.JSON_LD_TYPE + "\": \""
+ NGSIConstants.NGSI_LD_DATE_TIME + "\", \"" + NGSIConstants.JSON_LD_VALUE
+ "\": \"$1\"}],\"" + NGSIConstants.NGSI_LD_LAST_NOTIFICATION + "\": [{\""
+ NGSIConstants.JSON_LD_TYPE + "\": \"" + NGSIConstants.NGSI_LD_DATE_TIME + "\", \""
+ NGSIConstants.JSON_LD_VALUE + "\": \"$1\"}]}')::jsonb WHERE subscription_id=$2")
.execute(Tuple.of(date, id)).onFailure().retry().atMost(3).onItem()
String sql = "UPDATE subscriptions SET subscription = jsonb_set(jsonb_set(jsonb_set(subscription, '{"
+ NGSIConstants.NGSI_LD_TIMES_SENT + "}', jsonb_build_array(jsonb_build_object('"
+ NGSIConstants.JSON_LD_VALUE + "', (subscription #>> '{" + NGSIConstants.NGSI_LD_TIMES_SENT + ",0,"
+ NGSIConstants.JSON_LD_VALUE + "}')::integer + 1)), true), '{" + NGSIConstants.NGSI_LD_LAST_SUCCESS
+ "}', jsonb_build_array(jsonb_build_object('" + NGSIConstants.JSON_LD_TYPE + "', '"
+ NGSIConstants.NGSI_LD_DATE_TIME + "', '" + NGSIConstants.JSON_LD_VALUE + "', $1)), true),'{"
+ NGSIConstants.NGSI_LD_LAST_NOTIFICATION + "}', jsonb_build_array(jsonb_build_object('"
+ NGSIConstants.JSON_LD_TYPE + "', '" + NGSIConstants.NGSI_LD_DATE_TIME + "', '"
+ NGSIConstants.JSON_LD_VALUE + "', $1)), true) WHERE subscription_id=$2";
return client.preparedQuery(sql).execute(Tuple.of(date, id)).onFailure().retry().atMost(3).onItem()
.transformToUni(t -> Uni.createFrom().voidItem());
});
}

public Uni<Void> updateNotificationFailure(String tenant, String id, String date) {
return clientManager.getClient(tenant, false).onItem().transformToUni(client -> client
.preparedQuery("UPDATE subscriptions SET subscription = subscription || ('{\""
+ NGSIConstants.NGSI_LD_TIMES_FAILED + "\": [{\"" + NGSIConstants.JSON_LD_VALUE
+ "\": '|| (subscription#>>'{" + NGSIConstants.NGSI_LD_TIMES_FAILED + ",0, "
+ NGSIConstants.JSON_LD_VALUE + "}')::integer + 1 ||'}],\"" + NGSIConstants.NGSI_LD_LAST_FAILURE
+ "\": [{\"" + NGSIConstants.JSON_LD_TYPE + "\": \"" + NGSIConstants.NGSI_LD_DATE_TIME
+ "\", \"" + NGSIConstants.JSON_LD_VALUE + "\": \"$1\"}],\""
+ NGSIConstants.NGSI_LD_LAST_NOTIFICATION + "\": [{\"" + NGSIConstants.JSON_LD_TYPE + "\": \""
+ NGSIConstants.NGSI_LD_DATE_TIME + "\", \"" + NGSIConstants.JSON_LD_VALUE
+ "\": \"$1\"}]}')::jsonb WHERE subscription_id=$2")
.execute(Tuple.of(date, id)).onFailure().retry().atMost(3).onItem()
.transformToUni(t -> Uni.createFrom().voidItem()));
String sql = "UPDATE subscriptions SET subscription = jsonb_set(jsonb_set(jsonb_set(subscription, '{"
+ NGSIConstants.NGSI_LD_TIMES_FAILED + "}', jsonb_build_array(jsonb_build_object('"
+ NGSIConstants.JSON_LD_VALUE + "', (subscription #>> '{" + NGSIConstants.NGSI_LD_TIMES_FAILED + ",0,"
+ NGSIConstants.JSON_LD_VALUE + "}')::integer + 1)), true), '{" + NGSIConstants.NGSI_LD_LAST_FAILURE
+ "}', jsonb_build_array(jsonb_build_object('" + NGSIConstants.JSON_LD_TYPE + "', '"
+ NGSIConstants.NGSI_LD_DATE_TIME + "', '" + NGSIConstants.JSON_LD_VALUE + "', $1)), true),'{"
+ NGSIConstants.NGSI_LD_LAST_NOTIFICATION + "}', jsonb_build_array(jsonb_build_object('"
+ NGSIConstants.JSON_LD_TYPE + "', '" + NGSIConstants.NGSI_LD_DATE_TIME + "', '"
+ NGSIConstants.JSON_LD_VALUE + "', $1)), true) WHERE subscription_id=$2";
return clientManager.getClient(tenant, false).onItem()
.transformToUni(client -> client.preparedQuery(sql).execute(Tuple.of(date, id)).onFailure().retry()
.atMost(3).onItem().transformToUni(t -> Uni.createFrom().voidItem()));
}

public Uni<List<Tuple3<String, Map<String, Object>, Map<String, Object>>>> loadSubscriptions() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
update subscriptions set subscription=subscription-'https://uri.etsi.org/ngsi-ld/lastFailure '

0 comments on commit d3fb3f6

Please sign in to comment.