Skip to content

Commit

Permalink
MRG: Merge pull request #30 from aerosense-ai/feature/multi-process-g…
Browse files Browse the repository at this point in the history
…ateway

Add multi-processing
  • Loading branch information
cortadocodes committed Feb 15, 2022
2 parents eb1ee44 + c143921 commit dee5899
Show file tree
Hide file tree
Showing 23 changed files with 1,299 additions and 1,098 deletions.
11 changes: 11 additions & 0 deletions data_gateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,14 @@

__all__ = ("exceptions",)
MICROPHONE_SENSOR_NAME = "Mics"


def stop_gateway(logger, stop_signal):
"""Stop the gateway's multiple processes by sending the stop signal.
:param logging.Logger logger: a logger to log that the stop signal has been sent
:param multiprocessing.Value stop_signal: a value of 0 means don't stop; a value of 1 means stop
:return None:
"""
logger.info("Sending stop signal.")
stop_signal.value = 1
193 changes: 21 additions & 172 deletions data_gateway/cli.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
import json
import logging
import multiprocessing
import os
import time

import click
import pkg_resources
import requests
import serial
from requests import HTTPError
from slugify import slugify

from data_gateway.configuration import Configuration
from data_gateway.dummy_serial import DummySerial
from data_gateway.exceptions import DataMustBeSavedError, WrongNumberOfSensorCoordinatesError
from data_gateway.routine import Routine
from data_gateway.data_gateway import DataGateway
from data_gateway.exceptions import WrongNumberOfSensorCoordinatesError


SUPERVISORD_PROGRAM_NAME = "AerosenseGateway"
CREATE_INSTALLATION_CLOUD_FUNCTION_URL = "https://europe-west6-aerosense-twined.cloudfunctions.net/create-installation"

logger = logging.getLogger(__name__)
global_cli_context = {}

logger = multiprocessing.get_logger()


@click.group(context_settings={"help_option_names": ["-h", "--help"]})
Expand All @@ -44,15 +42,16 @@ def gateway_cli(logger_uri, log_level):
"""
from octue.log_handlers import apply_log_handler, get_remote_handler

# Apply log handler locally.
apply_log_handler(log_level=log_level.upper(), include_thread_name=True)
# Store log level to apply to multi-processed logger in `DataGateway` in the `start` command.
global_cli_context["log_level"] = log_level.upper()

# Stream logs to remote handler if required.
if logger_uri is not None:
apply_log_handler(
logger=logger,
handler=get_remote_handler(logger_uri=logger_uri),
log_level=log_level.upper(),
include_thread_name=True,
log_level=global_cli_context["log_level"],
include_process_name=True,
)


Expand Down Expand Up @@ -165,87 +164,24 @@ def start(
nodes/sensors via the serial port by typing them into stdin and pressing enter. These commands are:
[startBaros, startMics, startIMU, getBattery, stop].
"""
import sys
import threading

from data_gateway.packet_reader import PacketReader

if not save_locally and no_upload_to_cloud:
raise DataMustBeSavedError(
"Data from the gateway must either be saved locally or uploaded to the cloud. Please adjust the CLI "
"options provided."
)

config = _load_configuration(configuration_path=config_file)
config.session_data["label"] = label

serial_port = _get_serial_port(serial_port, configuration=config, use_dummy_serial_port=use_dummy_serial_port)
routine = _load_routine(routine_path=routine_file, interactive=interactive, serial_port=serial_port)
output_directory = _update_and_create_output_directory(output_directory_path=output_dir)

