Skip to content

Commit

Permalink
[#3517] Support searching for gateway devices
Browse files Browse the repository at this point in the history
Implement gateway filter for JDBC (H2 & Postgresql) device registry:
- add optional parameter "isGateway"
- functionality for listing only devices or gateways
- add documentation

Also-by: Matthias Feurer <m.feurer@sotec.eu>
Signed-off-by: Georgios Dimitropoulos <g.dimitropoulos@sotec.eu>
  • Loading branch information
gdimitropoulos-sotec committed Nov 22, 2023
1 parent 1be832d commit 7ceb17f
Show file tree
Hide file tree
Showing 17 changed files with 409 additions and 172 deletions.
Expand Up @@ -102,10 +102,7 @@ private Map<String, Object> getAttributes(final PubSubBasedCommand command) {
attributes.put(PubSubMessageHelper.PUBSUB_PROPERTY_PROJECT_ID, projectId);
attributes.put(PubSubMessageHelper.PUBSUB_PROPERTY_RESPONSE_REQUIRED, !command.isOneWay());
Optional.ofNullable(command.getGatewayId()).ifPresent(
id -> {
attributes.put(MessageHelper.APP_PROPERTY_GATEWAY_ID, id);
attributes.put(MessageHelper.APP_PROPERTY_CMD_VIA, id);
});
id -> attributes.put(MessageHelper.APP_PROPERTY_GATEWAY_ID, id));
return attributes;
}
}
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2019, 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2019, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -118,6 +118,11 @@ public final class RegistryManagementConstants extends RequestResponseApiConstan
*/
public static final String PARAM_SORT_JSON = "sortJson";

/**
* The name of the boolean filter query parameter for searching gateways or only devices.
*/
public static final String PARAM_IS_GATEWAY = "isGateway";


// DEVICES

Expand Down
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -83,6 +83,15 @@ public abstract class AbstractHttpEndpoint<T extends ServiceConfigProperties> ex
}
};

/**
* A function that tries to parse a string into an Optional boolean.
* It takes a valid boolean string value (e.g "true", "false") and returns an Optional of boolean type,
* if input string can be parsed as boolean else Optional.empty.
*/
protected static final Function<String, Optional<Boolean>> CONVERTER_BOOLEAN = s -> {
return Strings.isNullOrEmpty(s) ? Optional.empty() : Optional.of(Boolean.valueOf(s));
};

