# DEVWKS-3627: Make Python applications faster with asyncio!

## Demo 1: Sequential SSH to network devices with netmiko

In [1]:
import time
import sys
from pprint import pprint
from typing import Any, Dict, Iterable

from colorama import Fore
from netmiko import ConnectHandler

from utils import get_devices_conn_params

COMMANDS = ['show platform software status control-processor brief', 'show arp']
devices_conn_info = get_devices_conn_params()
num_devices = len(devices_conn_info)

def get_commands_output(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collects a list of outputs from device and returns them as dict, where command is the key"""
    result: Dict[Car, str] = {}
    with ConnectHandler(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = device_conn.send_command(command)
            result[command] = cli_output
    return result

Let's get SSH connection details, collect outputs for every device and print them:

In [2]:
start_time = time.time()

for device, device_conn_params in devices_conn_info.items():
    device_commands_output = get_commands_output(device_conn_params, COMMANDS)
    print(f"====== {Fore.MAGENTA}Outputs from {device}{Fore.RESET} =======")
    for command, output in device_commands_output.items():
        print(f"output for the command {Fore.GREEN}{command}{Fore.RESET}:\n{output}\n")

time_to_run = time.time() - start_time
num_devices = len(devices_conn_info)
print(f"{Fore.RED}It took {time_to_run:.2f} seconds to run for {num_devices} devices {Fore.RESET}")   

output for the command [32mshow platform software status control-processor brief[39m:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.60   0.61   0.71

Memory (kB)
 Slot  Status    Total     Used (Pct)     Free (Pct) Committed (Pct)
  RP0 Healthy  3018936  2594760 (86%)   424176 (14%)   3168640 (105%)

CPU Utilization
 Slot  CPU   User System   Nice   Idle    IRQ   SIRQ IOwait
  RP0    0  10.89   6.36   0.00  82.67   0.00   0.06   0.00


output for the command [32mshow arp[39m:
Protocol  Address          Age (min)  Hardware Addr   Type   Interface
Internet  100.64.12.1             -   fa16.3e3f.563a  ARPA   GigabitEthernet3
Internet  100.64.12.2           111   fa16.3ead.651e  ARPA   GigabitEthernet3
Internet  100.64.14.1             -   fa16.3e13.e71d  ARPA   GigabitEthernet4
Internet  100.64.14.4            95   fa16.3ed3.3152  ARPA   GigabitEthernet4
Internet  100.65.1.1              -   fa16.3ee8.1e5b  ARPA   GigabitEthernet2
Internet  192.168.1.1             

output for the command [32mshow platform software status control-processor brief[39m:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   1.12   1.00   1.00

Memory (kB)
 Slot  Status    Total     Used (Pct)     Free (Pct) Committed (Pct)
  RP0 Healthy  3018936  2572448 (85%)   446488 (15%)   3182284 (105%)

CPU Utilization
 Slot  CPU   User System   Nice   Idle    IRQ   SIRQ IOwait
  RP0    0  15.73   9.11   0.00  75.09   0.00   0.06   0.00


output for the command [32mshow arp[39m:
Protocol  Address          Age (min)  Hardware Addr   Type   Interface
Internet  192.168.1.1             -   001e.f61c.e6bd  ARPA   VirtualPortGroup0
Internet  198.51.100.113         71   fa16.3ed2.e61d  ARPA   GigabitEthernet2
Internet  198.51.100.114          -   fa16.3edd.60c6  ARPA   GigabitEthernet2

output for the command [32mshow platform software status control-processor brief[39m:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.67   0.73   0.73

Memory (kB)
 Slo

## Demo 2: Connect to devices via SSH using netmiko and threading

In [3]:
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def get_commands_output(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collects a list of outputs from device and returns them as dict, where command is the key"""
    result: Dict[str, str] = {}
    with ConnectHandler(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = device_conn.send_command(command)
            result[command] = cli_output
    return result

worker = partial(get_commands_output, commands=COMMANDS)
start_time = time.time()

with ThreadPoolExecutor(max_workers=10) as pool:
    results = pool.map(worker, devices_conn_info.values())
    
for hostname, result in zip(devices_conn_info, results):
    print(f"====== {Fore.MAGENTA}Outputs from device {hostname} were collected {Fore.RESET}=======")
    for command, output in result.items():
        print(f"output for the command {Fore.GREEN}{command}{Fore.RESET}:\n{output}\n")
    

time_to_run = time.time() - start_time
print(f"{Fore.RED}It took {time_to_run:.2f} seconds to run for {num_devices} devices{Fore.RESET}")  

    

output for the command [32mshow platform software status control-processor brief[39m:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.66   0.67   0.72

Memory (kB)
 Slot  Status    Total     Used (Pct)     Free (Pct) Committed (Pct)
  RP0 Healthy  3018936  2594728 (86%)   424208 (14%)   3168640 (105%)

CPU Utilization
 Slot  CPU   User System   Nice   Idle    IRQ   SIRQ IOwait
  RP0    0  12.61   6.70   0.00  80.63   0.00   0.04   0.00


output for the command [32mshow arp[39m:
Protocol  Address          Age (min)  Hardware Addr   Type   Interface
Internet  100.64.12.1             -   fa16.3e3f.563a  ARPA   GigabitEthernet3
Internet  100.64.12.2           114   fa16.3ead.651e  ARPA   GigabitEthernet3
Internet  100.64.14.1             -   fa16.3e13.e71d  ARPA   GigabitEthernet4
Internet  100.64.14.4            98   fa16.3ed3.3152  ARPA   GigabitEthernet4
Internet  100.65.1.1              -   fa16.3ee8.1e5b  ARPA   GigabitEthernet2
Internet  192.168.1.1             

## Demo 2b: Connect to devices via SSH using netmiko and threading (racing condition)

In [None]:
def get_commands_output_broken(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collects a list of outputs from device and returns them as dict, where command is the key
    
    Broken version with racing condition
    """
    result: Dict[str, str] = {}
    with ConnectHandler(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = device_conn.send_command(command)
            first_three_lines = '\n'.join(cli_output.splitlines()[:3])
            ip_addr = device_conn_params['host']
            print(f"{ip_addr} {command}:\n{first_three_lines}")
            result[command] = cli_output
    return result

worker = partial(get_commands_output_broken, commands=COMMANDS)
start_time = time.time()

with ThreadPoolExecutor(max_workers=10) as pool:
    list(pool.map(worker, devices_conn_info.values()))

time_to_run = time.time() - start_time
print(f"{Fore.RED}It took {time_to_run:.2f} seconds to run for {num_devices} devices{Fore.RESET}")


198.18.1.110 show platform software status control-processor brief:
Load Average
 Slot  Status  1-Min  5-Min 15-Min
  RP0 Healthy   0.57   0.70   0.72


## Demo 3: asyncio building blocks

In [None]:
import asyncio
import random
from datetime import datetime

async def greet_cisco_live(num):
    seconds_to_sleep = random.randint(1, 5)
    await asyncio.sleep(seconds_to_sleep)
    cur_time = datetime.now().strftime("%H:%M:%S.%f")
    print(
        f"{cur_time}: Hello Cisco Live! My number is {num} and "
        f"I've been waiting {seconds_to_sleep} seconds to tell you that!"
    )

    
loop = asyncio.get_event_loop()

tasks = [
    loop.create_task(greet_cisco_live(num))
    for num in range(10)
]

start_time = datetime.now().strftime("%H:%M:%S.%f")
print(f"{start_time}: Started")
loop.run_until_complete(asyncio.gather(*tasks))
print("Done")



## Demo 4: asyncio SSH using netdev

In [None]:
import asyncio
import netdev

def get_commands_output(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collects a list of outputs from device and returns them as dict, where command is the key"""
    result: Dict[str, str] = {}
    with ConnectHandler(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = device_conn.send_command(command)
            result[command] = cli_output
    return result

async def async_get_commands_output(device_conn_params: Dict[str, str], commands: Iterable[str]) -> Dict[str, str]:
    """Collect a list of outputs from device and returns them as dict, where the command is the key
    
    Runs asynchronously
    """
    result: Dict[str, str] = {}
    device_conn_params.pop('global_delay_factor')
    async with netdev.create(**device_conn_params) as device_conn:
        for command in commands:
            cli_output = await device_conn.send_command(command)
            result[command] = cli_output
    return result

loop = asyncio.get_event_loop()

start_time = time.time()

tasks = [
    loop.create_task(async_get_commands_output(device_conn_params, COMMANDS))
    for device_conn_params in devices_conn_info.values()
]

loop.run_until_complete(asyncio.gather(*tasks))

for hostname, task in zip(devices_conn_info, tasks):
    print(f"====== {Fore.MAGENTA}Outputs from device {hostname} were collected{Fore.RESET} =======")
    for command, output in task.result().items():
        print(f"output for the command {Fore.GREEN}{command}{Fore.RESET}:\n{output}\n")

time_to_run = time.time() - start_time
print(f"{Fore.RED}It took {time_to_run:.2f} seconds to run for {num_devices} devices{Fore.RESET}")  
    
    

## Demo 5: asyncio REST using aiohttp

In [None]:
import asyncio
import aiohttp

RESTCONF_ROOT = "https://{host}/restconf/data"
RESTCONF_MEMORY_STATS_ENDPOINT = "/cisco-platform-software/control-processes/control-process=platform-fru-rp,0,0,-1/memory-stats"
HEADERS = {"Accept": "application/yang-data+json", "Content-Type": "application/yang-data+json"}

async def get_memory_stats(device_conn_params: Dict[str, str]) -> Dict[str, str]:
    """Collect memory stats from the device using RESTCONF"""
    async with aiohttp.ClientSession() as session:
        host = device_conn_params["host"]
        username = device_conn_params["username"]
        password = device_conn_params["password"]
        url = (RESTCONF_ROOT + RESTCONF_MEMORY_STATS_ENDPOINT).format(host=host)
        async with session.get(
            url,
            headers=HEADERS,
            auth=aiohttp.BasicAuth(username, password),
            ssl=False,
        ) as response:
            return await response.json()

loop = asyncio.get_event_loop()

start_time = time.time()

tasks = [
    loop.create_task(get_memory_stats(device_conn_params))
    for device_conn_params in devices_conn_info.values()
]

loop.run_until_complete(asyncio.gather(*tasks))

for hostname, task in zip(devices_conn_info, tasks):
    print(f"====== {Fore.MAGENTA}Memory stats on {hostname}{Fore.RESET} =======")
    pprint(task.result())

time_to_run = time.time() - start_time
print(f"{Fore.RED}It took {time_to_run:.2f} seconds to run for {num_devices} devices{Fore.RESET}")  

## Demo 6: Check cabling using asyncio and netdev

In [None]:
import asyncio
import csv
import sys

import netdev
from deepdiff import DeepDiff
from ruamel.yaml import YAML

from constants import (
    TOPOLOGY_FILE,
    CDP_COMMAND,
)
from utils import parse_show_cdp_neighbors, normalize_interface_name

yaml = YAML()

def parse_device_cabling():
    """Parses topology csv file and gets cabling details"""
    result: Dict[str, Dict[str, str]] = {}
    with open(TOPOLOGY_FILE, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            device_name = row["local_hostname"]
            
            if device_name in result:
                device = result[device_name]
            else:
                device = {}
                result[device_name] = device
                
            local_port = normalize_interface_name(row["local_port"])
                
            local_port_dict = {
                "connected_device": {
                    "name": row["remote_hostname"],
                    "port": normalize_interface_name(row["remote_port"]),
                }
            }
            device[local_port] = local_port_dict
            
    return result


async def get_device_neighbors(device_params: Dict[str, str]) -> Dict[str, Dict[str, Any]]:
    """Checks cabling using CDP"""
    async with netdev.create(**device_params) as device_conn:
        show_cdp_neighbor_output = await device_conn.send_command(CDP_COMMAND)
        return parse_show_cdp_neighbors(show_cdp_neighbor_output)
    

In [None]:
start_time = time.time()

desired_cabling = parse_device_cabling()

loop = asyncio.get_event_loop()

tasks = [
    loop.create_task(get_device_neighbors(device_params))
    for device_params in devices_conn_info.values()
]

loop.run_until_complete(asyncio.gather(*tasks))

existing_cabling = {}
for hostname, task in zip(devices_conn_info, tasks):
    existing_cabling[hostname] = task.result()

time_to_run = time.time() - start_time
num_devices = len(devices_conn_info)
print(f"{Fore.RED}It took {time_to_run:.2f} seconds to run for {num_devices} devices{Fore.RESET}")    
    
cabling_errors = DeepDiff(desired_cabling, existing_cabling)
cabling_errors_dict = dict(cabling_errors)
if cabling_errors:
    print("Some cabling errors have been found:")
    yaml.dump(cabling_errors_dict, sys.stdout)
else:
    print("Cabling 100% coincides with the documentation! Congratulations!\n")
    
print("======== Existing cabling ========")
pprint(existing_cabling)
print("\n======== Desired cabling ========")
pprint(desired_cabling)


## END