Skip to content

Commit

Permalink
Merge 972d201 into d2705d4
Browse files Browse the repository at this point in the history
  • Loading branch information
andreoliwa committed Jan 8, 2022
2 parents d2705d4 + 972d201 commit da358f4
Show file tree
Hide file tree
Showing 21 changed files with 269 additions and 428 deletions.
3 changes: 2 additions & 1 deletion docs/generate_rst.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from nitpick import PROJECT_NAME, __version__
from nitpick.constants import (
CONFIG_FILES,
DOT,
EDITOR_CONFIG,
PACKAGE_JSON,
PRE_COMMIT_CONFIG_YAML,
Expand Down Expand Up @@ -127,7 +128,7 @@ def __lt__(self, other: "FileType") -> bool:
@property
def sort_key(self) -> str:
"""Sort key of this element."""
return ("0" if self.text.startswith("Any") else "1") + self.text.casefold().replace(".", "")
return ("0" if self.text.startswith("Any") else "1") + self.text.casefold().replace(DOT, "")

@property
def text_with_url(self) -> str:
Expand Down
8 changes: 4 additions & 4 deletions docs/ideas/lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import jmespath
from identify import identify

from nitpick.documents import TomlDoc, YamlDoc
from nitpick.generic import flatten, search_dict
from nitpick.blender import DictBlender, TomlDoc, YamlDoc, search_json
from nitpick.constants import DOT

workflow = YamlDoc(path=Path(".github/workflows/python.yaml"))


def find(expression):
"""Find with JMESpath."""
print(f"\nExpression: {expression}")
rv = search_dict(jmespath.compile(expression), workflow.as_object, {})
rv = search_json(workflow.as_object, jmespath.compile(expression), {})
print(f"Type: {type(rv)}")
pprint(rv)

Expand Down Expand Up @@ -56,7 +56,7 @@ def main():
print(json.dumps(config, indent=2))

click.secho("Flattened JSON", fg="yellow")
print(json.dumps(flatten(config), indent=2))
print(json.dumps(DictBlender(config, separator=DOT).flat_dict, indent=2))

# find("jobs.build")
# find("jobs.build.strategy.matrix")
Expand Down
188 changes: 170 additions & 18 deletions src/nitpick/documents.py → src/nitpick/blender.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
"""Configuration file formats."""
"""Dictionary blender and configuration file formats.
.. testsetup::
from nitpick.generic import *
"""
import abc
import json
import re
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Dict, List, MutableMapping, Optional, Tuple, Type, Union

import dictdiffer
import jmespath
import toml
import tomlkit
from autorepr import autorepr
from jmespath.parser import ParsedResult
from more_itertools import always_iterable
from ruamel.yaml import YAML, RoundTripRepresenter, StringIO
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from sortedcontainers import SortedDict

from nitpick.generic import flatten, search_dict, unflatten
from nitpick.constants import DOT, DOUBLE_QUOTE, SEPARATOR_FLATTEN, SEPARATOR_QUOTED_SPLIT, SINGLE_QUOTE
from nitpick.typedefs import JsonDict, PathOrStr, YamlObject, YamlValue

DICT_CLASSES = (dict, SortedDict, OrderedDict, CommentedMap)
Expand All @@ -33,6 +41,41 @@ def compare_lists_with_dictdiffer(actual: List[Any], expected: List[Any]) -> Lis
return []


def search_json(
json_data: Union[MutableMapping[str, Any], List[Any]],
jmespath_expression: Union[ParsedResult, str],
default: Any = None,
) -> Any:
"""Search a dictionary or list using a JMESPath expression. Return a default value if not found.
>>> data = {"root": {"app": [1, 2], "test": "something"}}
>>> search_json(data, "root.app", None)
[1, 2]
>>> search_json(data, "root.test", None)
'something'
>>> search_json(data, "root.unknown", "")
''
>>> search_json(data, "root.unknown", None)
>>> search_json(data, "root.unknown")
>>> search_json(data, jmespath.compile("root.app"), [])
[1, 2]
>>> search_json(data, jmespath.compile("root.whatever"), "xxx")
'xxx'
:param jmespath_expression: A compiled JMESPath expression or a string with an expression.
:param json_data: The dictionary to be searched.
:param default: Default value in case nothing is found.
:return: The object that was found or the default value.
"""
if isinstance(jmespath_expression, str):
rv = jmespath.search(jmespath_expression, json_data)
else:
rv = jmespath_expression.search(json_data)
return rv or default


def search_element_by_unique_key( # pylint: disable=too-many-locals
actual_list: List[Any], expected_list: List[Any], nested_key: str, parent_key: str = ""
) -> Tuple[List, List]:
Expand All @@ -42,19 +85,19 @@ def search_element_by_unique_key( # pylint: disable=too-many-locals
"""
jmes_search_key = f"{parent_key}[].{nested_key}" if parent_key else nested_key

actual_keys = search_dict(f"[].{jmes_search_key}", actual_list, [])
actual_keys = search_json(actual_list, f"[].{jmes_search_key}", [])
if not actual_keys:
# There are no actual keys in the current YAML: let's insert the whole expected block
return expected_list, actual_list + expected_list

actual_indexes = {
key: index for index, element in enumerate(actual_list) for key in search_dict(jmes_search_key, element, [])
key: index for index, element in enumerate(actual_list) for key in search_json(element, jmes_search_key, [])
}

display = []
replace = actual_list.copy()
for element in expected_list:
expected_keys = search_dict(jmes_search_key, element, None)
expected_keys = search_json(element, jmes_search_key, None)
if not expected_keys:
# There are no expected keys in this current element: let's insert the whole element
display.append(element)
Expand All @@ -73,8 +116,8 @@ def search_element_by_unique_key( # pylint: disable=too-many-locals
continue

jmes_nested = f"{parent_key}[?{nested_key}=='{expected_key}']"
actual_nested = search_dict(jmes_nested, actual_list[index], [])
expected_nested = search_dict(jmes_nested, element, [{}])
actual_nested = search_json(actual_list[index], jmes_nested, [])
expected_nested = search_json(element, jmes_nested, [{}])
diff_nested = compare_lists_with_dictdiffer(actual_nested, expected_nested)
if not diff_nested:
continue
Expand All @@ -99,6 +142,123 @@ def set_key_if_not_empty(dict_: JsonDict, key: str, value: Any) -> None:
dict_[key] = value


def quoted_split(string_: str, separator=DOT) -> List[str]:
"""Split a string by a separator, but considering quoted parts (single or double quotes).
>>> quoted_split("my.key.without.quotes")
['my', 'key', 'without', 'quotes']
>>> quoted_split('"double.quoted.string"')
['double.quoted.string']
>>> quoted_split('"double.quoted.string".and.after')
['double.quoted.string', 'and', 'after']
>>> quoted_split('something.before."double.quoted.string"')
['something', 'before', 'double.quoted.string']
>>> quoted_split("'single.quoted.string'")
['single.quoted.string']
>>> quoted_split("'single.quoted.string'.and.after")
['single.quoted.string', 'and', 'after']
>>> quoted_split("something.before.'single.quoted.string'")
['something', 'before', 'single.quoted.string']
"""
if DOUBLE_QUOTE not in string_ and SINGLE_QUOTE not in string_:
return string_.split(separator)

quoted_regex = re.compile(
f"([{SINGLE_QUOTE}{DOUBLE_QUOTE}][^{SINGLE_QUOTE}{DOUBLE_QUOTE}]+[{SINGLE_QUOTE}{DOUBLE_QUOTE}])"
)

def remove_quotes(match):
return match.group(0).strip("".join([SINGLE_QUOTE, DOUBLE_QUOTE])).replace(separator, SEPARATOR_QUOTED_SPLIT)

return [
part.replace(SEPARATOR_QUOTED_SPLIT, separator)
for part in quoted_regex.sub(remove_quotes, string_).split(separator)
]


def unflatten(dict_, separator=DOT, sort=True) -> OrderedDict:
"""Turn back a flattened dict created by :py:meth:`flatten()` into a nested dict.
>>> expected = {'my': {'sub': {'path': True}, 'home': 4}, 'another': {'path': 3}}
>>> unflatten({"my.sub.path": True, "another.path": 3, "my.home": 4}) == expected
True
>>> unflatten({"repo": "conflicted key", "repo.name": "?", "repo.path": "?"})
Traceback (most recent call last):
...
TypeError: 'str' object does not support item assignment
"""
# TODO: move flatten() and unflatten() to DictBlender: they depend on each other and keep state between calls.
items: OrderedDict = OrderedDict()
for root_key, root_value in sorted(dict_.items()) if sort else dict_.items():
all_keys = quoted_split(root_key, separator)
sub_items = items
for key in all_keys[:-1]:
try:
sub_items = sub_items[key]
except KeyError:
sub_items[key] = OrderedDict()
sub_items = sub_items[key]

sub_items[all_keys[-1]] = root_value

return items


class DictBlender:
"""A blender of dictionaries: keep adding dictionaries and mix them all at the end.
.. note::
This class intentionally doesn't inherit from the standard ``dict()``.
It's an unnecessary hassle to override and deal with all those magic dunder methods.
"""

def __init__(
self, original_dict: JsonDict = None, *, extend_lists=True, separator: str = SEPARATOR_FLATTEN
) -> None:
self._current_flat_dict: OrderedDict = OrderedDict()
self._current_lists: Dict[str, List[Any]] = {}
self.extend_lists = extend_lists
self.separator = separator
self.add(original_dict or {})

def add(self, other: JsonDict) -> None:
"""Add another dictionary to the existing data."""
self._current_flat_dict.update(self._flatten(other))

def _flatten(self, other_dict: JsonDict, parent_key="") -> JsonDict:
"""Flatten a nested dict.
Adapted from `this StackOverflow question <https://stackoverflow.com/a/6027615>`_.
"""
items: List[Tuple[str, Any]] = []
for key, value in other_dict.items():
quoted_key = f"{DOUBLE_QUOTE}{key}{DOUBLE_QUOTE}" if self.separator in str(key) else key
new_key = str(parent_key) + self.separator + str(quoted_key) if parent_key else quoted_key
if isinstance(value, dict):
flat_dict = self._flatten(value, new_key)
items.extend(flat_dict.items())
elif isinstance(value, (list, tuple)) and self.extend_lists:
# If the value is a list or tuple, append to a previously existing list.
existing_list = self._current_lists.get(new_key, [])
existing_list.extend(list(value))
self._current_lists[new_key] = existing_list

items.append((new_key, existing_list))
else:
items.append((new_key, value))
return dict(items)

@property
def flat_dict(self):
"""Return a flat dictionary with the current content."""
return self._current_flat_dict

def mix(self, sort=True) -> JsonDict:
"""Mix all dictionaries, replacing values with identical keys and extending lists."""
return unflatten(self._current_flat_dict, self.separator, sort)


class Comparison:
"""A comparison between two dictionaries, computing missing items and differences."""

Expand All @@ -108,8 +268,8 @@ def __init__(
expected: JsonDict,
doc_class: Type["BaseDoc"],
) -> None:
self.flat_actual = self._normalize_value(actual)
self.flat_expected = self._normalize_value(expected)
self.flat_actual = DictBlender(actual, separator=DOT).flat_dict
self.flat_expected = DictBlender(expected, separator=DOT).flat_dict

self.doc_class = doc_class

Expand Down Expand Up @@ -143,14 +303,6 @@ def has_changes(self) -> bool:
"""Return True is there is a difference or something missing."""
return bool(self.missing or self.diff or self.replace)

@staticmethod
def _normalize_value(value: JsonDict) -> Dict:
if isinstance(value, BaseDoc):
dict_value = value.as_object
else:
dict_value = value
return flatten(dict_value)


class BaseDoc(metaclass=abc.ABCMeta):
"""Base class for configuration file formats.
Expand Down
2 changes: 1 addition & 1 deletion src/nitpick/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from click.exceptions import Exit
from loguru import logger

from nitpick.blender import TomlDoc
from nitpick.constants import PROJECT_NAME, TOOL_KEY, TOOL_NITPICK_KEY
from nitpick.core import Nitpick
from nitpick.documents import TomlDoc
from nitpick.enums import OptionEnum
from nitpick.exceptions import QuitComplainingError
from nitpick.generic import relative_to_current_dir
Expand Down
3 changes: 2 additions & 1 deletion src/nitpick/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@
NITPICK_MINIMUM_VERSION_JMEX = jmespath.compile("nitpick.minimum_version")

#: Dot/slash is used to indicate a local style file
DOT = "."
SLASH = os.path.sep
DOT_SLASH = f".{SLASH}"
DOT_SLASH = f"{DOT}{SLASH}"

GIT_AT_REFERENCE = "@"

Expand Down
5 changes: 3 additions & 2 deletions src/nitpick/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from marshmallow.validate import Length
from more_itertools import always_iterable

from nitpick.constants import DOT
from nitpick.exceptions import pretty_exception

__all__ = ("Dict", "List", "String", "Nested", "Field")
Expand Down Expand Up @@ -56,9 +57,9 @@ def string_or_list_field(object_dict, parent_object_dict): # pylint: disable=un
def validate_section_dot_field(section_field: str) -> bool:
"""Validate if the combination section/field has a dot separating them."""
common = "Use <section_name>.<field_name>"
if "." not in section_field:
if DOT not in section_field:
raise ValidationError(f"Dot is missing. {common}")
parts = section_field.split(".")
parts = section_field.split(DOT)
if len(parts) > 2:
raise ValidationError(f"There's more than one dot. {common}")
if not parts[0].strip():
Expand Down

0 comments on commit da358f4

Please sign in to comment.