From 7263f6d89076f902f6c0c8f9b8c15c967f0fb7c5 Mon Sep 17 00:00:00 2001 From: Kudo Chien Date: Wed, 28 Oct 2015 14:41:26 +0800 Subject: [PATCH] Fix multiple key[] (key[].a,key[].b) issue --- collection_filter/collection_filter.py | 28 ++--- collection_filter/dict_utils.py | 57 +++++++++ tests/test_collection_filter.py | 11 ++ tests/test_dict_utils.py | 164 +++++++++++++++++++++++++ 4 files changed, 239 insertions(+), 21 deletions(-) create mode 100644 collection_filter/dict_utils.py create mode 100644 tests/test_dict_utils.py diff --git a/collection_filter/collection_filter.py b/collection_filter/collection_filter.py index cd55726..6d76e2d 100644 --- a/collection_filter/collection_filter.py +++ b/collection_filter/collection_filter.py @@ -1,21 +1,7 @@ import copy - - -def _mergedict(dict1, dict2): - '''Merge two dictionaries and return the new dictionary - ''' - def _merge_inner(inner_dict1, inner_dict2): - for key, value in inner_dict1.items(): - if isinstance(value, dict): - # get node or create one - node = inner_dict2.setdefault(key, {}) - _merge_inner(value, node) - else: - inner_dict2[key] = value - return inner_dict2 - - # Immutable dict2 - return _merge_inner(dict1, copy.copy(dict2)) +from .dict_utils import ( + dict_union, +) def _get_next_field(query): @@ -124,12 +110,12 @@ def collection_filter(data, fields): for field in fields.split(','): # [2] For each dot notated sub field, do further query recursively next_field, remain_query = _get_next_field(field) - subset = _inner_filter(copy.copy(data), next_field, remain_query) + subset = _inner_filter(copy.deepcopy(data), next_field, remain_query) if data_as_list: # [3-1] For list, set each element as merged dictionary for idx in range(len(data)): - result[idx] = _mergedict(result[idx], subset[idx]) + result[idx] = dict_union(result[idx], subset[idx]) else: - # [3-2] For dictionary, simply do merge - result = _mergedict(result, subset) + # [3-2] For dictionary, do union + result = dict_union(result, subset) return result diff --git a/collection_filter/dict_utils.py b/collection_filter/dict_utils.py new file mode 100644 index 0000000..25a7e45 --- /dev/null +++ b/collection_filter/dict_utils.py @@ -0,0 +1,57 @@ +import copy + + +def dict_union(dict1, dict2): + '''Return the union of two dictionaries + ''' + def _union_inner(inner_dict1, inner_dict2): + new_dict = copy.deepcopy(inner_dict1) + for key, value in inner_dict2.items(): + if type(value) == dict: + node = new_dict.setdefault(key, {}) + unioned_value = _union_inner(value, node) + elif type(value) == list: + if inner_dict1.get(key): + unioned_value = [_union_inner(inner_dict1[key][idx], + inner_dict2[key][idx]) + for idx in range(len(value))] + else: + unioned_value = copy.copy(inner_dict2[key]) + else: + unioned_value = new_dict.get(key) or value + + new_dict[key] = unioned_value + + return new_dict + + # Immutable dict2 + return _union_inner(dict1, dict2) + + +def dict_intersect(dict1, dict2): + '''Return the intersection of two dictionaries + ''' + def _intersect_inner(inner_dict1, inner_dict2): + new_dict = {} + for key, value1 in inner_dict1.items(): + value2 = inner_dict2.get(key) + if not value2: + continue + if type(value1) != type(value2): + intersected_value = value2 + elif type(value1) == dict: + intersected_value = _intersect_inner(value1, value2) + elif type(value1) == list: + intersected_value = [_intersect_inner(value1[idx], + value2[idx]) + for idx in + range(min(len(value1), len(value2)))] + else: + intersected_value = value2 + + new_dict[key] = intersected_value + + return new_dict + + # Immutable dict2 + return _intersect_inner(dict1, dict2) diff --git a/tests/test_collection_filter.py b/tests/test_collection_filter.py index 05d6e6e..33a9774 100644 --- a/tests/test_collection_filter.py +++ b/tests/test_collection_filter.py @@ -97,6 +97,17 @@ def test_DataDictIncludeListWithFieldsDeepKeyQuery_ReturnSubsetData(self): # Assert assert result == {'aList': [{'elem1': 1}, {}, {}]} + def test_DataDictIncludeListWithTwoFieldsDeepKeyQuery_ReturnUnionSubsetData(self): + # Arrange + data = {'foo': 1, 'aList': [{'foo': 1, 'bar': 2, 'dontcare': 99}, {'foo': 3, 'bar': 4, 'dontcare': 100}]} + fields = 'aList[].foo,aList[].bar' + + # Act + result = collection_filter(data, fields) + + # Assert + assert result == {'aList': [{'foo': 1, 'bar': 2}, {'foo': 3, 'bar': 4}]} + def test_DataDictIncludeListComplexWithFieldsDeepKeyQuery_ReturnSubsetData(self): # Arrange data = {'foo': 1, 'aList': [{'elem1': {'foo': 1, 'bar': 2}}, {'elem2': {'foo': 'bar'}}, {'elem3': {'foo': 'bar'}}]} diff --git a/tests/test_dict_utils.py b/tests/test_dict_utils.py new file mode 100644 index 0000000..b5b4b6c --- /dev/null +++ b/tests/test_dict_utils.py @@ -0,0 +1,164 @@ +# flake8: noqa for E501 +import pytest + +from collection_filter import dict_utils + + +class Test_DictUtils(object): + + def test_UnionSimpleDict_ReturnUnionData(self): + # Arrange + dict1 = {'foo': 1} + dict2 = {'bar': 2} + + # Act + result = dict_utils.dict_union(dict1, dict2) + + # Assert + assert result == {'foo': 1, 'bar': 2} + + def test_UnionDeepDict_ReturnUnionData(self): + # Arrange + dict1 = {'deep': {'foo': 1}} + dict2 = {'deep': {'bar': 2}} + + # Act + result = dict_utils.dict_union(dict1, dict2) + + # Assert + assert result == {'deep': {'foo': 1, 'bar': 2}} + + def test_UnionDeepDictWithExtraDataInDict1_KeepExtraData(self): + # Arrange + dict1 = {'deep': {'foo': 1}, 'extra': True} + dict2 = {'deep': {'bar': 2}} + + # Act + result = dict_utils.dict_union(dict1, dict2) + + # Assert + assert result == {'deep': {'foo': 1, 'bar': 2}, 'extra': True} + + def test_UnionDeepDictWithExtraDataInDict2_KeepExtraData(self): + # Arrange + dict1 = {'deep': {'foo': 1}} + dict2 = {'deep': {'bar': 2}, 'extra': True} + + # Act + result = dict_utils.dict_union(dict1, dict2) + + # Assert + assert result == {'deep': {'foo': 1, 'bar': 2}, 'extra': True} + + def test_UnionDictImmutateDict_DictsAsOrigin(self): + # Arrange + dict1 = {'deep': {'foo': 1}} + dict2 = {'deep': {'bar': 2}} + + # Act + result = dict_utils.dict_union(dict1, dict2) + + # Assert + assert dict1 == {'deep': {'foo': 1}} + assert dict2 == {'deep': {'bar': 2}} + + def test_UnionDictWithList_ReturnUnionData(self): + # Arrange + dict1 = {'deep': [{'foo': 1}, {'foo': 2}]} + dict2 = {'deep': [{'bar': 3}, {'bar': 4}]} + + # Act + result = dict_utils.dict_union(dict1, dict2) + + # Assert + assert result == {'deep': [{'foo': 1, 'bar': 3}, {'foo': 2, 'bar': 4}]} + + def test_UnionDictListFromEmpty_ReturnUnionData(self): + # Arrange + dict1 = {} + dict2 = {'deep': [{'bar': 3}, {'bar': 4}]} + + # Act + result = dict_utils.dict_union(dict1, dict2) + + # Assert + assert result == {'deep': [{'bar': 3}, {'bar': 4}]} + + def test_IntersectDictSimple_ReturnIntersectData(self): + # Arrange + dict1 = {'foo': 1, 'bar': 2} + dict2 = {'foo': 1} + + # Act + result = dict_utils.dict_intersect(dict1, dict2) + + # Assert + assert result == {'foo': 1} + + def test_IntersectDictDeep_ReturnIntersectData(self): + # Arrange + dict1 = {'deep': {'foo': 1, 'bar': 2}} + dict2 = {'deep': {'foo': 1}} + + # Act + result = dict_utils.dict_intersect(dict1, dict2) + + # Assert + assert result == {'deep': {'foo': 1}} + + + def test_IntersectDictToEmpty_ReturnEmptyData(self): + # Arrange + dict1 = {'foo': 1, 'bar': 2} + dict2 = {} + + # Act + result = dict_utils.dict_intersect(dict1, dict2) + + # Assert + assert result == {} + + def test_IntersectDictImmutable_ExpectOriginData(self): + # Arrange + dict1 = {'foo': 1, 'bar': 2} + dict2 = {'foo': 1} + + # Act + result = dict_utils.dict_intersect(dict1, dict2) + + # Assert + assert dict1 == {'foo': 1, 'bar': 2} + assert dict2 == {'foo': 1} + + def test_IntersectDictDifferentType_ReturnDict2Value(self): + # Arrange + dict1 = {'foo': 1} + dict2 = {'foo': [1, 2, 3]} + + # Act + result = dict_utils.dict_intersect(dict1, dict2) + + # Assert + assert result == {'foo': [1, 2, 3]} + + def test_IntersectDictDifferentPrimitiveValue_ReturnDict2Value(self): + # Arrange + dict1 = {'foo': 1} + dict2 = {'foo': 2} + + # Act + result = dict_utils.dict_intersect(dict1, dict2) + + # Assert + assert result == {'foo': 2} + + def test_IntersectDictWithList_ReturnDeepIntersect(self): + # Arrange + dict1 = {'deep': [{'foo': 1}, {'bar': 2}]} + dict2 = {'deep': [{'foo': 1}]} + + # Act + result = dict_utils.dict_intersect(dict1, dict2) + + # Assert + assert result == {'deep': [{'foo': 1}]}