In [190]:
# http://luca.ntop.org/Teaching/Appunti/asn1.html
# https://tools.ietf.org/html/rfc1442#section-7.1.1
# https://stackoverflow.com/questions/14572006/net-snmp-returned-types
# UInteger32 = Application | 7 = 0b01000000 | 0b00000111 = 0x47

#ENV SCALA_VERSION 2.13.1
#ENV SBT_VERSION 1.3.8
# apk add openjdk8
# RUN \
#  wget -O - https://downloads.typesafe.com/scala/$SCALA_VERSION/scala-$SCALA_VERSION.tgz | tar xfz - -C /opt/ && \
#  echo >> /root/.bashrc && \

#  echo "export PATH=/opt/scala-$SCALA_VERSION/bin:$PATH" >> /root/.bashrc

#RUN wget -O - https://piccolo.link/sbt-$SBT_VERSION.tgz | tar xfz - -C /opt/
#export PATH=/opt/sbt/bin:$PATH
# sbt universal:packageBin
# unzip jvm/target/universal/kaitai-struct-compiler-0.9-SNAPSHOT.zip -d /opt/
#&& \
#  export PATH=/opt/kaitai-struct-compiler-0.9-SNAPSHOT/bin:$PATH
import numpy as np
import snmp_stream as snmp
import attr
from ip_forward_mib import IpForwardMib

response = snmp.walk(
    '127.0.0.1:1161',
    ('recorded/linux-full-walk', 'V2C'),
    [
        '1.3.6.1.2.1.4.24.4.1.1',
        '1.3.6.1.2.1.4.24.4.1.10',
        '1.3.6.1.2.1.4.24.4.1.11',
        '1.3.6.1.2.1.4.24.4.1.12',
        '1.3.6.1.2.1.4.24.4.1.13',
        '1.3.6.1.2.1.4.24.4.1.14',
        '1.3.6.1.2.1.4.24.4.1.15',
        '1.3.6.1.2.1.4.24.4.1.16',
        '1.3.6.1.2.1.4.24.4.1.2',
        '1.3.6.1.2.1.4.24.4.1.3',
        '1.3.6.1.2.1.4.24.4.1.4',
        '1.3.6.1.2.1.4.24.4.1.5',
        '1.3.6.1.2.1.4.24.4.1.6',
        '1.3.6.1.2.1.4.24.4.1.7',
        '1.3.6.1.2.1.4.24.4.1.8',
        '1.3.6.1.2.1.4.24.4.1.9',        
    ],
    req_id='IP-FORWARD-MIB',
    config={'retries': 1, 'timeout': 3}
)

@attr.s
class Parser:
    def run_parser(self, data, pos):
        raise NotImplementedError()

    def bind(self, k):
        m = self
        class BindParser(Parser):
            def run_parser(self, data, pos):
                a, pos = m.run_parser(data, pos)
                return k(a).run_parser(data, pos)
        return BindParser()

@attr.s
class Return:
    value = attr.ib()
    
    def run_parser(self, data, pos):
        return self.value, pos

@attr.s
class Word(Parser):
    dtype = attr.ib(default=np.dtype('u1'))
    
    def run_parser(self, data, pos):
        return int(data[pos:(pos:=pos+self.dtype.itemsize)].view(self.dtype)[0]), pos 

@attr.s
class Partial:
    data = attr.ib()
    pos = attr.ib()
    def parse(self, parser):
        a, pos = parser.run_parser(self.data, self.pos)
        return a, Partial(self.data, pos)

def parse(parser, data):
    a, pos = parser.run_parser(data, 0)
    return a, Partial(data, pos)

def align(parser, by):
    class AlignParser(Parser):
        def run_parser(self, data, pos):
            if isinstance(by, np.dtype):
                pos += (by.itemsize - pos) % by.itemsize
            else:
                pos += (by - pos) % by
            return parser.run_parser(data, pos)
    return AlignParser()

@attr.s
class String(Parser):
    size = attr.ib()
    def run_parser(self, data, pos):
        a = data[pos:(pos:=pos+self.size)].tostring().decode("utf-8")
        return a, pos

@attr.s
class OctetString(Parser):
    size = attr.ib()
    dtype = attr.ib(default=np.dtype('u1'))
    def run_parser(self, data, pos):
        a = data[pos:(pos:=pos+self.size*self.dtype.itemsize)].view(self.dtype)
        return a, pos

from itertools import count
def repeat(n, parser):
    inf = (_ for _ in iter(int, 1))
    class RepeatParser(Parser):
        def run_parser(self, data, pos):
            a = list(
                a
                for _ in (range(n) if n is not None else inf)
                for a, _pos in (parser.run_parser(data, pos),)
                if(data[(pos:=_pos):].size != 0) or inf.close()
            )
            return a, pos
    return RepeatParser()

@attr.s
class Datetime(Parser):
    unit = attr.ib()
    dtype = attr.ib()
    
    def run_parser(self, data, pos):
        return data[pos:(pos:=pos+self.dtype.itemsize)].view(f'datetime64[{self.unit}]')[0], pos 
        
        
@attr.s
class VarBind(Parser):
    size_t = attr.ib()
    oid_t = attr.ib()
    def run_parser(self, data, pos):
        timestamp, pos = Datetime('s', size_t).run_parser(data, pos)
        root_oid_idx, pos = Word(self.size_t).run_parser(data, pos)
        value_type, pos = Word(self.size_t).run_parser(data, pos)
        idx, pos = align(Word(self.size_t).bind(lambda size: OctetString(size)), by=self.size_t).run_parser(data, pos)
        value, pos = align(Word(self.size_t).bind(lambda size: OctetString(size)), by=self.size_t).run_parser(data, pos)
        return (timestamp, root_oid_idx, value_type, idx, value), pos
        

