Skip to content

Commit

Permalink
Add MessageUpgrader to validator (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Apr 5, 2022
1 parent d13816a commit d573d5f
Show file tree
Hide file tree
Showing 24 changed files with 266 additions and 203 deletions.
4 changes: 2 additions & 2 deletions .gencode_hash.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
477bfaa6fe5b2e55a2afaa3c3897e19a090c490f8a14344a84240420289e8ef6 gencode/docs/config.html
8221d216d08bc6b67f6f081132aa86f78934149bc5be23fe9132eb187d176a86 gencode/docs/envelope.html
6371d915647bc852edb7bc446637c2d43d3c3db49661f62d7c27d3d71c6671dd gencode/docs/envelope.html
559379abd267e9719a2bee98e56c21301853b6f69bc37fddc501acc57ce7cd63 gencode/docs/event_discovery.html
987503860562a3971314a98d75890b6c7615fee84bff6bede7010231f469c035 gencode/docs/event_pointset.html
e3fb2b1a96f6fcb06f5af6cff32829abc825065ed52de81c3d379c8c8070fe09 gencode/docs/event_system.html
Expand All @@ -23,7 +23,7 @@ b6ff9b8739a9c3bb6972f73db6fc54f451189c13b273e58bc11cb3d82c74ad40 gencode/java/u
eb3df3042d3c2008e51c35f35074741ba94a5a7fd590b5f1e59bd30ec19b4c2f gencode/java/udmi/schema/DiscoveryEvent.java
b9b1c6dc52c28630021c76d51305cb2fe634c557f3cf9b8e5c8c8abf456e6216 gencode/java/udmi/schema/DiscoveryState.java
090bbaf1508aa6ca8380af936af990673f300eb5a940c9e8ab94deb64efa2b7b gencode/java/udmi/schema/Entry.java
e22684e98a6dd9f213093c4e8293f353cd5faafb66264fc34a06167c6c3833a3 gencode/java/udmi/schema/Envelope.java
d5cd62caeb10e69d1c7099019a45f995c9b483061ef0832aac711a790b2023c8 gencode/java/udmi/schema/Envelope.java
e9f5c77be81486b6b8c6d88f70f2d50583d8c3fafa2ac09ead80f44b8d5e751e gencode/java/udmi/schema/Event.java
70ac42b1f93211420e8b40add27a4388dffcaaac60ead45852412aa815520605 gencode/java/udmi/schema/Families.java
aa0885ca43ab38c7597eacc38b7c512940a1a9fa061abd47d02c28e66b6fd93e gencode/java/udmi/schema/FamilyDiscoveryConfig.java
Expand Down
18 changes: 9 additions & 9 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@ jobs:
run: |
bin/setup_base
bin/clone_model
- name: telemetry validator
- name: sequence tests
env:
GCP_TARGET_PROJECT: ${{ secrets.GCP_TARGET_PROJECT }}
if: "${{ env.GCP_TARGET_PROJECT != '' }}"
run: |
bin/test_validator $GCP_TARGET_PROJECT
cat udmi_site_model/out/validation_report.json && echo
cat udmi_site_model/out/devices/AHU-1/state.json && echo
diff -u /tmp/validator.out etc/validator.out && echo No validator diff detected.
- name: sequence tests
bin/test_sequencer $GCP_TARGET_PROJECT
more /tmp/sequencer.out
diff -u /tmp/sequencer.out etc/sequencer.out && echo No output diff detected.
- name: telemetry validator
env:
GCP_TARGET_PROJECT: ${{ secrets.GCP_TARGET_PROJECT }}
if: "${{ env.GCP_TARGET_PROJECT != '' }}"
run: |
bin/test_sequencer $GCP_TARGET_PROJECT
cat /tmp/sequencer.out
diff -u /tmp/sequencer.out etc/sequencer.out && echo No output diff detected.
# Run after sequencer to device config starts in a known state
bin/test_validator $GCP_TARGET_PROJECT
more /tmp/validator.out
diff -u /tmp/validator.out etc/validator.out && echo No validator diff detected.
- name: code checks
if: ${{ always() }}
run: |
Expand Down
1 change: 0 additions & 1 deletion bin/loop_sequences
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ for test_name in $test_names; do
test_name=
fi
for test_class in ConfigValidator WritebackValidator; do
rm -f out/*.json
target=$test_class$test_prefix$test_name
CLASS=com.google.daq.mqtt.validator.validations.$target
echo $JAVA_CMD $CLASS
Expand Down
2 changes: 1 addition & 1 deletion dashboard/deploy_dashboard_gcloud
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ echo "const udmi_deploy_version = '$version';" > public/deploy_version.js
PUBSUB_FUNCTIONS="udmi_target udmi_state udmi_config udmi_reflect"
for func in $PUBSUB_FUNCTIONS; do
echo Deploying pubsub-trigger function $func...
gcloud functions deploy $func --trigger-topic=$func --runtime=$RUNTIME --project=$PROJECT --source=$SOURCE &
gcloud functions deploy $func --set-env-vars GCP_PROJECT=$PROJECT --trigger-topic=$func --runtime=$RUNTIME --project=$PROJECT --source=$SOURCE &
sleep 10
done

Expand Down
101 changes: 62 additions & 39 deletions dashboard/functions/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
const PROJECT_ID = process.env.GCP_PROJECT || process.env.GCLOUD_PROJECT;
const useFirestore = !!process.env.FIREBASE_CONFIG;
if (!process.env.GCLOUD_PROJECT) {
console.log("Setting GCLOUD_PROJECT to " + PROJECT_ID);
console.log('Setting GCLOUD_PROJECT to ' + PROJECT_ID);
process.env.GCLOUD_PROJECT = PROJECT_ID;
}

Expand All @@ -20,6 +20,8 @@ const UDMI_VERSION = '1.3.14';
const EVENT_TYPE = 'event';
const CONFIG_TYPE = 'config';
const STATE_TYPE = 'state';
const UPDATE_FOLDER = 'update';
const QUERY_FOLDER = 'query';

const ALL_REGIONS = ['us-central1', 'europe-west1', 'asia-east1'];
let registry_regions = null;
Expand Down Expand Up @@ -48,7 +50,6 @@ function recordMessage(attributes, message) {
const subFolder = attributes.subFolder || 'unknown';
const timestamp = (message && message.timestamp) || currentTimestamp();
const promises = [];
const collectionType = subType + 's';

if (message) {
message.timestamp = timestamp;
Expand All @@ -66,7 +67,7 @@ function recordMessage(attributes, message) {
promises.push(dev_doc.set({
'updated': timestamp
}, { merge: true }));
const config_doc = dev_doc.collection(collectionType).doc(subFolder);
const config_doc = dev_doc.collection(subType).doc(subFolder);
if (message) {
promises.push(config_doc.set(message));
} else {
Expand All @@ -81,7 +82,10 @@ function recordMessage(attributes, message) {
}

function sendCommand(registryId, deviceId, subFolder, message) {
const messageStr = JSON.stringify(message);
return sendCommandStr(registryId, deviceId, subFolder, JSON.stringify(message));
}

function sendCommandStr(registryId, deviceId, subFolder, messageStr) {
return registry_promise.then(() => {
return sendCommandSafe(registryId, deviceId, subFolder, messageStr);
});
Expand Down Expand Up @@ -159,24 +163,25 @@ exports.udmi_reflect = functions.pubsub.topic('udmi_reflect').onPublish((event)

attributes.deviceRegistryId = attributes.deviceId;
attributes.deviceId = parts[1];
const subType = parts[2];
attributes.subFolder = parts[3];
attributes.subFolder = parts[2];
attributes.subType = parts[3];
console.log('Reflect', attributes.deviceId, attributes.subType, attributes.subFolder);

return registry_promise.then(() => {
attributes.cloudRegion = registry_regions[attributes.deviceRegistryId];
if (subType == 'query') {
if (attributes.subFolder == QUERY_FOLDER) {
return udmi_query_event(attributes, msgObject);
}
target = 'udmi_' + subType;
target = 'udmi_' + attributes.subType;
return publishPubsubMessage(target, attributes, msgObject);
});
});

function udmi_query_event(attributes, msgObject) {
if (attributes.subFolder == 'states') {
if (attributes.subType == STATE_TYPE) {
return udmi_query_states(attributes);
}
throw 'Unknown query type ' + attributes.subFolder;
throw 'Unknown query type ' + attributes.subType;
}

function udmi_query_states(attributes) {
Expand All @@ -202,7 +207,7 @@ function udmi_query_states(attributes) {
const stateBinaryData = deviceData[0].state.binaryData;
const stateString = stateBinaryData.toString();
const msgObject = JSON.parse(stateString);
return process_states_update(attributes, msgObject);
return process_state_update(attributes, msgObject);
});
}

Expand All @@ -212,18 +217,21 @@ exports.udmi_state = functions.pubsub.topic('udmi_state').onPublish((event) => {
const msgString = Buffer.from(base64, 'base64').toString();
const msgObject = JSON.parse(msgString);

return process_states_update(attributes, msgObject);
return process_state_update(attributes, msgObject);
});

function process_states_update(attributes, msgObject) {
function process_state_update(attributes, msgObject) {
let promises = [];
const deviceId = attributes.deviceId;
const registryId = attributes.deviceRegistryId;

const commandFolder = `devices/${deviceId}/update/states`;
const commandFolder = `devices/${deviceId}/${STATE_TYPE}/${UPDATE_FOLDER}`;
promises.push(sendCommand(REFLECT_REGISTRY, registryId, commandFolder, msgObject));

attributes.subFolder = UPDATE_FOLDER;
attributes.subType = STATE_TYPE;
promises.push(publishPubsubMessage('udmi_target', attributes, msgObject));

for (var block in msgObject) {
let subMsg = msgObject[block];
if (typeof subMsg === 'object') {
Expand Down Expand Up @@ -263,37 +271,48 @@ exports.udmi_config = functions.pubsub.topic('udmi_config').onPublish((event) =>
if (useFirestore) {
console.info('Deferring to firestore trigger for IoT Core modification.');
} else {
promises.push(modify_device_config(registryId, deviceId, subFolder, msgObject));
promises.push(modify_device_config(registryId, deviceId, subFolder, currentTimestamp(), msgObject));
}

return Promise.all(promises);
});

async function modify_device_config(registryId, deviceId, subFolder, subContents) {
const [oldConfig, version] = await get_device_config(registryId, deviceId);
let newConfig = {};
function parse_old_config(oldConfig, resetConfig) {
if (!oldConfig || resetConfig) {
console.warn('Resetting config bock, explicit=' + resetConfig);
return {};
}

try {
const resetConfig = subFolder === "system" && subContents.extra_field === "reset_config";
if (!resetConfig && oldConfig) {
newConfig = JSON.parse(oldConfig);
} else {
console.log("Config reset explicit=" + resetConfig);
resetConfig && delete subContents.extra_field;
}
} catch (e) {
return JSON.parse(oldConfig);
} catch(e) {
console.warn('Previous config parse error, ignoring update');
return null;
}
}

async function modify_device_config(registryId, deviceId, subFolder, startTime, subContents) {
const [oldConfig, version] = await get_device_config(registryId, deviceId);

const resetConfig = subFolder == 'system' && subContents && subContents.extra_field == 'reset_config';
const newConfig = parse_old_config(oldConfig, resetConfig);
if (newConfig === null) {
return;
}

newConfig.version = UDMI_VERSION;
newConfig.timestamp = currentTimestamp();

console.log('Config modify version', version, subFolder);
console.log('Config modify version', subFolder, version, startTime);
if (subContents) {
delete subContents.version;
delete subContents.timestamp;
newConfig[subFolder] = subContents;
} else {
if (!newConfig[subFolder]) {
console.log('Config target already null', subFolder, version, startTime);
return;
}
delete newConfig[subFolder];
}
const attributes = {
Expand All @@ -303,13 +322,12 @@ async function modify_device_config(registryId, deviceId, subFolder, subContents
deviceRegistryId: registryId
};
return update_device_config(newConfig, attributes, version)
.catch(e => {
console.log('Config update rejected, retry', subFolder);
return modify_device_config(registryId, deviceId, subFolder, subContents);
})
.then(() => {
console.log('Config accepted version', version, subFolder);
});
console.log('Config accepted version', subFolder, version, startTime);
}).catch(e => {
console.log('Config update rejected', subFolder, version, startTime);
return modify_device_config(registryId, deviceId, subFolder, startTime, subContents);
})
}

async function get_device_config(registryId, deviceId) {
Expand Down Expand Up @@ -365,19 +383,23 @@ function update_device_config(message, attributes, preVersion) {
versionToUpdate: version,
binaryData: binaryData
};
const commandFolder = `devices/${deviceId}/update/configs`;
const commandFolder = `devices/${deviceId}/${CONFIG_TYPE}/${UPDATE_FOLDER}`;

return iotClient
.modifyCloudToDeviceConfig(request)
.then(() => sendCommand(REFLECT_REGISTRY, registryId, commandFolder, message));
.then(() => sendCommandStr(REFLECT_REGISTRY, registryId, commandFolder, msgString));
}

function consolidate_config(registryId, deviceId) {
function consolidate_config(registryId, deviceId, subFolder) {
const cloudRegion = registry_regions[registryId];
const reg_doc = firestore.collection('registries').doc(registryId);
const dev_doc = reg_doc.collection('devices').doc(deviceId);
const configs = dev_doc.collection('configs');

if (subFolder == UPDATE_FOLDER) {
return;
}

console.log('consolidating config for', registryId, deviceId);

const new_config = {
Expand Down Expand Up @@ -416,11 +438,12 @@ function consolidate_config(registryId, deviceId) {
}

exports.udmi_update = functions.firestore
.document('registries/{registryId}/devices/{deviceId}/configs/{subFolder}')
.document('registries/{registryId}/devices/{deviceId}/config/{subFolder}')
.onWrite((change, context) => {
const registryId = context.params.registryId;
const deviceId = constext.params.deviceId;
return registry_promise.then(consolidate_config(registryId, deviceId));
const deviceId = context.params.deviceId;
const subFolder = context.params.subFolder;
return registry_promise.then(consolidate_config(registryId, deviceId, subFolder));
});

function publishPubsubMessage(topicName, attributes, data) {
Expand Down
4 changes: 2 additions & 2 deletions etc/sequencer.out
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RESULT pass broken_config Sequence complete
RESULT pass extra_config Sequence complete
RESULT pass provided_serial_no Sequence complete
RESULT pass provided_serial_no Sequence complete
RESULT pass system_last_update Sequence complete
RESULT pass valid_serial_no Sequence complete
RESULT pass valid_serial_no Sequence complete
RESULT skip writeback_states Missing 'invalid' target specification
17 changes: 0 additions & 17 deletions etc/validator.out
Original file line number Diff line number Diff line change
@@ -1,24 +1,7 @@
::::::::::::::
udmi_site_model/out/devices/AHU-1/state.out
::::::::::::::
While converting to json node: 2 schema violations found
2 schema violations found
instance value ("states") not found in enum (possible values: ["update","discovery","system","gateway","localnet","metadata","pointset","blobset"])
instance value ("update") not found in enum (possible values: ["event","command","state","config"])
While converting to json node: 2 schema violations found
2 schema violations found
object has missing required properties (["last_config"])
object has missing required properties (["version"])
::::::::::::::
udmi_site_model/out/devices/AHU-1/state_pointset.out
::::::::::::::
While converting to json node: 1 schema violations found
1 schema violations found
object instance has properties which are not allowed by the schema: ["version"]
::::::::::::::
udmi_site_model/out/devices/AHU-1/state_system.out
::::::::::::::
While converting to json node: 2 schema violations found
2 schema violations found
object has missing required properties (["last_config"])
object instance has properties which are not allowed by the schema: ["version"]
4 changes: 2 additions & 2 deletions gencode/docs/envelope.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions gencode/java/udmi/schema/Envelope.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions pubber/src/main/java/daq/pubber/MqttPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ private void publishCore(String deviceId, String topic, Object data) {
warn(String.format("Publish failed for %s: %s", deviceId, e));
if (configuration.gatewayId == null) {
closeMqttClient(deviceId);
if (mqttClients.isEmpty()) {
warn("Last client closed, shutting down publisher");
publisherExecutor.shutdown();
}
} else {
close();
}
Expand Down Expand Up @@ -137,14 +141,10 @@ void close() {
}
mqttClients.keySet().forEach(this::closeMqttClient);
} catch (Exception e) {
throw new RuntimeException("While closing publisher");
throw new RuntimeException("While closing publisher", e);
}
}

long clientCount() {
return mqttClients.size();
}

private void validateCloudIotOptions() {
try {
checkNotNull(configuration.bridgeHostname, "bridgeHostname");
Expand Down
Loading

0 comments on commit d573d5f

Please sign in to comment.