Skip to content

Commit

Permalink
Add misc utility to replay validated messages from validator out dire…
Browse files Browse the repository at this point in the history
…ctory (#510)
  • Loading branch information
noursaidi committed Nov 17, 2022
1 parent d25fb53 commit 4d6fe8e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
75 changes: 75 additions & 0 deletions misc/replay_validated/replay
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
Replays messages validated by validator back to a given Pub/Sub topic
Usage:
misc/replay_validated/replay PATH_TO_VALIDATOR_OUT PROJECT_ID TOPIC
WARNING - validator deletes it's out directory on invocation therefore a backup
needs to be made for replaying messages with the validator running, e.g. by
renaming the `out` directory
"""
import glob
import re
import os
import sys
import argparse
import json
import time

from google import auth
from google.cloud import pubsub_v1

def parse_command_line_args():
parser = argparse.ArgumentParser()
parser.add_argument('validation_dir', type=str)
parser.add_argument('project_id', type=str)
parser.add_argument('topic', type=str)
return parser.parse_args()

def main():
args = parse_command_line_args()

try:
credentials, project_id = auth.default()
except Exception as e:
print(e)
sys.exit(1)

# initaite publisher here
publisher = pubsub_v1.PublisherClient(credentials=credentials)
topic_path = publisher.topic_path(args.project_id, args.topic)

validation_dir = os.path.realpath(os.path.join(args.validation_dir, 'devices'))
messages_path = f'{validation_dir}/*/*.json'
file_paths = glob.glob(messages_path, recursive=True)
total = len(file_paths)

i = 0
for message_path in file_paths:
i += 1
root, ext = os.path.splitext(message_path)
attributes_path = f'{root}.attr'

try:
with open(message_path, 'r', encoding='utf-8') as f:
payload = f.read().encode('utf-8')
with open(attributes_path, 'r', encoding='utf-8') as f:
attributes = json.load(f)
except Exception:
continue

# Ignore validation messages
if attributes.get('subFolder') == 'validation':
continue

attributes.pop('wasBase64', None)

message_descriptor = '/'.join(message_path.rsplit('/', 2)[1:])
future = publisher.publish(topic_path, payload, **attributes)
print(f'published {i}/{total} {message_descriptor} ({future.result()})')
time.sleep(0.1) # avoid a message burst which can cause validator issues

if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions misc/replay_validated/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
google-api-core
google-api-python-client
google-auth
google-cloud-bigquery
google-cloud-pubsub

0 comments on commit 4d6fe8e

Please sign in to comment.