# Start a Simulated SNMP Agent

In [1]:
import subprocess

try:
    if process:
        pass
except:
    process = subprocess.Popen(
        [
            'snmpsimd.py',
            '--agent-udpv4-endpoint=127.0.0.1:1161',
            '--agent-udpv6-endpoint=[::1]:1161'
        ],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
        shell=False
    )

# Start a Distributed Local Cluster

In [2]:
from dask import delayed
from distributed import Client, LocalCluster, as_completed

try:
    if cluster and client:
        pass
except:
    cluster = LocalCluster()
    client = Client(cluster)
    
client

ValueError: Unable to configure formatter 'verbose'

# Create a DataFrame mimicking a database

In [None]:
from uuid import UUID

import pandas as pd

df = (
    pd.DataFrame(
        data=[
            [UUID('3a8083c9-79ff-4ed9-969c-e204cee391b3'), 'host1', 'localhost:1161', 'recorded/linux-full-walk', 'DNS Resolution'],
            [UUID('83f73383-c7e6-44ff-a063-8c79f339777b'), 'host2', '127.0.0.1', 'recorded/linux-full-walk', 'Timeout'],
            [UUID('3cae774a-ec98-4679-b20a-dc1947b0546f'), 'host3', '[::1]:1161', 'recorded/linux-full-walk', 'IPv6'],
            [UUID('3cae774a-ec98-4679-b20a-dc1947b0546f'), 'host3', '[::1]:1161', 'recorded/linux-full-walk', 'Preserved Index'],
        ],
        columns=['id', 'hostname', 'ip_address', 'community_string', 'notes']
    )
    .set_index(['id', 'hostname'])
)
df

# Construct a MIB-like Definition of the SNMP Objects to Collect

In [None]:
import numpy as np

from snmp_fetch.var_bind import var_bind
from snmp_fetch.df.functions import astype, set_index, decode

# definitions are built in an abrevieted hierarchy similiar to a MIB

if_table = var_bind(                              # construct a base OID definition for a SNMP table to collect on
    oid='1.3.6.1.2.1.2.2.1',
    index=np.dtype([('if_index', np.uint64)]),    # set the numpy dtype of the index on this table 
    op=set_index('if_index')                      # supply a post-collection processing function to set the index
)

if_admin_status = if_table << var_bind(            # construct a complete OID defintion that will extend the if_table
    oid='7',
    data=np.dtype([('admin_status', np.uint64)]),  # set the numpy dtype of the value of this object
    op=astype('admin_status', pd.UInt64Dtype())    # set the post-collection processing function for this value.
)                                                  # cast the dtype to a type that supports NaN values in an int column
                                                   # so dtype is not implicitly cast to float for missing objects

if_oper_status = if_table << var_bind(
    oid='8',
    data=np.dtype([('oper_status', np.uint64)]),
    op=astype('oper_status', pd.UInt64Dtype())
)

ifx_table = var_bind(                              # construct a second base OID definition that shares the same index as if_table
    oid='1.3.6.1.2.1.31.1.1.1',
    index=np.dtype([('if_index', np.uint64)]),
    op=set_index('if_index')
)

ifx_alias = ifx_table << var_bind(                 # construct a completed OID definition that will extend the ifx_table
    oid='1',
    data=np.dtype([('alias', 'S255')]),            # all object values are 64bit aligned; specifying a string length of one byte
                                                   # less than the aligned value will guarantee a null terminated string.
    op=decode('alias')                             # decode the byte strings into python strings
)

var_binds = [                                      # collect all object definitions to be collected
    if_admin_status,
    if_oper_status,
    ifx_alias
]

# Get Parameters for a Distributed Workload

In [None]:
from snmp_fetch import PduType, SnmpConfig
from snmp_fetch.distributed import distribute, fetch, process_response

task_params = distribute(
    PduType.BULKGET,                          # specify the PDU type
    df,                                       # supply the DataFrame to augment with SNMP data
    var_binds,                                # supply the object definitions to collect 
    config=SnmpConfig(retries=1, timeout=1),  # optionally, supply an SNMP configuration
    batch_size=1,                             # optionally, supply the batch size
    host='ip_address',                        # specify which column contains the host to connect from (default: 'host')
    snmp_community='community_string'         # specify which column contains the SNMP community string (default: 'snmp_community')
)

# Distribute the Workload and Combine All Results

In [None]:
from toolz.sandbox.core import unzip

graph = []

for fetch_params, response_params in task_params:
    response = delayed(fetch)(*fetch_params)
    result = delayed(process_response)(*response_params, response)
    # tasks can be routed to different nodes using dask resources
    # https://distributed.dask.org/en/latest/resources.html
    graph.append(result)

result_dfs, error_lists = unzip(client.gather(client.compute(graph)))

# Display Errors

In [None]:
errors = [error for errors in error_lists for error in errors]
errors

# Display Results

In [None]:
# resulting DataFrame contains the same index and column data as the original DataFrame
# along with the SNMP objects
results = astype('if_index', pd.UInt64Dtype())(pd.concat(result_dfs, sort=True))  # index dtypes need to fixed to support NaN
results

# Stop the Simulated SNMP Agent

In [None]:
process.kill()
process.communicate()
del process