# Motion GPS Module

Copyright 2022 Michael George (AKA Logiqx).

This file is part of [sse-results](https://github.com/Logiqx/sse-results) and is distributed under the terms of the GNU General Public License.

sse-results is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

sse-results is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with sse-results. If not, see <https://www.gnu.org/licenses/>.

In [1]:
import os
import logging

import json

import socket
from urllib.request import urlopen

import unittest

from common import projdir, testExit

from constants import *

## Default Values

Default settings, just in case of missing details in the config file

In [2]:
# Default logging level
UNITTEST_LOGGING_LEVEL = logging.WARNING

In [3]:
# Communications defaults
DEFAULT_HTTP_PORT=80
DEFAULT_PING_TIMEOUT=5
DEFAULT_JSON_TIMEOUT=10

In [4]:
# Motion status
HOST_STATUS_UNKNOWN = 0

HOST_STATUS_CONNECTED = 1
HOST_STATUS_INSPECTED = 2
HOST_STATUS_COMPLETED = 3

HOST_STATUS_REFUSED = -1
HOST_STATUS_TIMEOUT = -2
HOST_STATUS_GAIERROR = -3

In [5]:
# Default threshold for Motion filesystem warning
DEFAULT_FILESYSTEM_THRESHOLD = 95

# Default threshold for Motion battery warning
DEFAULT_BATTERY_THRESHOLD = 60

# Default GNSS rate
DEFAULT_GNSS_RATE = 5

# Default one log per day
DEFAULT_ONE_LOG = 0

# Default timezone (UTC)
DEFAULT_TIMEZONE = 0

# Default LED flash
DEFAULT_FLASH = 1

## Motion Class

Stuff relating to a single Motion device

In [6]:
class Motion():
    '''Motion GPS'''
    
    def __init__(self, config, logger, identifier, fileId):
        """Simple init method"""

        self.config = config
        self.logger = logger
        self.identifier = identifier
        self.fileId = fileId

        # Standard hostname includes the numerical identifier
        self.hostname = f'motion-{identifier}'

        # Dictionary of users, indexed by date
        self.users = {}

        # IP address and port
        self.addrInfo = None

        # Network / processing status
        self.status = HOST_STATUS_UNKNOWN

        # Derived from info.json
        self.firmware = None
        self.filesystemLevel = None
        self.batteryLevel = None
        
        # Derived from settings.json
        self.username = None
        self.gnssRate = None
        self.oneLogPerDay = None
        self.timezone = None
        self.customLedFlash = None
        
        # Derived from logs.json
        self.logs = None

        
    def getAddrInfo(self, port=DEFAULT_HTTP_PORT):
        """Get address information (IP address) for Motion"""

        try:
            addrInfo = socket.getaddrinfo(self.hostname, port, proto=socket.IPPROTO_TCP)[0][4]

            # Sky SR101 router will sometimes return 10.10.10.10 and should be treated the same as socket.gaierror
            if addrInfo[0] == '10.10.10.10':
                raise socket.gaierror

        # Only really expect a socket.gaierror exception, either from socket.getaddrinfo or because of 10.10.10.10
        except socket.gaierror:
            self.addrInfo = None

        # Just in case of some other (unexpected) issue!
        except Exception:
            self.addrInfo = None

        # The IP address of the Motion has been successfully identified. Yay!
        else:
            self.addrInfo = addrInfo
            
        return self.addrInfo


    def getStatus(self, timeout=DEFAULT_PING_TIMEOUT):
        """Check the status of the Motion

        Note the use of self.addrInfo instead of using the hostname with s.connect((host, port)).

        This avoids erroneous 'gaierror: [Errno -5] No address associated with hostname' under certain circumstances."""

        # Ensure address information is available
        if self.addrInfo is None:
            self.getAddrInfo()
        
        # Status can only be checked if address information is available
        if self.addrInfo:
            try:
                # Attempt connection to the standard HTTP socket as determined by socket.getaddrinfo()
                s = socket.socket()
                s.settimeout(timeout)
                s.connect(self.addrInfo)
                
                # Close the connection in a timely fashion - good practice
                s.shutdown(socket.SHUT_RDWR)
                s.close()
                
            # This should only happen if the IP address does not belong to a Motion
            except ConnectionRefusedError:
                self.status = HOST_STATUS_REFUSED

            # This should only happen when the Motion has been disconnected from the WiFi
            except socket.timeout:
                self.status = HOST_STATUS_TIMEOUT

            # Just in case of some other (unexpected) issue!
            except Exception:
                self.status = HOST_STATUS_UNKNOWN

            # The Motion is accepting connections. Yay!
            else:
                self.status = HOST_STATUS_CONNECTED

        # Address information is not known
        else:
            self.status = HOST_STATUS_GAIERROR

        return self.status
   

    def fetchJson(self, path, port=DEFAULT_HTTP_PORT, timeout=DEFAULT_JSON_TIMEOUT):
        """Fetch JSON from the Motion"""

        result = None

        # Ensure address information is available
        if self.addrInfo is None:
            self.getAddrInfo()

        # JSON can only be fetched if address information is available
        if self.addrInfo:
            # Use addrInfo to avoid unnecessary DNS lookup which can take several seconds
            url = 'http://{}/{}'.format(self.addrInfo[0], path)

            try:
                # The urlopen() function is a simple but effective way to open a URL
                response = urlopen(url, timeout=timeout)
                txt = response.read()

            except Exception as e:
                self.logger.error("%s was unable to provide %s", self.hostname, path)
                raise

            # Parse the JSON and store it in a dictionary
            result = json.loads(txt)

        return result


    def fetchInfo(self):
        """Fetch info.json from the Motion"""

        self.info = self.fetchJson('info.json')
        
        self.interpretInfo()


    def interpretInfo(self):
        """Interpret info.json"""

        identifier = self.info['motion']['identifier']
        if identifier != self.identifier:
            self.logger.error("%s identifier is incorrect (%s)", self.hostname, identifier)
            raise ValueError

        # A "hardware" section (containing "code" and "name") was introduced in 3144
        try:
            self.hardware = self.info['motion']['hardware']['name']
        except Exception:
            self.hardware = self.info['motion']['hardware']

        # A "firmware" section (containing "code", "name" and "special") was introduced in 3144
        try:
            self.firmware = self.info['motion']['firmware']['name']
        except Exception:
            self.firmware = str(self.info['motion']['firmware'])

        self.filesystemLevel = self.info['motion']['filesystem']['level']
        self.batteryLevel = self.info['motion']['battery']['level']

            
    def fetchSettings(self):
        """Fetch settings.json from the Motion"""

        self.settings = self.fetchJson('settings.json')
        
        self.interpretSettings()


    def interpretSettings(self):
        """Interpret settings.json"""

        self.username = self.settings['settings']['username']
        self.gnssRate = int(self.settings['settings']['gnss_rate'])
        self.oneLogPerDay = int(self.settings['settings']['one_log_per_day'])
        self.timezone = int(self.settings['settings']['timezone'])
        self.customLedFlash = int(self.settings['settings']['custom_led']['flash'])


    def fetchLogs(self):
        """Fetch logs.json from the Motion"""

        self.logs = self.fetchJson('logs.json')


    def checkInfo(self):
        """Check Motion information such as firmware, storage and battery"""
        
        self.checkFirmware()
        self.checkStorage()
        self.checkBattery()


    def checkFirmware(self):
        """Check firmware is acceptable"""
        
        try:
            firmwares = self.config['Motion']['Info']['Firmwares']
        except Exception:
            firmwares = {}

        if self.firmware not in firmwares:
            self.logger.warning("%s firmware is not approved (%s)", self.hostname, self.firmware)
        else:
            self.logger.debug("%s firmware is approved (%s)", self.hostname, self.firmware)


    def checkStorage(self):
        """Check storage is not above acceptable threshold"""
        
        try:
            threshold = self.config['Motion']['Info']['Thresholds']['Storage']
        except Exception:
            threshold = DEFAULT_FILESYSTEM_THRESHOLD

        if self.filesystemLevel > threshold:
            self.logger.warning("%s storage is %s%% full", self.hostname, self.filesystemLevel)
        else:
            self.logger.debug("%s storage is %s%% full", self.hostname, self.filesystemLevel)


    def checkBattery(self):
        """Check battery is not below acceptable threshold"""
        
        try:
            threshold = self.config['Motion']['Info']['Thresholds']['Battery']
        except Exception:
            threshold = DEFAULT_BATTERY_THRESHOLD

        if self.batteryLevel < threshold:
            self.logger.warning("%s battery level is %s%%", self.hostname, self.batteryLevel)
        else:
            self.logger.info("%s battery level is %s%%", self.hostname, self.batteryLevel)


    def checkSettings(self):
        """Check Motion settings such as firmware, filesystem and battery"""

        self.checkSetting('GNSS rate', 'GNSS Rate', DEFAULT_GNSS_RATE, self.gnssRate)
        self.checkSetting('one log per day', 'One Log Per Day', DEFAULT_ONE_LOG, self.oneLogPerDay)
        self.checkSetting('timezone', 'Timezone', DEFAULT_TIMEZONE, self.timezone)
        self.checkSetting('LED flash', 'LED Flash', DEFAULT_FLASH, self.customLedFlash)


    def checkSetting(self, description, configSetting, default, actual):
        """Check battery is not below acceptable threshold"""
        
        try:
            expected = self.config['Motion']['Settings'][configSetting]
        except Exception:
            expected = default

        if actual != expected:
            self.logger.warning("%s %s is incorrect (%s)", self.hostname, description, actual)
        else:
            self.logger.debug("%s %s is %s", self.hostname, description, actual)

## Unit Tests

Some basic unit tests, not relying upon Motion being connected to the WiFi

In [7]:
class TestMotionInit(unittest.TestCase):
    '''Class to test motion initialisation'''

    def testIdentifier(self):
        """Test identifier is as expected"""
        
        motion = Motion(config, logger, 470, 'GEO21MIC')

        self.assertEqual(motion.identifier, 470)
        

    def testHostname(self):
        """Test hostname is as expected"""
        
        motion = Motion(config, logger, 470, 'GEO21MIC')

        self.assertEqual(motion.hostname, 'motion-470')
        

    def testFileId(self):
        """Test file ID is as expected"""
        
        motion = Motion(config, logger, 470, 'GEO21MIC')

        self.assertEqual(motion.fileId, 'GEO21MIC')

In [8]:
class TestMotionInfo3129(unittest.TestCase):
    '''Class to test motion info of firmware 3129'''

    def testInterpretation(self):
        """Test interpretation of info.json"""
        
        motion = Motion(config, logger, 470, 'GEO21MIC')
        
        motion.info = {
            "motion": {
                "wifi": "station",
                "username": "K888",
                "identifier": 470,
                "hardware": "wireless-mini",
                "firmware": 3129,
                "filesystem": {
                    "level": 3
                },
                "battery": {
                    "level": 62,
                    "charging": False
                },
                "units": {
                    "speed": "kn",
                    "distance": "km"
                }
            }
        }

        motion.interpretInfo()

        self.assertEqual(motion.identifier, 470)
        self.assertEqual(motion.hardware, "wireless-mini")
        self.assertEqual(motion.firmware, "3129")
        self.assertEqual(motion.filesystemLevel, 3)
        self.assertEqual(motion.batteryLevel, 62)

In [9]:
class TestMotionInfo3144(unittest.TestCase):
    '''Class to test motion info of firmware 3144'''

    def testInterpretation(self):
        """Test interpretation of info.json"""
        
        motion = Motion(config, logger, 470, 'GEO21MIC')
        
        motion.info = {
            "motion": {
                "wifi": "station",
                "username": "K888",
                "identifier": 470,
                "hardware": {
                    "code": 106,
                    "name": "Wireless Mini M10"
                },
                "firmware": {
                    "code": 3144,
                    "name": "3144 Bypass",
                    "special": ""
                },
                "filesystem": {
                    "level": 3
                },
                "battery": {
                    "level": 62,
                    "charging": False
                },
                "units": {
                    "speed": "kn",
                    "distance": "km"
                }
            }
        }

        motion.interpretInfo()

        self.assertEqual(motion.identifier, 470)
        self.assertEqual(motion.hardware, "Wireless Mini M10")
        self.assertEqual(motion.firmware, "3144 Bypass")
        self.assertEqual(motion.filesystemLevel, 3)
        self.assertEqual(motion.batteryLevel, 62)

In [10]:
class TestMotionSettings(unittest.TestCase):
    '''Class to test motion settings of firmware 3144'''

    def testInterpretation(self):
        """Test interpretation of settings.json"""
        
        motion = Motion(config, logger, 470, 'GEO21MIC')
        
        motion.settings = {
            "settings": {
                "hardware": "wireless-mini",
                "username": "K888",
                "speed_unit": "kn",
                "distance_unit": "km",
                "gnss_rate": "5",
                "one_log_per_day": "1",
                "timezone": "60",
                "wifi": [
                {
                    "login": "WSW",
                    "password_length": "0"
                },
                {
                    "login": "",
                    "password_length": "0"
                },
                {
                    "login": "",
                    "password_length": "0"
                },
                {
                    "login": "",
                    "password_length": "0"
                }
                ],
                "custom_led": {
                    "flash": "1",
                    "substract": "0",
                    "rows": [
                        {
                            "code": "23"
                        },
                        {
                            "code": "24"
                        },
                        {
                            "code": "67"
                        },
                        {
                            "code": "68"
                        }
                    ]
                }
            }
        }

        motion.interpretSettings()

        self.assertEqual(motion.username, "K888")
        self.assertEqual(motion.gnssRate, 5)
        self.assertEqual(motion.oneLogPerDay, 1)
        self.assertEqual(motion.timezone, 60)
        self.assertEqual(motion.customLedFlash, 1)

## Run Unit Tests

Note: Only run unit tests when running this script directly, not during an import

In [12]:
def loadConfig():
    '''Read server config from JSON'''

    filename = os.path.join(projdir, CONFIG_DIR, 'server.json')
    with open(filename, 'r', encoding='utf-8') as f:
        jsonTxt = f.read()
        config = json.loads(jsonTxt)

    return config

In [11]:
def getLogger():
    """Initialise logging"""
    
    # Format of log messages
    formatString = "%(asctime)s,%(levelname)s,%(message)s"
    dateTimeFormat = "%Y-%m-%d %H:%M:%S"

    # Create logger
    logger = logging.getLogger('server')
    logger.setLevel(logging.DEBUG)

    # Handle this script being run repeatedly in IDE
    if not logger.hasHandlers():

        # Create console handler which logs using the level specified in server.log
        ch = logging.StreamHandler()
        ch.setLevel(UNITTEST_LOGGING_LEVEL)

        # Create formatter and add it to the handlers
        formatter = logging.Formatter(fmt=formatString, datefmt=dateTimeFormat)
        ch.setFormatter(formatter)

        # Add the handlers to logger
        logger.addHandler(ch)
    
    return logger

In [13]:
if __name__ == '__main__':
    
    config = loadConfig()

    logger = getLogger()
    
    unittest.main(argv=['first-arg-is-ignored'], exit=testExit)

......
----------------------------------------------------------------------
Ran 6 tests in 0.003s

OK
