Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis-py 3 #11

Merged
merged 2 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,34 @@ workflows:
version: 2
workflow:
jobs:
- test-2.7
- test-3.5
- test-3.6
- test-3.7
- test-3.8

defaults: &defaults
working_directory: ~/code
steps:
- checkout
- run:
name: Install dependencies
command: pip install --user -r requirements.txt
- run:
name: Test
command: python setup.py test
command: pytest tests.py

jobs:
test-2.7:
test-3.6:
<<: *defaults
docker:
- image: circleci/python:3.6
- image: redis:6.2.0
test-3.7:
<<: *defaults
docker:
- image: circleci/python:2.7
- image: redis:3
test-3.5:
- image: circleci/python:3.7
- image: redis:6.2.0
test-3.8:
<<: *defaults
docker:
- image: circleci/python:3.5
- image: redis:3
- image: circleci/python:3.8
- image: redis:6.2.0
86 changes: 49 additions & 37 deletions redis_hashring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

# Amount of points on the ring. Must not be higher than 2**32 because we're
# using CRC32 to compute the checksum.
RING_SIZE = 2**32
RING_SIZE = 2 ** 32

# Default amount of replicas per node
RING_REPLICAS = 16
Expand Down Expand Up @@ -70,14 +70,15 @@ def __init__(self, conn, key, n_replicas=RING_REPLICAS):
host = socket.gethostname()
pid = os.getpid()
# Create unique identifiers for the replicas
self.replicas = [(
random.randrange(2**32),
'{host}:{pid}:{rand}'.format(
host=host,
pid=pid,
rand=binascii.hexlify(os.urandom(4)).decode()
self.replicas = [
(
random.randrange(2 ** 32),
'{host}:{pid}:{rand}'.format(
host=host, pid=pid, rand=binascii.hexlify(os.urandom(4)).decode()
),
)
) for n in range(n_replicas)]
for n in range(n_replicas)
]

# List of tuples of ranges this node is responsible for, where a tuple
# (a, b) includes any N matching a <= N < b.
Expand Down Expand Up @@ -154,44 +155,52 @@ def debug_print(self):
hostname, pid, rnd = replica.split(':')
node = ':'.join([hostname, pid])

abs_size = (ring[(n+1) % n_replicas][0] - ring[n][0]) % RING_SIZE
size = 100. / RING_SIZE * abs_size
abs_size = (ring[(n + 1) % n_replicas][0] - ring[n][0]) % RING_SIZE
size = 100.0 / RING_SIZE * abs_size
delay = int(now - heartbeat)

nodes[node].append((hostname, pid, abs_size, delay, expired))

print('{start:10} {size:5.2f}% {delay:6}s {replica}{extra}'.format(
start=start,
replica=replica,
delay=delay,
size=size,
extra=' (EXPIRED)' if expired else ''
))
print(
'{start:10} {size:5.2f}% {delay:6}s {replica}{extra}'.format(
start=start,
replica=replica,
delay=delay,
size=size,
extra=' (EXPIRED)' if expired else '',
)
)

print()
print('Hash ring "{key}" nodes:'.format(key=self.key))

if nodes:
print('{:8} {:8} {:7} {:20} {:5}'.format('Range', 'Replicas', 'Delay', 'Hostname', 'PID'))
print(
'{:8} {:8} {:7} {:20} {:5}'.format(
'Range', 'Replicas', 'Delay', 'Hostname', 'PID'
)
)
else:
print('(no nodes)')

for k, v in nodes.items():
hostname, pid = v[0][0], v[0][1]
abs_size = sum(replica[2] for replica in v)
size = 100. / RING_SIZE * abs_size
size = 100.0 / RING_SIZE * abs_size
delay = max(replica[3] for replica in v)
expired = any(replica[4] for replica in v)
count = len(v)
print('{size:5.2f}% {count:8} {delay:6}s {hostname:20} {pid:5}{extra}'.format(
start=start,
count=count,
hostname=hostname,
pid=pid,
delay=delay,
size=size,
extra=' (EXPIRED)' if expired else ''
))
print(
'{size:5.2f}% {count:8} {delay:6}s {hostname:20} {pid:5}{extra}'.format(
start=start,
count=count,
hostname=hostname,
pid=pid,
delay=delay,
size=size,
extra=' (EXPIRED)' if expired else '',
)
)

def heartbeat(self):
"""
Expand All @@ -201,10 +210,12 @@ def heartbeat(self):
pipeline = self.conn.pipeline()
now = time.time()
for replica in self.replicas:
pipeline.zadd(self.key, '{start}:{name}'.format(
start=replica[0],
name=replica[1]
), now)
pipeline.zadd(
self.key,
{
'{start}:{name}'.format(start=replica[0], name=replica[1]): now,
},
)
ret = pipeline.execute()

# Only notify the other nodes if we're not in the ring yet.
Expand All @@ -217,10 +228,9 @@ def remove(self):
"""
pipeline = self.conn.pipeline()
for replica in self.replicas:
pipeline.zrem(self.key, '{start}:{name}'.format(
start=replica[0],
name=replica[1]
))
pipeline.zrem(
self.key, '{start}:{name}'.format(start=replica[0], name=replica[1])
)
pipeline.execute()
self._notify()

Expand Down Expand Up @@ -251,7 +261,7 @@ def update(self):
self.ranges = []
for n, (start, replica) in enumerate(ring):
if replica in replica_set:
end = ring[(n+1) % n_replicas][0] % RING_SIZE
end = ring[(n + 1) % n_replicas][0] % RING_SIZE
if start < end:
self.ranges.append((start, end))
elif end < start:
Expand Down Expand Up @@ -330,6 +340,7 @@ def gevent_start(self):
"""
import gevent
import gevent.select

self._poller_greenlet = gevent.spawn(self.poll)
self._select = gevent.select.select
self.heartbeat()
Expand All @@ -340,6 +351,7 @@ def gevent_stop(self):
Helper method to stop the node for gevent-based applications.
"""
import gevent

gevent.kill(self._poller_greenlet)
self.remove()
self._select = select.select
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
redis==3.5.3
pytest==6.2.2
18 changes: 11 additions & 7 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@

setup(
name='redis-hashring',
version='0.1.3',
version='0.2.0',
author='Close Engineering',
author_email='engineering@close.com',
url='http://github.com/closeio/redis-hashring',
license='MIT',
description='Python library for distributed applications using a Redis hash ring',
test_suite='tests',
tests_require=['redis==2.10.6'],
description=(
'Python library for distributed applications using a Redis hash ring'
),
install_requires=['redis>=3,<4'],
platforms='any',
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Topic :: Software Development :: Libraries :: Python Modules',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
],
packages=[
'redis_hashring',
Expand Down
56 changes: 30 additions & 26 deletions tests.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,43 @@
import pytest
from redis import Redis
import unittest
from redis_hashring import RingNode

TEST_KEY = 'hashring-test'

class HashRingTestCase(unittest.TestCase):
def setUp(self):
self.redis = Redis()
self.redis.delete(TEST_KEY)

def get_node(self, n_replicas, total_replicas):
node = RingNode(self.redis, TEST_KEY, n_replicas=n_replicas)
@pytest.fixture
def redis():
redis = Redis()
yield redis
redis.delete(TEST_KEY)

self.assertEqual(len(node.replicas), n_replicas)
self.assertEqual(self.redis.zcard(TEST_KEY), total_replicas-n_replicas)

node.heartbeat()
def get_node(redis, n_replicas, total_replicas):
node = RingNode(redis, TEST_KEY, n_replicas=n_replicas)

self.assertEqual(self.redis.zcard(TEST_KEY), total_replicas)
self.assertEqual(len(node.ranges), 0)
assert len(node.replicas) == n_replicas
assert redis.zcard(TEST_KEY) == total_replicas - n_replicas

return node
node.heartbeat()

def test_node(self):
node1 = self.get_node(1, 1)
node1.update()
self.assertEqual(len(node1.ranges), 1)
assert redis.zcard(TEST_KEY) == total_replicas
assert len(node.ranges) == 0

node2 = self.get_node(1, 2)
node1.update()
node2.update()
self.assertEqual(len(node1.ranges) + len(node2.ranges), 3)
return node

node3 = self.get_node(2, 4)
node1.update()
node2.update()
node3.update()
self.assertEqual(len(node1.ranges) + len(node2.ranges) + len(node3.ranges), 5)

def test_node(redis):
node1 = get_node(redis, 1, 1)
node1.update()
assert len(node1.ranges) == 1

node2 = get_node(redis, 1, 2)
node1.update()
node2.update()
assert len(node1.ranges) + len(node2.ranges) == 3

node3 = get_node(redis, 2, 4)
node1.update()
node2.update()
node3.update()
assert len(node1.ranges) + len(node2.ranges) + len(node3.ranges) == 5
6 changes: 0 additions & 6 deletions tox.ini

This file was deleted.