Skip to content

Commit

Permalink
some more fixes and more debug logging
Browse files Browse the repository at this point in the history
  • Loading branch information
ScorpioBroker committed Aug 16, 2023
1 parent f903919 commit 1bf1de0
Showing 1 changed file with 44 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Uni<Boolean> createHistoryEntity(CreateHistoryEntityRequest request) {
if (scope == null) {
sql += ") VALUES($1, $2, $3::text::timestamp, $4::text::timestamp) ";
} else {
sql += ", scopes) VALUES($1, $2, $3::text::timestamp, $4::text::timestamp, getScopes($5)) ";
sql += ", scopes) VALUES($1, $2, $3::text::timestamp, $4::text::timestamp, getScopes($5::jsonb)) ";
}

sql += "ON CONFLICT(id) DO UPDATE SET e_types = ARRAY(SELECT DISTINCT UNNEST("
Expand All @@ -82,7 +82,7 @@ public Uni<Boolean> createHistoryEntity(CreateHistoryEntityRequest request) {
if (scope == null) {
sql += ", scopes = null ";
} else {
sql += ", scopes = getScopes($5) ";
sql += ", scopes = getScopes($5::jsonb) ";
}

sql += "RETURNING (" + DBConstants.DBTABLE_TEMPORALENTITY + ".modifiedat = "
Expand Down Expand Up @@ -144,18 +144,29 @@ public Uni<Void> batchUpsertHistoryEntity(BatchRequest request) {
geoLocation = new JsonObject(tmp.get(0).get(NGSIConstants.NGSI_LD_HAS_VALUE).get(0));
}
if (payload.containsKey(NGSIConstants.JSON_LD_TYPE)) {
Tuple tuple = Tuple.of(entityId,
((List<String>) payload.remove(NGSIConstants.JSON_LD_TYPE)).toArray(new String[0]),
((List<Map<String, String>>) payload.remove(NGSIConstants.NGSI_LD_CREATED_AT)).get(0)
.get(NGSIConstants.JSON_LD_VALUE),
((List<Map<String, String>>) payload.remove(NGSIConstants.NGSI_LD_MODIFIED_AT)).get(0)
.get(NGSIConstants.JSON_LD_VALUE));
Tuple tuple = Tuple.of(entityId);

tuple.addArrayOfString(
((List<String>) payload.remove(NGSIConstants.JSON_LD_TYPE)).toArray(new String[0]));
if (payload.containsKey(NGSIConstants.NGSI_LD_CREATED_AT)) {
tuple.addString(((List<Map<String, String>>) payload.remove(NGSIConstants.NGSI_LD_CREATED_AT))
.get(0).get(NGSIConstants.JSON_LD_VALUE));
} else {
tuple.addString(null);
}
if (payload.containsKey(NGSIConstants.NGSI_LD_MODIFIED_AT)) {
tuple.addString(((List<Map<String, String>>) payload.remove(NGSIConstants.NGSI_LD_MODIFIED_AT))
.get(0).get(NGSIConstants.JSON_LD_VALUE));
} else {
tuple.addString(null);
}
if (payload.containsKey(NGSIConstants.NGSI_LD_SCOPE)) {
tuple.addJsonArray(
new JsonArray((List<Map<String, String>>) payload.remove(NGSIConstants.NGSI_LD_SCOPE)));
} else {
tuple.addJsonArray(null);
}
logger.debug("batch no type" + tuple.deepToString());
batchType.add(tuple);

} else {
Expand All @@ -170,6 +181,7 @@ public Uni<Void> batchUpsertHistoryEntity(BatchRequest request) {
tuple.addJsonArray(null);
}
tuple.addString(entityId);
logger.debug("batch no type" + tuple.deepToString());
batchNoType.add(tuple);
}
List<Tuple> attribsToFill;
Expand All @@ -184,18 +196,21 @@ public Uni<Void> batchUpsertHistoryEntity(BatchRequest request) {
for (Map<String, Object> attribEntry : entries) {
attribEntry.put(NGSIConstants.NGSI_LD_INSTANCE_ID, List
.of(Map.of(NGSIConstants.JSON_LD_ID, "instanceid:" + UUID.randomUUID().toString())));
Tuple tuple;
if (location != null) {
attribsToFill
.add(Tuple.of(entityId, entry.getKey(), new JsonObject(attribEntry), geoLocation));
tuple = Tuple.of(entityId, entry.getKey(), new JsonObject(attribEntry), geoLocation);
attribsToFill.add(tuple);
} else {
attribsToFill.add(Tuple.of(entityId, entry.getKey(), new JsonObject(attribEntry)));
tuple = Tuple.of(entityId, entry.getKey(), new JsonObject(attribEntry));
attribsToFill.add(tuple);
}
logger.debug("attrib tuple " + tuple.deepToString());

}
}
}
String typeSql = "INSERT INTO " + DBConstants.DBTABLE_TEMPORALENTITY
+ " (id, e_types, createdat, modifiedat, scopes) VALUES($1, $2, $3::text::timestamp, $4::text::timestamp, getScopes($5)) "
+ " (id, e_types, createdat, modifiedat, scopes) VALUES($1, $2, $3::text::timestamp, $4::text::timestamp, getScopes($5::jsonb)) "
+ "ON CONFLICT(id) DO UPDATE SET e_types = ARRAY(SELECT DISTINCT UNNEST("
+ DBConstants.DBTABLE_TEMPORALENTITY
+ ".e_types || EXCLUDED.e_types)), modifiedat = EXCLUDED.modifiedat, ";
Expand All @@ -208,26 +223,29 @@ public Uni<Void> batchUpsertHistoryEntity(BatchRequest request) {
typeSql += "RETURNING (modifiedat = createdat)";
List<Uni<RowSet<Row>>> tmpList = new ArrayList<>(2);
if (!batchType.isEmpty()) {
logger.debug("batch type" + typeSql);
tmpList.add(client.preparedQuery(typeSql).executeBatch(batchType));
}
if (!batchNoType.isEmpty()) {
tmpList.add(client.preparedQuery("UPDATE " + DBConstants.DBTABLE_TEMPORALENTITY
+ " SET modifiedat = $1, scopes = CASE WHEN $2 IS NULL THEN scopes ELSE getScopes($2) END WHERE id=$3")
.executeBatch(batchNoType));
String noTypeSql = "UPDATE " + DBConstants.DBTABLE_TEMPORALENTITY
+ " SET modifiedat = $1::text::timestamp, scopes = CASE WHEN $2 IS NULL THEN scopes ELSE getScopes($2::jsonb) END WHERE id=$3";
logger.debug(noTypeSql);
tmpList.add(client.preparedQuery(noTypeSql).executeBatch(batchNoType));
}
return Uni.combine().all().unis(tmpList).combinedWith(l -> l).onItem().transformToUni(l -> {
List<Uni<RowSet<Row>>> attribList = Lists.newArrayList();
if (batchAttribs.isEmpty()) {
attribList.add(client
.preparedQuery("INSERT INTO " + DBConstants.DBTABLE_TEMPORALENTITY_ATTRIBUTEINSTANCE
+ " (temporalentity_id, attributeid, data) VALUES ($1, $2, $3::jsonb)")
.executeBatch(batchAttribs));
String sql;
if (!batchAttribs.isEmpty()) {
sql = "INSERT INTO " + DBConstants.DBTABLE_TEMPORALENTITY_ATTRIBUTEINSTANCE
+ " (temporalentity_id, attributeid, data) VALUES ($1, $2, $3::jsonb)";
logger.debug("batch " + sql);
attribList.add(client.preparedQuery(sql).executeBatch(batchAttribs));
}
if (batchAttribsWtihLocation.isEmpty()) {
attribList.add(client.preparedQuery("INSERT INTO "
+ DBConstants.DBTABLE_TEMPORALENTITY_ATTRIBUTEINSTANCE
+ " (temporalentity_id, attributeid, data) VALUES ($1, $2, $3::jsonb, ST_SetSRID(ST_GeomFromGeoJSON(getGeoJson($4))))")
.executeBatch(batchAttribs));
if (!batchAttribsWtihLocation.isEmpty()) {
sql = "INSERT INTO " + DBConstants.DBTABLE_TEMPORALENTITY_ATTRIBUTEINSTANCE
+ " (temporalentity_id, attributeid, data) VALUES ($1, $2, $3::jsonb, ST_SetSRID(ST_GeomFromGeoJSON(getGeoJson($4)), 4326))";
logger.debug("batch location " + sql);
attribList.add(client.preparedQuery(sql).executeBatch(batchAttribsWtihLocation));
}
return Uni.combine().all().unis(attribList).combinedWith(l2 -> l2).onItem().transform(l1 -> l1);

Expand Down Expand Up @@ -282,7 +300,7 @@ public Uni<Void> appendToHistoryEntity(AppendHistoryEntityRequest request) {
dollarCount++;
}
if (payload.containsKey(NGSIConstants.NGSI_LD_SCOPE)) {
sql += ", SET scopes = getScopes($" + dollarCount + ")";
sql += ", SET scopes = getScopes($" + dollarCount + "::jsonb)";
dollarCount++;
tuple.addJsonArray(
new JsonArray((List<Map<String, String>>) payload.remove(NGSIConstants.NGSI_LD_SCOPE)));
Expand Down

0 comments on commit 1bf1de0

Please sign in to comment.