endianess, cont = parse(Word().bind(lambda x: Return('<' if x == 0 else '>')), response.results)
size_t, cont = cont.parse(Word().bind(lambda x: Return(np.dtype(endianess+'u'+str(x)))))
oid_t, cont = cont.parse(Word().bind(lambda x: Return(np.dtype(endianess+'u'+str(x)))))
metadata, cont = cont.parse(
    align(Word(size_t).bind(lambda size: String(size)), by=16)
)
root_oids, cont = cont.parse(
    align(Word(size_t), by=size_t)
    .bind(lambda r: repeat(r, align(Word(size_t).bind(lambda size: OctetString(size, oid_t)), by=size_t)))
)
var_binds, cont = cont.parse(
    repeat(None, align(Word(size_t).bind(lambda size: OctetString(size)), by=size_t))
)


print(size_t, oid_t)
print(metadata)
print(root_oids)
print(cont)
parse(VarBind(size_t, oid_t), var_binds[-1])[0]

uint64 uint64
IP-FORWARD-MIB
[array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1,  1], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1, 10], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1, 11], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1, 12], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1, 13], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1, 14], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1, 15], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1, 16], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1,  2], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1,  3], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1,  4], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1,  5], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1,  6], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  4, 24,  4,  1,  7], dtype=uint64), array([ 1,  3,  6,  1,  2,  1,  

(numpy.datetime64('2020-02-28T23:25:06'),
 13,
 2,
 array([195,   0,   0,   0,   0,   0,   0,   0, 218,   0,   0,   0,   0],
       dtype=uint8),
 array([  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0, 255,   0,   0,   0,   0,   0,   0,   0, 255,   0,
          0,   0,   0,   0,   0,   0, 255,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   8,   0,   0,   0,   0,   0,   0,   0,   2,   0,   0,
          0,   0,   0,   0,   0], dtype=uint8))

In [161]:
a = np.array([0, 0, 0, 0]).view(np.dtype('timedelta64[s]'))
a

array([0, 0, 0, 0], dtype='timedelta64[s]')

In [62]:
print(np.array2string(response.results[0:].reshape(response.results[0:].size >> 3, 8), edgeitems=20, threshold=10000))

[[  0   8   8   0   0   0   0   0]
 [  0   0   0   0   0   0   0   0]
 [ 14   0   0   0   0   0   0   0]
 [ 73  80  45  70  79  82  87  65]
 [ 82  68  45  77  73  66   0   0]
 [ 16   0   0   0   0   0   0   0]
 [ 11   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0]
 [  3   0   0   0   0   0   0   0]
 [  6   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0]
 [  2   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0]
 [  4   0   0   0   0   0   0   0]
 [ 24   0   0   0   0   0   0   0]
 [  4   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0]
 [ 11   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0]
 [  3   0   0   0   0   0   0   0]
 [  6   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0]
 [  2   0   0   0   0   0   0   0]
 [  1   0   0   0   0   0   0   0]
 [  4   0   0   0   0   0   0   0]
 [ 24   0   0   0   0   0   0   0]
 [  4   0   0   0   0   0   0   0]
 [  1   0   0   0   

In [54]:
pos = 0


reserved = res[:(pos:=pos+16)]
endianess = '<' if reserved[2] == 0 else '>'
size_t = np.dtype(endianess + 'u' + str(reserved[0]))
oid_t = np.dtype(endianess + 'u' + str(reserved[1]))
print(size_t, oid_t, pos)

def size(pos, res, f):
    size = int(res[pos:(pos:=pos+size_t.itemsize)].view(size_t)[0])
    res, pos = res[pos:(pos:=(pos+size))].view(dtype), pos
    v, pos = f(0, res)
    return v, pos

def sized_array(pos, res, dtype=np.dtype('u1')):
    size = int(res[pos:(pos:=pos+size_t.itemsize)].view(size_t)[0])
    return res[pos:(pos:=(pos+size))].view(dtype), pos

def align(pos, dtype):
    return pos + ((dtype.itemsize - pos) % dtype.itemsize)

def word(pos, res, dtype=np.dtype('u1')):
    return res[pos:(pos:=pos+dtype.itemsize)].view(dtype)[0], pos

def string(pos, res, n):
    

metadata, pos = sized_array(pos, res)
pos = align(pos, size_t)
metadata = metadata.tostring().decode("utf-8")
print(metadata)

#root_oid_len, pos = word(pos, res, dtype=size_t)
#print(root_oid_len)

print(pos)
print(np.array2string(res[pos:].reshape(res[pos:].size >> 3, 8), edgeitems=20, threshold=10000))

SyntaxError: invalid syntax (<ipython-input-54-58e88bf08aed>, line 77)

IP-FORWARD-MIB


In [31]:

#root_oids = []
#for i in range(count):
#res

In [None]:
[
    '.'+'.'.join([str(suboid.as_int) for suboid in oid.suboid])
    for oid in res.body.root_oids.oid
]

In [3]:
len(res.body.var_binds.var_bind)

45

In [97]:
even

[0,
 2,
 4,
 6,
 8,
 10,
 12,
 14,
 16,
 18,
 20,
 22,
 24,
 26,
 28,
 30,
 32,
 34,
 36,
 38,
 40,
 42,
 44,
 46,
 48]