/**
* The configuration properties for this endpoint.
*/
Expand Down
Expand Up @@ -97,9 +97,13 @@ public class TableManagementStore extends AbstractDeviceStore {
private final Statement updateDeviceVersionStatement;

private final Statement countDevicesOfTenantStatement;
private final Statement countDevicesWithFilter;
private final Statement countDevicesWithFilterStatement;
private final Statement countGatewaysOfTenantStatement;
private final Statement countOnlyDevicesOfTenantStatement;

private final Statement findDevicesStatement;
private final Statement findDevicesOfTenantStatement;
private final Statement findGatewaysOfTenantStatement;
private final Statement findOnlyDevicesOfTenantStatement;
private final Statement findDevicesOfTenantWithFilterStatement;

/**
Expand Down Expand Up @@ -204,20 +208,48 @@ public TableManagementStore(final JDBCClient client, final Tracer tracer, final
.validateParameters(
TENANT_ID);

this.countDevicesWithFilter = cfg
this.countGatewaysOfTenantStatement = cfg
.getRequiredStatement("countGatewaysOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID);

this.countOnlyDevicesOfTenantStatement = cfg
.getRequiredStatement("countOnlyDevicesOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID);

this.countDevicesWithFilterStatement = cfg
.getRequiredStatement("countDevicesOfTenantWithFilter")
.validateParameters(
TENANT_ID,
FIELD,
VALUE);

this.findDevicesStatement = cfg
this.findDevicesOfTenantStatement = cfg
.getRequiredStatement("findDevicesOfTenant")
.validateParameters(
TENANT_ID,
PAGE_SIZE,
PAGE_OFFSET);

this.findOnlyDevicesOfTenantStatement = cfg
.getRequiredStatement("findOnlyDevicesOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID,
PAGE_SIZE,
PAGE_OFFSET);

this.findGatewaysOfTenantStatement = cfg
.getRequiredStatement("findGatewaysOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID,
PAGE_SIZE,
PAGE_OFFSET);

this.findDevicesOfTenantWithFilterStatement = cfg
.getRequiredStatement("findDevicesOfTenantWithFilter")
.validateParameters(
Expand All @@ -228,46 +260,6 @@ public TableManagementStore(final JDBCClient client, final Tracer tracer, final
PAGE_OFFSET);
}

private static Future<Object> checkUpdateOutcome(final UpdateResult updateResult) {

if (updateResult.getUpdated() < 0) {
// conflict
log.debug("Optimistic lock broke");
return Future.failedFuture(new OptimisticLockingException());
}

return Future.succeededFuture();

}

private static Future<String> extractVersionForUpdate(final ResultSet device, final Optional<String> resourceVersion) {
final Optional<String> version = device.getRows(true).stream().map(o -> o.getString(VERSION)).findAny();

if (version.isEmpty()) {
log.debug("No version or no row found -> entity not found");
return Future.failedFuture(new EntityNotFoundException());
}

final var currentVersion = version.get();

return resourceVersion
// if we expect a certain version
.<Future<String>>map(expected -> {
// check ...
if (expected.equals(currentVersion)) {
// version matches, continue with current version
return Future.succeededFuture(currentVersion);
} else {
// version does not match, abort
return Future.failedFuture(new OptimisticLockingException());
}
}
)
// if we don't expect a version, continue with the current
.orElseGet(() -> Future.succeededFuture(currentVersion));

}

/**
* Read a device and lock it for updates.
* <p>
Expand Down Expand Up @@ -850,6 +842,46 @@ private <T> Future<T> recoverNotFound(final Span span, final Throwable err, fina
}
}

private static Future<Object> checkUpdateOutcome(final UpdateResult updateResult) {

if (updateResult.getUpdated() < 0) {
// conflict
log.debug("Optimistic lock broke");
return Future.failedFuture(new OptimisticLockingException());
}

return Future.succeededFuture();

}

private static Future<String> extractVersionForUpdate(final ResultSet device, final Optional<String> resourceVersion) {
final Optional<String> version = device.getRows(true).stream().map(o -> o.getString("version")).findAny();

if (version.isEmpty()) {
log.debug("No version or no row found -> entity not found");
return Future.failedFuture(new EntityNotFoundException());
}

final var currentVersion = version.get();

return resourceVersion
// if we expect a certain version
.<Future<String>>map(expected -> {
// check ...
if (expected.equals(currentVersion)) {
// version matches, continue with current version
return Future.succeededFuture(currentVersion);
} else {
// version does not match, abort
return Future.failedFuture(new OptimisticLockingException());
}
}
)
// if we don't expect a version, continue with the current
.orElseGet(() -> Future.succeededFuture(currentVersion));

}

/**
* Get all credentials for a device.
* <p>
Expand Down Expand Up @@ -925,25 +957,43 @@ private List<CommonCredential> parseCredentials(final ResultSet result) {
* @param pageSize The page size.
* @param pageOffset The page offset.
* @param filters The list of filters (currently only the first value of the list is used).
* Will be ignored if parameter isGateway is being used.
* @param isGateway Optional filter for searching only gateways or only devices.
* If given parameter is Optional.empty() result will contain both gateways and devices.
* @param spanContext The span to contribute to.
* @return A future containing devices.
*/
public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, final int pageSize, final int pageOffset, final List<Filter> filters,
public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, final int pageSize, final int pageOffset, final List<Filter> filters, final Optional<Boolean> isGateway,
final SpanContext spanContext) {


final var filter = filters.stream().findFirst();
final Statement findDeviceSqlStatement;
final Statement countStatement;
final String field;
final String value;

if (isGateway.isPresent()) {
field = "";
value = "";

final String field = filter.map(filter1 -> filter1.getField().toString().replace("/", "")).orElse("");
final var value = filter.map(filter1 ->
filter1.getValue().toString()
.replace("/", "")
.replace("*", "%")
.replace("?", "_")
).orElse("");
findDeviceSqlStatement = isGateway.get() ? this.findGatewaysOfTenantStatement : this.findOnlyDevicesOfTenantStatement;
countStatement = isGateway.get() ? this.countGatewaysOfTenantStatement : this.countOnlyDevicesOfTenantStatement;
} else {
final var filter = filters.stream().findFirst();

field = filter.map(filter1 -> filter1.getField().toString().replace("/", "")).orElse("");
value = filter.map(filter1 ->
filter1.getValue().toString()
.replace("/", "")
.replace("*", "%")
.replace("?", "_")
).orElse("");


findDeviceSqlStatement = (filter.isPresent()) ? findDevicesOfTenantWithFilterStatement : this.findDevicesOfTenantStatement;
countStatement = (filter.isPresent()) ? countDevicesWithFilterStatement : this.countDevicesOfTenantStatement;
}

final Statement findDeviceSqlStatement = (filter.isPresent()) ? findDevicesOfTenantWithFilterStatement : this.findDevicesStatement;
final Statement countStatement = (filter.isPresent()) ? countDevicesWithFilter : this.countDevicesOfTenantStatement;

final var expanded = findDeviceSqlStatement.expand(map -> {
map.put(TENANT_ID, tenantId);
Expand Down Expand Up @@ -985,5 +1035,3 @@ public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, fin
.onComplete(x -> span.finish());
}
}


@@ -1,18 +1,36 @@

countDevicesOfTenantWithFilter: |
SELECT COUNT(*) AS deviceCount FROM %1$s
WHERE
SELECT COUNT(*) AS deviceCount FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT_WS(':', :field, REPLACE(:value, '"')), REPLACE(data, '"'))
OR
REPLACE(data, '"') LIKE CONCAT('%%', :field, ':', REPLACE(:value, '"'))
countGatewaysOfTenant: |
SELECT COUNT(*) AS deviceCount
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', '') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) > 0
countOnlyDevicesOfTenant: |
SELECT COUNT(*) AS deviceCount
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', '') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) = 0
findDevicesOfTenantWithFilter: |
SELECT *
FROM %s
WHERE
SELECT *
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT_WS(':', :field, REPLACE(:value, '"')), REPLACE(data, '"'))
Expand All @@ -21,3 +39,29 @@ findDevicesOfTenantWithFilter: |
ORDER BY device_id ASC
LIMIT :page_size
OFFSET :page_offset
findGatewaysOfTenant: |
SELECT *
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REPLACE(REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', ''), '\') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) > 0
ORDER BY device_id ASC
LIMIT :page_size
OFFSET :page_offset
findOnlyDevicesOfTenant: |
SELECT *
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', '') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) = 0
ORDER BY device_id ASC
LIMIT :page_size
OFFSET :page_offset

0 comments on commit 7ceb17f

Please sign in to comment.