Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anta.cli): Make CV certs verified by default #700

Merged
merged 11 commits into from
Jun 10, 2024
22 changes: 18 additions & 4 deletions anta/cli/get/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import TYPE_CHECKING, Any

import click
import requests
from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpApiError
from rich.pretty import pretty_repr
Expand All @@ -36,14 +37,27 @@
@click.option("--username", "-u", help="CloudVision username", type=str, required=True)
@click.option("--password", "-p", help="CloudVision password", type=str, required=True)
@click.option("--container", "-c", help="CloudVision container where devices are configured", type=str)
def from_cvp(ctx: click.Context, output: Path, host: str, username: str, password: str, container: str | None) -> None:
@click.option(
"--ignore-cert",
help="Ignore verifying the SSL certificate when connecting to CloudVision",
show_envvar=True,
is_flag=True,
default=False,
)
def from_cvp(ctx: click.Context, output: Path, host: str, username: str, password: str, container: str | None, *, ignore_cert: bool) -> None:
# pylint: disable=too-many-arguments
"""Build ANTA inventory from Cloudvision.
"""Build ANTA inventory from CloudVision.

TODO - handle get_inventory and get_devices_in_container failure
NOTE: Only username/password authentication is supported for on-premises CloudVision instances.
Token authentication for both on-premises and CloudVision as a Service (CVaaS) is not supported.
"""
# TODO: - Handle get_cv_token, get_inventory and get_devices_in_container failures.
logger.info("Getting authentication token for user '%s' from CloudVision instance '%s'", username, host)
token = get_cv_token(cvp_ip=host, cvp_username=username, cvp_password=password)
try:
token = get_cv_token(cvp_ip=host, cvp_username=username, cvp_password=password, verify_cert=not ignore_cert)
except requests.exceptions.SSLError as error:
logger.error("Authentication to CloudVison failed: %s.", error)
ctx.exit(ExitCode.USAGE_ERROR)

clnt = CvpClient()
try:
Expand Down
27 changes: 22 additions & 5 deletions anta/cli/get/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,41 @@ def wrapper(
return wrapper


def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str) -> str:
"""Generate AUTH token from CVP using password."""
# TODO: need to handle requests error
def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str, *, verify_cert: bool) -> str:
"""Generate the authentication token from CloudVision using username and password.

TODO: need to handle requests error

Args:
----
cvp_ip: IP address of CloudVision.
cvp_username: Username to connect to CloudVision.
cvp_password: Password to connect to CloudVision.
verify_cert: Enable or disable certificate verification when connecting to CloudVision.

Returns
-------
token(str): The token to use in further API calls to CloudVision.

Raises
------
requests.ssl.SSLError: If the certificate verification fails

"""
# use CVP REST API to generate a token
url = f"https://{cvp_ip}/cvpservice/login/authenticate.do"
payload = json.dumps({"userId": cvp_username, "password": cvp_password})
headers = {"Content-Type": "application/json", "Accept": "application/json"}

response = requests.request("POST", url, headers=headers, data=payload, verify=False, timeout=10)
response = requests.request("POST", url, headers=headers, data=payload, verify=verify_cert, timeout=10)
return response.json()["sessionId"]


def write_inventory_to_file(hosts: list[AntaInventoryHost], output: Path) -> None:
"""Write a file inventory from pydantic models."""
i = AntaInventoryInput(hosts=hosts)
with output.open(mode="w", encoding="UTF-8") as out_fd:
out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: i.model_dump(exclude_unset=True)}))
out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: yaml.safe_load(i.yaml())}))
logger.info("ANTA inventory file has been created: '%s'", output)


Expand Down
15 changes: 15 additions & 0 deletions anta/inventory/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from __future__ import annotations

import logging
import math

import yaml
from pydantic import BaseModel, ConfigDict, IPvAnyAddress, IPvAnyNetwork

from anta.custom_types import Hostname, Port
Expand Down Expand Up @@ -82,3 +84,16 @@ class AntaInventoryInput(BaseModel):
networks: list[AntaInventoryNetwork] | None = None
hosts: list[AntaInventoryHost] | None = None
ranges: list[AntaInventoryRange] | None = None

def yaml(self) -> str:
"""Return a YAML representation string of this model.

