Skip to content

Commit

Permalink
Use custom encoding for RingData, not pickle.
Browse files Browse the repository at this point in the history
Serialize RingData in a versioned, custom format which is a combination
of a JSON-encoded header and .tostring() dumps of the
replica2part2dev_id arrays.  This format deserializes hundreds of times
faster than rings serialized with Python 2.7's pickle (a significant
performance regression for ring loading between Python 2.6 and Python
2.7).  Fixes bug 1031954.

swift.common.ring.ring.RingData is now responsible for serialization and
deserialization of its data via a new load() class method and save()
object method.  The new implementation is backward-compatible; if a ring
does not begin with a new-style magic string, it is assumed to be an
old-style pickle-dumped ring and is handled as before.  So new Swift
code can read old rings, but old Swift code will not be able to read
newly-serialized rings. THIS SHOULD BE MENTIONED PROMINENTLY IN THE
RELEASE NOTES.

I didn't want to bite of more than necessary, so I didn't mess with
builder file serialization.

Change-Id: I799b9a4c894d54fb16592443904ac055b2638e2d
  • Loading branch information
dbishop committed Aug 5, 2012
1 parent ef3f8bb commit f8ce43a
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 54 deletions.
15 changes: 6 additions & 9 deletions bin/swift-ring-builder
Expand Up @@ -610,13 +610,11 @@ swift-ring-builder <builder_file> rebalance
print '-' * 79
status = EXIT_WARNING
ts = time()
pickle.dump(builder.get_ring().to_dict(),
GzipFile(pathjoin(backup_dir, '%d.' % ts +
basename(ring_file)), 'wb'), protocol=2)
builder.get_ring().save(
pathjoin(backup_dir, '%d.' % ts + basename(ring_file)))
pickle.dump(builder.to_dict(), open(pathjoin(backup_dir,
'%d.' % ts + basename(argv[1])), 'wb'), protocol=2)
pickle.dump(builder.get_ring().to_dict(), GzipFile(ring_file, 'wb'),
protocol=2)
builder.get_ring().save(ring_file)
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
exit(status)

Expand Down Expand Up @@ -644,10 +642,9 @@ swift-ring-builder <builder_file> write_ring
'"rebalance"?'
else:
print 'Warning: Writing an empty ring'
pickle.dump(ring_data.to_dict(),
GzipFile(pathjoin(backup_dir, '%d.' % time() +
basename(ring_file)), 'wb'), protocol=2)
pickle.dump(ring_data.to_dict(), GzipFile(ring_file, 'wb'), protocol=2)
ring_data.save(
pathjoin(backup_dir, '%d.' % time() + basename(ring_file)))
ring_data.save(ring_file)
exit(EXIT_SUCCESS)

def pretend_min_part_hours_passed():
Expand Down
6 changes: 3 additions & 3 deletions swift/common/ring/builder.py
Expand Up @@ -29,9 +29,9 @@

