Skip to content

Commit

Permalink
Merge branch 'development-quarkus' of github.com:ScorpioBroker/Scorpi…
Browse files Browse the repository at this point in the history
…oBroker into development-quarkus
  • Loading branch information
ScorpioBroker committed Aug 16, 2023
2 parents 8b982ec + 7a48152 commit 28003e1
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
CREATE OR REPLACE FUNCTION MERGE_JSON(a text, b JSONB)
RETURNS JSONB AS $$
DECLARE
merged_json JSONB;
key TEXT;
value JSONB;
value2 JSONB;
previous_entity JSONB;
BEGIN
merged_json := (Select entity from entity where id =a);
previous_entity := (Select entity from entity where id =a);
-- Iterate through keys in JSON B
FOR key, value IN SELECT * FROM JSONB_EACH(b)
LOOP
IF value::TEXT LIKE '%"https://uri.etsi.org/ngsi-ld/hasValue": [{"@value": "urn:ngsi-ld:null"}]%'::TEXT
OR value::TEXT LIKE '%"https://uri.etsi.org/ngsi-ld/hasObject": [{"@id": "urn:ngsi-ld:null"}]%'::TEXT
THEN
-- Delete the key
merged_json := merged_json - key;
ELSIF merged_json ? key THEN
-- Update the value
raise notice '%', 'update';
value2 := (value->0)::jsonb ;
IF jsonb_typeof(value2) = 'object' THEN
value2 :=value2 - 'https://uri.etsi.org/ngsi-ld/createdAt';
end if;
merged_json := jsonb_set(merged_json, ARRAY[key], jsonb_build_array(value2), true);
IF previous_entity->key->0 ? 'https://uri.etsi.org/ngsi-ld/createdAt' then
merged_json := jsonb_set(merged_json, ARRAY[key,'0','https://uri.etsi.org/ngsi-ld/createdAt'], (previous_entity->key->0->'https://uri.etsi.org/ngsi-ld/createdAt'), true);
end if;
ELSE
-- Add the key-value pair
raise notice '%', 'add';
merged_json := jsonb_set(merged_json, ARRAY[key], value, true);
END IF;

