Skip to content
This repository has been archived by the owner on Feb 5, 2022. It is now read-only.

Replace calls with generators as a variadic args, when passing large data into the collection #7

Merged
merged 7 commits into from
Mar 22, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions sider/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .session import Session
from .types import Bulk, ByteString
from .transaction import query, manipulative
from . import utils


class Hash(collections.MutableMapping):
Expand Down Expand Up @@ -350,7 +351,12 @@ def update(self, mapping={}, **keywords):
mapping = mapping.items()
value = dict(mapping)
value.update(keywords)
self._raw_update(value, self.session.client)
pipe = self.session.client
if self.session.current_transaction is None:
pipe = pipe.pipeline()
self._raw_update(value, pipe)
if self.session.current_transaction is None:
pipe.execute()

def _raw_update(self, value, pipe, encoded=False):
items = getattr(value, 'iteritems', value.items)()
Expand All @@ -361,7 +367,9 @@ def _raw_update(self, value, pipe, encoded=False):
encode_value = self.value_type.encode
flatten = (val for k, v in items
for val in (encode_key(k), encode_value(v)))
pipe.execute_command('HMSET', self.key, *flatten)
n = 100 # FIXME: it is an arbitarary magic number.
for chunk in utils.chunk(flatten, n * 2):
pipe.execute_command('HMSET', self.key, *chunk)

def __repr__(self):
cls = type(self)
Expand Down
5 changes: 4 additions & 1 deletion sider/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .session import Session
from .transaction import manipulative, query
from .warnings import PerformanceWarning
from . import utils


class List(collections.MutableSequence):
Expand Down Expand Up @@ -271,7 +272,9 @@ def _raw_extend(self, iterable, pipe, encoded=False):
else:
if not encoded:
iterable = (encode(v) for v in iterable)
pipe.rpush(self.key, *iterable)
n = 100 # FIXME: it is an arbitarary magic number.
for chunk in utils.chunk(iterable, n):
pipe.rpush(self.key, *chunk)

def insert(self, index, value):
"""Inserts the ``value`` right after the offset ``index``.
Expand Down
7 changes: 5 additions & 2 deletions sider/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"""
from __future__ import absolute_import
import warnings
from redis.client import StrictRedis, Redis
from redis.client import StrictRedis, Redis, BasePipeline
from .threadlocal import LocalDict
from .types import Value, ByteString
from .transaction import Transaction
Expand Down Expand Up @@ -53,7 +53,10 @@ def server_version(self):
try:
info = self._server_info
except AttributeError:
info = self.client.info()
client = self.client
if isinstance(client, BasePipeline):
client = self.context_locals['original_client']
info = client.info()
self._server_info = info
return info['redis_version']

Expand Down
20 changes: 11 additions & 9 deletions sider/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .session import Session
from .types import Bulk, ByteString
from .transaction import manipulative, query
from . import utils


class Set(collections.MutableSet):
Expand Down Expand Up @@ -747,7 +748,7 @@ def update(self, *sets):
def block(trial, transaction):
pipe = self.session.client
if online_sets:
keys = (operand.key for operand in online_sets)
keys = [operand.key for operand in online_sets]
self.session.mark_manipulative()
pipe.sunionstore(self.key, self.key, *keys)
update = self._raw_update
Expand All @@ -758,14 +759,15 @@ def block(trial, transaction):
def _raw_update(self, members, pipe):
key = self.key
encode = self.value_type.encode
members = [encode(v) for v in members]
if members:
self.session.mark_manipulative()
if self.session.server_version_info < (2, 4, 0):
for member in members:
pipe.sadd(key, member)
else:
pipe.sadd(key, *members)
members = (encode(v) for v in members)
self.session.mark_manipulative()
if self.session.server_version_info < (2, 4, 0):
for member in members:
pipe.sadd(key, member)
else:
n = 100 # FIXME: it is an arbitarary magic number.
for chunk in utils.chunk(members, n):
pipe.sadd(key, *chunk)

