From 615d396adeae6014e9a7d00ee4d151fbf06b4611 Mon Sep 17 00:00:00 2001 From: Eric Harper Date: Wed, 30 Jan 2019 13:50:52 +0000 Subject: [PATCH] Merged in feature/enable-gzip (pull request #67) Enable transparently gzipped collections * Merged in feature/enable-gzip-use-binary-mode (pull request #69) Feature/enable gzip use binary mode * Refactor the collection io streams to support binary modes. This streamlines operating on zipped binary files. Use io.BytesIO() file for compressed collection construction. * Implement unit test to check compression ratio of compressed collections. * Implement reading/writing of compressed files for collections. * Add unit tests to check buffer size. * Fix issue where collection is not flushed to file after construction. This issue is related to implementation internals and does not affect the public API. Approved-by: Eric Harper Approved-by: Carl Simon Adorf * Update changelog, contributors * Update docstring, reorder args Approved-by: Vyas Ramasubramani Approved-by: Carl Simon Adorf --- changelog.txt | 1 + contributors.txt | 1 + signac/contrib/collection.py | 114 +++++++++++++++++++++++++++++------ tests/test_collection.py | 109 ++++++++++++++++++++++++++++----- 4 files changed, 191 insertions(+), 34 deletions(-) diff --git a/changelog.txt b/changelog.txt index 5f9c3cac0..2f921e87a 100644 --- a/changelog.txt +++ b/changelog.txt @@ -25,6 +25,7 @@ Added - Adds an ``H5Store`` class, useful for storing array-like data with an HDF5 backend. Accessible via ``with job.data:`` or ``with project.data:``. - Add automatic cast of numpy arrays to lists when storing them within a `JSONDict`, e.g., a `job.statepoint` or `job.document`. + - Enable `Collection` class to manage collections stored in gzip files. Fixed +++++ diff --git a/contributors.txt b/contributors.txt index 066de6d94..2428d45da 100644 --- a/contributors.txt +++ b/contributors.txt @@ -6,3 +6,4 @@ Jens Glaser Bradley Dice Tim Moore Pengji Zhou +Eric Harper diff --git a/signac/contrib/collection.py b/signac/contrib/collection.py index 4d9844c17..101612532 100644 --- a/signac/contrib/collection.py +++ b/signac/contrib/collection.py @@ -338,16 +338,22 @@ class Collection(object): time complexity of O(N) in the worst case and O(1) on average. All documents must have a primary key value. The default primary key is `_id`. + :param compresslevel: The level of compression to use. Any positive value + implies compression and is used by the underlying gzip implementation. + Default value is 0 (no compression). """ - - def __init__(self, docs=None, primary_key='_id', _trust=False): + def __init__(self, docs=None, primary_key='_id', compresslevel=0, _trust=False): if isinstance(docs, six.string_types): raise ValueError( "First argument cannot be of str type. " "Did you mean to use {}.open()?".format(type(self).__name__)) self.index_rebuild_threshold = 0.1 self._primary_key = primary_key - self._file = io.StringIO() + if compresslevel > 0: + self._file = io.BytesIO() + else: + self._file = io.StringIO() + self._compresslevel = compresslevel self._requires_flush = False self._dirty = set() self._indexes = dict() @@ -358,7 +364,6 @@ def __init__(self, docs=None, primary_key='_id', _trust=False): if self._primary_key not in doc: doc[self._primary_key] = self._next_default_id() self.__setitem__(doc[self._primary_key], doc, _trust=_trust) - self._requires_flush = False # not needed after initial read! self._update_indexes() def _assert_open(self): @@ -854,6 +859,15 @@ def delete_one(self, filter): for _id in to_delete: del self[_id] + def _dump(self, text_buffer): + "Dump collection content serialized to JSON to text-buffer." + if six.PY2: + for doc in self._docs.values(): + text_buffer.write(unicode(json.dumps(doc) + '\n', 'utf-8')) # noqa + else: + for doc in self._docs.values(): + text_buffer.write((json.dumps(doc) + '\n')) + def dump(self, file=sys.stdout): """Dump the collection in JSON-encoding to file. @@ -871,21 +885,51 @@ def dump(self, file=sys.stdout): :param file: The file to write the encoded blob to. """ self._assert_open() - for doc in self._docs.values(): - file.write(json.dumps(doc) + '\n') + if self._compresslevel > 0: + import gzip + with gzip.GzipFile( + compresslevel=self._compresslevel, fileobj=file, mode='wb') as gzipfile: + text_io = io.TextIOWrapper(gzipfile, encoding='utf-8') + self._dump(text_io) + text_io.flush() + else: + self._dump(file) @classmethod - def _open(cls, file): + def _open(cls, file, compresslevel=0): try: - docs = (json.loads(line) for line in file) - collection = cls(docs=docs) - except (IOError, io.UnsupportedOperation): - collection = cls() + if compresslevel > 0: + import gzip + with gzip.GzipFile(fileobj=file, mode='rb') as gzipfile: + if six.PY2 or (sys.version_info.major == 3 and sys.version_info.minor < 6): + text = gzipfile.read().decode('utf-8') + docs = [json.loads(line) for line in text.splitlines()] + collection = cls(docs=docs) + else: + text_io = io.TextIOWrapper(gzipfile, encoding='utf-8') + collection = cls(docs=(json.loads(line) for line in text_io)) + text_io.detach() + else: + collection = cls(docs=(json.loads(line) for line in file)) + except (IOError, io.UnsupportedOperation) as error: + if str(error) in ('not readable', 'read'): + collection = cls() + else: + raise error + except AttributeError as e: + # This error occurs in python27 and has been evaluated as being + # fine to accept in this manner + if str(e) == "'GzipFile' object has no attribute 'extrastart'": + collection = cls() + else: + raise AttributeError(e) collection._file = file + collection._compresslevel = compresslevel + collection._requires_flush = False # not needed after initial read return collection @classmethod - def open(cls, filename, mode='a+'): + def open(cls, filename, mode=None, compresslevel=None): """Open a collection associated with a file on disk. Using this factory method will return a collection that is @@ -913,14 +957,39 @@ def open(cls, filename, mode='a+'): The open-modes work as expected, so for example to open a collection file in *read-only* mode, use ``Collection.open('collection.txt', 'r')``. + + Opening a gzip (`*.gz`) file also works as expected. Because gzip does not + support a combined read and write mode, `mode=*+` is not available. Be + sure to open the file in read, write, or append mode as required. Due to + the manner in which gzip works, opening a file in `mode=wt` will + effectively erase the current file, so take care using `mode=wt`. """ + if compresslevel is None: + compresslevel = 9 if filename.endswith('.gz') else 0 + logger.debug("Open collection '{}'.".format(filename)) if filename == ':memory:': - file = io.StringIO() + if mode is not None: + raise RuntimeError("File open-mode must be None for in-memory collection.") + return cls(compresslevel=compresslevel) # That's the default open mode. else: - file = open(filename, mode) + # Set default mode + if mode is None: + mode = 'ab+' + + file = io.open(filename, mode) file.seek(0) - return cls._open(file) + + if 'b' in mode: + if compresslevel > 0: + return cls._open(file, compresslevel=compresslevel) + else: + return cls._open(io.TextIOWrapper(file, encoding='utf-8')) + elif compresslevel > 0: + raise RuntimeError( + "Compressed collections must be opened in binary mode, for example: 'ab+'.") + else: + return cls._open(file) def flush(self): """Write all changes to the associated file. @@ -938,9 +1007,18 @@ def flush(self): logger.debug("Flushed collection.") else: logger.debug("Flush collection to file '{}'.".format(self._file)) - self._file.truncate(0) - self.dump(self._file) - self._file.flush() + try: + self._file.truncate(0) + except ValueError as error: + if isinstance(error, io.UnsupportedOperation): + raise error # Python 3 + elif str(error).lower() == "file not open for writing": + raise io.UnsupportedOperation(error) # Python 2 + else: + raise error # unrelated error + else: + self.dump(self._file) + self._file.flush() self._requires_flush = False else: logger.debug("Flushed collection (no changes).") diff --git a/tests/test_collection.py b/tests/test_collection.py index d64159041..aeaa95820 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -1,3 +1,4 @@ +from __future__ import division import os import io import warnings @@ -61,6 +62,13 @@ def setUp(self): def test_init(self): self.assertEqual(len(self.c), 0) + def test_buffer_size(self): + docs = [{'a': i, '_id': str(i)} for i in range(10)] + self.c = Collection(docs) + self.assertEqual(len(self.c._file.getvalue()), 0) + self.c.flush() + self.assertGreater(len(self.c._file.getvalue()), 0) + def test_init_with_list_with_ids_sequential(self): docs = [{'a': i, '_id': str(i)} for i in range(10)] self.c = Collection(docs) @@ -538,6 +546,30 @@ def test_find_logical_operators(self): self.assertEqual(len(self.c.find({'$not': {'$not': expr}})), expectation) +class CompressedCollectionTest(CollectionTest): + + def setUp(self): + self.c = Collection.open(filename=':memory:', compresslevel=9) + + def test_compression(self): + # Fill with data + docs = [dict(_id=str(i)) for i in range(10)] + self.c.update(docs) + self.c.flush() + + # Create uncompressed copy to compare to. + c2 = Collection(self.c) + c2.flush() + + self.assertIsInstance(self.c._file, io.BytesIO) + size_compressed = len(self.c._file.getvalue()) + self.assertGreater(size_compressed, 0) + size_uncompressed = len(c2._file.getvalue().encode('utf-8')) + self.assertGreater(size_uncompressed, 0) + compresslevel = size_uncompressed / size_compressed + self.assertGreater(compresslevel, 1.0) + + class FileCollectionTestReadOnly(unittest.TestCase): def setUp(self): @@ -559,35 +591,36 @@ def test_write_on_readonly(self): self.assertEqual(len(list(c)), 10) c.insert_one(dict()) self.assertEqual(len(list(c)), 11) - if six.PY2: - with self.assertRaises(IOError): - c.flush() - with self.assertRaises(IOError): - c.close() - else: - with self.assertRaises(io.UnsupportedOperation): - c.flush() - with self.assertRaises(io.UnsupportedOperation): - c.close() + with self.assertRaises(io.UnsupportedOperation): + c.flush() + with self.assertRaises(io.UnsupportedOperation): + c.close() with self.assertRaises(RuntimeError): c.find() class FileCollectionTest(CollectionTest): mode = 'w' + filename = 'test.txt' def setUp(self): self._tmp_dir = TemporaryDirectory(prefix='signac_collection_') - self._fn_collection = os.path.join(self._tmp_dir.name, 'test.txt') + self._fn_collection = os.path.join(self._tmp_dir.name, self.filename) self.addCleanup(self._tmp_dir.cleanup) self.c = Collection.open(self._fn_collection, mode=self.mode) self.addCleanup(self.c.close) - def test_reopen(self): + def test_write_and_flush(self): docs = [dict(_id=str(i)) for i in range(10)] + self.c.update(docs) + self.c.flush() + self.assertGreater(os.path.getsize(self._fn_collection), 0) - with Collection.open(self._fn_collection) as c: - c.update(docs) + def test_write_flush_and_reopen(self): + docs = [dict(_id=str(i)) for i in range(10)] + self.c.update(docs) + self.c.flush() + self.assertGreater(os.path.getsize(self._fn_collection), 0) with Collection.open(self._fn_collection) as c: self.assertEqual(len(c), len(docs)) @@ -595,8 +628,12 @@ def test_reopen(self): self.assertIn(doc['_id'], c) -class FileCollectionTestAppendPlus(FileCollectionTest): - mode = 'a+' +class BinaryFileCollectionTest(CollectionTest): + mode = 'wb' + + +class FileCollectionAppendTest(FileCollectionTest): + mode = 'a' def test_file_size(self): docs = [dict(_id=str(i)) for i in range(10)] @@ -617,5 +654,45 @@ def test_file_size(self): self.assertEqual(len(list(f)), len(docs)) +class BinaryFileCollectionAppendTest(FileCollectionAppendTest): + mode = 'ab' + + +class FileCollectionAppendPlusTest(FileCollectionAppendTest): + mode = 'a+' + + +class BinaryFileCollectionAppendPlusTest(FileCollectionAppendTest): + mode = 'ab+' + + +class ZippedFileCollectionTest(FileCollectionTest): + filename = 'test.txt.gz' + mode = 'wb' + + def test_compression_level(self): + docs = [dict(_id=str(i)) for i in range(10)] + self.c.update(docs) + self.c.flush() + fn_txt = self._fn_collection + '.txt' + with Collection.open(fn_txt) as c_text: + c_text.update(self.c) + size_txt = os.path.getsize(fn_txt) + size_gz = os.path.getsize(self._fn_collection) + self.assertGreater(size_txt, 0) + self.assertGreater(size_gz, 0) + compresslevel = size_txt / size_gz + self.assertGreater(compresslevel, 1.0) + + +class ZippedFileCollectionAppendTest(ZippedFileCollectionTest): + filename = 'test.txt.gz' + mode = 'ab' + + +class ZippedFileCollectionAppendPlusTest(ZippedFileCollectionAppendTest): + mode = 'ab' + + if __name__ == '__main__': unittest.main()