class RingBuilder(object):
"""
Used to build swift.common.RingData instances to be written to disk and
used with swift.common.ring.Ring instances. See bin/ring-builder.py for
example usage.
Used to build swift.common.ring.RingData instances to be written to disk
and used with swift.common.ring.Ring instances. See bin/swift-ring-builder
for example usage.
The instance variable devs_changed indicates if the device information has
changed since the last balancing. This can be used by tools to know whether
Expand Down
97 changes: 76 additions & 21 deletions swift/common/ring/ring.py
Expand Up @@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import array
import cPickle as pickle
from collections import defaultdict
from gzip import GzipFile
from os.path import getmtime
from struct import unpack_from
import json
from os.path import getmtime, join as pathjoin
import struct
from time import time
import os
from io import BufferedReader
Expand All @@ -34,6 +36,70 @@ def __init__(self, replica2part2dev_id, devs, part_shift):
self._replica2part2dev_id = replica2part2dev_id
self._part_shift = part_shift

@classmethod
def deserialize_v1(cls, gz_file):
json_len, = struct.unpack('!I', gz_file.read(4))
ring_dict = json.loads(gz_file.read(json_len))
ring_dict['replica2part2dev_id'] = []
partition_count = 1 << (32 - ring_dict['part_shift'])
for x in xrange(ring_dict['replica_count']):
ring_dict['replica2part2dev_id'].append(
array.array('H', gz_file.read(2 * partition_count)))
return ring_dict

@classmethod
def load(cls, filename):
"""
Load ring data from a file.
:param filename: Path to a file serialized by the save() method.
:returns: A RingData instance containing the loaded data.
"""
gz_file = GzipFile(filename, 'rb')
# Python 2.6 GzipFile doesn't support BufferedIO
if hasattr(gz_file, '_checkReadable'):
gz_file = BufferedReader(gz_file)

# See if the file is in the new format
magic = gz_file.read(4)
if magic == 'R1NG':
version, = struct.unpack('!H', gz_file.read(2))
if version == 1:
ring_data = cls.deserialize_v1(gz_file)
else:
raise Exception('Unknown ring format version %d' % version)
else:
# Assume old-style pickled ring
gz_file.seek(0)
ring_data = pickle.load(gz_file)
if not hasattr(ring_data, 'devs'):
ring_data = RingData(ring_data['replica2part2dev_id'],
ring_data['devs'], ring_data['part_shift'])
return ring_data

def serialize_v1(self, file_obj):
# Write out new-style serialization magic and version:
file_obj.write(struct.pack('!4sH', 'R1NG', 1))
ring = self.to_dict()
json_text = json.dumps(
{'devs': ring['devs'], 'part_shift': ring['part_shift'],
'replica_count': len(ring['replica2part2dev_id'])})
json_len = len(json_text)
file_obj.write(struct.pack('!I', json_len))
file_obj.write(json_text)
for part2dev_id in ring['replica2part2dev_id']:
file_obj.write(part2dev_id.tostring())

def save(self, filename):
"""
Serialize this RingData instance to disk.
:param filename: File into which this instance should be serialized.
"""
gz_file = GzipFile(filename, 'wb')
self.serialize_v1(gz_file)
gz_file.close()

def to_dict(self):
return {'devs': self.devs,
'replica2part2dev_id': self._replica2part2dev_id,
Expand All @@ -44,43 +110,32 @@ class Ring(object):
"""
Partitioned consistent hashing ring.
:param pickle_gz_path: path to ring file
:param serialized_path: path to serialized RingData instance
:param reload_time: time interval in seconds to check for a ring change
"""

def __init__(self, pickle_gz_path, reload_time=15, ring_name=None):
def __init__(self, serialized_path, reload_time=15, ring_name=None):
# can't use the ring unless HASH_PATH_SUFFIX is set
validate_configuration()
if ring_name:
self.pickle_gz_path = os.path.join(pickle_gz_path,
self.serialized_path = os.path.join(serialized_path,
ring_name + '.ring.gz')
else:
self.pickle_gz_path = os.path.join(pickle_gz_path)
self.serialized_path = os.path.join(serialized_path)
self.reload_time = reload_time
self._reload(force=True)

def _reload(self, force=False):
self._rtime = time() + self.reload_time
if force or self.has_changed():
ring_data = pickle.load(self._get_gz_file())
if not hasattr(ring_data, 'devs'):
ring_data = RingData(ring_data['replica2part2dev_id'],
ring_data['devs'], ring_data['part_shift'])
self._mtime = getmtime(self.pickle_gz_path)
ring_data = RingData.load(self.serialized_path)
self._mtime = getmtime(self.serialized_path)
self._devs = ring_data.devs

self._replica2part2dev_id = ring_data._replica2part2dev_id
self._part_shift = ring_data._part_shift
self._rebuild_tier_data()

def _get_gz_file(self):
gz_file = GzipFile(self.pickle_gz_path, 'rb')
if hasattr(gz_file, '_checkReadable'):
return BufferedReader(gz_file)
else:
# Python 2.6 doesn't support BufferedIO
return gz_file

def _rebuild_tier_data(self):
self.tier2devs = defaultdict(list)
for dev in self._devs:
Expand Down Expand Up @@ -121,7 +176,7 @@ def has_changed(self):
:returns: True if the ring on disk has changed, False otherwise
"""
return getmtime(self.pickle_gz_path) != self._mtime
return getmtime(self.serialized_path) != self._mtime

