New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SNMPv3 support to nav.Snmp.pynetsnmp implementation #2703 #2710
Changes from all commits
2a0ac88
ddffac5
bc5322d
8060241
c3dd1a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# | ||
# Copyright (C) 2023 Sikt | ||
# | ||
# This file is part of Network Administration Visualized (NAV). | ||
# | ||
# NAV is free software: you can redistribute it and/or modify it under | ||
# the terms of the GNU General Public License version 3 as published by | ||
# the Free Software Foundation. | ||
# | ||
# This program is distributed in the hope that it will be useful, but WITHOUT | ||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | ||
# more details. You should have received a copy of the GNU General Public | ||
# License along with NAV. If not, see <http://www.gnu.org/licenses/>. | ||
# | ||
"""Defines types and enumerations for SNMP communication parameters""" | ||
from enum import Enum | ||
|
||
|
||
class SecurityLevel(Enum): | ||
"""SNMPv3 security levels""" | ||
|
||
NO_AUTH_NO_PRIV = "noAuthNoPriv" | ||
AUTH_NO_PRIV = "authNoPriv" | ||
AUTH_PRIV = "authPriv" | ||
|
||
|
||
class AuthenticationProtocol(Enum): | ||
"""SNMPv3 authentication protocols supported by NET-SNMP""" | ||
|
||
MD5 = "MD5" | ||
SHA = "SHA" | ||
SHA512 = "SHA-512" | ||
SHA384 = "SHA-384" | ||
SHA256 = "SHA-256" | ||
SHA224 = "SHA-224" | ||
|
||
|
||
class PrivacyProtocol(Enum): | ||
"""SNMPv3 privacy protocols supported by NET-SNMP""" | ||
|
||
DES = "DES" | ||
AES = "AES" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
c_ulong, | ||
c_uint64, | ||
) | ||
from typing import Union, Optional | ||
|
||
from IPy import IP | ||
from pynetsnmp import netsnmp | ||
|
@@ -48,12 +49,14 @@ | |
) | ||
|
||
from nav.oids import OID | ||
from .defines import SecurityLevel, AuthenticationProtocol, PrivacyProtocol | ||
from .errors import ( | ||
EndOfMibViewError, | ||
NoSuchObjectError, | ||
SnmpError, | ||
TimeOutException, | ||
UnsupportedSnmpVersionError, | ||
SNMPv3ConfigurationError, | ||
) | ||
|
||
PDUVarbind = namedtuple("PDUVarbind", ['oid', 'type', 'value']) | ||
|
@@ -85,30 +88,86 @@ class Snmp(object): | |
""" | ||
|
||
def __init__( | ||
self, host, community="public", version="1", port=161, retries=3, timeout=1 | ||
self, | ||
host: str, | ||
community: str = "public", | ||
version: Union[str, int] = "1", | ||
port: Union[str, int] = 161, | ||
retries: int = 3, | ||
timeout: int = 1, | ||
# SNMPv3-only options | ||
sec_level: Optional[SecurityLevel] = None, | ||
auth_protocol: Optional[AuthenticationProtocol] = None, | ||
sec_name: Optional[str] = None, | ||
auth_password: Optional[str] = None, | ||
priv_protocol: Optional[PrivacyProtocol] = None, | ||
priv_password: Optional[str] = None, | ||
): | ||
"""Makes a new Snmp-object. | ||
|
||
:param host: hostname or IP address | ||
:param community: community (password), defaults to "public" | ||
:param port: udp port number, defaults to "161" | ||
:param sec_level: SNMPv3 security level | ||
:param auth_protocol: SNMPv3 authentication protocol | ||
:param sec_name: SNMPv3 securityName | ||
:param auth_password: SNMPv3 authentication password | ||
:param priv_protocol: SNMPv3 privacy protocol | ||
:param priv_password: SNMPv3 privacy password | ||
|
||
""" | ||
|
||
self.handle = None | ||
self.host = host | ||
self.community = str(community) | ||
self.version = str(version) | ||
if self.version == '2': | ||
self.version = '2c' | ||
if self.version not in ('1', '2c'): | ||
if self.version not in ('1', '2c', '3'): | ||
raise UnsupportedSnmpVersionError(self.version) | ||
self.port = int(port) | ||
self.retries = retries | ||
self.timeout = timeout | ||
|
||
self.sec_level = SecurityLevel(sec_level) if sec_level else None | ||
self.auth_protocol = ( | ||
AuthenticationProtocol(auth_protocol) if auth_protocol else None | ||
) | ||
self.sec_name = sec_name | ||
self.auth_password = auth_password | ||
self.priv_protocol = PrivacyProtocol(priv_protocol) if priv_protocol else None | ||
self.priv_password = priv_password | ||
if self.version == "3": | ||
self._verify_snmpv3_params() | ||
|
||
self.handle = _MySnmpSession(self._build_cmdline()) | ||
self.handle.open() | ||
|
||
def _verify_snmpv3_params(self): | ||
if not self.sec_level: | ||
raise SNMPv3ConfigurationError("sec_level is required to be set") | ||
if not self.sec_name: | ||
raise SNMPv3ConfigurationError( | ||
"sec_name is required regardless of security level" | ||
) | ||
if self.sec_level in (SecurityLevel.AUTH_NO_PRIV, SecurityLevel.AUTH_PRIV): | ||
if not self.auth_protocol: | ||
raise SNMPv3ConfigurationError( | ||
f"{self.sec_level.value} requires auth_protocol to be set" | ||
) | ||
if not self.auth_password: | ||
raise SNMPv3ConfigurationError( | ||
f"{self.sec_level.value} requires auth_password to be set" | ||
) | ||
if self.sec_level == SecurityLevel.AUTH_PRIV: | ||
if not self.priv_protocol: | ||
raise SNMPv3ConfigurationError( | ||
f"{self.sec_level.value} requires priv_protocol to be set" | ||
) | ||
if not self.priv_password: | ||
raise SNMPv3ConfigurationError( | ||
f"{self.sec_level.value} requires priv_password to be set" | ||
) | ||
|
||
def _build_cmdline(self): | ||
try: | ||
address = IP(self.host) | ||
|
@@ -117,19 +176,29 @@ def _build_cmdline(self): | |
else: | ||
host = 'udp6:[%s]' % self.host if address.version() == 6 else self.host | ||
|
||
return ( | ||
'-v' + self.version, | ||
'-c', | ||
self.community, | ||
'-r', | ||
str(self.retries), | ||
'-t', | ||
str(self.timeout), | ||
'%s:%s' % (host, self.port), | ||
params = [f"-v{self.version}"] | ||
|
||
if self.version in ("1", "2c"): | ||
params.extend(["-c", self.community]) | ||
elif self.version == "3": | ||
params.extend(["-l", self.sec_level.value, "-u", self.sec_name]) | ||
if self.auth_protocol: | ||
params.extend(["-a", self.auth_protocol.value]) | ||
if self.auth_password: | ||
params.extend(["-A", self.auth_password]) | ||
if self.priv_protocol: | ||
params.extend(["-x", self.priv_protocol.value]) | ||
if self.priv_password: | ||
params.extend(["-X", self.priv_password]) | ||
Comment on lines
+184
to
+192
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the snmpv3-conf was stored in a dataclass, this could be a method on that class. |
||
|
||
params.extend( | ||
["-r", str(self.retries), "-t", str(self.timeout), f"{host}:{self.port}"] | ||
Comment on lines
+194
to
+195
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any specific reason these need to be at the end? I'd be tempted to merge this line with line 179. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
or
(and of course, 20 years of of conventional usage of these command line programs). The version of the commands on my current computer seem to accept the arguments in arbitrary order, however, so I can't be sure whether this was always true, or whether it was introduced at some specific version. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could add a comment "# stays here: do not meddle with 20 years of ingrained habits" or something :) |
||
) | ||
return tuple(params) | ||
|
||
def __del__(self): | ||
self.handle.close() | ||
if self.handle: | ||
self.handle.close() | ||
|
||
def get(self, query="1.3.6.1.2.1.1.1.0"): | ||
"""Performs an SNMP GET query. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @stveit suggested in the automerged one, a dataclass for the SNMPv3 config sounds better and better. Then this method could be on that class.
The instance would be stored as a whole in the
Snmp
init, which would make for more verbose code but probably better since theere is only one Snmp-class.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good idea(tm). But there is now a bunch of PRs based on this that are waiting to be merged, and all of them need to change as a result of this.
May I suggest doing a refactor PR to fix all the affected bits once they've all been merged? I can add it as a task to #1177 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.