diff --git a/misc/create_reflect_credentials b/misc/gcloud_deployment/create_reflect_credentials similarity index 100% rename from misc/create_reflect_credentials rename to misc/gcloud_deployment/create_reflect_credentials diff --git a/misc/setup_gcp_project b/misc/gcloud_deployment/setup_gcp_project similarity index 95% rename from misc/setup_gcp_project rename to misc/gcloud_deployment/setup_gcp_project index b9b3f186a5..0bd1888c15 100644 --- a/misc/setup_gcp_project +++ b/misc/gcloud_deployment/setup_gcp_project @@ -28,7 +28,7 @@ gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME --display-name="UDMI Cl gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:$SERVICE_ACCOUNT" --role="roles/pubsub.publisher" gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:$SERVICE_ACCOUNT" --role="roles/cloudiot.provisioner" -$ROOT_DIR/dashboard/deploy_dashboard_gcloud $PROJECT_ID --service-account=$SERVICE_ACCOUNT +$ROOT_DIR/udmis/deploy_udmis_gcloud $PROJECT_ID --service-account=$SERVICE_ACCOUNT $ROOT_DIR/bin/clone_model registry_id=$(jq -r .registry_id $ROOT_DIR/sites/udmi_site_model/cloud_iot_config.json) diff --git a/misc/gcloud_iot_config/README.md b/misc/gcloud_iot_config/README.md new file mode 100644 index 0000000000..11c22340af --- /dev/null +++ b/misc/gcloud_iot_config/README.md @@ -0,0 +1,6 @@ +# GCloud IoT Config Store + +Stores a complete history of all config messages sent to devices in a GCP Project in GCS + +## Installation +`./deploy PROJECT_ID` \ No newline at end of file diff --git a/misc/gcloud_iot_config/deploy b/misc/gcloud_iot_config/deploy new file mode 100755 index 0000000000..40bc9d754e --- /dev/null +++ b/misc/gcloud_iot_config/deploy @@ -0,0 +1,38 @@ +#!/bin/bash + +if (( $# < 1 )); then + echo $0 PROJECT_ID + exit +fi + +PROJECT_ID=$1 +shift 1 + +FUNCTION_NAME=gcloud_iot_config +ENTRY_POINT=storeConfig +TOPIC=config_updates +BUCKET=$(PROJECT_ID)-iot-configs + +## storage buckets create gs://$BUCKET_NAME --project=$PROJECT_ID --default-storage-class=STANDARD --location=us-central1 +gcloud storage buckets create gs://$BUCKET --project=$PROJECT_ID --default-storage-class=STANDARD --location=us-central1 + +# Create pub/sub topic +gcloud pubsub topics create $TOPIC --project=$PROJECT_ID + +# create logs router +gcloud logging sinks create config_updates_logsink \ + pubsub.googleapis.com/projects/$PROJECT_ID/topics/$TOPIC \ + --log-filter='protoPayload.methodName="google.cloud.iot.v1.DeviceManager.ModifyCloudToDeviceConfig" AND severity=NOTICE' \ + --project=$PROJECT_ID + +LOGSINK_SERVICE_ACCOUNT=$(gcloud logging sinks describe config_updates_logsink --project=$PROJECT_ID | grep writerIdentity | sed "s/writerIdentity: //") +gcloud pubsub topics add-iam-policy-binding $TOPIC --project=$PROJECT_ID --member=$LOGSINK_SERVICE_ACCOUNT --role='roles/pubsub.publisher' + +# Deploy Cloud Function +gcloud functions deploy $FUNCTION_NAME \ + --trigger-topic=$TOPIC\ + --entry-point=$ENTRY_POINT \ + --runtime nodejs16 \ + --project=$PROJECT_ID \ + --source=functions/ \ + --set-env-vars PROJECT_ID=$PROJECT_ID,DATASET_ID=$DATASET_ID,BUCKET=$BUCKET diff --git a/misc/gcloud_iot_config/functions/index.js b/misc/gcloud_iot_config/functions/index.js new file mode 100644 index 0000000000..b978def0e4 --- /dev/null +++ b/misc/gcloud_iot_config/functions/index.js @@ -0,0 +1,58 @@ +const PROJECT_ID = process.env.PROJECT_ID +const DATASET_ID = process.env.DATASET_ID +const BUCKET = process.env.BUCKET + +const iot = require('@google-cloud/iot'); +const {Storage} = require('@google-cloud/storage'); +const storage = new Storage(); +const iotClient = new iot.v1.DeviceManagerClient({}); +const bucket = storage.bucket(BUCKET); +var fs = require('fs'); + +// projects/daq1-273309/locations/us-central1/registries/ZZ-TRI-FECTA/devices/AHU-1 + +exports.storeConfig = async (event, context) => { + const pubsubMessage = event.data; + const objStr = Buffer.from(pubsubMessage, 'base64').toString(); + const logData = JSON.parse(objStr); + + devicePath = logData['protoPayload']['resourceName'] + versionToUpdate = logData['protoPayload']['request']['versionToUpdate'] + + const [response] = await iotClient.listDeviceConfigVersions({ + name: devicePath, + }); + + splitDevice = devicePath.split('/') + registryId = splitDevice[5] + deviceId = splitDevice[7] + + const configs = response.deviceConfigs; + + if (configs.length === 0) { + return null; + } + + configs.forEach((config, index) => { + if(config.version == versionToUpdate){ + configPayload = config.binaryData.toString('utf8'); + localFileName = `/tmp/${registryId}_${deviceId}_${versionToUpdate}.txt` + + fs.writeFile(localFileName, configPayload, function (err) { + console.log(err) + }); + + + const options = { + destination: `${registryId}/${deviceId}/${versionToUpdate}.json` + }; + + bucket.upload(localFileName, options, function(err, file) { + console.log(err) + }); + + } + }); + +}; + diff --git a/misc/gcloud_iot_config/functions/package.json b/misc/gcloud_iot_config/functions/package.json new file mode 100644 index 0000000000..737526a2fc --- /dev/null +++ b/misc/gcloud_iot_config/functions/package.json @@ -0,0 +1,9 @@ +{ + "name": "config_to_gcs", + "version": "0.0.1", + "dependencies": { + "@google-cloud/pubsub": "^0.18.0", + "@google-cloud/iot": "2.3.4", + "@google-cloud/storage": "6.5.2" + } + } \ No newline at end of file diff --git a/misc/gcloud_iot_connection_log/README.md b/misc/gcloud_iot_connection_log/README.md new file mode 100644 index 0000000000..bde15db67d --- /dev/null +++ b/misc/gcloud_iot_connection_log/README.md @@ -0,0 +1,47 @@ +# GCloud IoT Connection Logs + +Stores all device connections and disconnection from IoT Core into BigQuery + +**NOTE** +- Requires **Cloud Logging** on devices or registry be set to `INFO` or more. +- IoT Core has a default log entries limit of 2000 per second. If a registry has `DEBUG` level logging, this may very quickly be exceeding, and will result in missing connection or disconnection log events + +## Installation + +`./deploy PROJECT_ID DATASET_ID LOCATION TRIGGER_TOPIC [--drop]` + +**NOTE** cloud function deployed with default app-engine service account. This may need to change if this account does not have required permissions + +## Example Queries + +### List devices which were once connected that now offline + +```sql + SELECT *, + Row_number() OVER(partition BY device_id, registry_id ORDER BY timestamp DESC) AS rank +FROM `PROJECT_ID.udmi.iot_connects` qualify rank = 1 +AND event = 0 +ORDER BY timestamp DESC +``` + +### List device outages exceeding X minutes + +```sql +SELECT device_id, + qtimestamp disconnect_time, + timestamp reconnect_time, + outage_minutes +FROM ( + SELECT device_id, + qtimestamp, + timestamp, + Datetime_diff(timestamp, qtimestamp, minute) AS outage_minutes + FROM ( + SELECT *, + Lag(timestamp) OVER (partition BY device_id, registry_id ORDER BY timestamp, event) qtimestamp + FROM `PROJECT_ID.udmi.iot_connects` + WHERE logentry = 1 + AND event = 1 ) + WHERE timestamp IS NOT NULL ) +WHERE outage_minutes > 10 +``` diff --git a/misc/gcloud_iot_connection_log/deploy b/misc/gcloud_iot_connection_log/deploy new file mode 100755 index 0000000000..d9e7b9ea2d --- /dev/null +++ b/misc/gcloud_iot_connection_log/deploy @@ -0,0 +1,50 @@ +#!/bin/bash + +if (( $# < 4 )); then + echo $0 PROJECT_ID DATASET_ID LOCATION TRIGGER_TOPIC [--drop] + exit +fi + +PROJECT_ID=$1 +DATASET_ID=$2 +LOCATION=$3 +TRIGGER_TOPIC=$4 +shift 4 + + + +if [[ $1 == "--drop" ]]; then + shift 1 + #echo "WARNING: Dropping tables in 5 seconds. Data will be permanently lost" + #sleep 5 && echo "Deleting tables ..." && sleep 3 + bq rm -t -f $PROJECT_ID:$DATASET_ID.iot_connects +fi + +FUNCTION_NAME=gcloud_iot_connection_log +TOPIC=iot_connections + +# State table +bq mk \ + --table \ + $PROJECT_ID:$DATASET_ID.iot_connects \ + schema_iot_logs.json + +# Create pub/sub topic +gcloud pubsub topics create $TOPIC + +# create logs router +gcloud logging sinks create iot_connections_logsink \ + pubsub.googleapis.com/projects/$PROJECT_ID/topics/$TOPIC \ + --log-filter='resource.type="gce_instance"' + +LOGSINK_SERVICE_ACCOUNT=$(gcloud logging sinks describe iot_connections_logsink --project=$PROJECT_ID | grep writerIdentity | sed "s/writerIdentity: //") +gcloud pubsub topics add-iam-policy-binding $TOPIC --project=$PROJECT_ID --member=$LOGSINK_SERVICE_ACCOUNT --role='roles/pubsub.publisher' + +# Deploy Cloud Function +gcloud functions deploy $FUNCTION_NAME \ + --trigger-topic=logtest\ + --entry-point=logConnectionEvents \ + --runtime nodejs16 \ + --project=$PROJECT_ID \ + --source=functions/ \ + --set-env-vars PROJECT_ID=$PROJECT_ID,DATASET_ID=$DATASET_ID diff --git a/misc/gcloud_iot_connection_log/functions/index.js b/misc/gcloud_iot_connection_log/functions/index.js new file mode 100644 index 0000000000..e9f0f73ed8 --- /dev/null +++ b/misc/gcloud_iot_connection_log/functions/index.js @@ -0,0 +1,76 @@ +const {BigQuery} = require('@google-cloud/bigquery'); +const bigquery = new BigQuery(); + +const PROJECT_ID = process.env.PROJECT_ID +const DATASET_ID = process.env.DATASET_ID + +exports.logConnectionEvents = async (event, context) => { + const pubsubMessage = event.data; + const objStr = Buffer.from(pubsubMessage, 'base64').toString(); + const logData = JSON.parse(objStr); + + if (logData['jsonPayload']['eventType'] == "CONNECT"){ + oldState = 0; + newState = 1; + } else if (logData['jsonPayload']['eventType'] == "DISCONNECT") { + newState = 0; + oldState = 1; + } else { + return; + } + + if (logData['jsonPayload']['status']['description'] == "OK" ){ + description = null; + message = null; + } else { + description = logData['jsonPayload']['status']['description']; + message = logData['jsonPayload']['status']['message']; + } + + // reduce nanoseconds to microseconds + ts_ts = logData['timestamp'].substring(0,19) + ts_ns = logData['timestamp'].substring(20,29) + ms = Math.floor(parseInt(ts_ns) / 1000).padStart(6, '0') + ts = ts_ts + '.' + String(ms).padStart(6, '0') + 'Z'; + + // Order events by timestamp + // We don't need microsecond accuracy, so either add or subtract 1 microsecond + // to avoid dealing with dates and set such that t1 < t2 + if (ms == 0) { + t1 = ts + t2 = ts_ts + '.000001Z'; + } else { + t2 = ts; + t1 = ts_ts + '.' + String(ms - 1).padStart(6, '0') + 'Z'; + } + + +// the log event is always the newer entry +log_entry = { + timestamp: t2, + device_id: logData['labels']['device_id'], + device_num_id: logData['resource']['labels']['device_num_id'], + registry_id: logData['resource']['labels']['device_registry_id'], + event: newState, + logentry: 1, + logentry_description: description, + logentry_message: message + } + + + // the "derivative" is always preceding entry + log_derivative = { + timestamp: t1, + device_id: logData['labels']['device_id'], + device_num_id: logData['resource']['labels']['device_num_id'], + registry_id: logData['resource']['labels']['device_registry_id'], + event: oldState, + logentry: 0 + } + + return await Promise.all([ + bigquery.dataset(DATASET_ID).table('iot_connects').insert([log_derivative, log_entry]), + ]); + +}; + diff --git a/misc/gcloud_iot_connection_log/functions/package.json b/misc/gcloud_iot_connection_log/functions/package.json new file mode 100644 index 0000000000..ae61d4ec44 --- /dev/null +++ b/misc/gcloud_iot_connection_log/functions/package.json @@ -0,0 +1,8 @@ +{ + "name": "iotlogs-to-bq", + "version": "0.0.1", + "dependencies": { + "@google-cloud/pubsub": "^0.18.0", + "@google-cloud/bigquery": "^3.0.0" + } + } \ No newline at end of file diff --git a/misc/gcloud_iot_connection_log/schema_iot_logs.json b/misc/gcloud_iot_connection_log/schema_iot_logs.json new file mode 100644 index 0000000000..6fdd4f95d2 --- /dev/null +++ b/misc/gcloud_iot_connection_log/schema_iot_logs.json @@ -0,0 +1,42 @@ +[ + { + "mode": "NULLABLE", + "name": "timestamp", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "registry_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "device_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "device_num_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "event", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "logentry", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "logentry_description", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "logentry_message", + "type": "STRING" + } +] \ No newline at end of file diff --git a/misc/gcloud_messages_telemetry/README.md b/misc/gcloud_messages_telemetry/README.md new file mode 100644 index 0000000000..22b90a6c8f --- /dev/null +++ b/misc/gcloud_messages_telemetry/README.md @@ -0,0 +1,14 @@ +# GCloud Messages and Telemetry to BigQuery + +Saves a record of all messages into BigQuery, aswell as telemetry into a narrow data structure + +## Installation + +`./deploy PROJECT_ID DATASET_ID LOCATION TRIGGER_TOPIC [--drop]` + +**NOTE** cloud function deployed with default app-engine service account. This may need to change if this account does not have required permissions + +## BigQuery Tables + +- `messages` record of messages +- `telemetry` telemetry \ No newline at end of file diff --git a/misc/gcloud_messages_telemetry/deploy b/misc/gcloud_messages_telemetry/deploy new file mode 100755 index 0000000000..b219a2751e --- /dev/null +++ b/misc/gcloud_messages_telemetry/deploy @@ -0,0 +1,74 @@ +#!/bin/bash + +if (( $# < 4 )); then + echo $0 PROJECT_ID DATASET_ID LOCATION TRIGGER_TOPIC [--drop] + exit +fi + +PROJECT_ID=$1 +DATASET_ID=$2 +LOCATION=$3 +TRIGGER_TOPIC=$4 +shift 4 + +FUNCTION_NAME=function-11 +MESSAGES_TABLE=messages +TELEMETRY_TABLE=points + +BUCKET=$PROJECT_ID-iot-messages + +if [[ $1 == "--drop" ]]; then + shift 1 + echo "WARNING: Dropping tables in 5 seconds. Data will be permanently lost" + echo $PROJECT_ID:$DATASET_ID.$MESSAGES_TABLE + echo $PROJECT_ID:$DATASET_ID.$TELEMETRY_TABLE + sleep 8 && echo "Deleting tables " && sleep 4 + bq rm -t -f $PROJECT_ID:$DATASET_ID.$MESSAGES_TABLE + bq rm -t -f $PROJECT_ID:$DATASET_ID.$TELEMETRY_TABLE +fi + +gcloud storage buckets create gs://$BUCKET --project=$PROJECT_ID --default-storage-class=STANDARD --location=us-central1 + +# Create Dataset +bq --location=$LOCATION mk \ + --dataset \ + $PROJECT_ID:$DATASET_ID + +# Create Messsages table +bq mk \ + --table \ + --time_partitioning_field publish_timestamp \ + --time_partitioning_type DAY \ + --clustering_fields registry_id,device_id \ + $PROJECT_ID:$DATASET_ID.$MESSAGES_TABLE \ + schema_messages.json + +# Create Messsages table +bq mk \ + --table \ + --time_partitioning_field publish_timestamp \ + --time_partitioning_type DAY \ + --clustering_fields registry_id,device_id \ + $PROJECT_ID:$DATASET_ID.$TELEMETRY_TABLE \ + schema_telemetry.json + +# Deploy Cloud Function +gcloud functions deploy messages_to_bq \ + --trigger-topic=$TRIGGER_TOPIC \ + --entry-point=processMessage \ + --runtime nodejs16 \ + --project=$PROJECT_ID \ + --source=functions/ \ + --set-env-vars PROJECT_ID=$PROJECT_ID,DATASET_ID=$DATASET_ID,MESSAGES_TABLE=$MESSAGES_TABLE,TELEMETRY_TABLE=$TELEMETRY_TABLE,BUCKET=$BUCKET & + +# Deploy Cloud Function +gcloud functions deploy dump_raw_messages \ + --trigger-topic=$TRIGGER_TOPIC \ + --entry-point=storeMessage \ + --runtime nodejs16 \ + --project=$PROJECT_ID \ + --source=functions/ \ + --set-env-vars PROJECT_ID=$PROJECT_ID,DATASET_ID=$DATASET_ID,MESSAGES_TABLE=$MESSAGES_TABLE,TELEMETRY_TABLE=$TELEMETRY_TABLE,BUCKET=$BUCKET & + +wait +echo Done \ No newline at end of file diff --git a/misc/gcloud_messages_telemetry/functions/index.js b/misc/gcloud_messages_telemetry/functions/index.js new file mode 100644 index 0000000000..78f5f11113 --- /dev/null +++ b/misc/gcloud_messages_telemetry/functions/index.js @@ -0,0 +1,88 @@ +const {BigQuery} = require('@google-cloud/bigquery'); +const bigquery = new BigQuery(); + +const PROJECT_ID = process.env.PROJECT_ID +const DATASET_ID = process.env.DATASET_ID +const MESSAGES_TABLE = process.env.MESSAGES_TABLE +const TELEMETRY_TABLE = process.env.TELEMETRY_TABLE + +const STATE = 1 +const EVENT_POINTSET = 2 +const EVENT_SYSTEM = 3 +const EVENT_DISCOVERY = 4 +const EVENT_OTHER = 9 + +exports.processMessage = async (event, context) => { + // Only process messages from device and ignore all message fragments + if (event.attributes.subType == 'state' && event.attributes.subFolder == 'update') { + var messageType = STATE + } else if (!event.attributes.hasOwnProperty('subType')) { + if (event.attributes.subFolder == 'pointset'){ + var messageType = EVENT_POINTSET; + } else if (event.attributes.subFolder == 'pointset'){ + var messageType = EVENT_SYSTEM; + } else { + var messageType = EVENT_OTHER + } + } else { + return; + } + const pubsubMessage = event.data; + const objStr = Buffer.from(pubsubMessage, 'base64').toString('utf8'); + const msgObj = JSON.parse(objStr); + + // This is a Pub/Sub message ID - messages copied in the stack + // will have different message IDs + const messageId = context.eventId; + + const publishTimestamp = BigQuery.timestamp(context.timestamp); + const deviceId = event.attributes.deviceId; + const gatewayId = ("gatewayId" in event.attributes) && event.attributes.gatewayId || null; + const deviceRegistryId = event.attributes.deviceRegistryId; + + const payloadSize = Buffer.byteLength(objStr, 'utf8'); + + var promises = []; + + // add message to table + var messageRow = { + publish_timestamp: publishTimestamp, + device_num_id: parseInt(event.attributes.deviceNumId), + device_id: deviceId, + gateway_id: gatewayId, + registry_id: deviceRegistryId, + message_id: messageId, + payload_size_bytes: payloadSize, + message_type: messageType + }; + + promises.push(bigquery.dataset(DATASET_ID).table(MESSAGES_TABLE).insert([messageRow])); + + // Insert Telemetry + if ('points' in msgObj){ + var rows = [] + const payloadTimestamp = BigQuery.timestamp(msgObj.timestamp); + Object.keys(msgObj.points).forEach(function(key) { + let row = { + device_id: deviceId, + device_num_id: parseInt(event.attributes.deviceNumId), + message_id: messageId, + gateway_id: gatewayId, + registry_id: deviceRegistryId, + publish_timestamp: publishTimestamp, + timestamp: payloadTimestamp, + point_name: key, + present_value: (isNaN(parseFloat(msgObj.points[key].present_value)) ? null : parseFloat(msgObj.points[key].present_value) ), + present_value_string: String(msgObj.points[key].present_value) + }; + rows.push(row) + }); + + if (rows.length > 0){ + promises.push(bigquery.dataset(DATASET_ID).table(TELEMETRY_TABLE).insert(rows)) + } + + } + + return await Promise.all(promises); +}; diff --git a/misc/gcloud_messages_telemetry/functions/package.json b/misc/gcloud_messages_telemetry/functions/package.json new file mode 100644 index 0000000000..965f50ab40 --- /dev/null +++ b/misc/gcloud_messages_telemetry/functions/package.json @@ -0,0 +1,9 @@ +{ + "name": "message-store", + "version": "0.0.1", + "dependencies": { + "@google-cloud/pubsub": "^0.18.0", + "@google-cloud/bigquery": "^3.0.0", + "@google-cloud/storage": "6.5.2" + } + } \ No newline at end of file diff --git a/misc/gcloud_messages_telemetry/schema_messages.json b/misc/gcloud_messages_telemetry/schema_messages.json new file mode 100644 index 0000000000..24e82777a8 --- /dev/null +++ b/misc/gcloud_messages_telemetry/schema_messages.json @@ -0,0 +1,42 @@ +[ + { + "mode": "NULLABLE", + "name": "publish_timestamp", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "registry_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "device_num_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "device_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "gateway_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "message_type", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "message_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "payload_size_bytes", + "type": "INTEGER" + } +] \ No newline at end of file diff --git a/misc/gcloud_messages_telemetry/schema_telemetry.json b/misc/gcloud_messages_telemetry/schema_telemetry.json new file mode 100644 index 0000000000..25829c5098 --- /dev/null +++ b/misc/gcloud_messages_telemetry/schema_telemetry.json @@ -0,0 +1,52 @@ +[ + { + "mode": "NULLABLE", + "name": "publish_timestamp", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "timestamp", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "registry_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "device_num_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "device_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "gateway_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "message_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "point_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "present_value", + "type": "NUMERIC" + }, + { + "mode": "NULLABLE", + "name": "present_value_string", + "type": "STRING" + } +] \ No newline at end of file diff --git a/misc/gcloud_pubsub_mirror/README.md b/misc/gcloud_pubsub_mirror/README.md new file mode 100644 index 0000000000..d2ad31c8fa --- /dev/null +++ b/misc/gcloud_pubsub_mirror/README.md @@ -0,0 +1,10 @@ +# GCloud PubSub Mirror + +Republishes GCP Pub/Sub messages from one subscription into another project verbatim + +**NOTES** +- `messageId` of republished messages **will** differ from the original message +- no error/connection loss handling + +# Usage +`python3 mirror.py source_project source_subscription target_project target_topic` \ No newline at end of file diff --git a/misc/gcloud_pubsub_mirror/mirror.py b/misc/gcloud_pubsub_mirror/mirror.py new file mode 100644 index 0000000000..1eb6e9f90e --- /dev/null +++ b/misc/gcloud_pubsub_mirror/mirror.py @@ -0,0 +1,52 @@ +import sys +import argparse + +from concurrent import futures +from google import auth +from google.cloud import pubsub_v1 + +messages_processed = 0 + +def subscribe_callback(message: pubsub_v1.subscriber.message.Message) -> None: + global messages_processed + publisher.publish(topic_path, message.data, **message.attributes) + messages_processed += 1 + if messages_processed % 100 == 0: + print(f'{messages_processed} messages processed') + message.ack() + +def parse_command_line_args(): + parser = argparse.ArgumentParser() + parser.add_argument('source_project', type=str) + parser.add_argument('source_subscription', type=str) + parser.add_argument('target_project', type=str) + parser.add_argument('target_topic', type=str) + return parser.parse_args() + +args = parse_command_line_args() + +try: + credentials, project_id = auth.default() +except Exception as e: + print(e) + sys.exit() + +sub_client = pubsub_v1.SubscriberClient(credentials=credentials) +publisher = pubsub_v1.PublisherClient(credentials=credentials) + +topic_path = publisher.topic_path(args.target_project, args.target_topic) +future = sub_client.subscribe(f"projects/{args.source_project}/subscriptions/{args.source_subscription}", subscribe_callback) +print("Listening to pubsub, please wait ...") + +while True: + try: + future.result(timeout=5) + except (futures.CancelledError, KeyboardInterrupt): + future.cancel() + future.result() + break + except Exception as ex: + print(f"PubSub subscription failed with error: {ex}") + future.cancel() + future.result() + break diff --git a/misc/gcloud_pubsub_mirror/requirements.txt b/misc/gcloud_pubsub_mirror/requirements.txt new file mode 100644 index 0000000000..4632f7b10a --- /dev/null +++ b/misc/gcloud_pubsub_mirror/requirements.txt @@ -0,0 +1,7 @@ +google-api-core +google-api-python-client +google-auth +google-auth-httplib2 +google-auth-oauthlib +google-cloud-pubsub +googleapis-common-protos \ No newline at end of file diff --git a/misc/gcloud_state_hardware_software/README.md b/misc/gcloud_state_hardware_software/README.md new file mode 100644 index 0000000000..c357b7ed69 --- /dev/null +++ b/misc/gcloud_state_hardware_software/README.md @@ -0,0 +1,8 @@ +# State Hardware & Software into BigQuery + +Saves the `hardware` and `software` fields from state messages into BigQuery + +## Installation +`./deploy PROJECT_ID DATASET_ID LOCATION TRIGGER_TOPIC [--drop]` + +**NOTE** cloud function deployed with default app-engine service account. This may need to change if this account does not have required permissions diff --git a/misc/gcloud_state_hardware_software/deploy b/misc/gcloud_state_hardware_software/deploy new file mode 100755 index 0000000000..93733e856a --- /dev/null +++ b/misc/gcloud_state_hardware_software/deploy @@ -0,0 +1,36 @@ +#!/bin/bash + +if (( $# < 4 )); then + echo $0 PROJECT_ID DATASET_ID LOCATION TRIGGER_TOPIC [--drop] + exit +fi + +PROJECT_ID=$1 +DATASET_ID=$2 +LOCATION=$3 +TRIGGER_TOPIC=$4 +shift 4 + +if [[ $1 == "--drop" ]]; then + shift 1 + #echo "WARNING: Dropping tables in 5 seconds. Data will be permanently lost" + #sleep 5 && echo "Deleting tables ..." && sleep 3 + bq rm -t -f $PROJECT_ID:$DATASET_ID.state +fi + +FUNCTION_NAME=state + +# State table +bq mk \ + --table \ + $PROJECT_ID:$DATASET_ID.state \ + schema_state.json + +# Deploy Cloud Function +gcloud functions deploy $FUNCTION_NAME \ + --trigger-topic=$TRIGGER_TOPIC \ + --entry-point=stateToBigQuery \ + --runtime nodejs16 \ + --project=$PROJECT_ID \ + --source=functions/ \ + --set-env-vars PROJECT_ID=$PROJECT_ID,DATASET_ID=$DATASET_ID diff --git a/misc/gcloud_state_hardware_software/functions/index.js b/misc/gcloud_state_hardware_software/functions/index.js new file mode 100644 index 0000000000..e45ac95409 --- /dev/null +++ b/misc/gcloud_state_hardware_software/functions/index.js @@ -0,0 +1,40 @@ +const {BigQuery} = require('@google-cloud/bigquery'); +const bigquery = new BigQuery(); + +const PROJECT_ID = process.env.PROJECT_ID +const DATASET_ID = process.env.DATASET_ID + +exports.stateToBigQuery = async (event, context) => { + const pubsubMessage = event.data; + const objStr = Buffer.from(pubsubMessage, 'base64').toString(); + const msg = JSON.parse(objStr); + + try { + + const softwareEntries = [] + for (const [key, value] of Object.entries(msg.system.software)) { + softwareEntries.push({id: key, version:value}) + } + + // Data for devices table + stateRow = { + timestamp: BigQuery.timestamp(msg.timestamp), + device_id: event.attributes.deviceId, + registry_id: event.attributes.deviceRegistryId, + make: msg.system.hardware.make, + model: msg.system.hardware.model, + serial_no: msg.system.serial_no, + rev: ("rev" in msg.system.hardware) && msg.system.hardware.rev || null, + sku: ("sku" in msg.system.hardware) && msg.system.hardware.sku || null, + software: softwareEntries + } + + return await Promise.all([ + bigquery.dataset(DATASET_ID).table('state').insert([stateRow]), + ]); + } catch(err) { + console.log(err.message); + } + +}; + diff --git a/misc/gcloud_state_hardware_software/functions/package.json b/misc/gcloud_state_hardware_software/functions/package.json new file mode 100644 index 0000000000..42176e538b --- /dev/null +++ b/misc/gcloud_state_hardware_software/functions/package.json @@ -0,0 +1,8 @@ +{ + "name": "state-to-bq", + "version": "0.0.1", + "dependencies": { + "@google-cloud/pubsub": "^0.18.0", + "@google-cloud/bigquery": "^3.0.0" + } + } \ No newline at end of file diff --git a/misc/gcloud_state_hardware_software/schema_state.json b/misc/gcloud_state_hardware_software/schema_state.json new file mode 100644 index 0000000000..f571dbac22 --- /dev/null +++ b/misc/gcloud_state_hardware_software/schema_state.json @@ -0,0 +1,59 @@ +[ + { + "mode": "NULLABLE", + "name": "timestamp", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "registry_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "device_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "make", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "serial_no", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "model", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "sku", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "rev", + "type": "STRING" + }, + { + "name": "software", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "id", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "version", + "type": "STRING", + "mode": "NULLABLE" + } + ] + } +] \ No newline at end of file diff --git a/misc/gcloud_store_message/README.md b/misc/gcloud_store_message/README.md new file mode 100644 index 0000000000..5bae80ad4f --- /dev/null +++ b/misc/gcloud_store_message/README.md @@ -0,0 +1,10 @@ +# GCloud Store Messages + +Saves a copy of all messages published by a device into GCS + +Default bucket is `PROJECTID-iot-messages` + +## Installation +`./deploy PROJECT_ID DATASET_ID LOCATION TRIGGER_TOPIC [--drop]` + +**NOTE** cloud function deployed with default app-engine service account. This may need to change if this account does not have required permissions \ No newline at end of file diff --git a/misc/gcloud_store_message/deploy b/misc/gcloud_store_message/deploy new file mode 100755 index 0000000000..0aa40883a4 --- /dev/null +++ b/misc/gcloud_store_message/deploy @@ -0,0 +1,30 @@ +#!/bin/bash + +if (( $# < 3 )); then + echo $0 PROJECT_ID LOCATION TRIGGER_TOPIC + exit +fi + +PROJECT_ID=$1 +LOCATION=$2 +TRIGGER_TOPIC=$3 +shift 3 + +FUNCTION_NAME=dump_raw_messages +MESSAGES_TABLE=messages +TELEMETRY_TABLE=points + +BUCKET=$(PROJECT_ID)-iot-messages + +gcloud storage buckets create gs://$BUCKET --project=$PROJECT_ID --default-storage-class=STANDARD --location=us-central1 + +# Deploy Cloud Function +gcloud functions deploy $FUNCTION_NAME \ + --trigger-topic=$TRIGGER_TOPIC \ + --entry-point=storeMessage \ + --runtime nodejs16 \ + --project=$PROJECT_ID \ + --source=functions/ \ + --set-env-vars PROJECT_ID=$PROJECT_ID,BUCKET=$BUCKET + +echo Done diff --git a/misc/gcloud_store_message/functions/index.js b/misc/gcloud_store_message/functions/index.js new file mode 100644 index 0000000000..0f3cfe5813 --- /dev/null +++ b/misc/gcloud_store_message/functions/index.js @@ -0,0 +1,47 @@ +const BUCKET = process.env.BUCKET + + +const {Storage} = require('@google-cloud/storage'); +const storage = new Storage(); +const bucket = storage.bucket(BUCKET); +var fs = require('fs'); + +exports.storeMessage = async (event, context) => { + const pubsubMessage = event.data; + + if (event.attributes.subType == 'state' && event.attributes.subFolder == 'update') { + messageType = "state"; + } else if (!event.attributes.hasOwnProperty('subType')) { + subFolder = event.attributes.subFolder || 'unknown'; + messageType = `event_${subFolder}`; + } else { + return; + } + + const payloadStr = Buffer.from(pubsubMessage, 'base64').toString('utf8'); + + const messageId = context.eventId; + const publishTimestamp = context.timestamp; + const deviceId = event.attributes.deviceId; + const registryId = event.attributes.deviceRegistryId; + + localFileName = `/tmp/${registryId}_${deviceId}_${publishTimestamp}.json` + + fs.writeFile(localFileName, payloadStr, function (err) { + console.log(err) + return false + }); + + const options = { + destination: `${registryId}/${deviceId}/${publishTimestamp}_${messageType}_${messageId}.json` + }; + + bucket.upload(localFileName, options, function(err, file) { + console.log(err) + return false + }); + + return true +}; + + diff --git a/misc/gcloud_store_message/functions/package.json b/misc/gcloud_store_message/functions/package.json new file mode 100644 index 0000000000..965f50ab40 --- /dev/null +++ b/misc/gcloud_store_message/functions/package.json @@ -0,0 +1,9 @@ +{ + "name": "message-store", + "version": "0.0.1", + "dependencies": { + "@google-cloud/pubsub": "^0.18.0", + "@google-cloud/bigquery": "^3.0.0", + "@google-cloud/storage": "6.5.2" + } + } \ No newline at end of file