Skip to content

Commit

Permalink
Adopt standalone utils functions into Runner (#3572)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssbarnea committed Jun 22, 2023
1 parent f43b629 commit e2c1247
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 100 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ repos:
- rich>=13.2.0
- ruamel-yaml>=0.17.31
- ruamel-yaml-clib>=0.2.7
- spdx-tools
- spdx-tools>=0.7.1
- subprocess-tee
- types-PyYAML
- types-jsonschema>=4.4.2
Expand Down Expand Up @@ -187,7 +187,7 @@ repos:
- rich>=13.2.0
- ruamel-yaml>=0.17.31
- ruamel-yaml-clib>=0.2.7
- spdx-tools
- spdx-tools>=0.7.1
- typing_extensions
- wcmatch
- yamllint
Expand Down
104 changes: 102 additions & 2 deletions src/ansiblelint/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
import warnings
from dataclasses import dataclass
from fnmatch import fnmatch
from pathlib import Path
from typing import TYPE_CHECKING, Any

from ansible.errors import AnsibleError
from ansible.parsing.splitter import split_args
from ansible.parsing.yaml.constructor import AnsibleMapping
from ansible.plugins.loader import add_all_plugin_dirs
from ansible_compat.runtime import AnsibleWarning

import ansiblelint.skip_utils
Expand All @@ -31,12 +36,20 @@
from ansiblelint.logger import timed_info
from ansiblelint.rules.syntax_check import OUTPUT_PATTERNS, AnsibleSyntaxCheckRule
from ansiblelint.text import strip_ansi_escape
from ansiblelint.utils import (
PLAYBOOK_DIR,
_include_children,
_roles_children,
_taskshandlers_children,
template,
)

if TYPE_CHECKING:
from collections.abc import Generator
from pathlib import Path
from typing import Callable

from ansiblelint.config import Options
from ansiblelint.constants import FileType
from ansiblelint.rules import RulesCollection

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -415,7 +428,7 @@ def _emit_matches(self, files: list[Lintable]) -> Generator[MatchError, None, No
while visited != self.lintables:
for lintable in self.lintables - visited:
try:
children = ansiblelint.utils.find_children(lintable)
children = self.find_children(lintable)
for child in children:
if self.is_excluded(child):
continue
Expand All @@ -430,6 +443,93 @@ def _emit_matches(self, files: list[Lintable]) -> Generator[MatchError, None, No
yield MatchError(lintable=lintable, rule=LoadingFailureRule())
visited.add(lintable)

def find_children(self, lintable: Lintable) -> list[Lintable]:
"""Traverse children of a single file or folder."""
if not lintable.path.exists():
return []
playbook_dir = str(lintable.path.parent)
ansiblelint.utils.set_collections_basedir(lintable.path.parent)
add_all_plugin_dirs(playbook_dir or ".")
if lintable.kind == "role":
playbook_ds = AnsibleMapping({"roles": [{"role": str(lintable.path)}]})
elif lintable.kind not in ("playbook", "tasks"):
return []
else:
try:
playbook_ds = ansiblelint.utils.parse_yaml_from_file(str(lintable.path))
except AnsibleError as exc:
raise SystemExit(exc) from exc
results = []
# playbook_ds can be an AnsibleUnicode string, which we consider invalid
if isinstance(playbook_ds, str):
raise MatchError(lintable=lintable, rule=LoadingFailureRule())
for item in ansiblelint.utils.playbook_items(playbook_ds):
# if lintable.kind not in ["playbook"]:
for child in self.play_children(
lintable.path.parent,
item,
lintable.kind,
playbook_dir,
):
# We avoid processing parametrized children
path_str = str(child.path)
if "$" in path_str or "{{" in path_str:
continue

# Repair incorrect paths obtained when old syntax was used, like:
# - include: simpletask.yml tags=nginx
valid_tokens = []
for token in split_args(path_str):
if "=" in token:
break
valid_tokens.append(token)
path = " ".join(valid_tokens)
if path != path_str:
child.path = Path(path)
child.name = child.path.name

results.append(child)
return results

def play_children(
self,
basedir: Path,
item: tuple[str, Any],
parent_type: FileType,
playbook_dir: str,
) -> list[Lintable]:
"""Flatten the traversed play tasks."""
# pylint: disable=unused-argument
delegate_map: dict[str, Callable[[str, Any, Any, FileType], list[Lintable]]] = {
"tasks": _taskshandlers_children,
"pre_tasks": _taskshandlers_children,
"post_tasks": _taskshandlers_children,
"block": _taskshandlers_children,
"include": _include_children,
"ansible.builtin.include": _include_children,
"import_playbook": _include_children,
"ansible.builtin.import_playbook": _include_children,
"roles": _roles_children,
"dependencies": _roles_children,
"handlers": _taskshandlers_children,
"include_tasks": _include_children,
"ansible.builtin.include_tasks": _include_children,
"import_tasks": _include_children,
"ansible.builtin.import_tasks": _include_children,
}
(k, v) = item
add_all_plugin_dirs(str(basedir.resolve()))

if k in delegate_map and v:
v = template(
basedir,
v,
{"playbook_dir": PLAYBOOK_DIR or str(basedir.resolve())},
fail_on_undefined=False,
)
return delegate_map[k](str(basedir), k, v, parent_type)
return []


def _get_matches(rules: RulesCollection, options: Options) -> LintResult:
lintables = ansiblelint.utils.get_lintables(opts=options, args=options.lintables)
Expand Down
99 changes: 5 additions & 94 deletions src/ansiblelint/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@
from dataclasses import _MISSING_TYPE, dataclass, field
from functools import cache
from pathlib import Path
from typing import Any, Callable
from typing import Any

import yaml
from ansible.errors import AnsibleError, AnsibleParserError
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.parsing.dataloader import DataLoader
from ansible.parsing.mod_args import ModuleArgsParser
from ansible.parsing.splitter import split_args
from ansible.parsing.yaml.constructor import AnsibleConstructor, AnsibleMapping
from ansible.parsing.yaml.loader import AnsibleLoader
from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleSequence
Expand All @@ -50,7 +49,6 @@

from ansiblelint._internal.rules import (
AnsibleParserErrorRule,
LoadingFailureRule,
RuntimeErrorRule,
)
from ansiblelint.app import get_app
Expand Down Expand Up @@ -234,7 +232,8 @@ def tokenize(line: str) -> tuple[str, list[str], dict[str, str]]:
return (command, args, kwargs)


def _playbook_items(pb_data: AnsibleBaseYAMLObject) -> ItemsView: # type: ignore[type-arg]
def playbook_items(pb_data: AnsibleBaseYAMLObject) -> ItemsView: # type: ignore[type-arg]
"""Return a list of items from within the playbook."""
if isinstance(pb_data, dict):
return pb_data.items()
if not pb_data:
Expand All @@ -246,62 +245,13 @@ def _playbook_items(pb_data: AnsibleBaseYAMLObject) -> ItemsView: # type: ignor
return [item for play in pb_data if play for item in play.items()] # type: ignore[return-value]


def _set_collections_basedir(basedir: Path) -> None:
# Sets the playbook directory as playbook_paths for the collection loader
def set_collections_basedir(basedir: Path) -> None:
"""Set the playbook directory as playbook_paths for the collection loader."""
# Ansible expects only absolute paths inside `playbook_paths` and will
# produce weird errors if we use a relative one.
AnsibleCollectionConfig.playbook_paths = str(basedir.resolve())


def find_children(lintable: Lintable) -> list[Lintable]:
"""Traverse children of a single file or folder."""
if not lintable.path.exists():
return []
playbook_dir = str(lintable.path.parent)
_set_collections_basedir(lintable.path.parent)
add_all_plugin_dirs(playbook_dir or ".")
if lintable.kind == "role":
playbook_ds = AnsibleMapping({"roles": [{"role": str(lintable.path)}]})
elif lintable.kind not in ("playbook", "tasks"):
return []
else:
try:
playbook_ds = parse_yaml_from_file(str(lintable.path))
except AnsibleError as exc:
raise SystemExit(exc) from exc
results = []
# playbook_ds can be an AnsibleUnicode string, which we consider invalid
if isinstance(playbook_ds, str):
raise MatchError(lintable=lintable, rule=LoadingFailureRule())
for item in _playbook_items(playbook_ds):
# if lintable.kind not in ["playbook"]:
for child in play_children(
lintable.path.parent,
item,
lintable.kind,
playbook_dir,
):
# We avoid processing parametrized children
path_str = str(child.path)
if "$" in path_str or "{{" in path_str:
continue

# Repair incorrect paths obtained when old syntax was used, like:
# - include: simpletask.yml tags=nginx
valid_tokens = []
for token in split_args(path_str):
if "=" in token:
break
valid_tokens.append(token)
path = " ".join(valid_tokens)
if path != path_str:
child.path = Path(path)
child.name = child.path.name

results.append(child)
return results


def template(
basedir: Path,
value: Any,
Expand All @@ -328,45 +278,6 @@ def template(
return value


def play_children(
basedir: Path,
item: tuple[str, Any],
parent_type: FileType,
playbook_dir: str, # noqa: ARG001
) -> list[Lintable]:
"""Flatten the traversed play tasks."""
# pylint: disable=unused-argument
delegate_map: dict[str, Callable[[str, Any, Any, FileType], list[Lintable]]] = {
"tasks": _taskshandlers_children,
"pre_tasks": _taskshandlers_children,
"post_tasks": _taskshandlers_children,
"block": _taskshandlers_children,
"include": _include_children,
"ansible.builtin.include": _include_children,
"import_playbook": _include_children,
"ansible.builtin.import_playbook": _include_children,
"roles": _roles_children,
"dependencies": _roles_children,
"handlers": _taskshandlers_children,
"include_tasks": _include_children,
"ansible.builtin.include_tasks": _include_children,
"import_tasks": _include_children,
"ansible.builtin.import_tasks": _include_children,
}
(k, v) = item
add_all_plugin_dirs(str(basedir.resolve()))

if k in delegate_map and v:
v = template(
basedir,
v,
{"playbook_dir": PLAYBOOK_DIR or str(basedir.resolve())},
fail_on_undefined=False,
)
return delegate_map[k](str(basedir), k, v, parent_type)
return []


def _include_children(
basedir: str,
k: str,
Expand Down
6 changes: 4 additions & 2 deletions test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,11 @@ def test_get_rules_dirs_with_custom_rules(
assert get_rules_dirs(user_ruledirs, use_default=use_default) == expected


def test_find_children() -> None:
def test_find_children(default_rules_collection: RulesCollection) -> None:
"""Verify correct function of find_children()."""
utils.find_children(Lintable("examples/playbooks/find_children.yml"))
Runner(
rules=default_rules_collection,
).find_children(Lintable("examples/playbooks/find_children.yml"))


def test_find_children_in_task(default_rules_collection: RulesCollection) -> None:
Expand Down

0 comments on commit e2c1247

Please sign in to comment.