## Telemetry-Validation-Service

In [None]:
%%writefile configs/telemetry_service.conf

rabbitmq: {
    ip = "localhost"
    port = 5672
    username = log6953fe
    password = log6953fe
    vhost = /
    exchange_name = log6953fe_AMQP
    exchange_type = topic
}
mongodb: {
    database_name = log6953fe_db
    collection_name = trips
    vhost = localhost
    port = 27017
    username = log6953fe
    password = log6953fe
    connection_str: "mongodb://log6953fe:log6953fe@localhost:27017"
}


In [None]:
%%writefile telemetry_validation_service.py
import sys
import os

current_dir = os.getcwd()

assert os.path.basename(current_dir) == 'services', 'Current directory is not services'

parent_dir = os.path.dirname(current_dir)

software_dir = os.path.join(parent_dir, 'software')

assert os.path.exists(software_dir), 'software folder not found in the repository root'

sys.path.append(software_dir)

import json
import traceback
import logging
import math
import time
from enum import Enum
from communication.protocol import EARTH_RADIUS, MAX_ALLOW_DISTANCE, ROUTING_KEY_STM_TELEMETRY_VALIDATION, ROUTING_KEY_STM_TELEMETRY_VALIDATION_VIEW, ROUTING_KEY_STM_NOTIFICATION
from communication.mongodb import MongoDB
from communication.rpc_server import RPCServer
from config.config import load_config, config_logger


class CoordinateStatus(Enum):
    OK = "OK"
    ANOMALY = "ANOMALY"
    NOT_FOUND = "NOT_FOUND"

    def __str__(self):
        return f"{self.value}"

    @classmethod
    def parse_from_distance(cls, distance, max_allowed_dist):
        return CoordinateStatus.OK if distance <= max_allowed_dist else CoordinateStatus.ANOMALY


logging.config.fileConfig("configs/logging.conf")


