Skip to content

Commit

Permalink
v0.2.0: Drop redis-py 2 and introduce redis-py 3 (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmelos committed Mar 2, 2021
1 parent f570667 commit db3c0e3
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 85 deletions.
27 changes: 18 additions & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,34 @@ workflows:
version: 2
workflow:
jobs:
- test-2.7
- test-3.5
- test-3.6
- test-3.7
- test-3.8

defaults: &defaults
working_directory: ~/code
steps:
- checkout
- run:
name: Install dependencies
command: pip install --user -r requirements.txt
- run:
name: Test
command: python setup.py test
command: pytest tests.py

jobs:
test-2.7:
test-3.6:
<<: *defaults
docker:
- image: circleci/python:3.6
- image: redis:6.2.0
test-3.7:
<<: *defaults
docker:
- image: circleci/python:2.7
- image: redis:3
test-3.5:
- image: circleci/python:3.7
- image: redis:6.2.0
test-3.8:
<<: *defaults
docker:
- image: circleci/python:3.5
- image: redis:3
- image: circleci/python:3.8
- image: redis:6.2.0
86 changes: 49 additions & 37 deletions redis_hashring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

# Amount of points on the ring. Must not be higher than 2**32 because we're
# using CRC32 to compute the checksum.
RING_SIZE = 2**32
RING_SIZE = 2 ** 32

# Default amount of replicas per node
RING_REPLICAS = 16
Expand Down Expand Up @@ -70,14 +70,15 @@ def __init__(self, conn, key, n_replicas=RING_REPLICAS):
host = socket.gethostname()
pid = os.getpid()
# Create unique identifiers for the replicas
self.replicas = [(
random.randrange(2**32),
'{host}:{pid}:{rand}'.format(
host=host,
pid=pid,
rand=binascii.hexlify(os.urandom(4)).decode()
self.replicas = [
(
random.randrange(2 ** 32),
'{host}:{pid}:{rand}'.format(
host=host, pid=pid, rand=binascii.hexlify(os.urandom(4)).decode()
),
)
) for n in range(n_replicas)]
for n in range(n_replicas)
]

# List of tuples of ranges this node is responsible for, where a tuple
# (a, b) includes any N matching a <= N < b.
Expand Down Expand Up @@ -154,44 +155,52 @@ def debug_print(self):
hostname, pid, rnd = replica.split(':')
node = ':'.join([hostname, pid])

abs_size = (ring[(n+1) % n_replicas][0] - ring[n][0]) % RING_SIZE
size = 100. / RING_SIZE * abs_size
abs_size = (ring[(n + 1) % n_replicas][0] - ring[n][0]) % RING_SIZE
size = 100.0 / RING_SIZE * abs_size
delay = int(now - heartbeat)

nodes[node].append((hostname, pid, abs_size, delay, expired))

print('{start:10} {size:5.2f}% {delay:6}s {replica}{extra}'.format(
start=start,
replica=replica,
delay=delay,
size=size,
extra=' (EXPIRED)' if expired else ''
))
print(
'{start:10} {size:5.2f}% {delay:6}s {replica}{extra}'.format(
start=start,
replica=replica,
delay=delay,
size=size,
extra=' (EXPIRED)' if expired else '',
)
)

print()
print('Hash ring "{key}" nodes:'.format(key=self.key))

if nodes:
print('{:8} {:8} {:7} {:20} {:5}'.format('Range', 'Replicas', 'Delay', 'Hostname', 'PID'))
print(
'{:8} {:8} {:7} {:20} {:5}'.format(
'Range', 'Replicas', 'Delay', 'Hostname', 'PID'
)
)
else:
print('(no nodes)')

for k, v in nodes.items():
hostname, pid = v[0][0], v[0][1]
abs_size = sum(replica[2] for replica in v)
size = 100. / RING_SIZE * abs_size
size = 100.0 / RING_SIZE * abs_size
delay = max(replica[3] for replica in v)
expired = any(replica[4] for replica in v)
count = len(v)
print('{size:5.2f}% {count:8} {delay:6}s {hostname:20} {pid:5}{extra}'.format(
start=start,
count=count,
hostname=hostname,
pid=pid,
delay=delay,
size=size,
extra=' (EXPIRED)' if expired else ''
))
print(
'{size:5.2f}% {count:8} {delay:6}s {hostname:20} {pid:5}{extra}'.format(
start=start,
count=count,
hostname=hostname,
pid=pid,
delay=delay,
size=size,
extra=' (EXPIRED)' if expired else '',
)
)

def heartbeat(self):
"""
Expand All @@ -201,10 +210,12 @@ def heartbeat(self):
pipeline = self.conn.pipeline()
now = time.time()
for replica in self.replicas:
pipeline.zadd(self.key, '{start}:{name}'.format(
start=replica[0],
name=replica[1]
), now)
pipeline.zadd(
self.key,
{
'{start}:{name}'.format(start=replica[0], name=replica[1]): now,
},
)
ret = pipeline.execute()

# Only notify the other nodes if we're not in the ring yet.
Expand All @@ -217,10 +228,9 @@ def remove(self):
"""
pipeline = self.conn.pipeline()
for replica in self.replicas:
pipeline.zrem(self.key, '{start}:{name}'.format(
start=replica[0],
name=replica[1]
))
pipeline.zrem(
self.key, '{start}:{name}'.format(start=replica[0], name=replica[1])
)
pipeline.execute()
self._notify()

Expand Down Expand Up @@ -251,7 +261,7 @@ def update(self):
self.ranges = []
for n, (start, replica) in enumerate(ring):
if replica in replica_set:
end = ring[(n+1) % n_replicas][0] % RING_SIZE
end = ring[(n + 1) % n_replicas][0] % RING_SIZE
if start < end:
self.ranges.append((start, end))
elif end < start:
Expand Down Expand Up @@ -330,6 +340,7 @@ def gevent_start(self):
"""
import gevent
import gevent.select

self._poller_greenlet = gevent.spawn(self.poll)
self._select = gevent.select.select
self.heartbeat()
Expand All @@ -340,6 +351,7 @@ def gevent_stop(self):
Helper method to stop the node for gevent-based applications.
"""
import gevent

gevent.kill(self._poller_greenlet)
self.remove()
self._select = select.select
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
redis==3.5.3
pytest==6.2.2
18 changes: 11 additions & 7 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@

setup(
name='redis-hashring',
version='0.1.3',
version='0.2.0',
author='Close Engineering',
author_email='engineering@close.com',
url='http://github.com/closeio/redis-hashring',
license='MIT',
description='Python library for distributed applications using a Redis hash ring',
test_suite='tests',
tests_require=['redis==2.10.6'],
description=(
'Python library for distributed applications using a Redis hash ring'
),
install_requires=['redis>=3,<4'],
platforms='any',
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Topic :: Software Development :: Libraries :: Python Modules',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
],
packages=[
'redis_hashring',
Expand Down
56 changes: 30 additions & 26 deletions tests.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,43 @@
import pytest
from redis import Redis
import unittest
from redis_hashring import RingNode

TEST_KEY = 'hashring-test'

class HashRingTestCase(unittest.TestCase):
def setUp(self):
self.redis = Redis()
self.redis.delete(TEST_KEY)

def get_node(self, n_replicas, total_replicas):
node = RingNode(self.redis, TEST_KEY, n_replicas=n_replicas)
@pytest.fixture
def redis():
redis = Redis()
yield redis
redis.delete(TEST_KEY)

self.assertEqual(len(node.replicas), n_replicas)
self.assertEqual(self.redis.zcard(TEST_KEY), total_replicas-n_replicas)

node.heartbeat()
def get_node(redis, n_replicas, total_replicas):
node = RingNode(redis, TEST_KEY, n_replicas=n_replicas)

self.assertEqual(self.redis.zcard(TEST_KEY), total_replicas)
self.assertEqual(len(node.ranges), 0)
assert len(node.replicas) == n_replicas
assert redis.zcard(TEST_KEY) == total_replicas - n_replicas

return node
node.heartbeat()

def test_node(self):
node1 = self.get_node(1, 1)
node1.update()
self.assertEqual(len(node1.ranges), 1)
assert redis.zcard(TEST_KEY) == total_replicas
assert len(node.ranges) == 0

node2 = self.get_node(1, 2)
node1.update()
node2.update()
self.assertEqual(len(node1.ranges) + len(node2.ranges), 3)
return node

node3 = self.get_node(2, 4)
node1.update()
node2.update()
node3.update()
self.assertEqual(len(node1.ranges) + len(node2.ranges) + len(node3.ranges), 5)

def test_node(redis):
node1 = get_node(redis, 1, 1)
node1.update()
assert len(node1.ranges) == 1

node2 = get_node(redis, 1, 2)
node1.update()
node2.update()
assert len(node1.ranges) + len(node2.ranges) == 3

node3 = get_node(redis, 2, 4)
node1.update()
node2.update()
node3.update()
assert len(node1.ranges) + len(node2.ranges) + len(node3.ranges) == 5
6 changes: 0 additions & 6 deletions tox.ini

This file was deleted.

0 comments on commit db3c0e3

Please sign in to comment.