Skip to content

Commit

Permalink
feat(asyncio): update scripts (#127)
Browse files Browse the repository at this point in the history
* fix(anta.reporter): result does not have a host attribute anymore

* feat(anta): add main ANTA coroutine

* feat(scripts): update check-devices.py and clear-counters.py for asyncio

* refactor(anta.inventory): rename _filtered_inventory() to get_inventory() and remove unnecessary code.

* fix(anta): move main coroutine to a dedicated module and clean code

* CI: Run black+isort on .github/scripts/anta-tester.py

* refactor: clean shebangs

* refactor(anta): improve logs during device connection

* refactor(scripts): use asyncio in collect-eos-commands.py

* refactor(scripts): clean clear-counters.py

* refactor(scripts): use asyncio in evpn-blacklist-recovery.py

* refactor(scripts): improve logs in create-devices-inventory-from-cvp.py

* refactor(anta): improve logs

* clean: remove dependencies

* scripts: improved logs

* Update anta/inventory/__init__.py

Co-authored-by: Guillaume Mulocher <gmulocher@arista.com>

* Update anta/inventory/__init__.py

Co-authored-by: Guillaume Mulocher <gmulocher@arista.com>

* fix: implemented comments

* lint: fix pylint errors

* pytest: update tests

* nice typo

* fix linting errors

* Make Sure You Are Square With Your God Before Trying To Merge This

* TODO: Fix later

Co-authored-by: gmuloc <gmulocher@arista.com>
  • Loading branch information
mtache and gmuloc committed Jan 10, 2023
1 parent 6d50ac7 commit 630bff2
Show file tree
Hide file tree
Showing 35 changed files with 766 additions and 854 deletions.
330 changes: 196 additions & 134 deletions .github/scripts/anta-tester.py

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ repos:
name: Check for Linting error on Python files
description: This hook runs pylint.
types: [python]
additional_dependencies:
- rich
- lazydocs
args:
- -rn # Only display messages
- -sn # Don't display the score
Expand Down
6 changes: 1 addition & 5 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@
"admin",
"-p",
"admin123",
"-log",
"INFO",
"-i",
"${workspaceFolder}/.personal/avd-lab.yml",
"-c",
"${workspaceFolder}/.personal/ceos-catalog.yml",
"--table",
"--timeout",
"5"
"--table"
]
},
{
Expand Down
4 changes: 1 addition & 3 deletions anta/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#!/usr/bin/python

"""
anta init
"""
Expand All @@ -12,7 +10,7 @@
"Angélique Phillipps",
"Colin MacGiollaEáin",
"Khelil Sator",
"Matthieu Tache",
"Matthieu Tâche",
"Onur Gashi",
"Paul Lavelle",
"Guillaume Mulocher",
Expand Down
231 changes: 82 additions & 149 deletions anta/inventory/__init__.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
#!/usr/bin/python
# coding: utf-8 -*-

"""
Inventory Module for ANTA.
"""

import asyncio
import logging
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional

import yaml
from aioeapi.errors import EapiCommandError
from jsonrpclib import Server
from httpx import ConnectError, HTTPError
from netaddr import IPAddress, IPNetwork
from pydantic import ValidationError
from yaml.loader import SafeLoader

from .exceptions import (InventoryIncorrectSchema, InventoryRootKeyErrors,
InventoryUnknownFormat)
from .exceptions import InventoryIncorrectSchema, InventoryRootKeyErrors
from .models import (DEFAULT_TAG, AntaInventoryInput, InventoryDevice,
InventoryDevices)

# pylint: disable=W1309

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class AntaInventory:
Expand Down Expand Up @@ -94,16 +87,22 @@ class AntaInventory:
"""

# Root key of inventory part of the inventory file
INVENTORY_ROOT_KEY = 'anta_inventory'
INVENTORY_ROOT_KEY = "anta_inventory"
# Supported Output format
INVENTORY_OUTPUT_FORMAT = ["native", "json"]
# HW model definition in show version
HW_MODEL_KEY = "modelName"

# pylint: disable=R0913
def __init__(self, inventory_file: str, username: str, password: str,
enable_password: Optional[str] = None, timeout: Optional[float] = None,
filter_hosts: Optional[List[str]] = None) -> None:
def __init__(
self,
inventory_file: str,
username: str,
password: str,
enable_password: Optional[str] = None,
timeout: Optional[float] = None,
filter_hosts: Optional[List[str]] = None,
) -> None:
"""Class constructor.
Args:
Expand Down Expand Up @@ -166,55 +165,35 @@ def _is_ip_exist(self, ip: str) -> bool: # TODO mtache: unused, remove this ?
== 1
)

