diff --git a/Makefile b/Makefile index a938359..efe77b4 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ init: test: rm -f .coverage vdf/*.pyc tests/*.pyc - PYTHONHASHSEED=0 nosetests --verbosity 2 --with-coverage --cover-package=vdf + PYTHONHASHSEED=0 nosetests --verbosity 1 --with-coverage --cover-package=vdf pylint: pylint -r n -f colorized vdf || true diff --git a/tests/test_vdf_dict.py b/tests/test_vdf_dict.py index 16c0c36..72b483a 100644 --- a/tests/test_vdf_dict.py +++ b/tests/test_vdf_dict.py @@ -2,150 +2,276 @@ from vdf import VDFDict -class DuplicateOrderedDict_test(unittest.TestCase): - map_test = ( +class VDFDictCase(unittest.TestCase): + def test_init(self): + with self.assertRaises(ValueError): + VDFDict("asd zxc") + with self.assertRaises(ValueError): + VDFDict(5) + with self.assertRaises(ValueError): + VDFDict((('1',1), ('2', 2))) + + def test_repr(self): + self.assertIsInstance(repr(VDFDict()), str) + + def test_len(self): + self.assertEqual(len(VDFDict()), 0) + self.assertEqual(len(VDFDict({'1':1})), 1) + + def test_verify_key_tuple(self): + a = VDFDict() + with self.assertRaises(ValueError): + a._verify_key_tuple([]) + with self.assertRaises(ValueError): + a._verify_key_tuple((1,)) + with self.assertRaises(ValueError): + a._verify_key_tuple((1,1,1)) + with self.assertRaises(TypeError): + a._verify_key_tuple((None, 'asd')) + with self.assertRaises(TypeError): + a._verify_key_tuple(('1', 'asd')) + with self.assertRaises(TypeError): + a._verify_key_tuple((1, 1)) + with self.assertRaises(TypeError): + a._verify_key_tuple((1, None)) + + def test_normalize_key(self): + a = VDFDict() + self.assertEqual(a._normalize_key('AAA'), (0, 'AAA')) + self.assertEqual(a._normalize_key((5, 'BBB')), (5, 'BBB')) + + def test_normalize_key_exception(self): + a = VDFDict() + with self.assertRaises(TypeError): + a._normalize_key(5) + with self.assertRaises(TypeError): + a._normalize_key([]) + with self.assertRaises(TypeError): + a._normalize_key(None) + + def test_setitem(self): + a = list(zip(map(str, range(5, 0, -1)), range(50, 0, -10))) + b = VDFDict() + for k,v in a: + b[k] = v + self.assertEqual(a, list(b.items())) + + def test_setitem_with_duplicates(self): + a = list(zip(['5']*5, range(50, 0, -10))) + b = VDFDict() + for k,v in a: + b[k] = v + self.assertEqual(a, list(b.items())) + + def test_setitem_key_exceptions(self): + with self.assertRaises(TypeError): + VDFDict()[5] = None + with self.assertRaises(TypeError): + VDFDict()[(0, 5)] = None + with self.assertRaises(ValueError): + VDFDict()[(0, '5', 1)] = None + + def test_setitem_key_valid_types(self): + VDFDict()['5'] = None + VDFDict({'5': None})[(0, '5')] = None + + def test_setitem_keyerror_fullkey(self): + with self.assertRaises(KeyError): + VDFDict([("1", None)])[(1, "1")] = None + + def test_getitem(self): + a = VDFDict([('1',2), ('1',3)]) + self.assertEqual(a['1'], 2) + self.assertEqual(a[(0, '1')], 2) + self.assertEqual(a[(1, '1')], 3) + + def test_del(self): + a = VDFDict([("1",1),("1",2),("5",51),("1",3),("5",52)]) + b = [("1",1),("1",2),("1",3),("5",52)] + del a["5"] + self.assertEqual(list(a.items()), b) + + def test_del_by_fullkey(self): + a = VDFDict([("1",1),("1",2),("5",51),("1",3),("5",52)]) + b = [("1",1),("1",2),("1",3),("5",52)] + del a[(0, "5")] + self.assertEqual(list(a.items()), b) + + def test_del_first_duplicate(self): + a = [("1",1),("1",2),("1",3),("1",4)] + b = VDFDict(a) + + del b["1"] + del b["1"] + del b[(0, "1")] + del b[(0, "1")] + + self.assertEqual(len(b), 0) + + def test_del_exception(self): + with self.assertRaises(KeyError): + a = VDFDict() + del a["1"] + with self.assertRaises(KeyError): + a = VDFDict({'1':1}) + del a[(1, "1")] + + def test_iter(self): + a = VDFDict({"1": 1}) + iter(a).__iter__ + self.assertEqual(len(list(iter(a))), 1) + + def test_in(self): + a = VDFDict({"1":2, "3":4, "5":6}) + self.assertTrue('1' in a) + self.assertTrue((0, '1') in a) + self.assertFalse('6' in a) + self.assertFalse((1, '1') in a) + + def test_eq(self): + self.assertEqual(VDFDict(), VDFDict()) + self.assertNotEqual(VDFDict(), VDFDict({'1':1})) + self.assertNotEqual(VDFDict(), {'1':1}) + a = [("a", 1), ("b", 5), ("a", 11)] + self.assertEqual(VDFDict(a), VDFDict(a)) + self.assertNotEqual(VDFDict(a), VDFDict(a[1:])) + + def test_clear(self): + a = VDFDict([("1",2),("1",2),("5",3),("1",2)]) + a.clear() + self.assertEqual(len(a), 0) + self.assertEqual(len(a.keys()), 0) + self.assertEqual(len(list(a.iterkeys())), 0) + self.assertEqual(len(a.values()), 0) + self.assertEqual(len(list(a.itervalues())), 0) + self.assertEqual(len(a.items()), 0) + self.assertEqual(len(list(a.iteritems())), 0) + + def test_get(self): + a = VDFDict([('1',11), ('1',22)]) + self.assertEqual(a.get('1'), 11) + self.assertEqual(a.get((1, '1')), 22) + self.assertEqual(a.get('2', 33), 33) + self.assertEqual(a.get((0, '2'), 44), 44) + + def test_setdefault(self): + a = VDFDict([('1',11), ('1',22)]) + self.assertEqual(a.setdefault('1'), 11) + self.assertEqual(a.setdefault((0, '1')), 11) + self.assertEqual(a.setdefault('2'), None) + self.assertEqual(a.setdefault((0, '2')), None) + self.assertEqual(a.setdefault('3', 33), 33) + + def test_pop(self): + a = VDFDict([('1',11),('2',22),('1',33),('2',44),('2',55)]) + self.assertEqual(a.pop('1'), 11) + self.assertEqual(a.pop('1'), 33) + with self.assertRaises(KeyError): + a.pop('1') + self.assertEqual(a.pop((1, '2')), 44) + self.assertEqual(a.pop((1, '2')), 55) + + def test_popitem(self): + a = [('1',11),('2',22),('1',33)] + b = VDFDict(a) + self.assertEqual(b.popitem(), a.pop()) + self.assertEqual(b.popitem(), a.pop()) + self.assertEqual(b.popitem(), a.pop()) + with self.assertRaises(KeyError): + b.popitem() + + def test_update(self): + a = VDFDict([("1",2),("1",2),("5",3),("1",2)]) + b = VDFDict() + b.update([("1",2),("1",2)]) + b.update([("5",3),("1",2)]) + self.assertEqual(list(a.items()), list(b.items())) + + def test_update_exceptions(self): + a = VDFDict() + with self.assertRaises(TypeError): + a.update(None) + with self.assertRaises(TypeError): + a.update(1) + with self.assertRaises(TypeError): + a.update("asd zxc") + with self.assertRaises(ValueError): + a.update([(1,1,1), (2,2,2)]) + + map_test = [ ("1", 2), ("4", 3),("4", 3),("4", 2), ("7", 2), ("1", 2), - ) + ] def test_keys(self): _dict = VDFDict(self.map_test) self.assertSequenceEqual( - tuple(_dict.keys()), - tuple(x[0] for x in self.map_test)) - + list(_dict.keys()), + list(x[0] for x in self.map_test)) + def test_values(self): _dict = VDFDict(self.map_test) self.assertSequenceEqual( - tuple(_dict.values()), - tuple(x[1] for x in self.map_test)) - + list(_dict.values()), + list(x[1] for x in self.map_test)) + def test_items(self): _dict = VDFDict(self.map_test) self.assertSequenceEqual( - tuple(_dict.items()), + list(_dict.items()), self.map_test) - - def test_in(self): - a = VDFDict({"1":2, "3":4, "5":6}) - self.assertTrue('1' in a) - self.assertFalse('6' in a) - - def test_direct_access_set(self): - a = {"1":2, "3":4, "5":6} - b = VDFDict() - for k,v in a.items(): - b[k] = v - self.assertDictEqual(a, b) - + def test_direct_access_get(self): b = dict() a = VDFDict({"1":2, "3":4, "5":6}) for k,v in a.items(): b[k] = v - self.assertDictEqual(a, b) - + self.assertEqual(dict(a.items()), b) + def test_duplicate_keys(self): - items = (('key1', 1), ('key1', 2), ('key3', 3), ('key1', 1)) - keys = tuple(x[0] for x in items) - values = tuple(x[1] for x in items) - _dict = VDFDict((('key1', 1), ('key1', 2), ('key3', 3), ('key1', 1))) - self.assertSequenceEqual(tuple(_dict.items()), items) - self.assertSequenceEqual(tuple(_dict.keys()), keys) - self.assertSequenceEqual(tuple(_dict.values()), values) - - def test_update(self): - a = VDFDict((("1",2),("1",2),("5",3),("1",2))) - b = VDFDict() - b.update((("1",2),("1",2))) - b.update((("5",3),("1",2))) - self.assertSequenceEqual(tuple(a.items()), tuple(b.items())) - - def test_update_2(self): + items = [('key1', 1), ('key1', 2), ('key3', 3), ('key1', 1)] + keys = [x[0] for x in items] + values = [x[1] for x in items] + _dict = VDFDict(items) + self.assertEqual(list(_dict.items()), items) + self.assertEqual(list(_dict.keys()), keys) + self.assertEqual(list(_dict.values()), values) + + def test_same_type_init(self): self.assertSequenceEqual( tuple(VDFDict(self.map_test).items()), tuple(VDFDict(VDFDict(self.map_test)).items())) - - def test_del(self): - """ Tests del """ - a = VDFDict((("1",2),("1",2),("5",3),("1",2))) - b = VDFDict((("1",2),("1",2),("1",2))) - del a["5"] - self.assertSequenceEqual(tuple(a.items()), tuple(b.items())) - - def test_remove_all(self): - a = VDFDict((("1",2),("1",2),("5",3),("1",2))) - b = VDFDict((("5",3),)) - a.remove_all_by_key("1") - self.assertSequenceEqual(tuple(a.items()), tuple(b.items())) - - def test_clear(self): - a = VDFDict((("1",2),("1",2),("5",3),("1",2))) - a.clear() - self.assertEqual(len(a), 0) - - def test_get_all(self): - a = VDFDict((("1",2),("1",2**31),("5",3),("1",2))) - self.assertSequenceEqual( - tuple(a.get_all_by_key("1")), - (2,2**31,2)) - - def test_get(self): - a = VDFDict({'key': 'foo'}) - self.assertEqual(a.get("key"), a["key"]) - def test_repr(self): - a = VDFDict(self.map_test) + def test_get_all_for(self): + a = VDFDict([("1",2),("1",2**31),("5",3),("1",2)]) self.assertEqual( - repr(a), - "VDFDict(%s)" % repr(self.map_test) + list(a.get_all_for("1")), + [2,2**31,2], ) - - def test_exception_insert(self): - """ Only strings (and tuples) are supported as keys """ + def test_get_all_for_invalid_key(self): a = VDFDict() - self.assertRaises(TypeError, a.__setitem__, 5, "foo") + with self.assertRaises(TypeError): + a.get_all_for(None) + with self.assertRaises(TypeError): + a.get_all_for(5) + with self.assertRaises(TypeError): + a.get_all_for((0, '5')) - def test_exception_remove_all(self): - """ Only strings are supported as keys """ - a = VDFDict() - self.assertRaises(TypeError, a.remove_all_by_key, 5) - - def test_exception_get_all(self): - """ Only strings are supported as keys """ - a = VDFDict((("1",2),("1",2**31),("5",3),("1",2))) - self.assertRaises(TypeError, a.get_all_by_key, 5) - - def test_exception_del(self): - a = VDFDict((("1",2),("1",2**31),("5",3),("1",2))) - self.assertRaises(KeyError, a.__delitem__, "7") - - def test_exception_update_1(self): - a = VDFDict((("1",2),("1",2**31),("5",3),("1",2))) - self.assertRaises(TypeError, a.update, 7) - - def test_exception_update_2(self): - a = VDFDict((("1",2),("1",2**31),("5",3),("1",2))) - class foo(): - def items(self): - return None - self.assertRaises(TypeError, a.update, foo()) - - def test_exception_update_3(self): - a = VDFDict((("1",2),("1",2**31),("5",3),("1",2))) - self.assertRaises(TypeError, a.update, range(10)) - - def test_exception_update_4(self): - a = VDFDict((("1",2),("1",2**31),("5",3),("1",2))) - class foo(): - def items(self): - return ((1,2,3),(4,)) - self.assertRaises(TypeError, a.update, foo()) - - def test_exception_set_item(self): + def test_remove_all_for(self): + a = VDFDict([("1",2),("1",2),("5",3),("1",2)]) + a.remove_all_for("1") + self.assertEqual(list(a.items()), [("5",3)]) + self.assertEqual(len(a), 1) + + def test_remove_all_for_invalid_key(self): a = VDFDict() - self.assertRaises(KeyError, a.__setitem__, (7, "key"), "value") + with self.assertRaises(TypeError): + a.remove_all_for(None) + with self.assertRaises(TypeError): + a.remove_all_for(5) + with self.assertRaises(TypeError): + a.remove_all_for((0, '5')) diff --git a/vdf/vdict.py b/vdf/vdict.py index fb94db3..e9de546 100644 --- a/vdf/vdict.py +++ b/vdf/vdict.py @@ -1,142 +1,200 @@ -import sys -from collections import Iterable +import sys +from collections import Counter if sys.version_info[0] >= 3: + _range = range string_type = str - _iter_helper = lambda x: x + import collections as _c + class _kView(_c.KeysView): + def __iter__(self): + return self._mapping.iterkeys() + class _vView(_c.ValuesView): + def __iter__(self): + return self._mapping.itervalues() + class _iView(_c.ItemsView): + def __iter__(self): + return self._mapping.iteritems() else: + _range = xrange string_type = basestring - _iter_helper = tuple - + _kView = lambda x: list(x.iterkeys()) + _vView = lambda x: list(x.itervalues()) + _iView = lambda x: list(x.iteritems()) + class VDFDict(dict): - def __init__(self, values=None): + def __init__(self, data=None): """ - A dictionary implmentation which allows duplicate keys and contains the insert order. - - - ``values`` can be used to initialize this `DuplicateOrderedDict` instance.. - Dict like objects and iterables containing iterables of the length 2 are supported. + This is a dictionary that supports duplicate keys and preserves insert order + + ``data`` can be a ``dict``, or a sequence of key-value tuples. (e.g. ``[('key', 'value'),..]``) + The only supported type for key is str. + + Get/set duplicates is done by tuples ``(index, key)``, where index is the duplicate index + for the specified key. (e.g. ``(0, 'key')``, ``(1, 'key')``...) + + When the ``key`` is ``str``, instead of tuple, set will create a duplicate and get will look up ``(0, key)`` """ self.__omap = [] - if not values is None: - self.update(values) - + self.__kcount = Counter() + + if data is not None: + if not isinstance(data, (list, dict)): + raise ValueError("Expected data to be list of pairs or dict, got %s" % type(data)) + self.update(data) + def __repr__(self): out = "%s(" % self.__class__.__name__ - out += "%s)" % repr(tuple(self.items())) + out += "%s)" % repr(list(self.iteritems())) return out - + def __len__(self): return len(self.__omap) - - def __setitem__(self, key, value): - if not isinstance(key, tuple): - idx = 0 - while True: - if (idx, key) not in self: - self.__omap.append((idx, key)) - break - else: - idx += 1 - key = (idx, key) + + def _verify_key_tuple(self, key): + if len(key) != 2: + raise ValueError("Expected key tuple length to be 2, got %d" % len(key)) + if not isinstance(key[0], int): + raise TypeError("Key index should be an int") + if not isinstance(key[1], string_type): + raise TypeError("Key value should be a str") + + def _normalize_key(self, key): + if isinstance(key, string_type): + key = (0, key) + elif isinstance(key, tuple): + self._verify_key_tuple(key) else: + raise TypeError("Expected key to be a str or tuple, got %s" % type(key)) + return key + + def __setitem__(self, key, value): + if isinstance(key, string_type): + key = (self.__kcount[key], key) + self.__omap.append(key) + elif isinstance(key, tuple): + self._verify_key_tuple(key) if key not in self: - raise KeyError("%s doesn\'t exist" % repr(key)) - if not isinstance(key[1], string_type): - raise TypeError("The key need to be a string") + raise KeyError("%s doesn't exist" % repr(key)) + else: + raise TypeError("Expected either a str or tuple for key") super(VDFDict, self).__setitem__(key, value) - + self.__kcount[key[1]] += 1 + def __getitem__(self, key): - if not isinstance(key, tuple): - key = (0, key) - return super(VDFDict, self).__getitem__(key) - + return super(VDFDict, self).__getitem__(self._normalize_key(key)) + def __delitem__(self, key): - if not isinstance(key, tuple): - key = (0, key) - try: - self.__omap.remove(key) - except ValueError: - raise KeyError(key) - return dict.__delitem__(self, key) - + key = self._normalize_key(key) + result = super(VDFDict, self).__delitem__(key) + + start_idx = self.__omap.index(key) + del self.__omap[start_idx] + + dup_idx, skey = key + self.__kcount[skey] -= 1 + tail_count = self.__kcount[skey] - dup_idx + + if tail_count > 0: + for idx in _range(start_idx, len(self.__omap)): + if self.__omap[idx][1] == skey: + oldkey = self.__omap[idx] + newkey = (dup_idx, skey) + super(VDFDict, self).__setitem__(newkey, self[oldkey]) + super(VDFDict, self).__delitem__(oldkey) + self.__omap[idx] = newkey + + dup_idx += 1 + tail_count -= 1 + if tail_count == 0: + break + + if self.__kcount[skey] == 0: + del self.__kcount[skey] + + return result + def __iter__(self): - return iter(self.keys()) - - def __contains__(self, item): - if isinstance(item, tuple): - return dict.__contains__(self, item) - return dict.__contains__(self, (0, item)) - + return iter(self.iterkeys()) + + def __contains__(self, key): + return super(VDFDict, self).__contains__(self._normalize_key(key)) + def __eq__(self, other): - """ - This only returns true if the k,v pairs of `other` - are returned in the same order. - """ - if isinstance(other, dict): - other = tuple(other.items()) - return other == tuple(self.items()) - + if isinstance(other, VDFDict): + return list(self.items()) == list(other.items()) + else: + return False + def __ne__(self, other): - return not self.__eq__(other) - + return not self.__eq__(other) + def clear(self): - dict.clear(self) + super(VDFDict, self).clear() + self.__kcount.clear() self.__omap = list() - - def get(self, key, default=None): - if not isinstance(key, tuple): - key = (0, key) - return dict.get(self, key, default) - - def iteritems(self): - return ((key[1], self[key]) for key in self.__omap) - - def items(self): - return _iter_helper(self.iteritems()) - + + def get(self, key, *args): + return super(VDFDict, self).get(self._normalize_key(key), *args) + + def setdefault(self, key, default=None): + if key not in self: + self.__setitem__(key, default) + return self.__getitem__(key) + + def pop(self, key): + key = self._normalize_key(key) + value = self.__getitem__(key) + self.__delitem__(key) + return value + + def popitem(self): + if not self.__omap: + raise KeyError("VDFDict is empty") + key = self.__omap[-1] + return key[1], self.pop(key) + + def update(self, data=None, **kwargs): + if isinstance(data, dict): + data = data.items() + elif not isinstance(data, list): + raise TypeError("Expected data to be a list or dict, got %s" % type(data)) + + for key, value in data: + self.__setitem__(key, value) + def iterkeys(self): return (key[1] for key in self.__omap) - + def keys(self): - return _iter_helper(self.iterkeys()) - - def itervalues(self): + return _kView(self) + + def itervalues(self): return (self[key] for key in self.__omap) - + def values(self): - return _iter_helper(self.itervalues()) + return _vView(self) - def update(self, data=None, **kwargs): - if not data is None: - if hasattr(data, 'items'): - data = data.items() - if not isinstance(data, Iterable): - raise TypeError('Argument or its items method need to provide an iterable.') - for kv in data: - if not hasattr(kv, '__len__') or len(kv) != 2: - raise TypeError('Argument, or its keys method need to provide iterables of the length 2.') - self[kv[0]] = kv[1] - if len(kwargs) > 0: - self.update(kwargs) - - def get_all_by_key(self, key): - """ Returns all values of the given key as a generator """ + def iteritems(self): + return ((key[1], self[key]) for key in self.__omap) + + def items(self): + return _iView(self) + + def get_all_for(self, key): + """ Returns all values of the given key """ if not isinstance(key, string_type): - raise TypeError("Key need to be a string.") - return (self[d] for d in self.__omap if d[1] == key) - - def remove_all_by_key(self, key): + raise TypeError("Key needs to be a string.") + return [self[(idx, key)] for idx in _range(self.__kcount[key])] + + def remove_all_for(self, key): """ Removes all items with the given key """ if not isinstance(key, string_type): raise TypeError("Key need to be a string.") - to_del = list() - for d in self.__omap: - if d[1] == key: - to_del.append(d) - for d in to_del: - del self[d] - - - \ No newline at end of file + + for idx in _range(self.__kcount[key]): + super(VDFDict, self).__delitem__((idx, key)) + + self.__omap = list(filter(lambda x: x[1] != key, self.__omap)) + + del self.__kcount[key]