END LOOP;
merged_json := jsonb_strip_nulls(replace(merged_json::text,'"urn:ngsi-ld:null"','null')::jsonb);
merged_json := regexp_replace(merged_json::text, '{"@language": "[^"]*"}', 'null', 'g')::jsonb;
while merged_json::text like '%[]%'
or merged_json::text like '%{}%'
or merged_json::text like '%null%' loop
merged_json := jsonb_strip_nulls(replace(merged_json::text,'null,','')::jsonb);
merged_json := jsonb_strip_nulls(replace(merged_json::text,', null','')::jsonb);
merged_json := jsonb_strip_nulls(replace(merged_json::text,'[null]','null')::jsonb);
merged_json := jsonb_strip_nulls(replace(merged_json::text,'[]','null')::jsonb);
merged_json := jsonb_strip_nulls(replace(merged_json::text,'{}','null')::jsonb);
end loop;
update entity set entity = merged_json, e_types = ARRAY(SELECT jsonb_array_elements_text(merged_json->'@type')) where id = a;
RETURN previous_entity;
END;
$$ LANGUAGE plpgsql;
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private static void validateSub(Subscription subscription, boolean update) throw
public static NotificationParam getNotificationParam(Map<String, Object> map, Context context) throws Exception {
// Default accept
String accept = AppConstants.NGB_APPLICATION_JSONLD;
Format format = Format.normalized;
Format format = Format.normalized;
Set<String> watchedAttribs = Sets.newHashSet();
String mqttVersion = null;
Integer qos = null;
Expand Down Expand Up @@ -390,8 +390,10 @@ public static NotificationParam getNotificationParam(Map<String, Object> map, Co
String formatString = (String) ((List<Map<String, Object>>) entry.getValue()).get(0)
.get(NGSIConstants.JSON_LD_VALUE);
if (formatString.equalsIgnoreCase("keyvalues")) {
format = Format.keyValues;
}
format = Format.keyValues;
}if(formatString.equalsIgnoreCase("concise")){
format =Format.concise;
}
break;
case NGSIConstants.NGSI_LD_SHOWCHANGES:
notifyParam.setShowChanges(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class BaseRequest {
private long sendTimestamp = System.currentTimeMillis();

private String id;
private Map<String, Object> previousEntity;
protected Map<String, Object> previousEntity;
protected String attribName;
protected String datasetId;
protected boolean deleteAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class CreateCSourceRequest extends CSourceRequest {
public CreateCSourceRequest(String tenant, Map<String, Object> resolved) {
super(tenant, (String) resolved.get(NGSIConstants.JSON_LD_ID), resolved,
AppConstants.CREATE_REQUEST);
EntityTools.addSysAttrs(getPayload(), getSendTimestamp(),getRequestType());
EntityTools.addSysAttrs(getPayload(), getSendTimestamp());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
* @created 11-Jun-2018 11:13:22
*/
public enum Format {
keyValues, normalized
keyValues, normalized,concise
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,9 @@ public static void removeAttrs(Map<String, Object> localEntity, Set<String> attr

}

public static Map<String, Object> addSysAttrs(Map<String, Object> resolved, long timeStamp, int payloadType) {
public static Map<String, Object> addSysAttrs(Map<String, Object> resolved, long timeStamp) {
String now = SerializationTools.formatter.format(Instant.ofEpochMilli(timeStamp));
if (List.of(AppConstants.CREATE_REQUEST, AppConstants.UPSERT_REQUEST, AppConstants.CREATE_TEMPORAL_REQUEST,
AppConstants.CREATE_SUBSCRIPTION_REQUEST, AppConstants.CSOURCE_REG_CREATE_PAYLOAD)
.contains(payloadType)) {
setTemporalProperties(resolved, now, now, false);
} else
setTemporalProperties(resolved, "", now, false);
return resolved;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ public static Uni<RestResponse<Object>> generateEntityResult(List<Object> contex
});
}

private static void makeConcise(Object compacted) {
public static void makeConcise(Object compacted) {
makeConcise(compacted, null, null);
}

Expand All @@ -490,12 +490,12 @@ private static void makeConcise(Object compacted, Map<?, ?> parent, String key)
((Map<String, Object>) parent).put(key, map.get(NGSIConstants.VALUE));
}
}
map.forEach((str, nestedObj) -> {
if (!str.equals(NGSIConstants.JSON_LD_CONTEXT) && nestedObj instanceof Map<?, ?>
|| nestedObj instanceof ArrayList<?>) {
makeConcise(nestedObj, map, str.toString());
}
});
for(Object k : map.keySet()){
if (!k.equals(NGSIConstants.JSON_LD_CONTEXT) && map.get(k) instanceof Map<?, ?>
|| map.get(k) instanceof ArrayList<?>) {
makeConcise(map.get(k) , map, k.toString());
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import eu.neclab.ngsildbroker.commons.datatypes.requests.subscription.SubscriptionRequest;
import eu.neclab.ngsildbroker.commons.datatypes.terms.GeoQueryTerm;
import eu.neclab.ngsildbroker.commons.enums.ErrorType;
import eu.neclab.ngsildbroker.commons.enums.Format;
import eu.neclab.ngsildbroker.commons.exceptions.ResponseException;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple4;
Expand Down Expand Up @@ -217,7 +218,10 @@ public static Uni<Map<String, Object>> generateNotification(SubscriptionRequest
.getOrDefault(JsonLdConsts.GRAPH, List.of(compacted));
int acceptHeader = HttpUtils.parseAcceptHeader(
List.of(potentialSub.getSubscription().getNotification().getEndPoint().getAccept()));
switch (acceptHeader) {
if(potentialSub.getSubscription().getNotification().getFormat()==Format.concise){
HttpUtils.makeConcise(compacted);
}
switch (acceptHeader) {
case 1:
data.forEach(entry -> entry.remove(NGSIConstants.JSON_LD_CONTEXT));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ private NGSILDOperationResult getResult(List<?> list) {

private Uni<NGSILDOperationResult> updateLocalEntity(UpdateEntityRequest request, Context context) {
return entityDAO.updateEntity(request).onItem().transform(notAppended -> {
request.setPreviousEntity(notAppended);
entityEmitter.sendAndForget(request);
NGSILDOperationResult localResult = new NGSILDOperationResult(AppConstants.CREATE_REQUEST, request.getId());
localResult.addSuccess(new CRUDSuccess(null, null, null, request.getPayload(), context));
Expand Down Expand Up @@ -814,7 +815,7 @@ private Tuple2<Map<String, Object>, Collection<Tuple2<RemoteHost, Map<String, Ob
if (originalScopes != null) {
toStore.put(NGSIConstants.NGSI_LD_SCOPE, originalScopes);
}
EntityTools.addSysAttrs(toStore, request.getSendTimestamp(), request.getRequestType());
EntityTools.addSysAttrs(toStore, request.getSendTimestamp());
}
return Tuple2.of(toStore, cId2RemoteHostEntity.values());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ private Tuple2<Map<String, Object>, Collection<Tuple2<RemoteHost, Map<String, Ob
if (originalScopes != null) {
toStore.put(NGSIConstants.NGSI_LD_SCOPE, originalScopes);
}
EntityTools.addSysAttrs(toStore, request.getSendTimestamp(), request.getRequestType());
EntityTools.addSysAttrs(toStore, request.getSendTimestamp());
}
return Tuple2.of(toStore, cId2RemoteHostEntity.values());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ public Uni<NGSILDOperationResult> createSubscription(String tenant, Map<String,
} catch (ResponseException e) {
return Uni.createFrom().failure(e);
}
if(request.getId()==null){
String id = "urn:"+UUID.randomUUID().toString();
request.setId(id);
}
SubscriptionTools.setInitTimesSentAndFailed(request);
Map<String, Object> tmp = request.getContext().serialize();
return localContextService.createImplicitly(tenant, tmp).onItem().transformToUni(contextId -> {
Expand Down Expand Up @@ -319,8 +323,9 @@ public Uni<Void> checkSubscriptions(BaseRequest message) {
if (potentialSub.getSubscription().getNotification().getShowChanges()) {
payload.put(JsonLdConsts.GRAPH,
List.of(compareMaps(message.getPreviousEntity(), entityList.get(0))));
} else
} else {
payload.put(JsonLdConsts.GRAPH, entityList);
}
return sendNotification(potentialSub, payload, message.getRequestType());
}));
}
Expand Down

0 comments on commit 28003e1

Please sign in to comment.