Returns
-------
The YAML representation string of this model.
"""
# TODO: Pydantic and YAML serialization/deserialization is not supported natively.
# This could be improved.
# https://github.com/pydantic/pydantic/issues/1043
# Explore if this worth using this: https://github.com/NowanIlfideme/pydantic-yaml
return yaml.safe_dump(yaml.safe_load(self.model_dump_json(serialize_as_any=True, exclude_unset=True)), indent=2, width=math.inf)
27 changes: 19 additions & 8 deletions docs/cli/inv-from-cvp.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,32 @@

In large setups, it might be beneficial to construct your inventory based on CloudVision. The `from-cvp` entrypoint of the `get` command enables the user to create an ANTA inventory from CloudVision.

!!! info
The current implementation only works with on-premises CloudVision instances, not with CloudVision as a Service (CVaaS).

### Command overview

```bash
anta get from-cvp --help
Usage: anta get from-cvp [OPTIONS]

Build ANTA inventory from Cloudvision
Build ANTA inventory from CloudVision.

NOTE: Only username/password authentication is supported for on-premises CloudVision instances.
Token authentication for both on-premises and CloudVision as a Service (CVaaS) is not supported.

Options:
-ip, --cvp-ip TEXT CVP IP Address [required]
-u, --cvp-username TEXT CVP Username [required]
-p, --cvp-password TEXT CVP Password / token [required]
-c, --cvp-container TEXT Container where devices are configured
-d, --inventory-directory PATH Path to save inventory file
--help Show this message and exit.
-o, --output FILE Path to save inventory file [env var: ANTA_INVENTORY;
required]
--overwrite Do not prompt when overriding current inventory [env
var: ANTA_GET_FROM_CVP_OVERWRITE]
-host, --host TEXT CloudVision instance FQDN or IP [required]
-u, --username TEXT CloudVision username [required]
-p, --password TEXT CloudVision password [required]
-c, --container TEXT CloudVision container where devices are configured
--ignore-cert By default connection to CV will use HTTPS
certificate, set this flag to disable it [env var:
ANTA_GET_FROM_CVP_IGNORE_CERT]
--help Show this message and exit.
```

The output is an inventory where the name of the container is added as a tag for each host:
Expand Down
48 changes: 34 additions & 14 deletions tests/units/cli/get/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from unittest.mock import ANY, patch

import pytest
import requests
from cvprac.cvp_client_errors import CvpApiError

from anta.cli._main import anta
Expand All @@ -24,19 +25,25 @@


@pytest.mark.parametrize(
("cvp_container", "cvp_connect_failure"),
("cvp_container", "verify_cert", "cv_token_failure", "cvp_connect_failure"),
[
pytest.param(None, False, id="all devices"),
pytest.param("custom_container", False, id="custom container"),
pytest.param(None, True, id="cvp connect failure"),
pytest.param(None, True, False, False, id="all devices - verify cert"),
pytest.param(None, True, True, False, id="all devices - fail SSL check"),
pytest.param(None, False, False, False, id="all devices - do not verify cert"),
pytest.param("custom_container", False, False, False, id="custom container"),
pytest.param(None, False, False, True, id="cvp connect failure"),
],
)
def test_from_cvp(
tmp_path: Path,
click_runner: CliRunner,
cvp_container: str | None,
verify_cert: bool,
cv_token_failure: bool,
cvp_connect_failure: bool,
) -> None:
# pylint: disable=too-many-arguments
# ruff: noqa: C901
"""Test `anta get from-cvp`.

This test verifies that username and password are NOT mandatory to run this command
Expand All @@ -57,14 +64,20 @@ def test_from_cvp(

if cvp_container is not None:
cli_args.extend(["--container", cvp_container])
if not verify_cert:
cli_args.extend(["--ignore-cert"])

def mock_get_cv_token(*_args: str, **_kwargs: str) -> None:
if cv_token_failure:
raise requests.exceptions.SSLError

def mock_cvp_connect(_self: CvpClient, *_args: str, **_kwargs: str) -> None:
if cvp_connect_failure:
raise CvpApiError(msg="mocked CvpApiError")

# always get a token
with (
patch("anta.cli.get.commands.get_cv_token", return_value="dummy_token"),
patch("anta.cli.get.commands.get_cv_token", autospec=True, side_effect=mock_get_cv_token),
patch(
"cvprac.cvp_client.CvpClient.connect",
autospec=True,
Expand All @@ -79,20 +92,27 @@ def mock_cvp_connect(_self: CvpClient, *_args: str, **_kwargs: str) -> None:
):
result = click_runner.invoke(anta, cli_args)

if not cvp_connect_failure:
if not cvp_connect_failure and not cv_token_failure:
assert output.exists()

if cv_token_failure:
assert "Authentication to CloudVison failed" in result.output
assert result.exit_code == ExitCode.USAGE_ERROR
return

mocked_cvp_connect.assert_called_once()
if not cvp_connect_failure:
assert "Connected to CloudVision" in result.output
if cvp_container is not None:
mocked_get_devices_in_container.assert_called_once_with(ANY, cvp_container)
else:
mocked_get_inventory.assert_called_once()
assert result.exit_code == ExitCode.OK
else:

if cvp_connect_failure:
assert "Error connecting to CloudVision" in result.output
assert result.exit_code == ExitCode.USAGE_ERROR
return

assert "Connected to CloudVision" in result.output
if cvp_container is not None:
mocked_get_devices_in_container.assert_called_once_with(ANY, cvp_container)
else:
mocked_get_inventory.assert_called_once()
assert result.exit_code == ExitCode.OK


@pytest.mark.parametrize(
Expand Down
13 changes: 10 additions & 3 deletions tests/units/cli/get/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@
DATA_DIR: Path = Path(__file__).parents[3].resolve() / "data"


def test_get_cv_token() -> None:
@pytest.mark.parametrize(
"verify_cert",
[
pytest.param(True, id="Verify cert enabled"),
pytest.param(False, id="Verify cert disabled"),
],
)
def test_get_cv_token(verify_cert: bool) -> None:
"""Test anta.get.utils.get_cv_token."""
ip_addr = "42.42.42.42"
username = "ant"
Expand All @@ -29,13 +36,13 @@ def test_get_cv_token() -> None:
mocked_ret = MagicMock(autospec=requests.Response)
mocked_ret.json.return_value = {"sessionId": "simple"}
patched_request.return_value = mocked_ret
res = get_cv_token(ip_addr, username, password)
res = get_cv_token(ip_addr, username, password, verify_cert=verify_cert)
patched_request.assert_called_once_with(
"POST",
"https://42.42.42.42/cvpservice/login/authenticate.do",
headers={"Content-Type": "application/json", "Accept": "application/json"},
data='{"userId": "ant", "password": "formica"}',
verify=False,
verify=verify_cert,
timeout=10,
)
assert res == "simple"
Expand Down
Loading