diff --git a/data_gateway/__init__.py b/data_gateway/__init__.py index 5636c833..1cccc635 100644 --- a/data_gateway/__init__.py +++ b/data_gateway/__init__.py @@ -3,3 +3,14 @@ __all__ = ("exceptions",) MICROPHONE_SENSOR_NAME = "Mics" + + +def stop_gateway(logger, stop_signal): + """Stop the gateway's multiple processes by sending the stop signal. + + :param logging.Logger logger: a logger to log that the stop signal has been sent + :param multiprocessing.Value stop_signal: a value of 0 means don't stop; a value of 1 means stop + :return None: + """ + logger.info("Sending stop signal.") + stop_signal.value = 1 diff --git a/data_gateway/cli.py b/data_gateway/cli.py index 4d26224f..7e1efdd0 100644 --- a/data_gateway/cli.py +++ b/data_gateway/cli.py @@ -1,25 +1,23 @@ import json -import logging +import multiprocessing import os -import time import click import pkg_resources import requests -import serial from requests import HTTPError from slugify import slugify -from data_gateway.configuration import Configuration -from data_gateway.dummy_serial import DummySerial -from data_gateway.exceptions import DataMustBeSavedError, WrongNumberOfSensorCoordinatesError -from data_gateway.routine import Routine +from data_gateway.data_gateway import DataGateway +from data_gateway.exceptions import WrongNumberOfSensorCoordinatesError SUPERVISORD_PROGRAM_NAME = "AerosenseGateway" CREATE_INSTALLATION_CLOUD_FUNCTION_URL = "https://europe-west6-aerosense-twined.cloudfunctions.net/create-installation" -logger = logging.getLogger(__name__) +global_cli_context = {} + +logger = multiprocessing.get_logger() @click.group(context_settings={"help_option_names": ["-h", "--help"]}) @@ -44,15 +42,16 @@ def gateway_cli(logger_uri, log_level): """ from octue.log_handlers import apply_log_handler, get_remote_handler - # Apply log handler locally. - apply_log_handler(log_level=log_level.upper(), include_thread_name=True) + # Store log level to apply to multi-processed logger in `DataGateway` in the `start` command. + global_cli_context["log_level"] = log_level.upper() # Stream logs to remote handler if required. if logger_uri is not None: apply_log_handler( + logger=logger, handler=get_remote_handler(logger_uri=logger_uri), - log_level=log_level.upper(), - include_thread_name=True, + log_level=global_cli_context["log_level"], + include_process_name=True, ) @@ -165,87 +164,24 @@ def start( nodes/sensors via the serial port by typing them into stdin and pressing enter. These commands are: [startBaros, startMics, startIMU, getBattery, stop]. """ - import sys - import threading - - from data_gateway.packet_reader import PacketReader - - if not save_locally and no_upload_to_cloud: - raise DataMustBeSavedError( - "Data from the gateway must either be saved locally or uploaded to the cloud. Please adjust the CLI " - "options provided." - ) - - config = _load_configuration(configuration_path=config_file) - config.session_data["label"] = label - - serial_port = _get_serial_port(serial_port, configuration=config, use_dummy_serial_port=use_dummy_serial_port) - routine = _load_routine(routine_path=routine_file, interactive=interactive, serial_port=serial_port) - output_directory = _update_and_create_output_directory(output_directory_path=output_dir) - - # Start a new thread to parse the serial data while the main thread stays ready to take in commands from stdin. - packet_reader = PacketReader( + data_gateway = DataGateway( + serial_port=serial_port, + configuration_path=config_file, + routine_path=routine_file, save_locally=save_locally, upload_to_cloud=not no_upload_to_cloud, - output_directory=output_directory, + interactive=interactive, + output_directory=output_dir, window_size=window_size, project_name=gcp_project_name, bucket_name=gcp_bucket_name, - configuration=config, + label=label, save_csv_files=save_csv_files, + use_dummy_serial_port=use_dummy_serial_port, + log_level=global_cli_context["log_level"], ) - logger.info("Starting packet reader.") - - if not no_upload_to_cloud: - logger.info("Files will be uploaded to cloud storage at intervals of %s seconds.", window_size) - - if save_locally: - logger.info( - "Files will be saved locally to disk at %r at intervals of %s seconds.", - os.path.join(packet_reader.output_directory, packet_reader.session_subdirectory), - window_size, - ) - - # Start packet reader in a separate thread so commands can be sent to it in real time in interactive mode or by a - # routine. - reader_thread = threading.Thread(target=packet_reader.read_packets, args=(serial_port,), daemon=True) - reader_thread.setName("ReaderThread") - reader_thread.start() - - try: - if interactive: - # Keep a record of the commands given. - commands_record_file = os.path.join( - packet_reader.output_directory, packet_reader.session_subdirectory, "commands.txt" - ) - - os.makedirs(os.path.join(packet_reader.output_directory, packet_reader.session_subdirectory), exist_ok=True) - - while not packet_reader.stop: - for line in sys.stdin: - - with open(commands_record_file, "a") as f: - f.write(line) - - if line.startswith("sleep") and line.endswith("\n"): - time.sleep(int(line.split(" ")[-1].strip())) - elif line == "stop\n": - packet_reader.stop = True - break - - # Send the command to the node - serial_port.write(line.encode("utf_8")) - - else: - if routine is not None: - routine.run() - - except KeyboardInterrupt: - packet_reader.stop = True - - logger.info("Stopping gateway.") - packet_reader.writer.force_persist() + data_gateway.start() @gateway_cli.command() @@ -331,92 +267,5 @@ def supervisord_conf(config_file): return 0 -def _load_configuration(configuration_path): - """Load a configuration from the path if it exists, otherwise load the default configuration. - - :param str configuration_path: - :return data_gateway.configuration.Configuration: - """ - if os.path.exists(configuration_path): - with open(configuration_path) as f: - configuration = Configuration.from_dict(json.load(f)) - - logger.info("Loaded configuration file from %r.", configuration_path) - return configuration - - configuration = Configuration() - logger.info("No configuration file provided - using default configuration.") - return configuration - - -def _get_serial_port(serial_port, configuration, use_dummy_serial_port): - """Get the serial port or a dummy serial port if specified. - - :param str serial_port: - :param data_gateway.configuration.Configuration configuration: - :param bool use_dummy_serial_port: - :return serial.Serial: - """ - if not use_dummy_serial_port: - serial_port = serial.Serial(port=serial_port, baudrate=configuration.baudrate) - else: - serial_port = DummySerial(port=serial_port, baudrate=configuration.baudrate) - - # The buffer size can only be set on Windows. - if os.name == "nt": - serial_port.set_buffer_size( - rx_size=configuration.serial_buffer_rx_size, - tx_size=configuration.serial_buffer_tx_size, - ) - else: - logger.warning("Serial port buffer size can only be set on Windows.") - - return serial_port - - -def _load_routine(routine_path, interactive, serial_port): - """Load a sensor commands routine from the path if exists, otherwise return no routine. If in interactive mode, the - routine file is ignored. Note that "\n" has to be added to the end of each command sent to the serial port for it to - be executed - this is done automatically in this method. - - :param str routine_path: - :param bool interactive: - :param serial.Serial serial_port: - :return data_gateway.routine.Routine|None: - """ - if os.path.exists(routine_path): - if interactive: - logger.warning("Sensor command routine files are ignored in interactive mode.") - return - else: - with open(routine_path) as f: - routine = Routine( - **json.load(f), - action=lambda command: serial_port.write((command + "\n").encode("utf_8")), - ) - - logger.info("Loaded routine file from %r.", routine_path) - return routine - - logger.info( - "No routine file found at %r - no commands will be sent to the sensors unless given in interactive mode.", - routine_path, - ) - - -def _update_and_create_output_directory(output_directory_path): - """Set the output directory to a path relative to the current directory if the path does not start with "/" and - create it if it does not already exist. - - :param str output_directory_path: - :return str: - """ - if not output_directory_path.startswith("/"): - output_directory_path = os.path.join(".", output_directory_path) - - os.makedirs(output_directory_path, exist_ok=True) - return output_directory_path - - if __name__ == "__main__": gateway_cli() diff --git a/data_gateway/data_gateway.py b/data_gateway/data_gateway.py new file mode 100644 index 00000000..32a957be --- /dev/null +++ b/data_gateway/data_gateway.py @@ -0,0 +1,269 @@ +import json +import logging +import multiprocessing +import os +import sys +import threading +import time + +import serial +from octue.log_handlers import apply_log_handler + +from data_gateway import stop_gateway +from data_gateway.configuration import Configuration +from data_gateway.dummy_serial import DummySerial +from data_gateway.exceptions import DataMustBeSavedError +from data_gateway.packet_reader import PacketReader +from data_gateway.routine import Routine + + +logger = multiprocessing.get_logger() +apply_log_handler(logger=logger, include_process_name=True) + +# Ignore logs from the dummy serial port. +logging.getLogger("data_gateway.dummy_serial.dummy_serial").setLevel(logging.WARNING) + + +class DataGateway: + """A class for running the data gateway to collect wind turbine sensor data. The gateway is run as three processes: + 1. The `MainProcess` process, which starts the other two processes and sends commands to the serial port (via a + separate thread) interactively or through a routine + 2. The `Reader` process, which reads packets from the serial port and puts them on a queue + 3. The `Parser` process, which takes packets off the queue, parses them, and persists them + + All processes and threads are stopped and any data in the current window is persisted if: + - A "stop" signal is sent as a command interactively or in a routine + - An error is raised in any process or thread + - A `KeyboardInterrupt` is raised (i.e. the user presses `Ctrl + C`) + - No more data is received by the `Parser` process after `stop_when_no_more_data_after` seconds (if it is set in the + `DataGateway.run` method) + + :param str|serial.Serial serial_port: the name of the serial port or a `serial.Serial` instance to read from + :param str configuration_path: the path to a JSON configuration file for the packet reader + :param str routine_path: the path to a JSON routine file containing sensor commands to be run automatically + :param bool save_locally: if `True`, save data windows to disk locally + :param bool upload_to_cloud: if `True`, upload data windows to Google Cloud Storage + :param bool interactive: if `True`, allow commands entered into `stdin` to be sent to the sensors in real time + :param str output_directory: the name of the directory in which to save data in the cloud bucket or local file system + :param float window_size: the period in seconds at which data is persisted + :param str|None project_name: the name of the Google Cloud project to upload to + :param str|None bucket_name: the name of the Google Cloud bucket to upload to + :param str|None label: a label to be associated with the data collected in this run of the data gateway + :param bool save_csv_files: if `True`, also save windows locally as CSV files for debugging + :param bool use_dummy_serial_port: if `True` use a dummy serial port for testing + :return None: + """ + + def __init__( + self, + serial_port, + configuration_path="config.json", + routine_path="routine.json", + save_locally=False, + upload_to_cloud=True, + interactive=False, + output_directory="data_gateway", + window_size=600, + project_name=None, + bucket_name=None, + label=None, + save_csv_files=False, + use_dummy_serial_port=False, + log_level=logging.INFO, + ): + # Set multiprocessed logger level. + logger.setLevel(log_level) + for handler in logger.handlers: + handler.setLevel(log_level) + + if not save_locally and not upload_to_cloud: + raise DataMustBeSavedError( + "Data from the gateway must either be saved locally or uploaded to the cloud. Please adjust the " + "parameters provided." + ) + + self.interactive = interactive + + packet_reader_configuration = self._load_configuration(configuration_path=configuration_path) + packet_reader_configuration.session_data["label"] = label + + self.serial_port = self._get_serial_port( + serial_port, + configuration=packet_reader_configuration, + use_dummy_serial_port=use_dummy_serial_port, + ) + + self.packet_reader = PacketReader( + save_locally=save_locally, + upload_to_cloud=upload_to_cloud, + output_directory=output_directory, + window_size=window_size, + project_name=project_name, + bucket_name=bucket_name, + configuration=packet_reader_configuration, + save_csv_files=save_csv_files, + ) + + self.routine = self._load_routine(routine_path=routine_path) + + def start(self, stop_when_no_more_data_after=False): + """Begin reading and persisting data from the serial port for the sensors at the installation defined in + the configuration. In interactive mode, commands can be sent to the nodes/sensors via the serial port by typing + them into `stdin` and pressing enter. These commands are: [startBaros, startMics, startIMU, getBattery, stop]. + + :param float|bool stop_when_no_more_data_after: the number of seconds after receiving no data to stop the gateway (mainly for testing); if `False`, no limit is applied + :return None: + """ + packet_queue = multiprocessing.Queue() + stop_signal = multiprocessing.Value("i", 0) + + reader_process = multiprocessing.Process( + name="Reader", + target=self.packet_reader.read_packets, + kwargs={ + "serial_port": self.serial_port, + "packet_queue": packet_queue, + "stop_signal": stop_signal, + }, + daemon=True, + ) + + parser_process = multiprocessing.Process( + name="Parser", + target=self.packet_reader.parse_packets, + kwargs={ + "packet_queue": packet_queue, + "stop_signal": stop_signal, + "stop_when_no_more_data_after": stop_when_no_more_data_after, + }, + daemon=True, + ) + + reader_process.start() + parser_process.start() + + if self.interactive: + interactive_commands_thread = threading.Thread( + name="InteractiveCommandsThread", + target=self._send_commands_from_stdin_to_sensors, + kwargs={"stop_signal": stop_signal}, + daemon=True, + ) + + interactive_commands_thread.start() + + elif self.routine is not None: + routine_thread = threading.Thread( + name="RoutineCommandsThread", + target=self.routine.run, + kwargs={"stop_signal": stop_signal}, + daemon=True, + ) + routine_thread.start() + + # Wait for the stop signal before exiting. + while stop_signal.value == 0: + time.sleep(5) + + def _load_configuration(self, configuration_path): + """Load a configuration from the path if it exists; otherwise load the default configuration. + + :param str configuration_path: path to the configuration JSON file + :return data_gateway.configuration.Configuration: + """ + if os.path.exists(configuration_path): + with open(configuration_path) as f: + configuration = Configuration.from_dict(json.load(f)) + + logger.info("Loaded configuration file from %r.", configuration_path) + return configuration + + configuration = Configuration() + logger.info("No configuration file provided - using default configuration.") + return configuration + + def _get_serial_port(self, serial_port, configuration, use_dummy_serial_port): + """Get the serial port or a dummy serial port if specified. If a serial port instance is provided, return that + as the serial port to use. + + :param str|serial.Serial serial_port: the name of a serial port or a `serial.Serial` instance + :param data_gateway.configuration.Configuration configuration: the packet reader configuration + :param bool use_dummy_serial_port: if `True`, use a dummy serial port instead + :return serial.Serial|data_gateway.dummy_serial.DummySerial: + """ + if isinstance(serial_port, str): + if not use_dummy_serial_port: + serial_port = serial.Serial(port=serial_port, baudrate=configuration.baudrate) + else: + serial_port = DummySerial(port=serial_port, baudrate=configuration.baudrate) + + # The buffer size can only be set on Windows. + if os.name == "nt": + serial_port.set_buffer_size( + rx_size=configuration.serial_buffer_rx_size, + tx_size=configuration.serial_buffer_tx_size, + ) + else: + logger.debug("Serial port buffer size can only be set on Windows.") + + return serial_port + + def _load_routine(self, routine_path): + """Load a sensor commands routine from the path if it exists, otherwise return no routine. If in interactive + mode, the routine file is ignored. Note that "\n" has to be added to the end of each command sent to the serial + port for it to be executed - this is done automatically in this method. + + :param str routine_path: the path to the JSON routine file + :return data_gateway.routine.Routine|None: a sensor routine instance + """ + if os.path.exists(routine_path): + if self.interactive: + logger.warning("Sensor command routine files are ignored in interactive mode.") + return + + with open(routine_path) as f: + routine = Routine( + **json.load(f), + action=lambda command: self.serial_port.write((command + "\n").encode("utf_8")), + ) + + logger.info("Loaded routine file from %r.", routine_path) + return routine + + if not self.interactive: + logger.warning( + "No routine was provided and interactive mode is off - no commands will be sent to the sensors in this " + "session." + ) + + def _send_commands_from_stdin_to_sensors(self, stop_signal): + """Send commands from `stdin` to the sensors until the "stop" command is received or the packet reader is + otherwise stopped. A record is kept of the commands sent to the sensors as a text file in the session + subdirectory. Available commands: [startBaros, startMics, startIMU, getBattery, stop]. + + :return None: + """ + commands_record_file = os.path.join(self.packet_reader.local_output_directory, "commands.txt") + + try: + while stop_signal.value == 0: + for line in sys.stdin: + with open(commands_record_file, "a") as f: + f.write(line) + + # The `sleep` command is mainly for facilitating testing. + if line.startswith("sleep") and line.endswith("\n"): + time.sleep(int(line.split(" ")[-1].strip())) + continue + + if line == "stop\n": + self.serial_port.write(line.encode("utf_8")) + stop_gateway(logger, stop_signal) + break + + # Send the command to the node. + self.serial_port.write(line.encode("utf_8")) + + except Exception as e: + stop_gateway(logger, stop_signal) + raise e diff --git a/data_gateway/packet_reader.py b/data_gateway/packet_reader.py index e0dc3334..aa0f2c0b 100644 --- a/data_gateway/packet_reader.py +++ b/data_gateway/packet_reader.py @@ -1,31 +1,36 @@ import datetime import json -import logging +import multiprocessing import os import queue import struct -import threading from octue.cloud import storage -from data_gateway import MICROPHONE_SENSOR_NAME, exceptions +from data_gateway import MICROPHONE_SENSOR_NAME, exceptions, stop_gateway from data_gateway.configuration import Configuration -from data_gateway.persistence import BatchingFileWriter, BatchingUploader, NoOperationContextManager +from data_gateway.persistence import ( + DEFAULT_OUTPUT_DIRECTORY, + BatchingFileWriter, + BatchingUploader, + NoOperationContextManager, +) -logger = logging.getLogger(__name__) +logger = multiprocessing.get_logger() class PacketReader: - """A serial port packet reader. + """A serial port packet reader. Note that timestamp synchronisation is unavailable with the current sensor hardware + so the system clock is used instead. :param bool save_locally: save data windows locally :param bool upload_to_cloud: upload data windows to Google cloud - :param str|None output_directory: - :param float window_size: length of time window in seconds + :param str|None output_directory: the directory in which to save data in the cloud bucket or local file system + :param float window_size: the period in seconds at which data is persisted. :param str|None project_name: name of Google Cloud project to upload to - :param str|None bucket_name: name of Google Cloud project to upload to - :param data_gateway.configuration.Configuration|None configuration: + :param str|None bucket_name: name of Google Cloud bucket to upload to + :param data_gateway.configuration.Configuration|None configuration: the configuration for reading and parsing data :param bool save_csv_files: save sensor data to .csv when in interactive mode :return None: """ @@ -34,7 +39,7 @@ def __init__( self, save_locally, upload_to_cloud, - output_directory=None, + output_directory=DEFAULT_OUTPUT_DIRECTORY, window_size=600, project_name=None, bucket_name=None, @@ -43,48 +48,98 @@ def __init__( ): self.save_locally = save_locally self.upload_to_cloud = upload_to_cloud - self.output_directory = output_directory + self.session_subdirectory = str(hash(datetime.datetime.now()))[1:7] + + self.cloud_output_directory = storage.path.join(output_directory, self.session_subdirectory) + self.local_output_directory = os.path.abspath(os.path.join(output_directory, self.session_subdirectory)) + os.makedirs(self.local_output_directory, exist_ok=True) + + self.window_size = window_size + self.project_name = project_name + self.bucket_name = bucket_name self.config = configuration or Configuration() + self.save_csv_files = save_csv_files + + self.uploader = None + self.writer = None self.handles = self.config.default_handles self.sleep = False - self.stop = False self.sensor_time_offset = None - self.session_subdirectory = str(hash(datetime.datetime.now()))[1:7] - logger.warning("Timestamp synchronisation unavailable with current hardware; defaulting to using system clock.") + def read_packets(self, serial_port, packet_queue, stop_signal): + """Read packets from a serial port and send them to the parser thread for processing and persistence. + + :param serial.Serial serial_port: name of serial port to read from + :param queue.Queue packet_queue: a thread-safe queue to put packets on to for the parser thread to pick up + :return None: + """ + try: + logger.info("Packet reader process started.") + + while stop_signal.value == 0: + serial_data = serial_port.read() + + if len(serial_data) == 0: + continue + + if serial_data[0] != self.config.packet_key: + continue + + packet_type = str(int.from_bytes(serial_port.read(), self.config.endian)) + length = int.from_bytes(serial_port.read(), self.config.endian) + packet = serial_port.read(length) - if upload_to_cloud: + if packet_type == str(self.config.type_handle_def): + self.update_handles(packet) + continue + + # Check for bytes in serial input buffer. A full buffer results in overflow. + if serial_port.in_waiting == self.config.serial_buffer_rx_size: + logger.warning("Serial port buffer is full - buffer overflow may occur, resulting in data loss.") + continue + + packet_queue.put({"packet_type": packet_type, "packet": packet}) + + except KeyboardInterrupt: + pass + + finally: + stop_gateway(logger, stop_signal) + + def parse_packets(self, packet_queue, stop_signal, stop_when_no_more_data_after=False): + """Get packets from a thread-safe packet queue, check if a full payload has been received (i.e. correct length) + with the correct packet type handle, then parse the payload. After parsing/processing, upload them to Google + Cloud storage and/or write them to disk. If any errors are raised, put them on the error queue for the main + thread to handle. + + :param queue.Queue packet_queue: a thread-safe queue of packets provided by a reader thread + :param float|bool stop_when_no_more_data_after: the number of seconds after receiving no data to stop the gateway (mainly for testing); if `False`, no limit is applied + :return None: + """ + logger.info("Packet parser process started.") + + if self.upload_to_cloud: self.uploader = BatchingUploader( sensor_names=self.config.sensor_names, - project_name=project_name, - bucket_name=bucket_name, - window_size=window_size, - session_subdirectory=self.session_subdirectory, - output_directory=output_directory, + project_name=self.project_name, + bucket_name=self.bucket_name, + window_size=self.window_size, + output_directory=self.cloud_output_directory, metadata={"data_gateway__configuration": self.config.to_dict()}, ) else: self.uploader = NoOperationContextManager() - if save_locally: + if self.save_locally: self.writer = BatchingFileWriter( sensor_names=self.config.sensor_names, - window_size=window_size, - session_subdirectory=self.session_subdirectory, - output_directory=output_directory, - save_csv_files=save_csv_files, + window_size=self.window_size, + output_directory=self.local_output_directory, + save_csv_files=self.save_csv_files, ) else: self.writer = NoOperationContextManager() - def read_packets(self, serial_port, stop_when_no_more_data=False): - """Read packets from a serial port and send them to a separate thread that will parse and upload them to Google - Cloud storage and/or write them to disk. - - :param serial.Serial serial_port: name of serial port to read from - :param bool stop_when_no_more_data: stop reading when no more data is received from the port (for testing) - :return None: - """ self._persist_configuration() previous_timestamp = {} @@ -97,60 +152,58 @@ def read_packets(self, serial_port, stop_when_no_more_data=False): for _ in range(self.config.number_of_sensors[sensor_name]) ] - with self.uploader: - with self.writer: - packet_queue = queue.Queue() - error_queue = queue.Queue() + if stop_when_no_more_data_after is False: + timeout = 5 + else: + timeout = stop_when_no_more_data_after - parser_thread = threading.Thread( - target=self._parse_payload, - kwargs={"packet_queue": packet_queue, "error_queue": error_queue}, - daemon=True, - ) + try: + with self.uploader: + with self.writer: + while stop_signal.value == 0: + try: + packet_type, packet = packet_queue.get(timeout=timeout).values() + except queue.Empty: + if stop_when_no_more_data_after is not False: + break + continue + + if packet_type not in self.handles: + logger.error("Received packet with unknown type: %s", packet_type) + continue + + if len(packet) == 244: # If the full data payload is received, proceed parsing it + timestamp = int.from_bytes(packet[240:244], self.config.endian, signed=False) / (2 ** 16) + + data, sensor_names = self._parse_sensor_packet_data( + packet_type=self.handles[packet_type], + payload=packet, + data=data, + ) - parser_thread.setName("ParserThread") - parser_thread.start() - - while not self.stop: - if not error_queue.empty(): - raise error_queue.get() - - serial_data = serial_port.read() - - if len(serial_data) == 0: - if stop_when_no_more_data: - break - continue - - if serial_data[0] != self.config.packet_key: - continue - - packet_type = str(int.from_bytes(serial_port.read(), self.config.endian)) - length = int.from_bytes(serial_port.read(), self.config.endian) - payload = serial_port.read(length) - - if packet_type == str(self.config.type_handle_def): - self.update_handles(payload) - continue - - # Check for bytes in serial input buffer. A full buffer results in overflow. - if serial_port.in_waiting == self.config.serial_buffer_rx_size: - logger.warning( - "Buffer is full: %d bytes waiting. Re-opening serial port, to avoid overflow", - serial_port.in_waiting, - ) - serial_port.close() - serial_port.open() - continue - - packet_queue.put( - { - "packet_type": packet_type, - "payload": payload, - "data": data, - "previous_timestamp": previous_timestamp, - } - ) + for sensor_name in sensor_names: + self._check_for_packet_loss(sensor_name, timestamp, previous_timestamp) + + self._timestamp_and_persist_data( + data=data, + sensor_name=sensor_name, + timestamp=timestamp, + period=self.config.period[sensor_name], + ) + + elif len(packet) >= 1 and self.handles[packet_type] in [ + "Mic 1", + "Cmd Decline", + "Sleep State", + "Info Message", + ]: + self._parse_info_packet(self.handles[packet_type], packet) + + except KeyboardInterrupt: + pass + + finally: + stop_gateway(logger, stop_signal) def update_handles(self, payload): """Update the Bluetooth handles object. Handles are updated every time a new Bluetooth connection is @@ -182,7 +235,7 @@ def update_handles(self, payload): logger.info("Successfully updated handles.") return - logger.error("Handle error: %s %s", start_handle, end_handle) + logger.error("Handle error: start handle is %s, end handle is %s.", start_handle, end_handle) def _persist_configuration(self): """Persist the configuration to disk and/or cloud storage. @@ -192,59 +245,16 @@ def _persist_configuration(self): configuration_dictionary = self.config.to_dict() if self.save_locally: - with open( - os.path.abspath(os.path.join(self.output_directory, self.session_subdirectory, "configuration.json")), - "w", - ) as f: + with open(os.path.join(self.local_output_directory, "configuration.json"), "w") as f: json.dump(configuration_dictionary, f) if self.upload_to_cloud: self.uploader.client.upload_from_string( string=json.dumps(configuration_dictionary), bucket_name=self.uploader.bucket_name, - path_in_bucket=storage.path.join( - self.output_directory, self.session_subdirectory, "configuration.json" - ), + path_in_bucket=storage.path.join(self.cloud_output_directory, "configuration.json"), ) - def _parse_payload(self, packet_queue, error_queue): - """Get packets from a thread-safe packet queue, check if a full payload has been received (i.e. correct length) - with the correct packet type handle, then parse the payload. After parsing/processing, upload them to Google - Cloud storage and/or write them to disk. If any errors are raised, put them on the error queue for the reader - thread to handle. - - :param queue.Queue packet_queue: a thread-safe queue of packets provided by the reader thread - :param queue.Queue error_queue: a thread-safe queue to put any exceptions on to for the reader thread to handle - :return None: - """ - try: - while not self.stop: - packet_type, payload, data, previous_timestamp = packet_queue.get().values() - - if packet_type not in self.handles: - logger.error("Received packet with unknown type: %s", packet_type) - raise exceptions.UnknownPacketTypeError("Received packet with unknown type: {}".format(packet_type)) - - if len(payload) == 244: # If the full data payload is received, proceed parsing it - timestamp = int.from_bytes(payload[240:244], self.config.endian, signed=False) / (2 ** 16) - - data, sensor_names = self._parse_sensor_packet_data(self.handles[packet_type], payload, data) - - for sensor_name in sensor_names: - self._check_for_packet_loss(sensor_name, timestamp, previous_timestamp) - self._timestamp_and_persist_data(data, sensor_name, timestamp, self.config.period[sensor_name]) - - elif len(payload) >= 1 and self.handles[packet_type] in [ - "Mic 1", - "Cmd Decline", - "Sleep State", - "Info Message", - ]: - self._parse_info_packet(self.handles[packet_type], payload) - - except Exception as e: - error_queue.put(e) - def _parse_sensor_packet_data(self, packet_type, payload, data): """Parse sensor data type payloads. @@ -451,7 +461,7 @@ def _check_for_packet_loss(self, sensor_name, timestamp, previous_timestamp): return if previous_timestamp[sensor_name] == -1: - logger.info("Received first %s packet" % sensor_name) + logger.info("Received first %s packet." % sensor_name) else: expected_current_timestamp = ( previous_timestamp[sensor_name] diff --git a/data_gateway/persistence.py b/data_gateway/persistence.py index 58f966f7..0409e1ea 100644 --- a/data_gateway/persistence.py +++ b/data_gateway/persistence.py @@ -2,7 +2,7 @@ import copy import csv import json -import logging +import multiprocessing import os import time @@ -11,8 +11,7 @@ from octue.utils.persistence import calculate_disk_usage, get_oldest_file_in_directory -logger = logging.getLogger(__name__) - +logger = multiprocessing.get_logger() DEFAULT_OUTPUT_DIRECTORY = "data_gateway" @@ -30,32 +29,32 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): pass + def force_persist(self): + """Do nothing. + + :return None: + """ + pass + class TimeBatcher: """A batcher that groups the given data into time windows. :param iter(str) sensor_names: names of sensors to group data for :param float window_size: length of time window in seconds - :param str session_subdirectory: directory within output directory to persist into :param str output_directory: directory to write windows to :return None: """ - def __init__( - self, - sensor_names, - window_size, - session_subdirectory, - output_directory=DEFAULT_OUTPUT_DIRECTORY, - ): + _file_prefix = "window" + + def __init__(self, sensor_names, window_size, output_directory=DEFAULT_OUTPUT_DIRECTORY): self.current_window = {"sensor_time_offset": None, "sensor_data": {name: [] for name in sensor_names}} self.window_size = window_size self.output_directory = output_directory self.ready_window = {"sensor_time_offset": None, "sensor_data": {}} - self._session_subdirectory = session_subdirectory self._start_time = time.perf_counter() self._window_number = 0 - self._file_prefix = "window" def __enter__(self): return self @@ -64,7 +63,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.force_persist() def add_to_current_window(self, sensor_name, data): - """Add serialised data (a string) to the current window for the given sensor name. + """Add data to the current window for the given sensor name. :param str sensor_name: name of sensor :param iter data: data to add to window @@ -130,8 +129,7 @@ class BatchingFileWriter(TimeBatcher): """A file writer that groups the given into time windows, saving each window to disk. :param iter(str) sensor_names: names of sensors to make windows for - :param float window_size: :param float window_size: length of time window in seconds - :param str session_subdirectory: directory within output directory to persist into + :param float window_size: length of time window in seconds :param str output_directory: directory to write windows to :param int storage_limit: storage limit in bytes (default is 1 GB) :return None: @@ -141,15 +139,15 @@ def __init__( self, sensor_names, window_size, - session_subdirectory, save_csv_files=False, output_directory=DEFAULT_OUTPUT_DIRECTORY, storage_limit=1024 ** 3, ): self._save_csv_files = save_csv_files self.storage_limit = storage_limit - super().__init__(sensor_names, window_size, session_subdirectory, output_directory) - os.makedirs(os.path.join(self.output_directory, self._session_subdirectory), exist_ok=True) + super().__init__(sensor_names, window_size, output_directory) + os.makedirs(self.output_directory, exist_ok=True) + logger.info("Windows will be saved to %r at intervals of %s seconds.", self.output_directory, self.window_size) def _persist_window(self, window=None): """Write a window of serialised data to disk, deleting the oldest window first if the storage limit has been @@ -160,17 +158,17 @@ def _persist_window(self, window=None): """ self._manage_storage() window = window or self.ready_window - window_path = os.path.abspath(os.path.join(".", self._generate_window_path())) + window_path = self._generate_window_path() with open(window_path, "w") as f: json.dump(window, f) - logger.info(f"{self._file_prefix.capitalize()} {self._window_number} written to disk.") + logger.info("%s %d written to disk.", self._file_prefix.capitalize(), self._window_number) if self._save_csv_files: for sensor in window["sensor_data"]: csv_path = os.path.join(os.path.dirname(window_path), f"{sensor}.csv") - logger.info(f"Saving {sensor} data to csv file.") + logger.info("Saving %s data to csv file.", sensor) with open(csv_path, "w", newline="") as f: writer = csv.writer(f, delimiter=",") @@ -182,13 +180,11 @@ def _manage_storage(self): :return None: """ - session_directory = os.path.join(self.output_directory, self._session_subdirectory) - filter = lambda path: os.path.split(path)[-1].startswith("window") # noqa storage_limit_in_mb = self.storage_limit / 1024 ** 2 - if calculate_disk_usage(session_directory, filter) >= self.storage_limit: - oldest_window = get_oldest_file_in_directory(session_directory, filter) + if calculate_disk_usage(self.output_directory, filter) >= self.storage_limit: + oldest_window = get_oldest_file_in_directory(self.output_directory, filter) logger.warning( "Storage limit reached (%s MB) - deleting oldest window (%r).", @@ -198,7 +194,7 @@ def _manage_storage(self): os.remove(oldest_window) - elif calculate_disk_usage(session_directory, filter) >= 0.9 * self.storage_limit: + elif calculate_disk_usage(self.output_directory, filter) >= 0.9 * self.storage_limit: logger.warning("90% of storage limit reached - %s MB remaining.", 0.1 * storage_limit_in_mb) def _generate_window_path(self): @@ -207,7 +203,7 @@ def _generate_window_path(self): :return str: """ filename = f"{self._file_prefix}-{self._window_number}.json" - return os.path.join(self.output_directory, self._session_subdirectory, filename) + return os.path.join(self.output_directory, filename) class BatchingUploader(TimeBatcher): @@ -219,7 +215,6 @@ class BatchingUploader(TimeBatcher): :param str project_name: name of Google Cloud project to upload to :param str bucket_name: name of Google Cloud bucket to upload to :param float window_size: length of time window in seconds - :param str session_subdirectory: directory within output directory to persist into :param str output_directory: directory to write windows to :param float upload_timeout: time after which to give up trying to upload to the cloud :param bool upload_backup_files: attempt to upload backed-up windows on next window upload @@ -232,7 +227,6 @@ def __init__( project_name, bucket_name, window_size, - session_subdirectory, output_directory=DEFAULT_OUTPUT_DIRECTORY, metadata=None, upload_timeout=60, @@ -244,10 +238,13 @@ def __init__( self.metadata = metadata or {} self.upload_timeout = upload_timeout self.upload_backup_files = upload_backup_files - super().__init__(sensor_names, window_size, session_subdirectory, output_directory) + super().__init__(sensor_names, window_size, output_directory) + self._backup_directory = os.path.join(self.output_directory, ".backup") - self._backup_writer = BatchingFileWriter( - sensor_names, window_size, session_subdirectory, output_directory=self._backup_directory + self._backup_writer = BatchingFileWriter(sensor_names, window_size, output_directory=self._backup_directory) + + logger.info( + "Windows will be uploaded to %r at intervals of %s seconds.", self.output_directory, self.window_size ) def _persist_window(self): @@ -275,7 +272,7 @@ def _persist_window(self): self._backup_writer._persist_window(window=self.ready_window) return - logger.info(f"{self._file_prefix.capitalize()} {self._window_number} uploaded to cloud.") + logger.info("%s %d uploaded to cloud.", self._file_prefix.capitalize(), self._window_number) if self.upload_backup_files: self._attempt_to_upload_backup_files() @@ -286,25 +283,20 @@ def _generate_window_path(self): :return str: """ filename = f"{self._file_prefix}-{self._window_number}.json" - return storage.path.join(self.output_directory, self._session_subdirectory, filename) + return storage.path.join(self.output_directory, filename) def _attempt_to_upload_backup_files(self): """Check for backup files and attempt to upload them to cloud storage again. :return None: """ - backup_filenames = os.listdir(os.path.join(self._backup_directory, self._session_subdirectory)) - - if not backup_filenames: - return - - for filename in backup_filenames: + for filename in os.listdir(self._backup_directory): if not filename.startswith(self._file_prefix): continue - local_path = os.path.join(self._backup_directory, self._session_subdirectory, filename) - path_in_bucket = storage.path.join(self.output_directory, self._session_subdirectory, filename) + local_path = os.path.join(self._backup_directory, filename) + path_in_bucket = storage.path.join(self.output_directory, filename) try: self.client.upload_file( diff --git a/data_gateway/routine.py b/data_gateway/routine.py index 90606500..1d874ed1 100644 --- a/data_gateway/routine.py +++ b/data_gateway/routine.py @@ -1,9 +1,11 @@ -import logging +import multiprocessing import sched import time +from data_gateway import stop_gateway -logger = logging.getLogger(__name__) + +logger = multiprocessing.get_logger() class Routine: @@ -38,31 +40,46 @@ def __init__(self, commands, action, period=None, stop_after=None): if self.stop_after: logger.warning("The `stop_after` parameter is ignored unless `period` is also given.") - def run(self): - """Send the commands to the action after the given delays, repeating if a period was given. + def run(self, stop_signal): + """Send the commands to the action after the given delays, repeating if a period was given. The routine will + stop before the next run if the stop signal is received (i.e. if the `stop_signal.value` is set to 1 in another + process). + :param multiprocessing.Value stop_signal: a value of 0 means don't stop; a value of 1 means stop :return None: """ - scheduler = sched.scheduler(time.perf_counter) - start_time = time.perf_counter() + try: + scheduler = sched.scheduler(time.perf_counter) + start_time = time.perf_counter() - while True: - cycle_start_time = time.perf_counter() + while stop_signal.value == 0: + cycle_start_time = time.perf_counter() - for command, delay in self.commands: - scheduler.enter(delay=delay, priority=1, action=self.action, argument=(command,)) + for command, delay in self.commands: + scheduler.enter(delay=delay, priority=1, action=self.action, argument=(command,)) - scheduler.run(blocking=True) + # If a command is "stop", schedule stopping the gateway and then schedule no further commands. + if command == "stop": + scheduler.enter(delay=delay, priority=1, action=stop_gateway, argument=(logger, stop_signal)) + break - if self.period is None: - break + scheduler.run(blocking=True) - elapsed_time = time.perf_counter() - cycle_start_time - time.sleep(self.period - elapsed_time) + if self.period is None: + logger.info("Non-periodic routine finished.") + return - if self.stop_after: - if time.perf_counter() - start_time >= self.stop_after: - break + elapsed_time = time.perf_counter() - cycle_start_time + time.sleep(self.period - elapsed_time) + + if self.stop_after: + if time.perf_counter() - start_time >= self.stop_after: + logger.info("Periodic routine stopped after given timeout of %ss.", self.stop_after) + return + + except Exception as e: + stop_gateway(logger, stop_signal) + raise e def _wrap_action_with_logger(self, action): """Wrap the given action so that when it's run on a command, the command is logged. diff --git a/setup.py b/setup.py index 40bb6332..d6248247 100644 --- a/setup.py +++ b/setup.py @@ -16,12 +16,12 @@ setup( name="data_gateway", - version="0.9.0", + version="0.10.0", install_requires=[ "click>=7.1.2", "pyserial==3.5", "python-slugify==5.0.2", - "octue==0.10.0", + "octue==0.10.5", ], url="https://gitlab.com/windenergie-hsr/aerosense/digital-twin/data-gateway", license="MIT", diff --git a/tests/__init__.py b/tests/__init__.py index b6c30085..ecb96109 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -3,7 +3,7 @@ from data_gateway.configuration import Configuration -apply_log_handler(include_thread_name=True) +apply_log_handler(include_process_name=True) TEST_PROJECT_NAME = "a-project-name" diff --git a/tests/test_cloud_functions/base.py b/tests/test_cloud_functions/base.py deleted file mode 100644 index 3673509e..00000000 --- a/tests/test_cloud_functions/base.py +++ /dev/null @@ -1,40 +0,0 @@ -import json -import os - -from octue.cloud.credentials import GCPCredentialsManager - - -class CredentialsEnvironmentVariableAsFile: - """Temporarily store JSON credentials from the `GOOGLE_APPLICATION_CREDENTIALS` environment variable in a file for - use during the test class's test run. This is useful on GitHub where a file cannot be created for a secret but - tests that require credentials to be present as a file are run. - """ - - credentials_path = "temporary_file.json" - current_google_application_credentials_variable_value = None - - @classmethod - def setUpClass(cls): - """Temporarily write the credentials to a file so that the tests can run on GitHub where the credentials are - only provided as JSON in an environment variable. Set the credentials environment variable to point to this - file instead of the credentials JSON. - - :return None: - """ - cls.current_google_application_credentials_variable_value = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] - - credentials = GCPCredentialsManager().get_credentials(as_dict=True) - - with open(cls.credentials_path, "w") as f: - json.dump(credentials, f) - - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.credentials_path - - @classmethod - def tearDownClass(cls): - """Remove the temporary credentials file and restore the credentials environment variable to its original value. - - :return None: - """ - os.remove(cls.credentials_path) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.current_google_application_credentials_variable_value diff --git a/tests/test_cloud_functions/mocks.py b/tests/test_cloud_functions/mocks.py new file mode 100644 index 00000000..ab4a9d4e --- /dev/null +++ b/tests/test_cloud_functions/mocks.py @@ -0,0 +1,42 @@ +class MockBigQueryClient: + def __init__(self, expected_query_result=None): + self.expected_query_result = expected_query_result + self.rows = None + + def get_table(self, name): + """Do nothing. + + :param str name: + :return None: + """ + pass + + def insert_rows(self, table, rows): + """Store the given rows in the `self.rows` attribute. + + :param str table: + :param list(dict) rows: + :return None: + """ + self.rows = rows + + def query(self, query): + """Return the `self.expected_query_result` attribute in a `MockQueryResult` instance. + + :param str query: + :return MockQueryResult: + """ + self.query = query + return MockQueryResult(result=self.expected_query_result) + + +class MockQueryResult: + def __init__(self, result): + self._result = result + + def result(self): + """Return the `self._result` attribute. + + :return any: + """ + return self._result diff --git a/tests/test_cloud_functions/test_big_query.py b/tests/test_cloud_functions/test_big_query.py index 5ffb92fe..1116e5f1 100644 --- a/tests/test_cloud_functions/test_big_query.py +++ b/tests/test_cloud_functions/test_big_query.py @@ -5,7 +5,7 @@ from tests.base import BaseTestCase from tests.test_cloud_functions import REPOSITORY_ROOT -from tests.test_cloud_functions.base import CredentialsEnvironmentVariableAsFile +from tests.test_cloud_functions.mocks import MockBigQueryClient # Manually add the cloud_functions package to the path (its imports have to be done in a certain way for Google Cloud @@ -18,7 +18,7 @@ ) -class TestBigQueryDataset(CredentialsEnvironmentVariableAsFile, BaseTestCase): +class TestBigQueryDataset(BaseTestCase): def test_insert_sensor_data(self): """Test that sensor data can be sent to BigQuery for insertion.""" data = { @@ -32,18 +32,17 @@ def test_insert_sensor_data(self): "Constat": [[1636559720.639327, 36, 37, 38, 39]], } - with patch("big_query.bigquery.Client.get_table"): - with patch("big_query.bigquery.Client.insert_rows", return_value=None) as mock_insert_rows: + mock_big_query_client = MockBigQueryClient() - BigQueryDataset(project_name="my-project", dataset_name="my-dataset").add_sensor_data( - data=data, - configuration_id="dbfed555-1b70-4191-96cb-c22071464b90", - installation_reference="turbine-1", - label="my-test", - ) + with patch("big_query.bigquery.Client", return_value=mock_big_query_client): + BigQueryDataset(project_name="my-project", dataset_name="my-dataset").add_sensor_data( + data=data, + configuration_id="dbfed555-1b70-4191-96cb-c22071464b90", + installation_reference="turbine-1", + label="my-test", + ) - new_rows = mock_insert_rows.call_args.kwargs["rows"] - self.assertEqual(len(new_rows), 8) + self.assertEqual(len(mock_big_query_client.rows), 8) expected_rows = [ { @@ -112,19 +111,17 @@ def test_insert_sensor_data(self): }, ] - self.assertEqual(new_rows, expected_rows) + self.assertEqual(mock_big_query_client.rows, expected_rows) def test_add_new_sensor_type(self): """Test that new sensor types can be added and that their references are their names slugified.""" - with patch("big_query.bigquery.Client.get_table"): - with patch("big_query.bigquery.Client.insert_rows", return_value=None) as mock_insert_rows: + mock_big_query_client = MockBigQueryClient() - BigQueryDataset(project_name="my-project", dataset_name="my-dataset").add_sensor_type( - name="My sensor_Name" - ) + with patch("big_query.bigquery.Client", return_value=mock_big_query_client): + BigQueryDataset(project_name="my-project", dataset_name="my-dataset").add_sensor_type(name="My sensor_Name") self.assertEqual( - mock_insert_rows.call_args.kwargs["rows"][0], + mock_big_query_client.rows[0], { "reference": "my-sensor-name", "name": "My sensor_Name", @@ -136,20 +133,19 @@ def test_add_new_sensor_type(self): def test_add_installation(self): """Test that installations can be added.""" - with patch("big_query.bigquery.Client.get_table"): - with patch("big_query.bigquery.Client.insert_rows", return_value=None) as mock_insert_rows: - with patch("big_query.bigquery.Client.query", return_value=Mock(result=lambda: [])): - - BigQueryDataset(project_name="my-project", dataset_name="my-dataset").add_installation( - reference="my-installation", - turbine_id="my-turbine", - blade_id="my-blade", - hardware_version="1.0.0", - sensor_coordinates={"my-sensor": [[0, 1, 2], [3, 8, 7]]}, - ) + mock_big_query_client = MockBigQueryClient(expected_query_result=[]) + + with patch("big_query.bigquery.Client", return_value=mock_big_query_client): + BigQueryDataset(project_name="my-project", dataset_name="my-dataset").add_installation( + reference="my-installation", + turbine_id="my-turbine", + blade_id="my-blade", + hardware_version="1.0.0", + sensor_coordinates={"my-sensor": [[0, 1, 2], [3, 8, 7]]}, + ) self.assertEqual( - mock_insert_rows.call_args.kwargs["rows"][0], + mock_big_query_client.rows[0], { "reference": "my-installation", "turbine_id": "my-turbine", @@ -162,9 +158,11 @@ def test_add_installation(self): def test_add_installation_raises_error_if_installation_already_exists(self): """Test that an error is raised if attempting to add an installation that already exists.""" - dataset = BigQueryDataset(project_name="my-project", dataset_name="my-dataset") + mock_big_query_client = MockBigQueryClient(expected_query_result=[1]) + + with patch("big_query.bigquery.Client", return_value=mock_big_query_client): + dataset = BigQueryDataset(project_name="my-project", dataset_name="my-dataset") - with patch("big_query.bigquery.Client.query", return_value=Mock(result=lambda: [1])): with self.assertRaises(InstallationWithSameNameAlreadyExists): dataset.add_installation( reference="my-installation", @@ -176,18 +174,17 @@ def test_add_installation_raises_error_if_installation_already_exists(self): def test_add_configuration(self): """Test that a configuration can be added.""" - with patch("big_query.bigquery.Client.get_table"): - with patch("big_query.bigquery.Client.insert_rows", return_value=None) as mock_insert_rows: - with patch("big_query.bigquery.Client.query", return_value=Mock(result=lambda: [])): + mock_big_query_client = MockBigQueryClient(expected_query_result=[]) - BigQueryDataset(project_name="my-project", dataset_name="my-dataset").add_configuration( - configuration={"blah": "blah", "installation_data": {"stuff": "data"}} - ) + with patch("big_query.bigquery.Client", return_value=mock_big_query_client): + BigQueryDataset(project_name="my-project", dataset_name="my-dataset").add_configuration( + configuration={"blah": "blah", "installation_data": {"stuff": "data"}} + ) - del mock_insert_rows.call_args.kwargs["rows"][0]["id"] + del mock_big_query_client.rows[0]["id"] self.assertEqual( - mock_insert_rows.call_args.kwargs["rows"][0], + mock_big_query_client.rows[0], { "software_configuration": '{"blah": "blah"}', "software_configuration_hash": "a9a553b17102e3f08a1ca32486086cdb8699f8f50c358b0fed8071b1d4c11bb2", @@ -201,12 +198,11 @@ def test_add_configuration_raises_error_if_installation_already_exists(self): existing configuration is returned. """ existing_configuration_id = "0846401a-89fb-424e-89e6-039063e0ee6d" - dataset = BigQueryDataset(project_name="my-project", dataset_name="my-dataset") + mock_big_query_client = MockBigQueryClient(expected_query_result=[Mock(id=existing_configuration_id)]) + + with patch("big_query.bigquery.Client", return_value=mock_big_query_client): + dataset = BigQueryDataset(project_name="my-project", dataset_name="my-dataset") - with patch( - "big_query.bigquery.Client.query", - return_value=Mock(result=lambda: [Mock(id=existing_configuration_id)]), - ): with self.assertRaises(ConfigurationAlreadyExists): configuration_id = dataset.add_configuration( configuration={"blah": "blah", "installation_data": {"stuff": "data"}} diff --git a/tests/test_cloud_functions/test_deployment.py b/tests/test_cloud_functions/test_deployment.py index 95f3ff5c..7e2ed041 100644 --- a/tests/test_cloud_functions/test_deployment.py +++ b/tests/test_cloud_functions/test_deployment.py @@ -13,11 +13,13 @@ @unittest.skipUnless( - condition=os.getenv("RUN_DEPLOYMENT_TESTS", "").lower() == "true", + condition=os.getenv("RUN_DEPLOYMENT_TESTS", "0") == "1", reason="'RUN_DEPLOYMENT_TESTS' environment variable is False or not present.", ) class TestDeployment(unittest.TestCase, DatasetMixin): - storage_client = GoogleCloudStorageClient(os.environ["TEST_PROJECT_NAME"]) + if os.getenv("RUN_DEPLOYMENT_TESTS", "0") == "1": + # The client must be instantiated here to avoid the storage emulator. + storage_client = GoogleCloudStorageClient(os.environ["TEST_PROJECT_NAME"]) def test_clean_and_upload_window(self): """Test that a window can be uploaded to a cloud bucket, its data processed by the test cloud function, and the diff --git a/tests/test_cloud_functions/test_main.py b/tests/test_cloud_functions/test_main.py index 47041b62..92b639d8 100644 --- a/tests/test_cloud_functions/test_main.py +++ b/tests/test_cloud_functions/test_main.py @@ -1,3 +1,4 @@ +import copy import json import os import sys @@ -10,7 +11,6 @@ from tests import TEST_BUCKET_NAME # noqa from tests.base import BaseTestCase # noqa from tests.test_cloud_functions import REPOSITORY_ROOT -from tests.test_cloud_functions.base import CredentialsEnvironmentVariableAsFile # Manually add the cloud_functions package to the path (its imports have to be done in a certain way for Google Cloud @@ -22,7 +22,7 @@ from cloud_functions.window_handler import ConfigurationAlreadyExists # noqa -class TestCleanAndUploadWindow(CredentialsEnvironmentVariableAsFile, BaseTestCase): +class TestCleanAndUploadWindow(BaseTestCase): SOURCE_PROJECT_NAME = "source-project" SOURCE_BUCKET_NAME = TEST_BUCKET_NAME WINDOW = BaseTestCase().random_window(sensors=["Constat"], window_duration=1) @@ -55,11 +55,12 @@ def test_clean_and_upload_window(self): "BIG_QUERY_DATASET_NAME": "blah", }, ): - with patch("window_handler.BigQueryDataset") as mock_dataset: - main.clean_and_upload_window(event=self.MOCK_EVENT, context=self._make_mock_context()) + with patch("big_query.bigquery.Client"): + with patch("window_handler.BigQueryDataset") as mock_dataset: + main.clean_and_upload_window(event=self.MOCK_EVENT, context=self._make_mock_context()) # Check configuration without user data was added. - expected_configuration = self.VALID_CONFIGURATION.copy() + expected_configuration = copy.deepcopy(self.VALID_CONFIGURATION) del expected_configuration["session_data"] self.assertIn("add_configuration", mock_dataset.mock_calls[1][0]) self.assertEqual(mock_dataset.mock_calls[1].args[0], expected_configuration) @@ -93,7 +94,8 @@ def test_clean_and_upload_window_for_existing_configuration(self): side_effect=ConfigurationAlreadyExists("blah", "8b9337d8-40b1-4872-b2f5-b1bfe82b241e"), ): with patch("window_handler.BigQueryDataset.add_sensor_data", return_value=None): - main.clean_and_upload_window(event=self.MOCK_EVENT, context=self._make_mock_context()) + with patch("big_query.bigquery.Client"): + main.clean_and_upload_window(event=self.MOCK_EVENT, context=self._make_mock_context()) @staticmethod def _make_mock_context(): diff --git a/tests/test_data_gateway/__init__.py b/tests/test_data_gateway/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_cli.py b/tests/test_data_gateway/test_cli.py similarity index 82% rename from tests/test_cli.py rename to tests/test_data_gateway/test_cli.py index d0085ce1..170f210f 100644 --- a/tests/test_cli.py +++ b/tests/test_data_gateway/test_cli.py @@ -1,6 +1,7 @@ import json import os import tempfile +import time from unittest import mock from unittest.mock import call @@ -8,13 +9,14 @@ from click.testing import CliRunner from data_gateway.cli import CREATE_INSTALLATION_CLOUD_FUNCTION_URL, gateway_cli +from data_gateway.configuration import Configuration from data_gateway.dummy_serial import DummySerial from data_gateway.exceptions import DataMustBeSavedError from tests import LENGTH, PACKET_KEY, RANDOM_BYTES from tests.base import BaseTestCase -CONFIGURATION_PATH = os.path.join(os.path.dirname(__file__), "valid_configuration.json") +CONFIGURATION_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "valid_configuration.json") class EnvironmentVariableRemover: @@ -74,6 +76,7 @@ def test_start(self): "start", "--interactive", "--save-locally", + "--no-upload-to-cloud", "--use-dummy-serial-port", f"--output-dir={temporary_directory}", ], @@ -83,28 +86,6 @@ def test_start(self): self.assertIsNone(result.exception) self.assertEqual(result.exit_code, 0) - def test_start_with_default_output_directory(self): - """Ensure the gateway can be started via the CLI with a default output directory.""" - initial_directory = os.getcwd() - - with tempfile.TemporaryDirectory() as temporary_directory: - os.chdir(temporary_directory) - - result = CliRunner().invoke( - gateway_cli, - [ - "start", - "--interactive", - "--save-locally", - "--use-dummy-serial-port", - ], - input="sleep 2\nstop\n", - ) - - self.assertIsNone(result.exception) - self.assertEqual(result.exit_code, 0) - os.chdir(initial_directory) - def test_commands_are_recorded_in_interactive_mode(self): """Ensure commands given in interactive mode are recorded.""" with EnvironmentVariableRemover("GOOGLE_APPLICATION_CREDENTIALS"): @@ -160,7 +141,7 @@ def test_with_routine(self): def test_log_level_can_be_set(self): """Test that the log level can be set.""" with tempfile.TemporaryDirectory() as temporary_directory: - with self.assertLogs(level="DEBUG") as mock_logger: + with mock.patch("octue.log_handlers.logging.StreamHandler.emit") as mock_emit: result = CliRunner().invoke( gateway_cli, [ @@ -180,8 +161,8 @@ def test_log_level_can_be_set(self): debug_message_found = False - for message in mock_logger.output: - if "DEBUG" in message: + for record in mock_emit.call_args_list: + if record.args[0].levelname == "DEBUG": debug_message_found = True break @@ -191,26 +172,21 @@ def test_start_and_stop_in_interactive_mode(self): """Ensure the gateway can be started and stopped via the CLI in interactive mode.""" with tempfile.TemporaryDirectory() as temporary_directory: with EnvironmentVariableRemover("GOOGLE_APPLICATION_CREDENTIALS"): - with mock.patch("logging.StreamHandler.emit") as mock_local_logger_emit: - result = CliRunner().invoke( - gateway_cli, - [ - "start", - "--interactive", - "--save-locally", - "--no-upload-to-cloud", - "--use-dummy-serial-port", - f"--output-dir={temporary_directory}", - ], - input="stop\n", - ) - - self.assertIsNone(result.exception) - self.assertEqual(result.exit_code, 0) + result = CliRunner().invoke( + gateway_cli, + [ + "start", + "--interactive", + "--save-locally", + "--no-upload-to-cloud", + "--use-dummy-serial-port", + f"--output-dir={temporary_directory}", + ], + input="stop\n", + ) - self.assertTrue( - any(call_arg[0][0].msg == "Stopping gateway." for call_arg in mock_local_logger_emit.call_args_list) - ) + self.assertIsNone(result.exception) + self.assertEqual(result.exit_code, 0) def test_save_locally(self): """Ensure `--save-locally` mode writes data to disk.""" @@ -236,6 +212,8 @@ def test_save_locally(self): session_subdirectory = [item for item in os.scandir(temporary_directory) if item.is_dir()][0].name + # Wait for the parser process to receive stop signal and persist the window it has open. + time.sleep(2) with open(os.path.join(temporary_directory, session_subdirectory, "window-0.json")) as f: data = json.loads(f.read()) @@ -248,26 +226,27 @@ def test_save_locally(self): def test_start_with_config_file(self): """Ensure a configuration file can be provided via the CLI.""" - with EnvironmentVariableRemover("GOOGLE_APPLICATION_CREDENTIALS"): - with mock.patch("logging.StreamHandler.emit") as mock_local_logger_emit: - with tempfile.TemporaryDirectory() as temporary_directory: - result = CliRunner().invoke( - gateway_cli, - [ - "start", - "--interactive", - "--save-locally", - "--no-upload-to-cloud", - "--use-dummy-serial-port", - f"--config-file={CONFIGURATION_PATH}", - f"--output-dir={temporary_directory}", - ], - input="stop\n", - ) + with tempfile.TemporaryDirectory() as temporary_directory: + with mock.patch( + "data_gateway.data_gateway.Configuration.from_dict", return_value=Configuration() + ) as mock_configuration_from_dict: + result = CliRunner().invoke( + gateway_cli, + [ + "start", + "--interactive", + "--save-locally", + "--no-upload-to-cloud", + "--use-dummy-serial-port", + f"--config-file={CONFIGURATION_PATH}", + f"--output-dir={temporary_directory}", + ], + input="stop\n", + ) - self.assertIsNone(result.exception) - self.assertEqual(result.exit_code, 0) - self.assertIn("Loaded configuration file", mock_local_logger_emit.call_args_list[0][0][0].msg) + self.assertIsNone(result.exception) + self.assertEqual(result.exit_code, 0) + mock_configuration_from_dict.assert_called() class TestCreateInstallation(BaseTestCase): diff --git a/tests/test_configuration.py b/tests/test_data_gateway/test_configuration.py similarity index 91% rename from tests/test_configuration.py rename to tests/test_data_gateway/test_configuration.py index 4283cdfd..b9a6f397 100644 --- a/tests/test_configuration.py +++ b/tests/test_data_gateway/test_configuration.py @@ -7,7 +7,7 @@ class TestConfiguration(BaseTestCase): - configuration_path = os.path.join(os.path.dirname(__file__), "valid_configuration.json") + configuration_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "valid_configuration.json") with open(configuration_path) as f: VALID_CONFIGURATION = json.load(f) diff --git a/tests/test_data_gateway/test_data_gateway.py b/tests/test_data_gateway/test_data_gateway.py new file mode 100644 index 00000000..ac6676bd --- /dev/null +++ b/tests/test_data_gateway/test_data_gateway.py @@ -0,0 +1,381 @@ +import json +import os +import shutil +from unittest.mock import patch + +import coolname +from octue.cloud import storage +from octue.cloud.storage.client import GoogleCloudStorageClient + +from data_gateway.configuration import Configuration +from data_gateway.data_gateway import DataGateway +from data_gateway.dummy_serial import DummySerial +from data_gateway.persistence import TimeBatcher +from tests import LENGTH, PACKET_KEY, RANDOM_BYTES, TEST_BUCKET_NAME, TEST_PROJECT_NAME +from tests.base import BaseTestCase + + +class TestDataGateway(BaseTestCase): + """Test `DataGateway` with different sensors. NOTE: The payloads are generated randomly. Consequently, two + consecutive packets are extremely unlikely to have consecutive timestamps. This will trigger lost packet warning + during tests. + """ + + @classmethod + def setUpClass(cls): + """Set up the class with a window size and a Google Cloud Storage client. + + :return None: + """ + cls.WINDOW_SIZE = 10 + cls.storage_client = GoogleCloudStorageClient(project_name=TEST_PROJECT_NAME) + + def setUp(self): + """Create a uniquely-named output directory.""" + self.output_directory = coolname.generate_slug(2) + + def tearDown(self): + """Delete the output directory created in `setUp`.""" + try: + shutil.rmtree(self.output_directory) + except FileNotFoundError: + pass + + def test_configuration_file_is_persisted(self): + """Test that the configuration file is persisted.""" + serial_port = DummySerial(port="test") + packet_type = bytes([34]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port=serial_port, + save_locally=True, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + + data_gateway.start(stop_when_no_more_data_after=0.1) + + # Check configuration file is present and valid locally. + with open(os.path.join(data_gateway.packet_reader.local_output_directory, "configuration.json")) as f: + Configuration.from_dict(json.load(f)) + + # Check configuration file is present and valid on the cloud. + configuration = self.storage_client.download_as_string( + bucket_name=TEST_BUCKET_NAME, + path_in_bucket=storage.path.join(data_gateway.packet_reader.cloud_output_directory, "configuration.json"), + ) + + Configuration.from_dict(json.loads(configuration)) + + def test_data_gateway_with_baros_p_sensor(self): + """Test that the packet reader works with the "Baros_P" sensor.""" + serial_port = DummySerial(port="test") + packet_type = bytes([34]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + output_directory=coolname.generate_slug(2), + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + + data_gateway.start(stop_when_no_more_data_after=0.1) + self._check_data_is_written_to_files( + data_gateway.packet_reader.local_output_directory, sensor_names=["Baros_P"] + ) + + self._check_windows_are_uploaded_to_cloud( + data_gateway.packet_reader.cloud_output_directory, + sensor_names=["Baros_P"], + number_of_windows_to_check=1, + ) + + def test_data_gateway_with_baros_t_sensor(self): + """Test that the packet reader works with the Baro_T sensor.""" + serial_port = DummySerial(port="test") + packet_type = bytes([34]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + data_gateway.start(stop_when_no_more_data_after=0.1) + self._check_data_is_written_to_files( + data_gateway.packet_reader.local_output_directory, sensor_names=["Baros_T"] + ) + + self._check_windows_are_uploaded_to_cloud( + data_gateway.packet_reader.cloud_output_directory, + sensor_names=["Baros_T"], + number_of_windows_to_check=1, + ) + + def test_data_gateway_with_diff_baros_sensor(self): + """Test that the packet reader works with the Diff_Baros sensor.""" + serial_port = DummySerial(port="test") + packet_type = bytes([36]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + data_gateway.start(stop_when_no_more_data_after=0.1) + + self._check_data_is_written_to_files( + data_gateway.packet_reader.local_output_directory, + sensor_names=["Diff_Baros"], + ) + + self._check_windows_are_uploaded_to_cloud( + data_gateway.packet_reader.cloud_output_directory, + sensor_names=["Diff_Baros"], + number_of_windows_to_check=1, + ) + + def test_data_gateway_with_mic_sensor(self): + """Test that the packet reader works with the mic sensor.""" + serial_port = DummySerial(port="test") + packet_type = bytes([38]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + data_gateway.start(stop_when_no_more_data_after=0.1) + self._check_data_is_written_to_files(data_gateway.packet_reader.local_output_directory, sensor_names=["Mics"]) + + self._check_windows_are_uploaded_to_cloud( + data_gateway.packet_reader.cloud_output_directory, + sensor_names=["Mics"], + number_of_windows_to_check=1, + ) + + def test_data_gateway_with_acc_sensor(self): + """Test that the packet reader works with the acc sensor.""" + serial_port = DummySerial(port="test") + packet_type = bytes([42]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + data_gateway.start(stop_when_no_more_data_after=0.1) + + self._check_data_is_written_to_files(data_gateway.packet_reader.local_output_directory, sensor_names=["Acc"]) + + self._check_windows_are_uploaded_to_cloud( + data_gateway.packet_reader.cloud_output_directory, + sensor_names=["Acc"], + number_of_windows_to_check=1, + ) + + def test_data_gateway_with_gyro_sensor(self): + """Test that the packet reader works with the gyro sensor.""" + serial_port = DummySerial(port="test") + packet_type = bytes([44]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + data_gateway.start(stop_when_no_more_data_after=0.1) + + self._check_data_is_written_to_files(data_gateway.packet_reader.local_output_directory, sensor_names=["Gyro"]) + + self._check_windows_are_uploaded_to_cloud( + data_gateway.packet_reader.cloud_output_directory, + sensor_names=["Gyro"], + number_of_windows_to_check=1, + ) + + def test_data_gateway_with_mag_sensor(self): + """Test that the packet reader works with the mag sensor.""" + serial_port = DummySerial(port="test") + packet_type = bytes([46]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + data_gateway.start(stop_when_no_more_data_after=0.1) + self._check_data_is_written_to_files(data_gateway.packet_reader.local_output_directory, sensor_names=["Mag"]) + + self._check_windows_are_uploaded_to_cloud( + data_gateway.packet_reader.cloud_output_directory, + sensor_names=["Mag"], + number_of_windows_to_check=1, + ) + + def test_data_gateway_with_connections_statistics(self): + """Test that the packet reader works with the connection statistics "sensor".""" + serial_port = DummySerial(port="test") + packet_type = bytes([52]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + data_gateway.start(stop_when_no_more_data_after=0.1) + + self._check_data_is_written_to_files( + data_gateway.packet_reader.local_output_directory, sensor_names=["Constat"] + ) + + self._check_windows_are_uploaded_to_cloud( + data_gateway.packet_reader.cloud_output_directory, + sensor_names=["Constat"], + number_of_windows_to_check=1, + ) + + def test_data_gateway_with_connections_statistics_in_sleep_mode(self): + """Test that the packet reader works with the connection statistics "sensor" in sleep state. Normally, + randomly generated payloads would trigger packet loss warning in logger. Check that this warning is suppressed + in sleep mode. + """ + serial_port = DummySerial(port="test") + # Enter sleep state + serial_port.write(data=b"".join((PACKET_KEY, bytes([56]), bytes([1]), bytes([1])))) + + packet_type = bytes([52]) + + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + upload_to_cloud=False, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + + with patch("data_gateway.packet_reader.logger") as mock_logger: + data_gateway.start(stop_when_no_more_data_after=0.1) + + self._check_data_is_written_to_files( + data_gateway.packet_reader.local_output_directory, sensor_names=["Constat"] + ) + self.assertEqual(0, mock_logger.warning.call_count) + + def test_all_sensors_together(self): + """Test that the packet reader works with all sensors together.""" + serial_port = DummySerial(port="test") + packet_types = (bytes([34]), bytes([36]), bytes([38]), bytes([42]), bytes([44]), bytes([46])) + sensor_names = ("Baros_P", "Baros_T", "Diff_Baros", "Mics", "Acc", "Gyro", "Mag") + + for packet_type in packet_types: + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) + serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) + + data_gateway = DataGateway( + serial_port, + save_locally=True, + output_directory=self.output_directory, + window_size=self.WINDOW_SIZE, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + ) + data_gateway.start(stop_when_no_more_data_after=0.1) + + self._check_data_is_written_to_files( + data_gateway.packet_reader.local_output_directory, sensor_names=sensor_names + ) + + self._check_windows_are_uploaded_to_cloud( + data_gateway.packet_reader.cloud_output_directory, + sensor_names=sensor_names, + number_of_windows_to_check=1, + ) + + def _check_windows_are_uploaded_to_cloud(self, output_directory, sensor_names, number_of_windows_to_check=5): + """Check that non-trivial windows from a packet reader for a particular sensor are uploaded to cloud storage.""" + window_paths = [ + blob.name + for blob in self.storage_client.scandir( + cloud_path=storage.path.generate_gs_path(TEST_BUCKET_NAME, output_directory) + ) + if not blob.name.endswith("configuration.json") + ] + + self.assertTrue(len(window_paths) >= number_of_windows_to_check) + + for path in window_paths: + data = json.loads(self.storage_client.download_as_string(bucket_name=TEST_BUCKET_NAME, path_in_bucket=path)) + + for name in sensor_names: + lines = data["sensor_data"][name] + self.assertTrue(len(lines[0]) > 1) + + def _check_data_is_written_to_files(self, output_directory, sensor_names): + """Check that non-trivial data is written to the given file.""" + windows = [file for file in os.listdir(output_directory) if file.startswith(TimeBatcher._file_prefix)] + self.assertTrue(len(windows) > 0) + + for window in windows: + with open(os.path.join(output_directory, window)) as f: + data = json.load(f) + + for name in sensor_names: + lines = data["sensor_data"][name] + self.assertTrue(len(lines[0]) > 1) diff --git a/tests/test_dummy_serial.py b/tests/test_data_gateway/test_dummy_serial.py similarity index 100% rename from tests/test_dummy_serial.py rename to tests/test_data_gateway/test_dummy_serial.py diff --git a/tests/test_data_gateway/test_packet_reader.py b/tests/test_data_gateway/test_packet_reader.py new file mode 100644 index 00000000..e6844d42 --- /dev/null +++ b/tests/test_data_gateway/test_packet_reader.py @@ -0,0 +1,110 @@ +import multiprocessing +import tempfile +from unittest.mock import patch + +from data_gateway.packet_reader import PacketReader +from tests import LENGTH, PACKET_KEY, RANDOM_BYTES +from tests.base import BaseTestCase + + +class TestPacketReader(BaseTestCase): + def test_error_is_logged_if_unknown_sensor_type_packet_is_received(self): + """Test that an error is logged if an unknown sensor type packet is received.""" + queue = multiprocessing.Queue() + queue.put({"packet_type": bytes([0]), "packet": b"".join((PACKET_KEY, bytes([0]), LENGTH, RANDOM_BYTES[0]))}) + + packet_reader = PacketReader( + save_locally=False, + upload_to_cloud=False, + output_directory=tempfile.TemporaryDirectory().name, + ) + + with patch("data_gateway.packet_reader.logger") as mock_logger: + packet_reader.parse_packets( + packet_queue=queue, + stop_signal=multiprocessing.Value("i", 0), + stop_when_no_more_data_after=0.1, + ) + + self.assertIn("Received packet with unknown type: ", mock_logger.method_calls[1].args[0]) + + def test_update_handles_fails_if_start_and_end_handles_are_incorrect(self): + """Test that an error is raised if the start and end handles are incorrect when trying to update handles.""" + packet = bytearray(RANDOM_BYTES[0]) + packet[0:1] = int(0).to_bytes(1, "little") + packet[2:3] = int(255).to_bytes(1, "little") + + packet_reader = PacketReader( + save_locally=False, + upload_to_cloud=False, + output_directory=tempfile.TemporaryDirectory().name, + ) + + with patch("data_gateway.packet_reader.logger") as mock_logger: + packet_reader.update_handles(packet) + + self.assertIn("Handle error", mock_logger.method_calls[0].args[0]) + + def test_update_handles(self): + """Test that the handles can be updated.""" + packet = bytearray(RANDOM_BYTES[0]) + packet[0:1] = int(0).to_bytes(1, "little") + packet[2:3] = int(26).to_bytes(1, "little") + packet_reader = PacketReader( + save_locally=False, + upload_to_cloud=False, + output_directory=tempfile.TemporaryDirectory().name, + ) + + with patch("data_gateway.packet_reader.logger") as mock_logger: + packet_reader.update_handles(packet) + + self.assertIn("Successfully updated handles", mock_logger.method_calls[0].args[0]) + + def test_packet_reader_with_info_packets(self): + """Test that the packet reader works with info packets.""" + packet_types = [bytes([40]), bytes([54]), bytes([56]), bytes([58])] + + packets = [ + [bytes([1]), bytes([2]), bytes([3])], + [bytes([0]), bytes([1]), bytes([2]), bytes([3])], + [bytes([0]), bytes([1])], + [bytes([0])], + ] + + queue = multiprocessing.Queue() + + for index, packet_type in enumerate(packet_types): + for packet in packets[index]: + queue.put({"packet_type": str(int.from_bytes(packet_type, "little")), "packet": packet}) + + with tempfile.TemporaryDirectory() as temporary_directory: + packet_reader = PacketReader( + save_locally=True, + upload_to_cloud=False, + output_directory=temporary_directory, + ) + + with patch("data_gateway.packet_reader.logger") as mock_logger: + packet_reader.parse_packets( + packet_queue=queue, + stop_signal=multiprocessing.Value("i", 0), + stop_when_no_more_data_after=0.1, + ) + + log_messages = [call_arg.args for call_arg in mock_logger.info.call_args_list] + + for message in [ + ("Microphone data reading done",), + ("Microphone data erasing done",), + ("Microphones started ",), + ("Command declined, %s", "Bad block detection ongoing"), + ("Command declined, %s", "Task already registered, cannot register again"), + ("Command declined, %s", "Task is not registered, cannot de-register"), + ("Command declined, %s", "Connection Parameter update unfinished"), + ("\n%s\n", "Exiting sleep"), + ("\n%s\n", "Entering sleep"), + ("Battery info",), + ("Voltage : %fV\n Cycle count: %f\nState of charge: %f%%", 0.0, 0.0, 0.0), + ]: + self.assertIn(message, log_messages) diff --git a/tests/test_persistence.py b/tests/test_data_gateway/test_persistence.py similarity index 76% rename from tests/test_persistence.py rename to tests/test_data_gateway/test_persistence.py index cddd1efd..057cfc0c 100644 --- a/tests/test_persistence.py +++ b/tests/test_data_gateway/test_persistence.py @@ -25,8 +25,7 @@ def test_data_is_batched(self): with tempfile.TemporaryDirectory() as temporary_directory: writer = BatchingFileWriter( sensor_names=["test"], - session_subdirectory="this-session", - output_directory=temporary_directory, + output_directory=os.path.join(temporary_directory, "this-session"), window_size=600, ) @@ -38,8 +37,7 @@ def test_data_is_written_to_disk_in_windows(self): with tempfile.TemporaryDirectory() as temporary_directory: writer = BatchingFileWriter( sensor_names=["test"], - session_subdirectory="this-session", - output_directory=temporary_directory, + output_directory=os.path.join(temporary_directory, "this-session"), window_size=0.01, ) @@ -55,19 +53,19 @@ def test_data_is_written_to_disk_in_windows(self): self.assertEqual(len(writer.current_window["sensor_data"]["test"]), 0) - with open(os.path.join(temporary_directory, writer._session_subdirectory, "window-0.json")) as f: + with open(os.path.join(writer.output_directory, "window-0.json")) as f: self.assertEqual(json.load(f)["sensor_data"], {"test": ["ping", "pong"]}) - with open(os.path.join(temporary_directory, writer._session_subdirectory, "window-1.json")) as f: + with open(os.path.join(writer.output_directory, "window-1.json")) as f: self.assertEqual(json.load(f)["sensor_data"], {"test": ["ding", "dong"]}) def test_oldest_window_is_deleted_when_storage_limit_reached(self): """Check that (only) the oldest window is deleted when the storage limit is reached.""" with tempfile.TemporaryDirectory() as temporary_directory: + writer = BatchingFileWriter( sensor_names=["test"], - session_subdirectory="this-session", - output_directory=temporary_directory, + output_directory=os.path.join(temporary_directory, "this-session"), window_size=0.01, storage_limit=1, ) @@ -75,7 +73,7 @@ def test_oldest_window_is_deleted_when_storage_limit_reached(self): with writer: writer.add_to_current_window(sensor_name="test", data="ping,") - first_window_path = os.path.join(temporary_directory, writer._session_subdirectory, "window-0.json") + first_window_path = os.path.join(writer.output_directory, "window-0.json") # Check first file is written to disk. self.assertTrue(os.path.exists(first_window_path)) @@ -87,17 +85,15 @@ def test_oldest_window_is_deleted_when_storage_limit_reached(self): self.assertFalse(os.path.exists(first_window_path)) # Check the second file has not been deleted. - self.assertTrue( - os.path.exists(os.path.join(temporary_directory, writer._session_subdirectory, "window-1.json")) - ) + self.assertTrue(os.path.exists(os.path.join(writer.output_directory, "window-1.json"))) def test_that_csv_files_are_written(self): """Test that data is written to disk as CSV-files if the `save_csv_files` option is `True`.""" with tempfile.TemporaryDirectory() as temporary_directory: + writer = BatchingFileWriter( sensor_names=["sensor1", "sensor2"], - session_subdirectory="this-session", - output_directory=temporary_directory, + output_directory=os.path.join(temporary_directory, "this-session"), save_csv_files=True, window_size=0.01, ) @@ -108,11 +104,11 @@ def test_that_csv_files_are_written(self): writer.add_to_current_window(sensor_name="sensor1", data=[4, 5, 6]) writer.add_to_current_window(sensor_name="sensor2", data=[4, 5, 6]) - with open(os.path.join(temporary_directory, writer._session_subdirectory, "sensor1.csv")) as f: + with open(os.path.join(writer.output_directory, "sensor1.csv")) as f: reader = csv.reader(f) self.assertEqual([row for row in reader], [["1", "2", "3"], ["4", "5", "6"]]) - with open(os.path.join(temporary_directory, writer._session_subdirectory, "sensor2.csv")) as f: + with open(os.path.join(writer.output_directory, "sensor2.csv")) as f: reader = csv.reader(f) self.assertEqual([row for row in reader], [["1", "2", "3"], ["4", "5", "6"]]) @@ -133,8 +129,7 @@ def test_data_is_batched(self): project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, window_size=600, - session_subdirectory="this-session", - output_directory=tempfile.TemporaryDirectory().name, + output_directory=storage.path.join(tempfile.TemporaryDirectory().name, "this-session"), ) uploader.add_to_current_window(sensor_name="test", data="blah,") @@ -147,8 +142,7 @@ def test_data_is_uploaded_in_windows_and_can_be_retrieved_from_cloud_storage(sel project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, window_size=0.01, - session_subdirectory="this-session", - output_directory=tempfile.TemporaryDirectory().name, + output_directory=storage.path.join(tempfile.TemporaryDirectory().name, "this-session"), ) with uploader: @@ -170,9 +164,7 @@ def test_data_is_uploaded_in_windows_and_can_be_retrieved_from_cloud_storage(sel json.loads( self.storage_client.download_as_string( bucket_name=TEST_BUCKET_NAME, - path_in_bucket=storage.path.join( - uploader.output_directory, uploader._session_subdirectory, "window-0.json" - ), + path_in_bucket=storage.path.join(uploader.output_directory, "window-0.json"), ) )["sensor_data"], {"test": ["ping", "pong"]}, @@ -182,9 +174,7 @@ def test_data_is_uploaded_in_windows_and_can_be_retrieved_from_cloud_storage(sel json.loads( self.storage_client.download_as_string( bucket_name=TEST_BUCKET_NAME, - path_in_bucket=storage.path.join( - uploader.output_directory, uploader._session_subdirectory, "window-1.json" - ), + path_in_bucket=storage.path.join(uploader.output_directory, "window-1.json"), ) )["sensor_data"], {"test": ["ding", "dong"]}, @@ -204,8 +194,7 @@ def test_window_is_written_to_disk_if_upload_fails(self): project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, window_size=0.01, - session_subdirectory="this-session", - output_directory=temporary_directory, + output_directory=storage.path.join(temporary_directory, "this-session"), upload_backup_files=False, ) @@ -217,15 +206,11 @@ def test_window_is_written_to_disk_if_upload_fails(self): with self.assertRaises(google.api_core.exceptions.NotFound): self.storage_client.download_as_string( bucket_name=TEST_BUCKET_NAME, - path_in_bucket=storage.path.join( - uploader.output_directory, uploader._session_subdirectory, "window-0.json" - ), + path_in_bucket=storage.path.join(uploader.output_directory, "window-0.json"), ) # Check that a backup file has been written. - with open( - os.path.join(temporary_directory, ".backup", uploader._session_subdirectory, "window-0.json") - ) as f: + with open(os.path.join(uploader.output_directory, ".backup", "window-0.json")) as f: self.assertEqual(json.load(f)["sensor_data"], {"test": ["ping", "pong"]}) def test_backup_files_are_uploaded_on_next_upload_attempt(self): @@ -242,8 +227,7 @@ def test_backup_files_are_uploaded_on_next_upload_attempt(self): project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, window_size=10, - session_subdirectory="this-session", - output_directory=temporary_directory, + output_directory=storage.path.join(temporary_directory, "this-session"), upload_backup_files=True, ) @@ -255,12 +239,10 @@ def test_backup_files_are_uploaded_on_next_upload_attempt(self): with self.assertRaises(google.api_core.exceptions.NotFound): self.storage_client.download_as_string( bucket_name=TEST_BUCKET_NAME, - path_in_bucket=storage.path.join( - uploader.output_directory, uploader._session_subdirectory, "window-0.json" - ), + path_in_bucket=storage.path.join(uploader.output_directory, "window-0.json"), ) - backup_path = os.path.join(temporary_directory, ".backup", uploader._session_subdirectory, "window-0.json") + backup_path = os.path.join(uploader._backup_directory, "window-0.json") # Check that a backup file has been written. with open(backup_path) as f: @@ -274,9 +256,7 @@ def test_backup_files_are_uploaded_on_next_upload_attempt(self): json.loads( self.storage_client.download_as_string( bucket_name=TEST_BUCKET_NAME, - path_in_bucket=storage.path.join( - uploader.output_directory, uploader._session_subdirectory, "window-0.json" - ), + path_in_bucket=storage.path.join(uploader.output_directory, "window-0.json"), ) )["sensor_data"], {"test": ["ping", "pong"]}, @@ -286,9 +266,7 @@ def test_backup_files_are_uploaded_on_next_upload_attempt(self): json.loads( self.storage_client.download_as_string( bucket_name=TEST_BUCKET_NAME, - path_in_bucket=storage.path.join( - uploader.output_directory, uploader._session_subdirectory, "window-1.json" - ), + path_in_bucket=storage.path.join(uploader.output_directory, "window-1.json"), ) )["sensor_data"], {"test": [["ding", "dong"]]}, @@ -304,8 +282,7 @@ def test_metadata_is_added_to_uploaded_files(self): project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, window_size=0.01, - session_subdirectory="this-session", - output_directory=tempfile.TemporaryDirectory().name, + output_directory=storage.path.join(tempfile.TemporaryDirectory().name, "this-session"), metadata={"big": "rock"}, ) @@ -314,9 +291,7 @@ def test_metadata_is_added_to_uploaded_files(self): metadata = self.storage_client.get_metadata( bucket_name=TEST_BUCKET_NAME, - path_in_bucket=storage.path.join( - uploader.output_directory, uploader._session_subdirectory, "window-0.json" - ), + path_in_bucket=storage.path.join(uploader.output_directory, "window-0.json"), ) self.assertEqual(metadata["custom_metadata"], {"big": "rock"}) diff --git a/tests/test_data_gateway/test_routine.py b/tests/test_data_gateway/test_routine.py new file mode 100644 index 00000000..51144f15 --- /dev/null +++ b/tests/test_data_gateway/test_routine.py @@ -0,0 +1,125 @@ +import multiprocessing +import time +from unittest import TestCase +from unittest.mock import patch + +from data_gateway.routine import Routine + + +def create_record_commands_action(): + """Create a list in which commands will be recorded when the `record_commands` function is given as an action to + the routine. + + :return (list, callable): the list that actions are recorded in, and the function that causes them to be recorded + """ + recorded_commands = [] + + def record_commands(command): + recorded_commands.append((command, time.perf_counter())) + + return recorded_commands, record_commands + + +class TestRoutine(TestCase): + def test_routine_with_no_period_runs_commands_once(self): + """Test that commands can be scheduled to run once when a period isn't given.""" + recorded_commands, record_commands = create_record_commands_action() + + routine = Routine( + commands=[("first-command", 0.1), ("second-command", 0.3)], + action=record_commands, + ) + + start_time = time.perf_counter() + routine.run(stop_signal=multiprocessing.Value("i", 0)) + + self.assertEqual(recorded_commands[0][0], "first-command") + self.assertAlmostEqual(recorded_commands[0][1], start_time + 0.1, delta=0.2) + + self.assertEqual(recorded_commands[1][0], "second-command") + self.assertAlmostEqual(recorded_commands[1][1], start_time + 0.3, delta=0.2) + + def test_error_raised_if_any_delay_is_greater_than_period(self): + """Test that an error is raised if any of the command delays is greater than the period.""" + with self.assertRaises(ValueError): + Routine( + commands=[("first-command", 10), ("second-command", 0.3)], + action=None, + period=1, + ) + + def test_error_raised_if_stop_after_time_is_less_than_period(self): + """Test that an error is raised if the `stop_after` time is less than the period.""" + with self.assertRaises(ValueError): + Routine( + commands=[("first-command", 0.1), ("second-command", 0.3)], + action=None, + period=1, + stop_after=0.5, + ) + + def test_warning_raised_if_stop_after_time_provided_without_a_period(self): + """Test that a warning is raised if the `stop_after` time is provided without a period.""" + with patch("data_gateway.routine.logger") as mock_logger: + Routine( + commands=[("first-command", 10), ("second-command", 0.3)], + action=None, + stop_after=0.5, + ) + + self.assertEqual( + mock_logger.warning.call_args_list[0].args[0], + "The `stop_after` parameter is ignored unless `period` is also given.", + ) + + def test_routine_with_period(self): + """Test that commands can be scheduled to repeat at the given period and then stop after a certain time.""" + recorded_commands, record_commands = create_record_commands_action() + + routine = Routine( + commands=[("first-command", 0.1), ("second-command", 0.3)], + action=record_commands, + period=0.4, + stop_after=1, + ) + + start_time = time.perf_counter() + routine.run(stop_signal=multiprocessing.Value("i", 0)) + + self.assertEqual(recorded_commands[0][0], "first-command") + self.assertAlmostEqual(recorded_commands[0][1], start_time + 0.1, delta=0.2) + + self.assertEqual(recorded_commands[1][0], "second-command") + self.assertAlmostEqual(recorded_commands[1][1], start_time + 0.3, delta=0.2) + + self.assertEqual(recorded_commands[2][0], "first-command") + self.assertAlmostEqual(recorded_commands[2][1], start_time + 0.1 + routine.period, delta=0.2) + + self.assertEqual(recorded_commands[3][0], "second-command") + self.assertAlmostEqual(recorded_commands[3][1], start_time + 0.3 + routine.period, delta=0.2) + + def test_routine_only_runs_until_stop_command(self): + """Test that a routine only runs until the "stop" command is received.""" + recorded_commands, record_commands = create_record_commands_action() + + routine = Routine( + commands=[("first-command", 0.1), ("stop", 0.3), ("command-after-stop", 0.5)], + action=record_commands, + ) + + stop_signal = multiprocessing.Value("i", 0) + start_time = time.perf_counter() + + routine.run(stop_signal=stop_signal) + + # Check that only the first two commands (i.e. up until the `stop` command) are scheduled and carried out. + self.assertEqual(len(recorded_commands), 2) + + self.assertEqual(recorded_commands[0][0], "first-command") + self.assertAlmostEqual(recorded_commands[0][1], start_time + 0.1, delta=0.2) + + self.assertEqual(recorded_commands[1][0], "stop") + self.assertAlmostEqual(recorded_commands[1][1], start_time + 0.3, delta=0.2) + + # Check that the stop signal has been sent. + self.assertEqual(stop_signal.value, 1) diff --git a/tests/test_packet_reader.py b/tests/test_packet_reader.py deleted file mode 100644 index 21d90d25..00000000 --- a/tests/test_packet_reader.py +++ /dev/null @@ -1,446 +0,0 @@ -import json -import os -import tempfile -from unittest.mock import patch - -from octue.cloud import storage -from octue.cloud.storage.client import GoogleCloudStorageClient - -from data_gateway import exceptions -from data_gateway.configuration import Configuration -from data_gateway.dummy_serial import DummySerial -from data_gateway.packet_reader import PacketReader -from tests import LENGTH, PACKET_KEY, RANDOM_BYTES, TEST_BUCKET_NAME, TEST_PROJECT_NAME -from tests.base import BaseTestCase - - -class TestPacketReader(BaseTestCase): - """Test packet reader with different sensors. NOTE: The payloads are generated randomly. Consequently, - two consecutive packets are extremely unlikely to have consecutive timestamps. This will trigger lost packet - warning during tests. - """ - - @classmethod - def setUpClass(cls): - """Set up the class with a window size and a Google Cloud Storage client. - - :return None: - """ - cls.WINDOW_SIZE = 10 - cls.storage_client = GoogleCloudStorageClient(project_name=TEST_PROJECT_NAME) - - def _check_windows_are_uploaded_to_cloud(self, packet_reader, sensor_names, number_of_windows_to_check=5): - """Check that non-trivial windows from a packet reader for a particular sensor are uploaded to cloud storage.""" - number_of_windows = packet_reader.uploader._window_number - self.assertTrue(number_of_windows > 0) - - for i in range(number_of_windows_to_check): - data = json.loads( - self.storage_client.download_as_string( - bucket_name=TEST_BUCKET_NAME, - path_in_bucket=storage.path.join( - packet_reader.uploader.output_directory, - packet_reader.uploader._session_subdirectory, - f"window-{i}.json", - ), - ) - ) - - for name in sensor_names: - lines = data["sensor_data"][name] - self.assertTrue(len(lines[0]) > 1) - - def _check_data_is_written_to_files(self, packet_reader, temporary_directory, sensor_names): - """Check that non-trivial data is written to the given file.""" - window_directory = os.path.join(temporary_directory, packet_reader.writer._session_subdirectory) - windows = [file for file in os.listdir(window_directory) if file.startswith(packet_reader.writer._file_prefix)] - self.assertTrue(len(windows) > 0) - - for window in windows: - with open(os.path.join(window_directory, window)) as f: - data = json.load(f) - - for name in sensor_names: - lines = data["sensor_data"][name] - self.assertTrue(len(lines[0]) > 1) - - def test_error_is_raised_if_unknown_sensor_type_packet_is_received(self): - """Test that an `UnknownPacketTypeException` is raised if an unknown sensor type packet is received.""" - serial_port = DummySerial(port="test") - packet_type = bytes([0]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - with self.assertRaises(exceptions.UnknownPacketTypeError): - packet_reader.read_packets(serial_port, stop_when_no_more_data=False) - - def test_configuration_file_is_persisted(self): - """Test that the configuration file is persisted.""" - serial_port = DummySerial(port="test") - packet_type = bytes([34]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - - configuration_path = os.path.join( - temporary_directory, packet_reader.session_subdirectory, "configuration.json" - ) - - # Check configuration file is present and valid locally. - with open(configuration_path) as f: - Configuration.from_dict(json.load(f)) - - # Check configuration file is present and valid on the cloud. - configuration = self.storage_client.download_as_string( - bucket_name=TEST_BUCKET_NAME, - path_in_bucket=storage.path.join( - packet_reader.uploader.output_directory, packet_reader.session_subdirectory, "configuration.json" - ), - ) - - # Test configuration is valid. - Configuration.from_dict(json.loads(configuration)) - - def test_update_handles_fails_if_start_and_end_handles_are_incorrect(self): - """Test that an error is raised if the start and end handles are incorrect when trying to update handles.""" - serial_port = DummySerial(port="test") - - # Set packet type to handles update packet. - packet_type = bytes([255]) - - # Set first two bytes of payload to incorrect range for updating handles. - payload = bytearray(RANDOM_BYTES[0]) - payload[0:1] = int(0).to_bytes(1, "little") - payload[2:3] = int(255).to_bytes(1, "little") - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, payload))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=False, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - - with patch("data_gateway.packet_reader.logger") as mock_logger: - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self.assertIn("Handle error", mock_logger.method_calls[0].args[0]) - - def test_update_handles(self): - """Test that the handles can be updated.""" - serial_port = DummySerial(port="test") - - # Set packet type to handles update packet. - packet_type = bytes([255]) - - # Set first two bytes of payload to correct range for updating handles. - payload = bytearray(RANDOM_BYTES[0]) - payload[0:1] = int(0).to_bytes(1, "little") - payload[2:3] = int(26).to_bytes(1, "little") - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, payload))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=False, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - - with patch("data_gateway.packet_reader.logger") as mock_logger: - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self.assertIn("Successfully updated handles", mock_logger.method_calls[0].args[0]) - - def test_packet_reader_with_baros_p_sensor(self): - """Test that the packet reader works with the Baro_P sensor.""" - serial_port = DummySerial(port="test") - packet_type = bytes([34]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=["Baros_P"]) - - self._check_windows_are_uploaded_to_cloud(packet_reader, sensor_names=["Baros_P"], number_of_windows_to_check=1) - - def test_packet_reader_with_baros_t_sensor(self): - """Test that the packet reader works with the Baro_T sensor.""" - serial_port = DummySerial(port="test") - packet_type = bytes([34]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=["Baros_T"]) - - self._check_windows_are_uploaded_to_cloud(packet_reader, sensor_names=["Baros_T"], number_of_windows_to_check=1) - - def test_packet_reader_with_diff_baros_sensor(self): - """Test that the packet reader works with the Diff_Baros sensor.""" - serial_port = DummySerial(port="test") - packet_type = bytes([36]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=["Diff_Baros"]) - - self._check_windows_are_uploaded_to_cloud( - packet_reader, - sensor_names=["Diff_Baros"], - number_of_windows_to_check=1, - ) - - def test_packet_reader_with_mic_sensor(self): - """Test that the packet reader works with the mic sensor.""" - serial_port = DummySerial(port="test") - packet_type = bytes([38]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=["Mics"]) - - self._check_windows_are_uploaded_to_cloud(packet_reader, sensor_names=["Mics"], number_of_windows_to_check=1) - - def test_packet_reader_with_acc_sensor(self): - """Test that the packet reader works with the acc sensor.""" - serial_port = DummySerial(port="test") - packet_type = bytes([42]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=["Acc"]) - - self._check_windows_are_uploaded_to_cloud(packet_reader, sensor_names=["Acc"], number_of_windows_to_check=1) - - def test_packet_reader_with_gyro_sensor(self): - """Test that the packet reader works with the gyro sensor.""" - serial_port = DummySerial(port="test") - packet_type = bytes([44]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=["Gyro"]) - - self._check_windows_are_uploaded_to_cloud(packet_reader, sensor_names=["Gyro"], number_of_windows_to_check=1) - - def test_packet_reader_with_mag_sensor(self): - """Test that the packet reader works with the mag sensor.""" - serial_port = DummySerial(port="test") - packet_type = bytes([46]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=["Mag"]) - - self._check_windows_are_uploaded_to_cloud(packet_reader, sensor_names=["Mag"], number_of_windows_to_check=1) - - def test_packet_reader_with_connections_statistics(self): - """Test that the packet reader works with the connection statistics "sensor".""" - serial_port = DummySerial(port="test") - packet_type = bytes([52]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=["Constat"]) - - self._check_windows_are_uploaded_to_cloud(packet_reader, sensor_names=["Constat"], number_of_windows_to_check=1) - - def test_packet_reader_with_connections_statistics_in_sleep_mode(self): - """Test that the packet reader works with the connection statistics "sensor" in sleep state. Normally, - randomly generated payloads would trigger packet loss warning in logger. Check that this warning is suppressed - in sleep mode. - """ - serial_port = DummySerial(port="test") - # Enter sleep state - serial_port.write(data=b"".join((PACKET_KEY, bytes([56]), bytes([1]), bytes([1])))) - - packet_type = bytes([52]) - - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=False, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - - with patch("data_gateway.packet_reader.logger") as mock_logger: - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=["Constat"]) - self.assertEqual(0, mock_logger.warning.call_count) - - def test_all_sensors_together(self): - """Test that the packet reader works with all sensors together.""" - serial_port = DummySerial(port="test") - packet_types = (bytes([34]), bytes([36]), bytes([38]), bytes([42]), bytes([44]), bytes([46])) - sensor_names = ("Baros_P", "Baros_T", "Diff_Baros", "Mics", "Acc", "Gyro", "Mag") - - for packet_type in packet_types: - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[0]))) - serial_port.write(data=b"".join((PACKET_KEY, packet_type, LENGTH, RANDOM_BYTES[1]))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=True, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - - self._check_data_is_written_to_files(packet_reader, temporary_directory, sensor_names=sensor_names) - - self._check_windows_are_uploaded_to_cloud( - packet_reader, sensor_names=sensor_names, number_of_windows_to_check=1 - ) - - def test_packet_reader_with_info_packets(self): - """Test that the packet reader works with info packets.""" - serial_port = DummySerial(port="test") - - packet_types = [bytes([40]), bytes([54]), bytes([56]), bytes([58])] - - payloads = [ - [bytes([1]), bytes([2]), bytes([3])], - [bytes([0]), bytes([1]), bytes([2]), bytes([3])], - [bytes([0]), bytes([1])], - [bytes([0])], - ] - - for index, packet_type in enumerate(packet_types): - for payload in payloads[index]: - serial_port.write(data=b"".join((PACKET_KEY, packet_type, bytes([1]), payload))) - - with tempfile.TemporaryDirectory() as temporary_directory: - packet_reader = PacketReader( - save_locally=True, - upload_to_cloud=False, - output_directory=temporary_directory, - window_size=self.WINDOW_SIZE, - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, - ) - - with patch("data_gateway.packet_reader.logger") as mock_logger: - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) - self.assertEqual(11, len(mock_logger.method_calls)) diff --git a/tests/test_routine.py b/tests/test_routine.py deleted file mode 100644 index ece6eda5..00000000 --- a/tests/test_routine.py +++ /dev/null @@ -1,73 +0,0 @@ -import time -from unittest import TestCase - -from data_gateway.routine import Routine - - -class TestRoutine(TestCase): - def test_routine_with_no_period_runs_commands_once(self): - """Test that commands can be scheduled to run once when a period isn't given.""" - recorded_commands = [] - - def record_commands(command): - recorded_commands.append((command, time.perf_counter())) - - routine = Routine(commands=[("first-command", 0.1), ("second-command", 0.3)], action=record_commands) - - start_time = time.perf_counter() - routine.run() - - self.assertEqual(recorded_commands[0][0], "first-command") - self.assertAlmostEqual(recorded_commands[0][1], start_time + 0.1, delta=0.2) - - self.assertEqual(recorded_commands[1][0], "second-command") - self.assertAlmostEqual(recorded_commands[1][1], start_time + 0.3, delta=0.2) - - def test_error_raised_if_any_delay_is_greater_than_period(self): - """Test that an error is raised if any of the command delays is greater than the period.""" - with self.assertRaises(ValueError): - Routine(commands=[("first-command", 10), ("second-command", 0.3)], action=None, period=1) - - def test_error_raised_if_stop_after_time_is_less_than_period(self): - """Test that an error is raised if the `stop_after` time is less than the period.""" - with self.assertRaises(ValueError): - Routine(commands=[("first-command", 0.1), ("second-command", 0.3)], action=None, period=1, stop_after=0.5) - - def test_warning_raised_if_stop_after_time_provided_without_a_period(self): - """Test that a warning is raised if the `stop_after` time is provided without a period.""" - with self.assertLogs() as logs_context: - Routine(commands=[("first-command", 10), ("second-command", 0.3)], action=None, stop_after=0.5) - - self.assertEqual( - logs_context.output[0], - "WARNING:data_gateway.routine:The `stop_after` parameter is ignored unless `period` is also given.", - ) - - def test_routine_with_period(self): - """Test that commands can be scheduled to repeat at the given period and then stop after a certain time.""" - recorded_commands = [] - - def record_commands(command): - recorded_commands.append((command, time.perf_counter())) - - routine = Routine( - commands=[("first-command", 0.1), ("second-command", 0.3)], - action=record_commands, - period=0.4, - stop_after=1, - ) - - start_time = time.perf_counter() - routine.run() - - self.assertEqual(recorded_commands[0][0], "first-command") - self.assertAlmostEqual(recorded_commands[0][1], start_time + 0.1, delta=0.2) - - self.assertEqual(recorded_commands[1][0], "second-command") - self.assertAlmostEqual(recorded_commands[1][1], start_time + 0.3, delta=0.2) - - self.assertEqual(recorded_commands[2][0], "first-command") - self.assertAlmostEqual(recorded_commands[2][1], start_time + 0.1 + routine.period, delta=0.2) - - self.assertEqual(recorded_commands[3][0], "second-command") - self.assertAlmostEqual(recorded_commands[3][1], start_time + 0.3 + routine.period, delta=0.2)