def intersection_update(self, *sets):
"""Updates the set with the intersection of itself and
Expand Down
3 changes: 2 additions & 1 deletion sider/sortedset.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,8 @@ def block(trial, transaction):
zincrby(key, value=el, amount=score)
if online_sets:
keys = [set_.key for set_ in online_sets]
session.client.zunionstore(key, len(keys) + 1, key, *keys)
keys.insert(0, key)
session.client.zunionstore(key, keys)
session.transaction(block, [key], ignore_double=True)

def __repr__(self):
Expand Down
12 changes: 12 additions & 0 deletions sider/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import itertools


def chunk(iterable, n):
"""Splits an iterable into a list of length ``n``.

If the iterable is finite, The last element may be shorter than
the other chunks, depending on the length of the iterable.

"""
i = iter(iterable)
return iter(lambda: list(itertools.islice(i, n)), [])
8 changes: 8 additions & 0 deletions sidertests/test_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,14 @@ def test_update(session):
hashx.update(1234)


def test_massive_update(session):
huge_data = dict(('{0}'.format(i), chr(ord('a') + (i % 26)) * i)
for i in xrange(1235))
hash_ = session.get(key('test_hash_massive_update'), Hash)
hash_.update(huge_data)
assert dict(hash_) == huge_data


def test_repr(session):
keyid = key('test_hash_repr')
hash_ = session.set(keyid, {1: 2, 3: 4, 5: 6}, Hash(NInt, NInt))
Expand Down
2 changes: 2 additions & 0 deletions sidertests/test_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ def test_extend(session):
assert ['a', 'b', 'c', 'd', 'e'] == list(list_)
list_.extend(['fg', 'hi'])
assert ['a', 'b', 'c', 'd', 'e', 'fg', 'hi'] == list(list_)
list_.extend(str(i) for i in range(1, 4))
assert ['a', 'b', 'c', 'd', 'e', 'fg', 'hi', '1', '2', '3'] == list(list_)
with raises(TypeError):
list_.extend([object(), object()])
listx = session.set(key('test_listx_extend'), [1, 2], List(NInt))
Expand Down
68 changes: 67 additions & 1 deletion sidertests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,32 @@
from .env import NInt, get_client, key
from .env import session
from sider.session import Session
from sider.types import (Set as SetT, List as ListT, Integer)
from sider.types import (Set as SetT,
List as ListT,
Hash as HashT,
SortedSet as SortedSetT,
Integer)
from sider.set import Set
from sider.list import List
from sider.hash import Hash
from sider.sortedset import SortedSet


class CustomRedis(StrictRedis):
"""A custom subclass of StrictRedis for test."""


def ensure_encoding_error(excinfo):
"""Ensure that given error is raised from :meth:`~Bulk.encode()`

.. seealso:: <https://gist.github.com/Kroisse/5211709>

