# DEVNET-3627: Make Python applications faster with asyncio!

## Demo 1: Sequential SSH to network devices with netmiko

In [1]:
import time
import sys
from pprint import pprint
from typing import Any, Dict, Iterable

from netmiko import ConnectHandler

from utils import get_devices_conn_params

COMMANDS = ['show platform software status control-processor brief', 'show arp']
devices_conn_info = get_devices_conn_params()
num_devices = len(devices_conn_info)

def get_commands_output(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collects a list of outputs from device and returns them as dict, where command is the key"""
    result: Dict[str, str] = {}
    with ConnectHandler(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = device_conn.send_command(command)
            result[command] = cli_output
    return result

Let's get SSH connection details, collect outputs for every device and print them:

In [2]:
start_time = time.time()

for device, device_conn_params in devices_conn_info.items():
    device_commands_output = get_commands_output(device_conn_params, COMMANDS)
    print(f"====== Outputs from {device} =======")
    for command, output in device_commands_output.items():
        print(f"output for the command **{command}**:\n{output}\n")

time_to_run = time.time() - start_time
num_devices = len(devices_conn_info)
print(f"It took {time_to_run:.2f} seconds to run for {num_devices} devices")   

output for the command **show platform software status control-processor brief**:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.65   0.67   0.71

Memory (kB)
 Slot  Status    Total     Used (Pct)     Free (Pct) Committed (Pct)
  RP0 Healthy  3018936  2593620 (86%)   425316 (14%)   3181900 (105%)

CPU Utilization
 Slot  CPU   User System   Nice   Idle    IRQ   SIRQ IOwait
  RP0    0  14.14   9.43   0.00  76.26   0.00   0.14   0.00


output for the command **show arp**:
Protocol  Address          Age (min)  Hardware Addr   Type   Interface
Internet  100.64.12.1             -   fa16.3ed6.768b  ARPA   GigabitEthernet3
Internet  100.64.12.2           230   fa16.3eab.de82  ARPA   GigabitEthernet3
Internet  100.64.14.1             -   fa16.3ee5.6002  ARPA   GigabitEthernet4
Internet  100.64.14.4           217   fa16.3e4b.cfbb  ARPA   GigabitEthernet4
Internet  100.65.1.1              -   fa16.3e70.284c  ARPA   GigabitEthernet2
Internet  100.65.1.11             0   Incomple

output for the command **show platform software status control-processor brief**:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.68   0.73   0.78

Memory (kB)
 Slot  Status    Total     Used (Pct)     Free (Pct) Committed (Pct)
  RP0 Healthy  3018936  2618096 (87%)   400840 (13%)   3182280 (105%)

CPU Utilization
 Slot  CPU   User System   Nice   Idle    IRQ   SIRQ IOwait
  RP0    0  15.16   7.33   0.00  77.41   0.00   0.08   0.00


output for the command **show arp**:
Protocol  Address          Age (min)  Hardware Addr   Type   Interface
Internet  192.168.1.1             -   001e.7a3f.01bd  ARPA   VirtualPortGroup0
Internet  198.51.100.113        197   fa16.3e68.493d  ARPA   GigabitEthernet2
Internet  198.51.100.114          -   fa16.3e65.ce99  ARPA   GigabitEthernet2

output for the command **show platform software status control-processor brief**:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.67   1.64   1.71

Memory (kB)
 Slot  Status    Total

## Demo 2: Connect to devices via SSH using netmiko and threading

In [3]:
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def get_commands_output(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collects a list of outputs from device and returns them as dict, where command is the key"""
    result: Dict[str, str] = {}
    with ConnectHandler(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = device_conn.send_command(command)
            result[command] = cli_output
    return result

worker = partial(get_commands_output, commands=COMMANDS)
start_time = time.time()

with ThreadPoolExecutor(max_workers=10) as pool:
    results = pool.map(worker, devices_conn_info.values())
    
for hostname, result in zip(devices_conn_info, results):
    print(f"====== Outputs from device {hostname} were collected =======")
    for command, output in result.items():
        print(f"output for the command **{command}**:\n{output}\n")
    

time_to_run = time.time() - start_time
print(f"It took {time_to_run:.2f} seconds to run for {num_devices} devices")  

    

output for the command **show platform software status control-processor brief**:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.86   0.73   0.74

Memory (kB)
 Slot  Status    Total     Used (Pct)     Free (Pct) Committed (Pct)
  RP0 Healthy  3018936  2593748 (86%)   425188 (14%)   3181900 (105%)

CPU Utilization
 Slot  CPU   User System   Nice   Idle    IRQ   SIRQ IOwait
  RP0    0  12.79   8.04   0.00  78.99   0.00   0.16   0.00


output for the command **show arp**:
Protocol  Address          Age (min)  Hardware Addr   Type   Interface
Internet  100.64.12.1             -   fa16.3ed6.768b  ARPA   GigabitEthernet3
Internet  100.64.12.2           232   fa16.3eab.de82  ARPA   GigabitEthernet3
Internet  100.64.14.1             -   fa16.3ee5.6002  ARPA   GigabitEthernet4
Internet  100.64.14.4           219   fa16.3e4b.cfbb  ARPA   GigabitEthernet4
Internet  100.65.1.1              -   fa16.3e70.284c  ARPA   GigabitEthernet2
Internet  100.65.1.11             0   Incomple

## Demo 2b: Connect to devices via SSH using netmiko and threading (racing condition)

In [4]:
def get_commands_output_broken(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collects a list of outputs from device and returns them as dict, where command is the key
    
    Broken version with racing condition
    """
    result: Dict[str, str] = {}
    with ConnectHandler(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = device_conn.send_command(command)
            first_three_lines = '\n'.join(cli_output.splitlines()[:3])
            print(f"{device_conn_params['host']} - {command}:\n{first_three_lines}")
            result[command] = cli_output
    return result

worker = partial(get_commands_output_broken, commands=COMMANDS)
start_time = time.time()

with ThreadPoolExecutor(max_workers=10) as pool:
    pool.map(worker, devices_conn_info.values())
    
time_to_run = time.time() - start_time
print(f"It took {time_to_run:.2f} seconds to run for {num_devices} devices")


198.18.1.107 - show platform software status control-processor brief:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.94   1.04   1.01
198.18.1.107 - show arp:
Protocol  Address          Age (min)  Hardware Addr   Type   Interface
Internet  192.168.1.1             -   001e.14f8.43bd  ARPA   VirtualPortGroup0
Internet  198.51.100.105        197   fa16.3e34.b5d4  ARPA   GigabitEthernet2
198.18.1.108 - show platform software status control-processor brief:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   1.28   1.13   1.10
198.18.1.110 - show platform software status control-processor brief:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.64   1.44   1.63
198.18.1.103 - show platform software status control-processor brief:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.81   0.82   0.89
198.18.1.102 - show platform software status control-processor brief:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0

## Demo 3: asyncio building blocks

In [17]:
import asyncio
import random
from datetime import datetime

async def greet_cisco_live(num):
    seconds_to_sleep = random.randint(1, 5)
    await asyncio.sleep(seconds_to_sleep)
    cur_time = datetime.now().strftime("%H:%M:%S.%f")
    print(
        f"{cur_time}: Hello CLUS! My number is {num} and "
        f"I've been waiting {seconds_to_sleep} seconds to tell you that!"
    )

    
loop = asyncio.get_event_loop()

tasks = [
    loop.create_task(greet_cisco_live(num))
    for num in range(10)
]

start_time = datetime.now().strftime("%H:%M:%S.%f")
print(f"{start_time}: Started")
_ = loop.run_until_complete(asyncio.wait(tasks))



12:41:51.305595: started
12:41:52.309165: Hello CLUS! My number is 8 and I've been waiting 1 seconds to tell you that!
12:41:53.309206: Hello CLUS! My number is 0 and I've been waiting 2 seconds to tell you that!
12:41:53.309396: Hello CLUS! My number is 4 and I've been waiting 2 seconds to tell you that!
12:41:54.308465: Hello CLUS! My number is 1 and I've been waiting 3 seconds to tell you that!
12:41:54.308678: Hello CLUS! My number is 2 and I've been waiting 3 seconds to tell you that!
12:41:54.308742: Hello CLUS! My number is 5 and I've been waiting 3 seconds to tell you that!
12:41:54.308796: Hello CLUS! My number is 6 and I've been waiting 3 seconds to tell you that!
12:41:56.310142: Hello CLUS! My number is 3 and I've been waiting 5 seconds to tell you that!
12:41:56.310300: Hello CLUS! My number is 7 and I've been waiting 5 seconds to tell you that!
12:41:56.310343: Hello CLUS! My number is 9 and I've been waiting 5 seconds to tell you that!


## Demo 4: asyncio SSH using netdev

In [18]:
import asyncio
import netdev

def get_commands_output(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collects a list of outputs from device and returns them as dict, where command is the key"""
    result: Dict[str, str] = {}
    with ConnectHandler(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = device_conn.send_command(command)
            result[command] = cli_output
    return result

async def async_get_commands_output(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collect a list of outputs from device and returns them as dict, where the command is the key
    
    Runs asynchronously
    """
    result: Dict[str, str] = {}
    async with netdev.create(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = await device_conn.send_command(command)
            result[command] = cli_output
    return result

loop = asyncio.get_event_loop()

start_time = time.time()

tasks = [
    loop.create_task(async_get_commands_output(device_conn_params, COMMANDS))
    for device_conn_params in devices_conn_info.values()
]

loop.run_until_complete(asyncio.wait(tasks))

for hostname, task in zip(devices_conn_info, tasks):
    print(f"====== Outputs from device {hostname} were collected =======")
    for command, output in task.result().items():
        print(f"output for the command **{command}**:\n{output}\n")

time_to_run = time.time() - start_time
print(f"It took {time_to_run:.2f} seconds to run for {num_devices} devices")  
    
    

output for the command **show platform software status control-processor brief**:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.69   0.78   0.79

Memory (kB)
 Slot  Status    Total     Used (Pct)     Free (Pct) Committed (Pct)
  RP0 Healthy  3018936  2592916 (86%)   426020 (14%)   3169580 (105%)

CPU Utilization
 Slot  CPU   User System   Nice   Idle    IRQ   SIRQ IOwait
  RP0    0  11.91   7.67   0.00  80.32   0.00   0.08   0.00


output for the command **show arp**:
Protocol  Address          Age (min)  Hardware Addr   Type   Interface
Internet  100.64.12.1             -   fa16.3ed6.768b  ARPA   GigabitEthernet3
Internet  100.64.12.2           241   fa16.3eab.de82  ARPA   GigabitEthernet3
Internet  100.64.14.1             -   fa16.3ee5.6002  ARPA   GigabitEthernet4
Internet  100.64.14.4           228   fa16.3e4b.cfbb  ARPA   GigabitEthernet4
Internet  100.65.1.1              -   fa16.3e70.284c  ARPA   GigabitEthernet2
Internet  192.168.1.1             -   001e.f6f

## Demo 5: asyncio REST using aiohttp

In [19]:
import asyncio
import aiohttp

RESTCONF_ROOT = "https://{host}/restconf/data"
RESTCONF_MEMORY_STATS_ENDPOINT = "/cisco-platform-software/control-processes/control-process=platform-fru-rp,0,0,-1/memory-stats"
HEADERS = {"Accept": "application/yang-data+json", "Content-Type": "application/yang-data+json"}

async def get_memory_stats(device_conn_params: Dict[str, str]) -> Dict[str, str]:
    """Collect memory stats from the device using RESTCONF"""
    async with aiohttp.ClientSession() as session:
        host = device_conn_params["host"]
        username = device_conn_params["username"]
        password = device_conn_params["password"]
        url = (RESTCONF_ROOT + RESTCONF_MEMORY_STATS_ENDPOINT).format(host=host)
        async with session.get(
            url,
            headers=HEADERS,
            auth=aiohttp.BasicAuth(username, password),
            ssl=False,
        ) as response:
            return await response.json()

loop = asyncio.get_event_loop()

start_time = time.time()

tasks = [
    loop.create_task(get_memory_stats(device_conn_params))
    for device_conn_params in devices_conn_info.values()
]

loop.run_until_complete(asyncio.wait(tasks))

for hostname, task in zip(devices_conn_info, tasks):
    print(f"====== Memory stats on {hostname} =======")
    pprint(task.result())

time_to_run = time.time() - start_time
print(f"It took {time_to_run:.2f} seconds to run for {num_devices} devices")  

{'Cisco-IOS-XE-platform-software-oper:memory-stats': {'available-number': '426016',
                                                      'available-percent': '14',
                                                      'committed-number': '3169580',
                                                      'committed-percent': 105,
                                                      'free-number': '426016',
                                                      'free-percent': '14',
                                                      'memory-status': 'Healthy',
                                                      'status': {'critical-threshold-percent': 93,
                                                      'total': '3018936',
                                                      'used-number': '2592920',
                                                      'used-percent': '86'}}
{'Cisco-IOS-XE-platform-software-oper:memory-stats': {'available-number': '426676',
                   

## Demo 6: Check cabling using asyncio and netdev

In [22]:
import asyncio
import csv
import sys

import netdev
from deepdiff import DeepDiff
from ruamel.yaml import YAML

from constants import (
    TOPOLOGY_FILE,
    CDP_COMMAND,
)
from utils import parse_show_cdp_neighbors, normalize_interface_name

yaml = YAML()

def parse_device_cabling():
    """Parses topology csv file and gets cabling details"""
    result: Dict[str, Dict[str, str]] = {}
    with open(TOPOLOGY_FILE, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            device_name = row["local_hostname"]
            
            if device_name in result:
                device = result[device_name]
            else:
                device = {}
                result[device_name] = device
                
            local_port = normalize_interface_name(row["local_port"])
                
            local_port_dict = {
                "connected_device": {
                    "name": row["remote_hostname"],
                    "port": normalize_interface_name(row["remote_port"]),
                }
            }
            device[local_port] = local_port_dict
            
    return result


async def get_device_neighbors(device_params: Dict[str, str]) -> Dict[str, Dict[str, Any]]:
    """Checks cabling using CDP"""
    async with netdev.create(**device_params) as device_conn:
        show_cdp_neighbor_output = await device_conn.send_command(CDP_COMMAND)
        return parse_show_cdp_neighbors(show_cdp_neighbor_output)
    

In [25]:
start_time = time.time()

desired_cabling = parse_device_cabling()

loop = asyncio.get_event_loop()

tasks = []
for device_params in devices_conn_info.values():
    tasks.append(loop.create_task(get_device_neighbors(device_params)))

loop.run_until_complete(asyncio.wait(tasks))

existing_cabling = {}
for hostname, task in zip(devices_conn_info, tasks):
    existing_cabling[hostname] = task.result()

time_to_run = time.time() - start_time
num_devices = len(devices_conn_info)
print(f"It took {time_to_run:.2f} seconds to run for {num_devices} devices")    
    
cabling_errors = DeepDiff(desired_cabling, existing_cabling)
cabling_errors_dict = dict(cabling_errors)
if cabling_errors:
    print("Some cabling errors have been found:")
    yaml.dump(cabling_errors_dict, sys.stdout)
else:
    print("Cabling 100% coincides with the documentation! Congratulations!\n")
    
print("======== Existing cabling ========")
pprint(existing_cabling)
print("\n======== Desired cabling ========")
pprint(desired_cabling)


It took 9.53 seconds to run for 10 devices
Cabling 100% coincides with the documentation! Congratulations!

Existing cabling:
{'R1': {'GigabitEthernet3': {'connected_device': {'name': 'R2',
                                                  'port': 'GigabitEthernet2'}},
        'GigabitEthernet4': {'connected_device': {'name': 'R4',
                                                  'port': 'GigabitEthernet2'}}},
 'R10': {'GigabitEthernet2': {'connected_device': {'name': 'R8',
                                                   'port': 'GigabitEthernet3'}}},
 'R2': {'GigabitEthernet2': {'connected_device': {'name': 'R1',
                                                  'port': 'GigabitEthernet3'}},
        'GigabitEthernet3': {'connected_device': {'name': 'R3',
                                                  'port': 'GigabitEthernet2'}}},
 'R3': {'GigabitEthernet2': {'connected_device': {'name': 'R2',
                                                  'port': 'GigabitEthernet3'}},
     

## END