def get_part_nodes(self, part):
"""
Expand Down Expand Up @@ -172,7 +227,7 @@ def get_nodes(self, account, container=None, obj=None):
key = hash_path(account, container, obj, raw_digest=True)
if time() > self._rtime:
self._reload()
part = unpack_from('>I', key)[0] >> self._part_shift
part = struct.unpack_from('>I', key)[0] >> self._part_shift
seen_ids = set()
return part, [self._devs[r[part]] for r in self._replica2part2dev_id
if not (r[part] in seen_ids or seen_ids.add(r[part]))]
Expand Down
65 changes: 44 additions & 21 deletions test/unit/common/ring/test_ring.py
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import array
import cPickle as pickle
import os
import unittest
Expand All @@ -25,6 +26,20 @@

class TestRingData(unittest.TestCase):

def setUp(self):
self.testdir = os.path.join(os.path.dirname(__file__), 'ring_data')
rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)

def tearDown(self):
rmtree(self.testdir, ignore_errors=1)

def assert_ring_data_equal(self, rd_expected, rd_got):
self.assertEquals(rd_expected._replica2part2dev_id,
rd_got._replica2part2dev_id)
self.assertEquals(rd_expected.devs, rd_got.devs)
self.assertEquals(rd_expected._part_shift, rd_got._part_shift)

def test_attrs(self):
r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]]
d = [{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}]
Expand All @@ -34,11 +49,23 @@ def test_attrs(self):
self.assertEquals(rd.devs, d)
self.assertEquals(rd._part_shift, s)

def test_pickleable(self):
def test_can_load_pickled_ring_data(self):
rd = ring.RingData([[0, 1, 0, 1], [0, 1, 0, 1]],
[{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30)
ring_fname = os.path.join(self.testdir, 'foo.ring.gz')
for p in xrange(pickle.HIGHEST_PROTOCOL):
pickle.loads(pickle.dumps(rd, protocol=p))
pickle.dump(rd, GzipFile(ring_fname, 'wb'), protocol=p)
ring_data = ring.RingData.load(ring_fname)
self.assert_ring_data_equal(rd, ring_data)

def test_roundtrip_serialization(self):
ring_fname = os.path.join(self.testdir, 'foo.ring.gz')
rd = ring.RingData(
[array.array('H', [0, 1, 0, 1]), array.array('H',[0, 1, 0, 1])],
[{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30)
rd.save(ring_fname)
rd2 = ring.RingData.load(ring_fname)
self.assert_ring_data_equal(rd, rd2)


class TestRing(unittest.TestCase):
Expand All @@ -49,9 +76,10 @@ def setUp(self):
rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
self.testgz = os.path.join(self.testdir, 'whatever.ring.gz')
self.intended_replica2part2dev_id = [[0, 1, 0, 1],
[0, 1, 0, 1],
[3, 4, 3, 4]]
self.intended_replica2part2dev_id = [
array.array('H', [0, 1, 0, 1]),
array.array('H', [0, 1, 0, 1]),
array.array('H', [3, 4, 3, 4])]
self.intended_devs = [{'id': 0, 'zone': 0, 'weight': 1.0,
'ip': '10.1.1.1', 'port': 6000},
{'id': 1, 'zone': 0, 'weight': 1.0,
Expand All @@ -63,9 +91,8 @@ def setUp(self):
'ip': '10.1.2.2', 'port': 6000}]
self.intended_part_shift = 30
self.intended_reload_time = 15
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
self.ring = ring.Ring(self.testdir,
reload_time=self.intended_reload_time, ring_name='whatever')

Expand All @@ -78,7 +105,7 @@ def test_creation(self):
self.assertEquals(self.ring._part_shift, self.intended_part_shift)
self.assertEquals(self.ring.devs, self.intended_devs)
self.assertEquals(self.ring.reload_time, self.intended_reload_time)
self.assertEquals(self.ring.pickle_gz_path, self.testgz)
self.assertEquals(self.ring.serialized_path, self.testgz)
# test invalid endcap
_orig_hash_path_suffix = utils.HASH_PATH_SUFFIX
try:
Expand All @@ -99,9 +126,8 @@ def test_reload(self):
orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 5)
self.intended_devs.append({'id': 3, 'zone': 3, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
sleep(0.1)
self.ring.get_nodes('a')
self.assertEquals(len(self.ring.devs), 6)
Expand All @@ -113,9 +139,8 @@ def test_reload(self):
orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 6)
self.intended_devs.append({'id': 5, 'zone': 4, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
sleep(0.1)
self.ring.get_part_nodes(0)
self.assertEquals(len(self.ring.devs), 7)
Expand All @@ -128,9 +153,8 @@ def test_reload(self):
part, nodes = self.ring.get_nodes('a')
self.assertEquals(len(self.ring.devs), 7)
self.intended_devs.append({'id': 6, 'zone': 5, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
sleep(0.1)
self.ring.get_more_nodes(part).next()
self.assertEquals(len(self.ring.devs), 8)
Expand All @@ -142,9 +166,8 @@ def test_reload(self):
orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 8)
self.intended_devs.append({'id': 5, 'zone': 4, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
sleep(0.1)
self.assertEquals(len(self.ring.devs), 9)
self.assertNotEquals(self.ring._mtime, orig_mtime)
Expand Down

0 comments on commit f8ce43a

Please sign in to comment.