"""
assert 'argument after * must be a sequence' not in excinfo.value.message,\
'Ensure to not use an iterable object as a variadic arugments'
assert excinfo.traceback[-1].name == 'encode'


def test_warn_old_client():
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
Expand Down Expand Up @@ -51,6 +68,15 @@ def test_getset_set(session):
session.set(key('test_session_getset_set'), 1234, SetT)
with raises(TypeError):
session.set(key('test_session_getset_set'), 'abc', SetT)
with raises(TypeError) as excinfo:
session.set(key('test_session_getset_set'), set([1, 2, 3]), SetT)
ensure_encoding_error(excinfo)


def test_set_empty_set(session):
set_ = session.set(key('test_session_set_empty_set'), set(), SetT)
assert isinstance(set_, Set)
assert set(set_) == set()


def test_getset_list(session):
Expand All @@ -62,6 +88,46 @@ def test_getset_list(session):
assert list(lst) == ['a', 'b', 'c']
with raises(TypeError):
session.set(key('test_session_getset_list'), 1234, ListT)
with raises(TypeError) as excinfo:
session.set(key('test_session_getset_list'), [1, 2, 3], ListT)
ensure_encoding_error(excinfo)


def test_set_empty_list(session):
lst = session.set(key('test_session_set_empty_list'), [], ListT)
assert isinstance(lst, List)
assert list(lst) == []


def test_getset_hash(session):
hash_ = session.set(key('test_session_getset_hash'),
{'a': 'b', 'c': 'd'}, HashT)
assert isinstance(hash_, Hash)
assert dict(hash_) == {'a': 'b', 'c': 'd'}
hash_ = session.get(key('test_session_getset_hash'), HashT)
assert isinstance(hash_, Hash)
assert dict(hash_) == {'a': 'b', 'c': 'd'}
with raises(TypeError):
session.set(key('test_session_getset_hash'), 1234, HashT)
with raises(TypeError):
session.set(key('test_session_getset_hash'), 'abc', HashT)
with raises(TypeError) as excinfo:
session.set(key('test_session_getset_hash'),
{'a': 1, 'b': 2}, HashT)
ensure_encoding_error(excinfo)


def test_set_empty_hash(session):
hash_ = session.set(key('test_session_set_empty_hash'), {}, HashT)
assert isinstance(hash_, Hash)
assert dict(hash_) == {}


def test_set_empty_sortedset(session):
set_ = session.set(key('test_session_set_empty_sortedset'),
set(), SortedSetT)
assert isinstance(set_, SortedSet)
assert dict(set_) == {}


def test_version_info(session):
Expand Down
7 changes: 7 additions & 0 deletions sidertests/test_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,13 @@ def resetx():
set_ |= setx == S(['a', 'b', 'c', 1, 2, 3])


def test_massive_update(session):
huge_data = set('{0}'.format(i) for i in range(1010))
set_ = session.get(key('test_set_massive_update'), Set)
set_.update(huge_data)
assert set(set_) == huge_data


def test_update_t(session):
session2 = get_session()
keyid = key('test_set_update_t')
Expand Down
20 changes: 20 additions & 0 deletions sidertests/test_sortedset.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,26 @@ def reset2():
assert dict(setx) == {1: 1, 2: 3, 3: 4.1, 4: 6, 5: 1}


def test_massive_update(session):
huge_data = dict((chr(a) * i, (a - ord('a') + 1) * i)
for a in xrange(ord('a'), ord('z') + 1)
for i in xrange(1, 101))
set_ = session.get(key('test_sortedset_massive_update'), SortedSet)
set_.update(huge_data)
assert dict(set_) == huge_data
a_to_z = set('abcdefghijklmnopqrstuvwxyz')
set_.update(a_to_z)
for i in a_to_z:
huge_data[i] += 1
assert dict(set_) == huge_data
data = dict((chr(a), a) for a in xrange(ord('a'), ord('z') + 1))
setx = session.set(key('test_sortedsetx_massive_update'), data, SortedSet)
set_.update(setx)
for e, score in setx.items():
huge_data[e] += score
assert dict(set_) == huge_data


def test_repr(session):
keyid = key('test_sortedset_repr')
set_ = session.set(keyid, set([1, 2, 3]), IntSet)
Expand Down
28 changes: 28 additions & 0 deletions sidertests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import collections
from sider import utils


def test_chunk():
data_length = 1000
chunk_length = 7
data = range(data_length)
chunks = utils.chunk(data, chunk_length)
assert isinstance(chunks, collections.Iterable)
chunks = list(chunks)
for i, chunk in enumerate(chunks[:-1]):
assert len(chunk) == 7
assert chunk == data[i * chunk_length:(i + 1) * chunk_length]
assert len(chunks[-1]) == data_length % chunk_length
assert chunks[-1] == data[(i + 1) * chunk_length:]


def test_chunk_short_data():
chunks = utils.chunk('asdf', 5)
chunks = list(chunks)
assert len(chunks) == 1
assert chunks[0] == list('asdf')


def test_empty_chunk():
chunks = utils.chunk([], 10)
assert len(list(chunks)) == 0