diff --git a/deirokay/_utils.py b/deirokay/_utils.py index 213c757..1a4a3f4 100644 --- a/deirokay/_utils.py +++ b/deirokay/_utils.py @@ -1,5 +1,5 @@ from difflib import get_close_matches -from typing import Generator, Iterable, TypeVar +from typing import Generator, Iterable, TypeVar, Union from jinja2.nativetypes import Environment @@ -77,3 +77,14 @@ def render_dict(env: Environment, dict_: dict, template: dict): render_dict(env, value, template) elif isinstance(value, list): render_list(env, value, template) + + +T = TypeVar('T') + + +def noneor(*operands: T) -> Union[T, None]: + """Return the first non-None element.""" + for item in operands: + if item is not None: + return item + return None diff --git a/deirokay/statements/builtin/contain.py b/deirokay/statements/builtin/contain.py index 6d6030c..c21db84 100644 --- a/deirokay/statements/builtin/contain.py +++ b/deirokay/statements/builtin/contain.py @@ -1,22 +1,28 @@ """ Statement to check the presence (or absence) of values in a scope. """ +import warnings from typing import List +import dask.dataframe # lazy module import numpy # lazy module import pandas # lazy module from deirokay._typing import DeirokayStatement +from deirokay._utils import noneor from deirokay.enums import Backend from deirokay.parser import get_dtype_treater, get_treater_instance +from deirokay.parser.loader import data_treater from ..multibackend import profile, report from .base_statement import BaseStatement +NODEFAULT = object() + class Contain(BaseStatement): """ - Checks if the given scope contains specific values. We can also + Checks if the given scope contains specific values. You may also check the number of their occurrences by specifying a minimum and maximum value of frequency. @@ -24,9 +30,19 @@ class Contain(BaseStatement): * `rule` (required): One of `all`, `only` or `all_and_only`. * `values` (required): A list of values to which the rule applies. - * `parser` (required): The parser to be used to parse the `values`. + * `multicolumn`: A boolean indicating whether the statement should + consider each of the `values` as a tuple of multiple columns or + a single value. When set to False and evaluated over a scope + containing more than one column, the rule will be applied over + all the values from the original columns as in a single column. + Default: False. + * `parser`: The parser (or a list) to be used to parse the `values`. Correspond to the `parser` parameter of the `treater` function (see `deirokay.data_reader` method). + Either `parser` or `parsers` must be declared. + * `parsers`: An alias for `parser`, recommended when `multicolumn` + is set to True. + Either `parser` or `parsers` must be declared. * `min_occurrences`: a global minimum number of occurrences for each of the `values`. Default: 1 for `all` and `all_and_only` rules, 0 for `only`. @@ -35,7 +51,7 @@ class Contain(BaseStatement): * `occurrences_per_value`: a list of dictionaries overriding the global boundaries. Each dictionary may have the following keys: - * `values` (required): a value (or a list) to which the + * `values` (required): a list of values to which the occurrence bounds below must apply to. * `min_occurrences`: a minimum number of occurrences for these values. Default: global `min_occurrences` parameter. @@ -46,8 +62,9 @@ class Contain(BaseStatement): dictionaries in `occurrences_per_value` (but yet present on the main `values` list). - * `verbose`: if `True`, the report will include the percentage of - occurrences for each value. Default: `True`. + * `report_limit`: if set to a positive integer, limit the number of + items generated in the statement report. + Default: 32. The `all` rule checks if all the `values` declared are present in the scope (possibly tolerating other values not declared in @@ -73,6 +90,10 @@ class Contain(BaseStatement): applied to all the `values` declared, and only these. It means you cannot (yet) specify boundaries for values you did't declare. + Null values are considered valid for the purpose of the statement + evaluation and must be explicitely passed in `values` if you wish + to allow them (or not). + You may also notice that, by tweaking the expected number of occurrences, you may end up having the very same behaviour regardless the `rule` you choose. @@ -188,224 +209,229 @@ class Contain(BaseStatement): - 3.48.48.136 parser: {dtype: string} + * You want the pair 'San Diego' and '2022' to appear at most twice + in your dataset (for any reason). + + .. code-block:: yaml + + scope: [city, year] + statements: + - type: contain + rule: all + min_occurrences: 0 + max_occurrences: 2 + values: + - ['San Diego', 2022] + parsers: + - {dtype: string} + - {dtype: integer} + """ name = 'contain' expected_parameters = [ 'rule', 'values', + 'multicolumn', 'parser', + 'parsers', 'min_occurrences', 'max_occurrences', 'occurrences_per_value', - 'verbose' + 'report_limit', ] - supported_backends: List[Backend] = [Backend.PANDAS] + supported_backends: List[Backend] = [Backend.PANDAS, Backend.DASK] + + DEFAULT_MIN_OCCURRENCES = { + 'all': (1, 0), + 'only': (0, 0), + 'all_and_only': (1, 0) + } + DEFAULT_MAX_OCCURRENCES = { + 'all': (numpy.inf, numpy.inf), + 'only': (numpy.inf, 0), + 'all_and_only': (numpy.inf, 0) + } + DEFAULT_REPORT_LIMIT = 32 def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.rule = self.options['rule'] - self.treater = get_treater_instance(self.options['parser'], - backend=self.get_backend()) - self.values = self.treater(self.options['values']) + assert self.rule in ('all', 'only', 'all_and_only') + self.multicolumn = self.options.get('multicolumn', False) + _parsers = self.options.get('parser') or self.options['parsers'] + if self.multicolumn: + self.parsers = _parsers + else: + self.parsers = [_parsers] + self.treaters = [ + get_treater_instance(parser, backend=self.get_backend()) + for parser in self.parsers + ] self.min_occurrences = self.options.get('min_occurrences', None) self.max_occurrences = self.options.get('max_occurrences', None) self.occurrences_per_value = self.options.get( 'occurrences_per_value', [] ) - self.verbose = self.options.get('verbose', True) + self.report_limit = self.options.get('report_limit', NODEFAULT) self._set_default_minmax_occurrences() - self._assert_parameters() def _set_default_minmax_occurrences(self) -> None: - min_occurrences_rule_default = { - 'all': 1, - 'only': 0, - 'all_and_only': 1 - } - max_occurrences_rule_default = { - 'all': numpy.inf, - 'only': numpy.inf, - 'all_and_only': numpy.inf + final_min = noneor(self.min_occurrences, + Contain.DEFAULT_MIN_OCCURRENCES[self.rule][0]) + final_max = noneor(self.max_occurrences, + Contain.DEFAULT_MAX_OCCURRENCES[self.rule][0]) + if ( + self.max_occurrences is not None and + self.max_occurrences < final_min + ): + final_min = self.max_occurrences + if ( + self.min_occurrences is not None and + self.min_occurrences > final_max + ): + final_max = self.min_occurrences + assert final_min >= 0 + assert final_max >= 0 + self.min_occurrences = final_min + self.max_occurrences = final_max + + def _generate_analysis(self, value_counts): + if self.multicolumn: + def _unpack_row(row, *args): + return (*row, *args) + else: + def _unpack_row(row, *args): + return (row, *args) + + listed_values = [ + _unpack_row( + row, + self.min_occurrences, + self.max_occurrences + ) + for row in self.options['values'] + ] + occurences_per_value = [ + _unpack_row( + row, + noneor(item.get('min_occurrences'), self.min_occurrences), + noneor(item.get('max_occurrences'), self.max_occurrences), + ) + for item in self.occurrences_per_value + for row in item['values'] + ] + + occurrence_limits = pandas.DataFrame( + occurences_per_value + listed_values, + columns=value_counts.index.names + ['min', 'max'] + ) + options = { + col: parser + for col, parser in zip(value_counts.index.names, self.parsers) } + data_treater(occurrence_limits, options, backend=Backend.PANDAS) - if self.min_occurrences is None: - self.min_occurrences = min_occurrences_rule_default[self.rule] - if self.max_occurrences is None: - self.max_occurrences = max_occurrences_rule_default[self.rule] + occurrence_limits.drop_duplicates( + subset=value_counts.index.names, + keep='first', + inplace=True + ) + occurrence_limits.set_index(value_counts.index.names, inplace=True) - def _assert_parameters(self) -> None: - assert self.rule in ('all', 'only', 'all_and_only') - assert self.min_occurrences >= 0 - assert self.max_occurrences >= 0 + analysis = value_counts.to_frame().reset_index().merge( + occurrence_limits.reset_index(), how='outer' + ) + analysis['count'].fillna(0, inplace=True) + analysis['min'].fillna(Contain.DEFAULT_MIN_OCCURRENCES[self.rule][1], + inplace=True) + analysis['max'].fillna(Contain.DEFAULT_MAX_OCCURRENCES[self.rule][1], + inplace=True) + analysis['result'] = ( + analysis['count'].ge(analysis['min']) + & + analysis['count'].le(analysis['max']) + ) + return analysis + + def _generate_report(self, analysis): + columns = [analysis[col] for col in analysis.columns[:-4]] + serialized = ( + treater.serialize(column) + for treater, column in zip(self.treaters, columns) + ) + rows = zip(*(s['values'] for s in serialized)) + values_report = sorted([ + { + 'value': value_row, + 'count': analysis_row.count, + 'result': analysis_row.result, + } + for value_row, analysis_row in zip(rows, analysis.itertuples()) + ], key=lambda x: x['result']) + + if ( + self.report_limit is NODEFAULT and + len(values_report) > Contain.DEFAULT_REPORT_LIMIT + ): + self.report_limit = Contain.DEFAULT_REPORT_LIMIT + warnings.warn( + "The 'contain' statement's report size was automatically" + f' truncated to {Contain.DEFAULT_REPORT_LIMIT} items to' + ' prevent unexpectedly long logs.\n' + 'If you wish to set a different' + ' size limit or even not set a limit at all (None),' + ' please declare the `report_limit` parameter explicitely.', + Warning + ) + + return { + 'values': ( + values_report if self.report_limit is NODEFAULT else + values_report if self.report_limit is None else + values_report[:self.report_limit] + ) + } @report(Backend.PANDAS) def _report_pandas(self, df: 'pandas.DataFrame') -> dict: # Concat all columns - count_isin = ( - pandas.concat(df[col] for col in df.columns).value_counts() + _cols = df.columns.tolist() + + if not self.multicolumn: + # Columns are assumed to be of same Dtype + df = pandas.concat([df[col] for col in _cols]).to_frame() + + value_counts = ( + df.groupby(_cols, dropna=False)[_cols[0]].size() + .rename('count') ) - self.value_count = count_isin.to_dict() - - # Generate report - values = self.treater.serialize(self.value_count.keys())['values'] - freqs = [int(freq) for freq in self.value_count.values()] - - if self.verbose: - # Include percentage in reports - total = int(count_isin.sum()) - rel_freqs = (freq*100/total for freq in freqs) - values_report = [ - {'value': value, 'count': freq, 'perc': pfreq} - for value, freq, pfreq in zip(values, freqs, rel_freqs) - ] - else: - values_report = [ - {'value': value, 'count': freq} - for value, freq in zip(values, freqs) - ] - return { - 'values': values_report - } + analysis = self._generate_analysis(value_counts) + return self._generate_report(analysis) + + @report(Backend.DASK) + def _report_dask(self, df: 'dask.dataframe.DataFrame') -> dict: + # Concat all columns + _cols = df.columns.tolist() + + if not self.multicolumn: + # Columns are assumed to be of same Dtype + df = dask.dataframe.concat([df[col] for col in _cols]).to_frame() + + value_counts = ( + df.groupby(_cols, dropna=False)[_cols[0]].size() + .rename('count') + ) + analysis = self._generate_analysis(value_counts.compute()) + return self._generate_report(analysis) # docstr-coverage:inherited def result(self, report: dict) -> bool: - self._set_min_max_boundaries() - self._set_values_scope() - - if not self._check_interval(self.value_count): - return False - if not self._check_rule(self.value_count): - return False - return True - - def _set_min_max_boundaries(self) -> None: - # Global boundaries - min_max_boundaries = {} - for value in self.values: - min_max_boundaries.update({ - value: { - 'min_occurrences': self.min_occurrences, - 'max_occurrences': self.max_occurrences - } - }) - - # Dedicated boundaries - if self.occurrences_per_value: - for occurrence in self.occurrences_per_value: - values = occurrence['values'] - values = [values] if not isinstance(values, list) else values - values = self.treater(values) - - for value in values: - min_max_boundaries[value][ - 'min_occurrences' - ] = occurrence.get( - 'min_occurrences', self.min_occurrences - ) - - min_max_boundaries[value][ - 'max_occurrences' - ] = occurrence.get( - 'max_occurrences', self.max_occurrences - ) - - self.min_max_boundaries = min_max_boundaries - - def _set_values_scope(self): - """ - Set the scope of values to be analyzed according to the given - `self.rule`. Excludes the cases of values where its - corresponding `max_occurrences` is zero, since these cases - won't matter for the `rule` analysis, as they must be absent - in the column. - """ - values_col = [ - value for value in self.min_max_boundaries - if self.min_max_boundaries[value]['max_occurrences'] != 0 - ] - self.values_scope_filter = values_col - - def _check_interval(self, value_count: dict) -> bool: - """ - Check if each value is inside an interval of min and max - number of occurrencies. These values are set globally in - `self.min_occurrencies` and `self.max_occurrencies`, but the - user can specify dedicated intervals for a given value in - `self.occurrences_per_value` - """ - for value in self.values: - min_value = self.min_max_boundaries[value][ - 'min_occurrences' - ] - max_value = self.min_max_boundaries[value][ - 'max_occurrences' - ] - if value in value_count: - if not ( - min_value <= value_count[value] <= max_value - ): - return False - else: - if self.rule != 'only' and max_value != 0 and min_value != 0: - return False - return True - - def _check_rule(self, value_count: dict) -> bool: - """ - Checks if given columns attend the given requirements - of presence or absence of values, according to a criteria - specified in `self.rule` - - Parameters - ---------- - value_count: dict - Got from `report` method, it contains the count of - occurrences for each column for each value - - Notes - ----- - `self.rule` parameter defines the criteria to use for checking - the presence or absence of values in a column. Its values - should be: - - * all: all the values in `self.values` are present in the - column, but there can be other values also - * only: only the values in `self.values` (but not necessarilly - all of them) are present in the given column - * all_and_only: the column must contain exactly the values in - `self.values` - neither more than less. As the name says, it - is an `and` boolean operation between `all` and `only` modes - """ - if self.rule == 'all': - return self._check_all(value_count) - elif self.rule == 'only': - return self._check_only(value_count) - else: - return (self._check_all(value_count) and - self._check_only(value_count)) - - def _check_all(self, value_count: dict) -> bool: - """ - Checks if values in df contains all the expected values - """ - values_in_col = set(value_count.keys()) - values = set(self.values_scope_filter) - if values - values_in_col: - return False - return True - - def _check_only(self, value_count: dict) -> bool: - """ - Checks if all values in df are inside the expected values - """ - values_in_col = set(value_count.keys()) - values = set(self.values_scope_filter) - if values_in_col - values: - return False - return True + return all( + item['result'] for item in report['values'] + ) @profile(Backend.PANDAS) @staticmethod @@ -444,3 +470,41 @@ def _profile_pandas(df: 'pandas.DataFrame') -> DeirokayStatement: statement_template['min_occurrences'] = min_occurrences return statement_template + + @profile(Backend.DASK) + @staticmethod + def _profile_dask(df: 'dask.dataframe.DataFrame') -> DeirokayStatement: + if any(dtype != df.dtypes for dtype in df.dtypes): + raise NotImplementedError( + "Refusing to mix up different types of columns" + ) + + series = dask.dataframe.concat([df[col] for col in df.columns]) + + unique_series = series.drop_duplicates().dropna() + if len(unique_series) > 20: + raise NotImplementedError("Won't generate too long statements!") + + value_frequency = series.value_counts() + min_occurrences = int(value_frequency.min().compute()) + + statement_template = { + 'type': 'contain', + 'rule': 'all' + } # type: DeirokayStatement + # Get most common type to infer treater + try: + statement_template.update( + get_dtype_treater(unique_series.map(type).mode().compute()[0]) + .attach_backend(Backend.DASK) + .serialize(unique_series) # type: ignore + ) + except TypeError: + raise NotImplementedError("Can't handle mixed types") + # Sort allowing `None` values, which will appear last + statement_template['values'].sort(key=lambda x: (x is None, x)) + + if min_occurrences != 1: + statement_template['min_occurrences'] = min_occurrences + + return statement_template diff --git a/tests/statements/test_contain.csv b/tests/statements/test_contain.csv index b469fee..ddcbcda 100644 --- a/tests/statements/test_contain.csv +++ b/tests/statements/test_contain.csv @@ -2,10 +2,10 @@ test_rule_1;test_rule_2;test_rule_3;test_maxmin;test_not_contain RJ;RJ;RJ;RJ;RJ ES;ES;ES;RJ;ES SC;SC;SC;RJ;SP -AC;AC;AC;SP;MG -SP;;SP;SP;MG -AB;;AC;ES;SP -XY;;;ES;SP -hello_world;;;ES;SP -hello_deirokay;;;ES;BA -hello_life;;;ES;BA +AC;SC;AC;SP;MG +SP;RJ;;SP;MG +;ES;AC;ES;SP +;SC;AC;ES;SP +;SC;AC;ES;SP +;SC;AC;ES;BA +;SC;;ES;BA diff --git a/tests/statements/test_contain.py b/tests/statements/test_contain.py index 17ea3c7..944ca66 100644 --- a/tests/statements/test_contain.py +++ b/tests/statements/test_contain.py @@ -9,10 +9,10 @@ @pytest.mark.parametrize('rule, scope, result', [('all', 'test_rule_1', 'pass'), ('all', 'test_rule_2', 'fail'), - ('all', 'test_rule_3', 'pass'), + ('all', 'test_rule_3', 'fail'), ('only', 'test_rule_1', 'fail'), ('only', 'test_rule_2', 'pass'), - ('only', 'test_rule_3', 'pass')]) + ('only', 'test_rule_3', 'fail')]) def test_rules(rule, scope, result, backend): df = data_reader('tests/statements/test_contain.csv', options='tests/statements/test_contain_options.yaml', @@ -157,3 +157,73 @@ def test_profile(backend): validate(df, against=assertions, raise_exception=False) ['items'][0]['statements'][0]['report']['result'] ) == 'pass' + + +@pytest.mark.parametrize('backend', list(Backend)) +def test_multicolumn(backend): + df = data_reader('tests/statements/test_contain.csv', + options='tests/statements/test_contain_options.yaml', + backend=backend) + assertions = { + 'name': 'all_test_rule', + 'items': [ + { + 'scope': ['test_rule_1', 'test_rule_2'], + 'statements': [ + { + 'type': 'contain', + 'rule': 'all', + 'values': [ + ['RJ', 'RJ'], + ['ES', 'ES'] + ], + 'multicolumn': True, + 'parsers': 2*[{'dtype': 'string'}], + 'occurrences_per_value': [ + { + 'values': [('RJ', 'RJ'), ('ES', 'ES')], + 'min_occurrences': 1, + } + ] + } + ] + } + ] + } + assert ( + validate(df, against=assertions, raise_exception=False) + ['items'][0]['statements'][0]['report']['result'] + ) == 'pass' + + +@pytest.mark.parametrize('backend', list(Backend)) +@pytest.mark.parametrize('scope, rule, result, values', [ + ('test_rule_1', 'all', 'pass', ['AC', 'SP', None]), + ('test_rule_1', 'only', 'fail', [None]), + ('test_rule_2', 'all', 'fail', [None]), + ('test_not_contain', 'all', 'fail', ['RJ', 'SP', 'ES', None]), +]) +def test_null_values(scope, rule, result, values, backend): + df = data_reader('tests/statements/test_contain.csv', + options='tests/statements/test_contain_options.yaml', + backend=backend) + assertions = { + 'name': 'all_test_rule', + 'items': [ + { + 'scope': scope, + 'statements': [ + { + 'type': 'contain', + 'rule': rule, + 'values': values, + 'parser': {'dtype': 'string'} + } + ] + } + ] + } + assert ( + validate(df, against=assertions, raise_exception=False) + ['items'][0]['statements'][0]['report']['result'] + ) == result