Skip to content

Commit

Permalink
Merge 7263f6d into c8941ac
Browse files Browse the repository at this point in the history
  • Loading branch information
Kudo committed Oct 28, 2015
2 parents c8941ac + 7263f6d commit a29488f
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 21 deletions.
28 changes: 7 additions & 21 deletions collection_filter/collection_filter.py
@@ -1,21 +1,7 @@
import copy


def _mergedict(dict1, dict2):
'''Merge two dictionaries and return the new dictionary
'''
def _merge_inner(inner_dict1, inner_dict2):
for key, value in inner_dict1.items():
if isinstance(value, dict):
# get node or create one
node = inner_dict2.setdefault(key, {})
_merge_inner(value, node)
else:
inner_dict2[key] = value
return inner_dict2

# Immutable dict2
return _merge_inner(dict1, copy.copy(dict2))
from .dict_utils import (
dict_union,
)


def _get_next_field(query):
Expand Down Expand Up @@ -124,12 +110,12 @@ def collection_filter(data, fields):
for field in fields.split(','):
# [2] For each dot notated sub field, do further query recursively
next_field, remain_query = _get_next_field(field)
subset = _inner_filter(copy.copy(data), next_field, remain_query)
subset = _inner_filter(copy.deepcopy(data), next_field, remain_query)
if data_as_list:
# [3-1] For list, set each element as merged dictionary
for idx in range(len(data)):
result[idx] = _mergedict(result[idx], subset[idx])
result[idx] = dict_union(result[idx], subset[idx])
else:
# [3-2] For dictionary, simply do merge
result = _mergedict(result, subset)
# [3-2] For dictionary, do union
result = dict_union(result, subset)
return result
57 changes: 57 additions & 0 deletions collection_filter/dict_utils.py
@@ -0,0 +1,57 @@
import copy


def dict_union(dict1, dict2):
'''Return the union of two dictionaries
'''
def _union_inner(inner_dict1, inner_dict2):
new_dict = copy.deepcopy(inner_dict1)
for key, value in inner_dict2.items():
if type(value) == dict:
node = new_dict.setdefault(key, {})
unioned_value = _union_inner(value, node)
elif type(value) == list:
if inner_dict1.get(key):
unioned_value = [_union_inner(inner_dict1[key][idx],
inner_dict2[key][idx])
for idx in range(len(value))]
else:
unioned_value = copy.copy(inner_dict2[key])
else:
unioned_value = new_dict.get(key) or value

new_dict[key] = unioned_value

return new_dict

# Immutable dict2
return _union_inner(dict1, dict2)


def dict_intersect(dict1, dict2):
'''Return the intersection of two dictionaries
'''
def _intersect_inner(inner_dict1, inner_dict2):
new_dict = {}
for key, value1 in inner_dict1.items():
value2 = inner_dict2.get(key)
if not value2:
continue
if type(value1) != type(value2):
intersected_value = value2
elif type(value1) == dict:
intersected_value = _intersect_inner(value1, value2)
elif type(value1) == list:
intersected_value = [_intersect_inner(value1[idx],
value2[idx])
for idx in
range(min(len(value1), len(value2)))]
else:
intersected_value = value2

new_dict[key] = intersected_value

return new_dict

# Immutable dict2
return _intersect_inner(dict1, dict2)
11 changes: 11 additions & 0 deletions tests/test_collection_filter.py
Expand Up @@ -97,6 +97,17 @@ def test_DataDictIncludeListWithFieldsDeepKeyQuery_ReturnSubsetData(self):
# Assert
assert result == {'aList': [{'elem1': 1}, {}, {}]}

def test_DataDictIncludeListWithTwoFieldsDeepKeyQuery_ReturnUnionSubsetData(self):
# Arrange
data = {'foo': 1, 'aList': [{'foo': 1, 'bar': 2, 'dontcare': 99}, {'foo': 3, 'bar': 4, 'dontcare': 100}]}
fields = 'aList[].foo,aList[].bar'

# Act
result = collection_filter(data, fields)

# Assert
assert result == {'aList': [{'foo': 1, 'bar': 2}, {'foo': 3, 'bar': 4}]}

def test_DataDictIncludeListComplexWithFieldsDeepKeyQuery_ReturnSubsetData(self):
# Arrange
data = {'foo': 1, 'aList': [{'elem1': {'foo': 1, 'bar': 2}}, {'elem2': {'foo': 'bar'}}, {'elem3': {'foo': 'bar'}}]}
Expand Down
164 changes: 164 additions & 0 deletions tests/test_dict_utils.py
@@ -0,0 +1,164 @@
# flake8: noqa for E501
import pytest

from collection_filter import dict_utils


class Test_DictUtils(object):

