Skip to content

Commit

Permalink
Implemented "Bulk" request.
Browse files Browse the repository at this point in the history
References #16
  • Loading branch information
exhuma committed Oct 6, 2016
1 parent 0c8dd57 commit 86b7f50
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 0 deletions.
83 changes: 83 additions & 0 deletions puresnmp/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,86 @@ class SetRequest(SnmpMessage):
Represents an SNMP SET Request.
"""
TAG = 3


class BulkGetRequest(Type):
"""
Represents a SNMP GetBulk request
"""
TYPECLASS = TypeInfo.CONTEXT
TAG = 5

@classmethod
def decode(cls, data):
"""
This method takes a :py:class:`bytes` object and converts it to
an application object.
"""
# TODO (advanced): recent tests revealed that this is *not symmetric*
# with __bytes__ of this class. This should be ensured!
if not data:
raise EmptyMessage('No data to decode!')
request_id, data = pop_tlv(data)
non_repeaters, data = pop_tlv(data)
max_repeaters, data = pop_tlv(data)
values, data = pop_tlv(data)

oids = [str(*oid) for oid, _ in values]

return cls(
request_id,
non_repeaters,
max_repeaters,
*oids
)

def __init__(self, request_id, non_repeaters, max_repeaters, *oids):
self.request_id = request_id
self.non_repeaters = non_repeaters
self.max_repeaters = max_repeaters
self.varbinds = []
for oid in oids:
self.varbinds.append(VarBind(oid, Null()))

def __bytes__(self):
wrapped_varbinds = [Sequence(vb.oid, vb.value) for vb in self.varbinds]
data = [
Integer(self.request_id),
Integer(self.non_repeaters),
Integer(self.max_repeaters),
Sequence(*wrapped_varbinds)
]
payload = b''.join([bytes(chunk) for chunk in data])

tinfo = TypeInfo(TypeInfo.CONTEXT, TypeInfo.CONSTRUCTED, self.TAG)
length = encode_length(len(payload))
return bytes(tinfo) + length + payload

def __repr__(self):
return '%s(%r, %r)' % (
self.__class__.__name__,
self.request_id, self.varbinds)

def __eq__(self, other):
# pylint: disable=unidiomatic-typecheck
return (type(other) == type(self) and
self.request_id == other.request_id and
self.non_repeaters == other.non_repeaters and
self.max_repeaters == other.max_repeaters and
self.varbinds == other.varbinds)

def pretty(self) -> str: # pragma: no cover
"""
Returns a "prettified" string representing the SNMP message.
"""
lines = [
self.__class__.__name__,
' Request ID: %s' % self.request_id,
' Non Repeaters: %s' % self.non_repeaters,
' Max Repeaters: %s' % self.max_repeaters,
' Varbinds: ',
]
for bind in self.varbinds:
lines.append(' %s: %s' % (bind.oid, bind.value))

return '\n'.join(lines)
4 changes: 4 additions & 0 deletions puresnmp/test/data/bulk_get_request.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
0000: 30 37 02 01 01 04 06 70 75 62 6C 69 63 A5 2A 02 07.....public.*.
0016: 04 1A 12 02 6A 02 01 00 02 01 05 30 1C 30 0C 06 ....j......0.0..
0032: 08 2B 06 01 02 01 02 02 00 05 00 30 0C 06 08 2B .+.........0...+
0048: 06 01 02 01 02 03 00 05 00 .........
28 changes: 28 additions & 0 deletions puresnmp/test/test_pdus.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from ..exc import SnmpError
from ..x690.types import (
Integer,
Null,
ObjectIdentifier,
OctetString,
Sequence,
)
from ..pdu import (
BulkGetRequest,
GetNextRequest,
GetRequest,
GetResponse,
Expand Down Expand Up @@ -190,3 +192,29 @@ def test_request(self):
)
result = bytes(packet)
self.assertBytesEqual(result, expected)


class TestBulkGet(ByteTester):
"""
BulkGet also receives a default "get" response, so there's no need to test
this in this TestCase.
"""

def test_request(self):
expected = readbytes('bulk_get_request.hex')

request = BulkGetRequest(
437387882,
0, # non-repeaters
5, # max-repeaters
VarBind(ObjectIdentifier.from_string('1.3.6.1.2.1.2.2.0'), Null()),
VarBind(ObjectIdentifier.from_string('1.3.6.1.2.1.2.3.0'), Null()),
)
packet = Sequence(
Integer(Version.V2C),
OctetString('public'),
request
)

result = bytes(packet)
self.assertBytesEqual(result, expected)

0 comments on commit 86b7f50

Please sign in to comment.