class TelemetryValidationService(RPCServer):

    def __init__(self, _rabbitmq_config:dict=None, _mongodb_config:dict=None):
        self._logger = logging.getLogger("TelemetryValidationService")

        _configs = load_config("configs/telemetry_service.conf")

        if _rabbitmq_config is None:
            self._logger.debug("RabbitMQ config value is empty, reverting to default configs.")
            _rabbitmq_config = _configs['rabbitmq']

        if _mongodb_config is None:
            self._logger.debug("MongoDB config value is empty, reverting to default configs.")
            _mongodb_config = _configs["mongodb"]

        self.mongodb_client = MongoDB(**_mongodb_config)
        super().__init__(**_rabbitmq_config)


    def setup(self):
        # Subscribe to any message coming from the STM GPS routes update
        super(TelemetryValidationService, self).setup(
            routing_key=ROUTING_KEY_STM_TELEMETRY_VALIDATION,
            queue_name=ROUTING_KEY_STM_TELEMETRY_VALIDATION,
            on_message_callback=self.process_telemetry
        )
        # Declare validated STM GPS routes
        self.declare_local_queue(routing_key=ROUTING_KEY_STM_NOTIFICATION)
        self.declare_local_queue(routing_key=ROUTING_KEY_STM_TELEMETRY_VALIDATION_VIEW)
        self._logger.info("TelemetryValidationService setup complete.")


    @staticmethod
    def _deg_to_rad(x):
        return x * math.pi / 180

    def _calculate_distance(self, from_coordinates:tuple, to_coordinates:tuple):
        from_lat, from_lng = from_coordinates[0], from_coordinates[1]
        to_lat, to_lng = to_coordinates[0], to_coordinates[1]
        half_d_lat = self._deg_to_rad((to_lat - from_lat) / 2)
        half_d_lon = self._deg_to_rad((to_lng - from_lng) / 2)
        a = (math.sin(half_d_lat) ** 2 + math.cos(self._deg_to_rad(from_lat)) * math.cos(self._deg_to_rad(to_lat)) * math.sin(half_d_lon) ** 2)
        return EARTH_RADIUS * (2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)))


    def _routes_sort(self, route):
        try:
            return int(route['stop_sequence'])
        except KeyError as __e:
            self._logger.error(f"Error at extracting timestamp: {str(__e)}")
            return 0


    def _determine_coordinates_status(self, result, data) -> tuple[CoordinateStatus,str]:
        routes = result['routes'] if 'routes' in result else []
        max_allowed_distance = result['maximum_distance'] if 'maximum_distance' in result else MAX_ALLOW_DISTANCE
        bus_stop = {'stop_name': ''}

        if not routes:
            # If no bus stops found return NOT_FOUND
            return CoordinateStatus.NOT_FOUND, bus_stop['stop_name']

        routes.sort(key=self._routes_sort, reverse=False)
        stop_sequence = int(data['vehicle.current_stop_sequence']) - 1
        bus_stop = routes[stop_sequence]
        form_coordinates = (bus_stop['latitude'], bus_stop['longitude'])
        to_coordinates = (data['vehicle.position.latitude'], data['vehicle.position.longitude'])
        distance = self._calculate_distance(form_coordinates, to_coordinates)
        status = CoordinateStatus.parse_from_distance(distance, max_allowed_distance)

        if CoordinateStatus.ANOMALY == status:
            self._logger.debug(f"Distance: {distance}, Status: {status}, Bus Stop: {bus_stop}, Vehicle: {data}")

        if  max_allowed_distance <= distance <= 1.01: # check against the next bus stop
            bus_stop = routes[stop_sequence + 1]
            form_coordinates = (bus_stop['latitude'], bus_stop['longitude'])
            distance = self._calculate_distance(form_coordinates, to_coordinates)
            status = CoordinateStatus.parse_from_distance(distance, max_allowed_distance)
            self._logger.debug(f"Distance: {distance}, Status: {status}, Bus Stop: {bus_stop}, Vehicle: {data}")

        elif distance >= 1.01: # reverse the order of bus stops
            routes.sort(key=self._routes_sort, reverse=True)
            bus_stop = routes[stop_sequence]
            form_coordinates = (bus_stop['latitude'], bus_stop['longitude'])
            distance = self._calculate_distance(form_coordinates, to_coordinates)
            status = CoordinateStatus.parse_from_distance(distance, max_allowed_distance)

            self._logger.debug(f"Distance: {distance}, Status: {status}, Bus Stop: {bus_stop}, Vehicle: {data}")

        return status, bus_stop['stop_name']


    def process_telemetry(self, ch, mthd, prop, json_payload:str):
        self._logger.info(f"Received JSON payload: {json_payload}")

        payload_dict = json.loads(json_payload)

        if 'data' not in payload_dict:
            self._logger.error("Service received empty payload to be processed")
            return

        anomaly_data = []
        for i, data in enumerate(payload_dict["data"]):

            try:
                assert 'vehicle.trip.route_id' in payload_dict["data"][i], "Data dictionary must contain 'vehicle.trip.route_id' field."
                assert 'vehicle.position.latitude' in payload_dict["data"][i], "Data dictionary must contain 'vehicle.position.latitude' field."
                assert 'vehicle.position.longitude' in payload_dict["data"][i], "Data dictionary must contain 'vehicle.position.longitude' field."
                assert 'vehicle.current_stop_sequence' in payload_dict["data"][i], "Data dictionary must contain 'vehicle.current_stop_sequence' field."

                route_id =  int(data['vehicle.trip.route_id'])
                results = self.mongodb_client.database["bus_stops"].find({"route_id": route_id})
                results = list(results)
                bus_stop = ""

                if results and len(results) > 0 and 'routes' in results[0]:
                    self._logger.debug(f"Database returned: {results[0]}")
                    coordinates_status, bus_stop = self._determine_coordinates_status(results[0], payload_dict["data"][i])

                else:
                    self._logger.error(f"No data found in the database. Cannot validate coordinates for route : {route_id}")
                    coordinates_status = CoordinateStatus.NOT_FOUND

                payload_dict["data"][i]["vehicle.position.coordinates.status"] = coordinates_status.value
                payload_dict["data"][i]["vehicle.current_bus_stop"] = bus_stop
                if coordinates_status == CoordinateStatus.ANOMALY:
                    anomaly_data.append(payload_dict["data"][i])

            except AssertionError as __e:
                self._logger.error(f"{str(__e)} for : {payload_dict['data'][i]}")

        message = {"source": "stm_telemetry_validation_service", "time": time.time_ns(), "data": payload_dict["data"]}

        # Publish to the view queue
        self._logger.debug(f"Message: {message}")
        self._logger.debug(f"Published to queue: {ROUTING_KEY_STM_TELEMETRY_VALIDATION_VIEW}")
        self.send_message(ROUTING_KEY_STM_TELEMETRY_VALIDATION_VIEW, message)

        # Store to MongoDB
        # self.mongodb_client.save(payload_dict["data"])

        # Publish anomaly messages
        if anomaly_data:
            message['data'] = anomaly_data
            self._logger.info(f"Message: {message}")
            self._logger.info(f"Published to queue: {ROUTING_KEY_STM_NOTIFICATION}")
            self.send_message(ROUTING_KEY_STM_NOTIFICATION, message)


if __name__ == '__main__':
    service = TelemetryValidationService()
    service.setup()

    while True:
        try:
            service.start_serving()
        except KeyboardInterrupt:
            exit(0)

        except Exception as e:
            print(f"The following exception occurred: {e}")
            traceback.print_tb(e.__traceback__)
            exit(0)


In [None]:
import subprocess
import time
import sys

SERVICE_LOG = "Services.log"

service_proc = subprocess.Popen([sys.executable, "telemetry_validation_service.py"])

time.sleep(6)

print(f"TelemetryValidationService = {service_proc.pid}")

assert service_proc.poll() is None, "TelemetryValidationService process has crashed"

In [28]:
service_proc.terminate()
service_proc.wait() 

assert service_proc.returncode is not None, "Process has not exited"


In [None]:
import os

if not os.path.exists(SERVICE_LOG):
    _f = open(SERVICE_LOG, "w")

with open(SERVICE_LOG, 'r') as __f:
    print(__f.read())
    