async def _is_device_online(self, device: InventoryDevice) -> bool:
"""
_is_device_online Check if device is online.
Checks the target device to ensure that the eAPI port is
open and accepting connections.
If device is ready to serve request, method returns True, else return False.
Args:
device (InventoryDevice): InventoryDevice structure to test
Returns:
bool: True if device ready, False by default.
"""
logger.debug(f'Checking connection to device {device.name}')
# Check connectivity
online = await device.session.check_connection()
# pylint: disable=W0703
if not online:
logger.warning(f'Cannot open port to {device.name}')
return False
return True

###########################################################################
# Internal methods
###########################################################################

async def _read_device_hw(self, device: InventoryDevice) -> Optional[str]:
async def _read_device_hw(self, device: InventoryDevice) -> None:
"""
_read_device_hw Read HW model from the device and update entry with correct value.
It returns HW model name from show version or None if device is not reachable
or if it cannot find the modelName key
_read_device_hw Get HW model name from show version and update the hw_model attribute.
Args:
device (InventoryDevice): Device to update
Returns:
str: HW value read from the device using show version.
"""
logger.debug(f'Reading HW information for {device.name}')
logger.debug(f"Reading HW information for {device.name}")
try:
response = await device.session.cli(command='show version')
# pylint: disable=W0703
response = await device.session.cli(command="show version")
except EapiCommandError as e:
logger.warning(f'Cannot run CLI commands on device {device.name}: {str(e)}')
return None
logger.warning(
f"Cannot get HW information from device {device.name}: {e.errmsg}"
)
except (HTTPError, ConnectError) as e:
logger.warning(
f"Cannot get HW information from device {device.name}: {type(e).__name__}{'' if not str(e) else f' ({str(e)})'}"
)
else:
return response[self.HW_MODEL_KEY] if self.HW_MODEL_KEY in response else None
if self.HW_MODEL_KEY in response:
device.hw_model = response[self.HW_MODEL_KEY]
else:
logger.warning(
f"Cannot get HW information from device {device.name}: cannot parse 'show version'"
)

async def _refresh_device_fact(self, device: InventoryDevice) -> None:
"""
Expand All @@ -231,15 +210,23 @@ async def _refresh_device_fact(self, device: InventoryDevice) -> None:
Returns:
InventoryDevice: Updated structure with devices information
"""
logger.debug(f'Refreshing device {device.name}')
device.is_online, hw_model = await asyncio.gather(self._is_device_online(device=device), self._read_device_hw(device=device))
if device.is_online and hw_model:
device.established = True
device.hw_model = hw_model
logger.debug(f"Refreshing device {device.name}")
device.is_online = await device.session.check_connection()
if device.is_online:
await self._read_device_hw(device=device)
else:
device.established = False
logger.warning(
f"Could not connect to device {device.name}: cannot open eAPI port"
)
device.established = bool(device.is_online and device.hw_model)

