diff --git a/.github/ISSUE_TEMPLATE/bug_report.yaml b/.github/ISSUE_TEMPLATE/bug_report.yaml new file mode 100644 index 0000000..6adb01d --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.yaml @@ -0,0 +1,27 @@ +name: Bug Report +description: File a bug report +title: "[Bug]: " +labels: ["bug", "needs-triage"] +body: + - type: checkboxes + id: terms + attributes: + label: Please try to fill out as much of the information below as you can. Thank you! + options: + - label: Yes, I've searched similar issues on GitHub and didn't find any. + required: true + - type: input + id: app_version + attributes: + label: Which version contains the bug? + placeholder: 1.0.0 + - type: textarea + id: description + attributes: + label: Describe the bug + description: Please provide a concise description of the bug, add any relevant output or error messages. You can use markdown. + - type: textarea + id: recreate + attributes: + label: How to recreate the bug? + description: Please provide the steps to recreate the issue. diff --git a/.github/ISSUE_TEMPLATE/documentation.yaml b/.github/ISSUE_TEMPLATE/documentation.yaml new file mode 100644 index 0000000..088b14f --- /dev/null +++ b/.github/ISSUE_TEMPLATE/documentation.yaml @@ -0,0 +1,10 @@ +name: Documentation +description: Suggest documentation improvements +title: "[Documentation]: " +labels: ["documentation"] +body: + - type: textarea + id: description + attributes: + label: Describe the improvements you'd like. + description: Please provide as much context as possible. You can use markdown. diff --git a/.github/ISSUE_TEMPLATE/feature_request.yaml b/.github/ISSUE_TEMPLATE/feature_request.yaml new file mode 100644 index 0000000..12c9e2e --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.yaml @@ -0,0 +1,15 @@ +name: Feature Request +description: Request a feature or enhancement +title: "[Feature]: " +labels: ["feature", "needs-triage"] +body: + - type: markdown + attributes: + value: | + Please try to fill out as much of the information below as you can. Thank you! + **Note:** If you want to sponsor new features, contact us at info@netways.de + - type: textarea + id: description + attributes: + label: Describe the feature request + description: Please provide a concise description of the feature. You can use markdown. diff --git a/.github/ISSUE_TEMPLATE/question.yaml b/.github/ISSUE_TEMPLATE/question.yaml new file mode 100644 index 0000000..65183ea --- /dev/null +++ b/.github/ISSUE_TEMPLATE/question.yaml @@ -0,0 +1,10 @@ +name: Question +description: Ask a question +title: "[Question]: " +labels: ["question"] +body: + - type: textarea + id: description + attributes: + label: Ask a question + description: Please provide as much context as possible. You can use markdown. diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..ad45155 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: +- package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: monthly diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml new file mode 100644 index 0000000..9666768 --- /dev/null +++ b/.github/workflows/unittest.yml @@ -0,0 +1,23 @@ +name: CI + +on: [push, pull_request] + +jobs: + gitHubActionForPytest: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.10, 3.11, 3.12, 3.13, 3.14] + name: GitHub Action + steps: + - name: Checkout + uses: actions/checkout@v6 + - name: Install dependencies + run: | + python -m pip install -r requirements.txt -r requirements-dev.txt + - name: Lint + run: | + make lint + - name: Test + run: | + make coverage diff --git a/.gitignore b/.gitignore index 7da0c3b..ece1395 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,65 @@ -.idea -.venv +# Byte-compiled / optimized / DLL files __pycache__/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +venv/ +.venv/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Editors +\#* +.\#* +.idea *.bak diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..04b6c62 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,15 @@ +# pylint config +[FORMAT] +good-names=s,r,e +[MESSAGES CONTROL] +disable=fixme, + invalid-name, + consider-using-f-string, + missing-module-docstring, + too-many-instance-attributes, + too-many-arguments, + too-many-branches, + too-many-locals, + too-many-statements, + redefined-outer-name, + no-member, diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..8014a40 --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +.PHONY: lint test coverage + +lint: + python -m pylint check_sensorProbe2plus.py +test: + python -m unittest -v test_check_sensorProbe2plus.py +coverage: + python -m coverage run -m unittest -b test_check_sensorProbe2plus.py + python -m coverage report -m --include check_sensorProbe2plus.py diff --git a/README.md b/README.md index 86f288b..8e511e0 100644 --- a/README.md +++ b/README.md @@ -1,35 +1,27 @@ -## check_sensorProbe2plus ## - -### Description ### +# check_sensorProbe2plus This plugin serves the purpose of receiving data from the SensorProbe2+ and checking its state. - -### Dependencies ### +## Dependencies + [PySNMP](https://github.com/etingof/pysnmp) -+ [enum34](https://pypi.org/project/enum34) -### Usage ### +## Usage ``` -check_sensorProbe2plus.py -H -C [-p] [-V] [-v] [-h] -``` +usage: check_sensorProbe2plus.py [-h] [-V] [-v] [-p PORT] -H HOSTNAME -C COMMUNITY -#### required arguments: #### +Check plugin for AKCP SensorProbe2+ -+ **HOSTNAME:** host of the SensorProbe2+ - `` -H, --hostname `` -+ **COMMUNITY:** read community of the SensorProbe2+ - `` -C, --community `` +options: + -h, --help show this help message and exit + -V, --version + -v, --verbose increase output verbosity (-v or -vv) + -p, --port PORT port of the sensors to check (shows all if not set) -#### optional arguments: #### - -+ **HELP** show the help message and exit - `` -h, --help `` -+ **VERSION** shows the current version of the check plugin - `` -V, --version `` -+ **VERBOSE** increases output verbosity (-v or -vv) - `` -v, --verbose `` -+ **PORT** port of the sensor to check (shows all if not set) - `` -p, --port `` +required arguments: + -H, --hostname HOSTNAME + host of the sensor probe + -C, --community COMMUNITY + read community of the sensor probe +``` diff --git a/check_sensorProbe2plus.py b/check_sensorProbe2plus.py index 45cd6f9..50bccb8 100755 --- a/check_sensorProbe2plus.py +++ b/check_sensorProbe2plus.py @@ -2,10 +2,6 @@ # ------------------------------------------------------------------------------ # check_sensorProbe2plus.py - A check plugin for AKCP SensorProbe2+. # Copyright (C) 2017 NETWAYS GmbH, www.netways.de -# Authors: Noah Hilverling -# Jennifer Mourek -# -# Version: 1.0 # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -22,14 +18,36 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ------------------------------------------------------------------------------ +""" +check_sensorProbe2plus.py is a Check Plugin for Nagios similar monitoring systems +like Icinga or Nameon. +It retrieves data from a AKCP SensorProbe2+ devices and alerts on problematic +conditions +""" + import argparse +import asyncio as other_asyncio_name import sys -from pysnmp.entity.rfc3413.oneliner import cmdgen +import typing from enum import Enum, IntEnum +import pysnmp +# pylint: disable=import-error,no-name-in-module +from pysnmp.hlapi.v3arch.asyncio import SnmpEngine as pySnmp_engine +from pysnmp.hlapi.v3arch.asyncio import next_cmd as pySnmp_next_cmd + +# Version number +VERSION = 1.1 + +# Root for sensor OIDs +SENSORS_OID = (1, 3, 6, 1, 4, 1, 3854, 3, 5) + +# pylint: disable=consider-using-f-string + -# Translate the OID indexes to keywords class Types(IntEnum): + """Translate the OID indexes to keywords""" + CATEGORY = 0 NAME = 2 UNIT = 5 @@ -41,8 +59,9 @@ class Types(IntEnum): VALUE = 20 -# Translate the Nagios state IDs to keywords class NagiosState(Enum): + """Translate the Nagios state IDs to keywords""" + OK = 0 WARNING = 1 CRITICAL = 2 @@ -67,31 +86,35 @@ class NagiosState(Enum): 22: "Power", 24: "Fuel", 26: "Tank sender", - 27: "Door" + 27: "Door", } -# Convert a state from AKCP format to Nagios def convert_state_to_nagios(state_to_convert): + """ + Convert a state from AKCP format to Nagios + """ state_to_convert = int(state_to_convert) if state_to_convert == 1: return NagiosState.UNKNOWN - elif state_to_convert == 2: + if state_to_convert == 2: return NagiosState.OK - elif state_to_convert == 3 or state_to_convert == 5: + if state_to_convert in (3, 5): return NagiosState.WARNING - elif state_to_convert == 4 or state_to_convert == 6: + if state_to_convert in (4, 6): return NagiosState.CRITICAL - else: - print("State %s is out of range. That should not happen." % state_to_convert) - return NagiosState.UNKNOWN + + print("State %s is out of range. That should not happen." % state_to_convert) + return NagiosState.UNKNOWN -# Display a one line status message with: -# - most important Nagios state and short message -# - number and name of the sensors in any state but OK -# - thresholds of the sensors def print_status_message(sensor_states, perf_data): + """ + Display a one line status message with: + - most important Nagios state and short message + - number and name of the sensors in any state but OK + - thresholds of the sensors + """ warning_name_string = "" for name in sensor_states["WARNING"]: if warning_name_string: @@ -106,206 +129,290 @@ def print_status_message(sensor_states, perf_data): result_message = "" if len(sensor_states["WARNING"]) > 0 and len(sensor_states["CRITICAL"]) > 0: - result_message = "CRITICAL sensorProbe2plus: Sensor reports state CRITICAL for %d sensor%s (%s) " \ - "and state WARNING for %d sensor%s (%s)" % ( - len(sensor_states["CRITICAL"]), - "s" if len(sensor_states["CRITICAL"]) > 1 else "", - critical_name_string, - len(sensor_states["WARNING"]), - "s" if len(sensor_states["WARNING"]) > 1 else "", - warning_name_string - ) + result_message = ( + "CRITICAL sensorProbe2plus: Sensor reports state CRITICAL for {} sensor{} ({}) " + "and state WARNING for {} sensor{} ({})".format( + len(sensor_states["CRITICAL"]), + "s" if len(sensor_states["CRITICAL"]) > 1 else "", + critical_name_string, + len(sensor_states["WARNING"]), + "s" if len(sensor_states["WARNING"]) > 1 else "", + warning_name_string, + ) + ) elif len(sensor_states["WARNING"]) > 0: - result_message = "WARNING sensorProbe2plus: Sensor reports state WARNING for %d sensor%s (%s)" % ( - len(sensor_states["WARNING"]), "s" if len(sensor_states["WARNING"]) > 1 else "", warning_name_string) + result_message = ( + "WARNING sensorProbe2plus: Sensor reports state WARNING for %d sensor%s (%s)" + % ( + len(sensor_states["WARNING"]), + "s" if len(sensor_states["WARNING"]) > 1 else "", + warning_name_string, + ) + ) elif len(sensor_states["CRITICAL"]) > 0: - result_message = "CRITICAL sensorProbe2plus: Sensor reports state CRITICAL for %d sensor%s (%s)" % ( - len(sensor_states["CRITICAL"]), "s" if len(sensor_states["CRITICAL"]) > 1 else "", critical_name_string) + result_message = ( + "CRITICAL sensorProbe2plus: Sensor reports state CRITICAL for %d sensor%s (%s)" + % ( + len(sensor_states["CRITICAL"]), + "s" if len(sensor_states["CRITICAL"]) > 1 else "", + critical_name_string, + ) + ) else: result_message = "OK sensorProbe2plus: Sensor reports that everything is fine" # Add the performance data to the end of the first output line result_message += "|" - for singlePerfData in perf_data: - result_message += singlePerfData + " " + for single_perfdata in perf_data: + result_message += single_perfdata + " " # Print summary and performance data print(result_message) -# Version number -version = 1.0 - -# Initialise variables -verbose = 0 -hostname = "" -community = "" -port = 0 - -# Arguments for the CLI command -parser = argparse.ArgumentParser(description='Check plugin for AKCP SensorProbe2+') -parser.add_argument("-V", "--version", action="store_true") -parser.add_argument("-v", "--verbose", action="count", default=0, help="increase output verbosity (-v or -vv)") -parser.add_argument("-p", "--port", help="port of the sensors to check (shows all if not set)", type=int, default=0) -required = parser.add_argument_group('required arguments') -required.add_argument("-H", "--hostname", help="host of the sensor probe", required=True) -required.add_argument("-C", "--community", help="read community of the sensor probe", required=True) - -args = parser.parse_args() - -# Print version if version argument is given -if args.version: - print("AKCP SensorProbe2+ Version %s" % version) - sys.exit() -else: - # Assert arguments to their variables - verbose = args.verbose if args.verbose <= 2 else 2 - hostname = args.hostname - community = args.community - port = args.port - -# The state with the highest importance (CRITICAL -> WARNING -> OK) -mostImportantState = NagiosState.OK - -# Array of messages to print after first line if verbose -stateMessages = [] - -# Performance data for each sensor as string -perfData = [] - -# Root for sensor dictionary tree -sensorPorts = {} - -# Root for sensor OIDs -sensorsOID = (1, 3, 6, 1, 4, 1, 3854, 3, 5) - -generator = cmdgen.CommandGenerator() -communityData = cmdgen.CommunityData(community) -transport = cmdgen.UdpTransportTarget((hostname, 161)) -command = getattr(generator, 'nextCmd') - -errorIndication, errorStatus, errorIndex, result = command(communityData, transport, sensorsOID) - -# Check if an exception occurred -if errorIndication: - print("%s sensorProbe2plus: %s" % (NagiosState.UNKNOWN.name, errorIndication)) - mostImportantState = NagiosState.UNKNOWN -elif errorStatus: - print(('%s sensorProbe2plus: %s at %s' % (NagiosState.CRITICAL.name, - errorStatus.prettyPrint(), - errorIndex and result[int(errorIndex)-1] or '?'))) - mostImportantState = NagiosState.CRITICAL -else: - # Sort results - for data in result: - oid = data[0][0] - value = data[0][1] - - # Filter relevant OIDs - valueIndex = int(oid[11]) - try: - Types(valueIndex) - except ValueError as err: - continue - - # Get sensor category - category = int(oid[9]) - if category == 1 or category > 27: - continue - - # Filter ports if port is given in arguments - sensorPort = int(oid[15]) - if args.port != 0 and args.port - 1 != sensorPort: - continue - - # Numeric index of sensor - sensorIndex = int(oid[16]) - - # Add needed dictionaries if not yet existing - if sensorPort not in sensorPorts: - sensorPorts[sensorPort] = {} - if sensorIndex not in sensorPorts[sensorPort]: - sensorPorts[sensorPort][sensorIndex] = {} - - # Store data in dictionary tree - sensorPorts[sensorPort][sensorIndex][valueIndex] = value - sensorPorts[sensorPort][sensorIndex][Types.CATEGORY] = categories[category] - - # Check if there is no sensor on the given port - if len(sensorPorts) < 1: - print("%s sensorProbe2plus: There is no sensor on the given port" % NagiosState.UNKNOWN.name) - sys.exit(NagiosState.UNKNOWN.value) - - # Sensor names sorted by state - namesByState = {"OK": [], "WARNING": [], "CRITICAL": [], "UNKNOWN": []} - - # Iterate through sensors - for sensorPort, sensorIndexes in sensorPorts.items(): - for sensorIndex, valueIndexes in sensorIndexes.items(): - # Convert state to Nagios states - state = convert_state_to_nagios(valueIndexes[Types.STATE]) - - # Redetermines most important state - if hasattr(state, 'value') and state.value > mostImportantState.value: - mostImportantState = state - else: - mostImportantState = NagiosState.UNKNOWN - - # Sort sensor name by state - namesByState[state.name].append(valueIndexes[Types.NAME]) - - # Check if sensor has no value - if Types.VALUE not in valueIndexes: - # Value replacement for sensors without value - value = 0 if state == NagiosState.OK else 1 - - # Status message for sensor - stateMessages.append( - "%s %s" % (state.name, valueIndexes[Types.NAME])) - - # Add performance data to performance data array - perfData.append("'%s'=%s;" % (valueIndexes[Types.NAME], value)) - else: - # Convert temperatures into right format - if valueIndexes[Types.UNIT] == "C": - valueIndexes[Types.VALUE] = float(valueIndexes[Types.VALUE]) / 10 - valueIndexes[Types.LOW_CRITICAL] = float(valueIndexes[Types.LOW_CRITICAL]) / 10 - valueIndexes[Types.LOW_WARNING] = float(valueIndexes[Types.LOW_WARNING]) / 10 - valueIndexes[Types.HIGH_WARNING] = float(valueIndexes[Types.HIGH_WARNING]) / 10 - valueIndexes[Types.HIGH_CRITICAL] = float(valueIndexes[Types.HIGH_CRITICAL]) / 10 - - # Status message for sensor - stateMessage = '%s %s sensor "%s": %s%s' % (state.name, - valueIndexes[Types.CATEGORY], - valueIndexes[Types.NAME], - valueIndexes[Types.VALUE], - valueIndexes[Types.UNIT]) - - # Add thresholds to verbose sensor messages - if verbose > 1: - stateMessage += " (%s:%s/%s:%s)" % (valueIndexes[Types.LOW_WARNING], - valueIndexes[Types.HIGH_WARNING], - valueIndexes[Types.LOW_CRITICAL], - valueIndexes[Types.HIGH_CRITICAL]) - - stateMessages.append(stateMessage) - - # Add performance data to performance data array - perfData.append("'%s'=%s%s;%s:%s;%s:%s" % (valueIndexes[Types.NAME], - valueIndexes[Types.VALUE], - valueIndexes[Types.UNIT], - valueIndexes[Types.LOW_WARNING], - valueIndexes[Types.HIGH_WARNING], - valueIndexes[Types.LOW_CRITICAL], - valueIndexes[Types.HIGH_CRITICAL])) - - # Print first line of output - print_status_message(namesByState, perfData) - - # Add additional information for each sensor to the output if verbose - if verbose > 0: - for message in stateMessages: - print(message) - - # Exit with most important state - sys.exit(mostImportantState.value) +def parse_args() -> tuple[int, str, str, int]: + """ + Parse CLI arguments + """ + parser = argparse.ArgumentParser(description="Check plugin for AKCP SensorProbe2+") + parser.add_argument("-V", "--version", action="store_true") + parser.add_argument( + "-v", + "--verbose", + action="count", + default=0, + help="increase output verbosity (-v or -vv)", + ) + parser.add_argument( + "-p", + "--port", + help="port of the sensors to check (shows all if not set)", + type=int, + default=0, + ) + required = parser.add_argument_group("required arguments") + required.add_argument( + "-H", "--hostname", help="host of the sensor probe", required=True + ) + required.add_argument( + "-C", "--community", help="read community of the sensor probe", required=True + ) + + args = parser.parse_args() + + # Print version if version argument is given + if args.version: + print("check_sensorProbe2plus version %s" % VERSION) + sys.exit() + else: + # Assert arguments to their variables + verbose = args.verbose if args.verbose <= 2 else 2 + hostname = args.hostname + community = args.community + port = args.port + return (verbose, hostname, community, port) + + +def execute(hostname, sensor_port_input, community, verbose) -> None: + """ + Execute the actual test + """ + # Array of messages to print after first line if verbose + state_messages = [] + + # Performance data for each sensor as string + perfdata = [] + + # Root for sensor dictionary tree + sensor_ports: dict[int, typing.Any] = {} + + error_indication, error_status, error_index, result = other_asyncio_name.run( + snmp_query(hostname, 161, SENSORS_OID, community) + ) + + # The state with the highest importance (CRITICAL -> WARNING -> OK) + most_important_state = NagiosState.OK + + if error_indication: + print("%s sensorProbe2plus: %s" % (NagiosState.UNKNOWN.name, error_indication)) + most_important_state = NagiosState.UNKNOWN + elif error_status: + print( + ( + "%s sensorProbe2plus: %s at %s" + % ( + NagiosState.CRITICAL.name, + error_status.prettyPrint(), + error_index and result[int(error_index) - 1] or "?", + ) + ) + ) + most_important_state = NagiosState.CRITICAL + else: + # Sort results + for data in result: + oid = data[0][0] + value = data[0][1] + + # Filter relevant OIDs + value_index = int(oid[11]) + try: + Types(value_index) + except ValueError: + continue + + # Get sensor category + category = int(oid[9]) + if category == 1 or category > 27: + continue + + # Filter ports if port is given in arguments + sensor_port = int(oid[15]) + if sensor_port_input != 0 and sensor_port_input - 1 != sensor_port: + continue + + # Numeric index of sensor + sensor_index = int(oid[16]) + + # Add needed dictionaries if not yet existing + if sensor_port not in sensor_ports: + sensor_ports[sensor_port] = {} + if sensor_index not in sensor_ports[sensor_port]: + sensor_ports[sensor_port][sensor_index] = {} + + # Store data in dictionary tree + sensor_ports[sensor_port][sensor_index][value_index] = value + sensor_ports[sensor_port][sensor_index][Types.CATEGORY] = categories[ + category + ] + + # Check if there is no sensor on the given port + if len(sensor_ports) < 1: + print( + "%s sensorProbe2plus: There is no sensor on the given port" + % NagiosState.UNKNOWN.name + ) + sys.exit(NagiosState.UNKNOWN.value) + + # Sensor names sorted by state + names_by_state: dict[str, list] = { + "OK": [], + "WARNING": [], + "CRITICAL": [], + "UNKNOWN": [], + } + + # Iterate through sensors + for sensor_port, sensor_indexes in sensor_ports.items(): + for sensor_index, value_indexes in sensor_indexes.items(): + # Convert state to Nagios states + state = convert_state_to_nagios(value_indexes[Types.STATE]) + + # Redetermines most important state + if hasattr(state, "value") and state.value > most_important_state.value: + most_important_state = state + else: + most_important_state = NagiosState.UNKNOWN + + # Sort sensor name by state + names_by_state[state.name].append(value_indexes[Types.NAME]) + + # Check if sensor has no value + if Types.VALUE not in value_indexes: + # Value replacement for sensors without value + value = 0 if state == NagiosState.OK else 1 + + # Status message for sensor + state_messages.append( + "%s %s" % (state.name, value_indexes[Types.NAME]) + ) + + # Add performance data to performance data array + perfdata.append("'%s'=%s;" % (value_indexes[Types.NAME], value)) + else: + # Convert temperatures into right format + if value_indexes[Types.UNIT] == "C": + value_indexes[Types.VALUE] = ( + float(value_indexes[Types.VALUE]) / 10 + ) + value_indexes[Types.LOW_CRITICAL] = ( + float(value_indexes[Types.LOW_CRITICAL]) / 10 + ) + value_indexes[Types.LOW_WARNING] = ( + float(value_indexes[Types.LOW_WARNING]) / 10 + ) + value_indexes[Types.HIGH_WARNING] = ( + float(value_indexes[Types.HIGH_WARNING]) / 10 + ) + value_indexes[Types.HIGH_CRITICAL] = ( + float(value_indexes[Types.HIGH_CRITICAL]) / 10 + ) + + # Status message for sensor + state_message = '%s %s sensor "%s": %s%s' % ( + state.name, + value_indexes[Types.CATEGORY], + value_indexes[Types.NAME], + value_indexes[Types.VALUE], + value_indexes[Types.UNIT], + ) + + # Add thresholds to verbose sensor messages + if verbose > 1: + state_message += " (%s:%s/%s:%s)" % ( + value_indexes[Types.LOW_WARNING], + value_indexes[Types.HIGH_WARNING], + value_indexes[Types.LOW_CRITICAL], + value_indexes[Types.HIGH_CRITICAL], + ) + + state_messages.append(state_message) + + # Add performance data to performance data array + perfdata.append( + "'%s'=%s%s;%s:%s;%s:%s" + % ( + value_indexes[Types.NAME], + value_indexes[Types.VALUE], + value_indexes[Types.UNIT], + value_indexes[Types.LOW_WARNING], + value_indexes[Types.HIGH_WARNING], + value_indexes[Types.LOW_CRITICAL], + value_indexes[Types.HIGH_CRITICAL], + ) + ) + + # Print first line of output + print_status_message(names_by_state, perfdata) + + # Add additional information for each sensor to the output if verbose + if verbose > 0: + for message in state_messages: + print(message) + + # Exit with most important state + sys.exit(most_important_state.value) + + +async def snmp_query(hostname, port, oid, community): + """ + snmp_query executes the actual query + """ + snmp_engine = pySnmp_engine() + + snmp_object = pysnmp.smi.rfc1902.ObjectType(pysnmp.smi.rfc1902.ObjectIdentity(oid)) + # pylint: disable=c-extension-no-member + error_indication, error_status, error_index, result = await pySnmp_next_cmd( + snmp_engine, + pysnmp.hlapi.v3arch.asyncio.auth.CommunityData(community), + await pysnmp.hlapi.v3arch.asyncio.UdpTransportTarget.create((hostname, port)), + pysnmp.hlapi.v3arch.asyncio.ContextData(), + snmp_object, + ) + + return (error_indication, error_status, error_index, result) + + +if __name__ == "__main__": + verbose, hostname, community, sensor_port = parse_args() + execute(hostname, sensor_port, community, verbose) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..368140c --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,2 @@ +pylint==3.3.7 +coverage==7.9.0 diff --git a/requirements.txt b/requirements.txt index a748911..d39f41e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1 @@ -enum34==1.1.10 -pysnmp==4.4.12 +pysnmp==7.1.26 diff --git a/test_check_sensorProbe2plus.py b/test_check_sensorProbe2plus.py new file mode 100644 index 0000000..796df4b --- /dev/null +++ b/test_check_sensorProbe2plus.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +import unittest +import unittest.mock as mock +import sys + +sys.path.append('..') + +from check_sensorProbe2plus import convert_state_to_nagios +from check_sensorProbe2plus import print_status_message + +from check_sensorProbe2plus import NagiosState + +class UtilTesting(unittest.TestCase): + + def test_state(self): + actual = convert_state_to_nagios(1) + expected = NagiosState.UNKNOWN + + self.assertEqual(actual, expected) + + @mock.patch('builtins.print') + def test_status_message(self, mock_print): + sensor_states = { + "OK": [], + "WARNING": [], + "CRITICAL": [], + "UNKNOWN": [], + } + + print_status_message(sensor_states, "perfdata") + + calls = [mock.call('OK sensorProbe2plus: Sensor reports that everything is fine|p e r f d a t a ')] + + mock_print.assert_has_calls(calls) +