# Start a new thread to parse the serial data while the main thread stays ready to take in commands from stdin.
packet_reader = PacketReader(
data_gateway = DataGateway(
serial_port=serial_port,
configuration_path=config_file,
routine_path=routine_file,
save_locally=save_locally,
upload_to_cloud=not no_upload_to_cloud,
output_directory=output_directory,
interactive=interactive,
output_directory=output_dir,
window_size=window_size,
project_name=gcp_project_name,
bucket_name=gcp_bucket_name,
configuration=config,
label=label,
save_csv_files=save_csv_files,
use_dummy_serial_port=use_dummy_serial_port,
log_level=global_cli_context["log_level"],
)

logger.info("Starting packet reader.")

if not no_upload_to_cloud:
logger.info("Files will be uploaded to cloud storage at intervals of %s seconds.", window_size)

if save_locally:
logger.info(
"Files will be saved locally to disk at %r at intervals of %s seconds.",
os.path.join(packet_reader.output_directory, packet_reader.session_subdirectory),
window_size,
)

# Start packet reader in a separate thread so commands can be sent to it in real time in interactive mode or by a
# routine.
reader_thread = threading.Thread(target=packet_reader.read_packets, args=(serial_port,), daemon=True)
reader_thread.setName("ReaderThread")
reader_thread.start()

try:
if interactive:
# Keep a record of the commands given.
commands_record_file = os.path.join(
packet_reader.output_directory, packet_reader.session_subdirectory, "commands.txt"
)

os.makedirs(os.path.join(packet_reader.output_directory, packet_reader.session_subdirectory), exist_ok=True)

while not packet_reader.stop:
for line in sys.stdin:

with open(commands_record_file, "a") as f:
f.write(line)

if line.startswith("sleep") and line.endswith("\n"):
time.sleep(int(line.split(" ")[-1].strip()))
elif line == "stop\n":
packet_reader.stop = True
break

# Send the command to the node
serial_port.write(line.encode("utf_8"))

else:
if routine is not None:
routine.run()

except KeyboardInterrupt:
packet_reader.stop = True

logger.info("Stopping gateway.")
packet_reader.writer.force_persist()
data_gateway.start()


@gateway_cli.command()
Expand Down Expand Up @@ -331,92 +267,5 @@ def supervisord_conf(config_file):
return 0


def _load_configuration(configuration_path):
"""Load a configuration from the path if it exists, otherwise load the default configuration.
:param str configuration_path:
:return data_gateway.configuration.Configuration:
"""
if os.path.exists(configuration_path):
with open(configuration_path) as f:
configuration = Configuration.from_dict(json.load(f))

logger.info("Loaded configuration file from %r.", configuration_path)
return configuration

configuration = Configuration()
logger.info("No configuration file provided - using default configuration.")
return configuration


def _get_serial_port(serial_port, configuration, use_dummy_serial_port):
"""Get the serial port or a dummy serial port if specified.
:param str serial_port:
:param data_gateway.configuration.Configuration configuration:
:param bool use_dummy_serial_port:
:return serial.Serial:
"""
if not use_dummy_serial_port:
serial_port = serial.Serial(port=serial_port, baudrate=configuration.baudrate)
else:
serial_port = DummySerial(port=serial_port, baudrate=configuration.baudrate)

# The buffer size can only be set on Windows.
if os.name == "nt":
serial_port.set_buffer_size(
rx_size=configuration.serial_buffer_rx_size,
tx_size=configuration.serial_buffer_tx_size,
)
else:
logger.warning("Serial port buffer size can only be set on Windows.")

return serial_port


def _load_routine(routine_path, interactive, serial_port):
"""Load a sensor commands routine from the path if exists, otherwise return no routine. If in interactive mode, the
routine file is ignored. Note that "\n" has to be added to the end of each command sent to the serial port for it to
be executed - this is done automatically in this method.
:param str routine_path:
:param bool interactive:
:param serial.Serial serial_port:
:return data_gateway.routine.Routine|None:
"""
if os.path.exists(routine_path):
if interactive:
logger.warning("Sensor command routine files are ignored in interactive mode.")
return
else:
with open(routine_path) as f:
routine = Routine(
**json.load(f),
action=lambda command: serial_port.write((command + "\n").encode("utf_8")),
)

logger.info("Loaded routine file from %r.", routine_path)
return routine

logger.info(
"No routine file found at %r - no commands will be sent to the sensors unless given in interactive mode.",
routine_path,
)


def _update_and_create_output_directory(output_directory_path):
"""Set the output directory to a path relative to the current directory if the path does not start with "/" and
create it if it does not already exist.
:param str output_directory_path:
:return str:
"""
if not output_directory_path.startswith("/"):
output_directory_path = os.path.join(".", output_directory_path)

os.makedirs(output_directory_path, exist_ok=True)
return output_directory_path


if __name__ == "__main__":
gateway_cli()
Loading

0 comments on commit dee5899

Please sign in to comment.