def _add_device_to_inventory(self, host: str, port: Optional[int] = None, name: Optional[str] = None, tags: Optional[List[str]] = None) -> None:
def _add_device_to_inventory(
self,
host: str,
port: Optional[int] = None,
name: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> None:
"""Add a InventoryDevice to final inventory.
Create InventoryDevice and append to existing inventory
Expand All @@ -250,20 +237,20 @@ def _add_device_to_inventory(self, host: str, port: Optional[int] = None, name:
name (str): Optional name of the device
"""
kwargs: Dict[str, Any] = {
'host': host,
'username': self._username,
'password': self._password,
"host": host,
"username": self._username,
"password": self._password,
}
if name:
kwargs['name'] = name
kwargs["name"] = name
if port:
kwargs['port'] = port
kwargs["port"] = port
if self._enable_password:
kwargs['enable_password'] = self._enable_password
kwargs["enable_password"] = self._enable_password
if tags:
kwargs['tags'] = tags
kwargs["tags"] = tags
if self.timeout:
kwargs['timeout'] = self.timeout
kwargs["timeout"] = self.timeout
device = InventoryDevice(**kwargs)
self._inventory.append(device)

Expand All @@ -274,7 +261,9 @@ def _inventory_read_hosts(self) -> None:
"""
assert self._read_inventory.hosts is not None
for host in self._read_inventory.hosts:
self._add_device_to_inventory(host.host, host.port, host.name, tags=host.tags)
self._add_device_to_inventory(
host.host, host.port, host.name, tags=host.tags
)

def _inventory_read_networks(self) -> None:
"""Read input data from networks section and create inventory structure.
Expand All @@ -299,30 +288,6 @@ def _inventory_read_ranges(self) -> None:
self._add_device_to_inventory(str(range_increment), tags=range_def.tags)
range_increment += 1

def _filtered_inventory(self, established_only: bool = False, tags: Optional[List[str]] = None) -> InventoryDevices:
"""
_filtered_inventory Generate a temporary inventory filtered.
Args:
established_only (bool, optional): Do we have to include non-established devices. Defaults to False.
tags (List[str], optional): List of tags to use to filter devices. Default is [default].
Returns:
InventoryDevices: A inventory with concerned devices
"""
inventory_filtered_tags = InventoryDevices()
for device in self._inventory:
if tags and any(tag in tags for tag in device.tags):
inventory_filtered_tags.append(device)
if not established_only:
return inventory_filtered_tags

inventory_final = InventoryDevices()
for device in inventory_filtered_tags:
if device.established:
inventory_final.append(device)
return inventory_final

###########################################################################
# Public methods
###########################################################################
Expand All @@ -331,81 +296,49 @@ def _filtered_inventory(self, established_only: bool = False, tags: Optional[Lis
# GET methods
###########################################################################

# TODO refactor this to avoid having a union of return of types ..
def get_inventory(
self,
format_out: str = "native",
established_only: bool = True,
tags: Optional[List[str]] = None,
) -> Union[List[InventoryDevice], str, InventoryDevices]:
"""get_inventory Expose device inventory.
Provides inventory has a list of InventoryDevice objects. If requried, it can be exposed in JSON format. Also, by default expose only active devices.
self, established_only: bool = False, tags: Optional[List[str]] = None
) -> InventoryDevices:
"""
get_inventory Returns a new filtered inventory.
Args:
format (str, optional): Format output, can be native, list or JSON. Defaults to 'native'.
established_only (bool, optional): Allow to expose also unreachable devices. Defaults to True.
established_only (bool, optional): Whether or not including non-established devices in the Inventory.
Default False.
tags (List[str], optional): List of tags to use to filter devices. Default is [default].
Returns:
InventoryDevices: List of InventoryDevice
InventoryDevices: An inventory with concerned devices
"""
if tags is None:
tags = [DEFAULT_TAG]

if format_out not in ["native", "json", "list"]:
raise InventoryUnknownFormat(
f"Unsupported inventory format: {format_out}. Only supported format are: {self.INVENTORY_OUTPUT_FORMAT}"
)

inventory = self._filtered_inventory(established_only, tags)

if format_out == "list":
# pylint: disable=R1721
return [dev for dev in inventory]

if format_out == 'json':
return inventory.json(exclude={'__root__': {'__all__': {'session'}}})

return inventory

def get_device(self, host_ip: str) -> Optional[InventoryDevice]: # TODO mtache: unused, remove this ?
"""Get device information from a given IP.
Args:
host_ip (str): IP address of the device
Returns:
InventoryDevice: Device information
"""
if self._is_ip_exist(host_ip):
return [dev for dev in self._inventory if str(dev.host) == str(host_ip)][0]
return None

def get_device_session(self, host_ip: str) -> Server: # TODO mtache: unused, remove this ?
"""Expose RPC session of a given host from our inventory.
Provide RPC session if the session exists, if not, it returns None
Args:
host_ip (str): IP address of the host to match
inventory_filtered_tags = InventoryDevices()
for device in self._inventory:
if tags and any(tag in tags for tag in device.tags):
inventory_filtered_tags.append(device)
if not established_only:
return inventory_filtered_tags

Returns:
jsonrpclib.Server: Instance to the device. None if session does not exist
"""
device = self.get_device(host_ip=host_ip)
if device is None:
return None
return device.session
inventory_final = InventoryDevices()
for device in inventory_filtered_tags:
if device.established:
inventory_final.append(device)
return inventory_final

###########################################################################
# MISC methods
###########################################################################

async def connect_inventory(self) -> None:
"""connect_inventory Helper to prepare inventory with network data."""
logger.debug('Refreshing facts for current inventory')
results = await asyncio.gather(*(self._refresh_device_fact(device) for device in self._inventory), return_exceptions=True)
logger.debug("Refreshing facts for current inventory")
results = await asyncio.gather(
*(self._refresh_device_fact(device) for device in self._inventory),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
logger.error(f"Error when connecting to device: {r.__class__.__name__}: {r}")
logger.error(
f"Error when initiating inventory: {r.__class__.__name__}{'' if not str(r) else f' ({str(r)})'}"
)

0 comments on commit 630bff2

Please sign in to comment.