def test_UnionSimpleDict_ReturnUnionData(self):
# Arrange
dict1 = {'foo': 1}
dict2 = {'bar': 2}

# Act
result = dict_utils.dict_union(dict1, dict2)

# Assert
assert result == {'foo': 1, 'bar': 2}

def test_UnionDeepDict_ReturnUnionData(self):
# Arrange
dict1 = {'deep': {'foo': 1}}
dict2 = {'deep': {'bar': 2}}

# Act
result = dict_utils.dict_union(dict1, dict2)

# Assert
assert result == {'deep': {'foo': 1, 'bar': 2}}

def test_UnionDeepDictWithExtraDataInDict1_KeepExtraData(self):
# Arrange
dict1 = {'deep': {'foo': 1}, 'extra': True}
dict2 = {'deep': {'bar': 2}}

# Act
result = dict_utils.dict_union(dict1, dict2)

# Assert
assert result == {'deep': {'foo': 1, 'bar': 2}, 'extra': True}

def test_UnionDeepDictWithExtraDataInDict2_KeepExtraData(self):
# Arrange
dict1 = {'deep': {'foo': 1}}
dict2 = {'deep': {'bar': 2}, 'extra': True}

# Act
result = dict_utils.dict_union(dict1, dict2)

# Assert
assert result == {'deep': {'foo': 1, 'bar': 2}, 'extra': True}

def test_UnionDictImmutateDict_DictsAsOrigin(self):
# Arrange
dict1 = {'deep': {'foo': 1}}
dict2 = {'deep': {'bar': 2}}

# Act
result = dict_utils.dict_union(dict1, dict2)

# Assert
assert dict1 == {'deep': {'foo': 1}}
assert dict2 == {'deep': {'bar': 2}}

def test_UnionDictWithList_ReturnUnionData(self):
# Arrange
dict1 = {'deep': [{'foo': 1}, {'foo': 2}]}
dict2 = {'deep': [{'bar': 3}, {'bar': 4}]}

# Act
result = dict_utils.dict_union(dict1, dict2)

# Assert
assert result == {'deep': [{'foo': 1, 'bar': 3}, {'foo': 2, 'bar': 4}]}

def test_UnionDictListFromEmpty_ReturnUnionData(self):
# Arrange
dict1 = {}
dict2 = {'deep': [{'bar': 3}, {'bar': 4}]}

# Act
result = dict_utils.dict_union(dict1, dict2)

# Assert
assert result == {'deep': [{'bar': 3}, {'bar': 4}]}

def test_IntersectDictSimple_ReturnIntersectData(self):
# Arrange
dict1 = {'foo': 1, 'bar': 2}
dict2 = {'foo': 1}

# Act
result = dict_utils.dict_intersect(dict1, dict2)

# Assert
assert result == {'foo': 1}

def test_IntersectDictDeep_ReturnIntersectData(self):
# Arrange
dict1 = {'deep': {'foo': 1, 'bar': 2}}
dict2 = {'deep': {'foo': 1}}

# Act
result = dict_utils.dict_intersect(dict1, dict2)

# Assert
assert result == {'deep': {'foo': 1}}


def test_IntersectDictToEmpty_ReturnEmptyData(self):
# Arrange
dict1 = {'foo': 1, 'bar': 2}
dict2 = {}

# Act
result = dict_utils.dict_intersect(dict1, dict2)

# Assert
assert result == {}

def test_IntersectDictImmutable_ExpectOriginData(self):
# Arrange
dict1 = {'foo': 1, 'bar': 2}
dict2 = {'foo': 1}

# Act
result = dict_utils.dict_intersect(dict1, dict2)

# Assert
assert dict1 == {'foo': 1, 'bar': 2}
assert dict2 == {'foo': 1}

def test_IntersectDictDifferentType_ReturnDict2Value(self):
# Arrange
dict1 = {'foo': 1}
dict2 = {'foo': [1, 2, 3]}

# Act
result = dict_utils.dict_intersect(dict1, dict2)

# Assert
assert result == {'foo': [1, 2, 3]}

def test_IntersectDictDifferentPrimitiveValue_ReturnDict2Value(self):
# Arrange
dict1 = {'foo': 1}
dict2 = {'foo': 2}

# Act
result = dict_utils.dict_intersect(dict1, dict2)

# Assert
assert result == {'foo': 2}

def test_IntersectDictWithList_ReturnDeepIntersect(self):
# Arrange
dict1 = {'deep': [{'foo': 1}, {'bar': 2}]}
dict2 = {'deep': [{'foo': 1}]}

# Act
result = dict_utils.dict_intersect(dict1, dict2)

# Assert
assert result == {'deep': [{'foo': 1}]}

0 comments on commit a29488f

Please sign in to comment.