Skip to content

Commit

Permalink
Merged in feature/enable-gzip (pull request #67)
Browse files Browse the repository at this point in the history
Enable transparently gzipped collections

* Merged in feature/enable-gzip-use-binary-mode (pull request #69)

    Feature/enable gzip use binary mode

    * Refactor the collection io streams to support binary modes.

        This streamlines operating on zipped binary files.

        Use io.BytesIO() file for compressed collection construction.

    * Implement unit test to check compression ratio of compressed collections.

    * Implement reading/writing of compressed files for collections.

    * Add unit tests to check buffer size.

    * Fix issue where collection is not flushed to file after construction.

        This issue is related to implementation internals and does not affect
        the public API.

    Approved-by: Eric Harper <harperic@umich.edu>
    Approved-by: Carl Simon Adorf <csadorf@umich.edu>

* Update changelog, contributors

* Update docstring, reorder args

Approved-by: Vyas Ramasubramani <vramasub@umich.edu>
Approved-by: Carl Simon Adorf <csadorf@umich.edu>
  • Loading branch information
harperic authored and csadorf committed Jan 30, 2019
1 parent 86d51c5 commit 615d396
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 34 deletions.
1 change: 1 addition & 0 deletions changelog.txt
Expand Up @@ -25,6 +25,7 @@ Added

- Adds an ``H5Store`` class, useful for storing array-like data with an HDF5 backend. Accessible via ``with job.data:`` or ``with project.data:``.
- Add automatic cast of numpy arrays to lists when storing them within a `JSONDict`, e.g., a `job.statepoint` or `job.document`.
- Enable `Collection` class to manage collections stored in gzip files.

Fixed
+++++
Expand Down
1 change: 1 addition & 0 deletions contributors.txt
Expand Up @@ -6,3 +6,4 @@ Jens Glaser
Bradley Dice
Tim Moore
Pengji Zhou
Eric Harper
114 changes: 96 additions & 18 deletions signac/contrib/collection.py
Expand Up @@ -338,16 +338,22 @@ class Collection(object):
time complexity of O(N) in the worst case and O(1) on average.
All documents must have a primary key value. The default primary
key is `_id`.
:param compresslevel: The level of compression to use. Any positive value
implies compression and is used by the underlying gzip implementation.
Default value is 0 (no compression).
"""

def __init__(self, docs=None, primary_key='_id', _trust=False):
def __init__(self, docs=None, primary_key='_id', compresslevel=0, _trust=False):
if isinstance(docs, six.string_types):
raise ValueError(
"First argument cannot be of str type. "
"Did you mean to use {}.open()?".format(type(self).__name__))
self.index_rebuild_threshold = 0.1
self._primary_key = primary_key
self._file = io.StringIO()
if compresslevel > 0:
self._file = io.BytesIO()
else:
self._file = io.StringIO()
self._compresslevel = compresslevel
self._requires_flush = False
self._dirty = set()
self._indexes = dict()
Expand All @@ -358,7 +364,6 @@ def __init__(self, docs=None, primary_key='_id', _trust=False):
if self._primary_key not in doc:
doc[self._primary_key] = self._next_default_id()
self.__setitem__(doc[self._primary_key], doc, _trust=_trust)
self._requires_flush = False # not needed after initial read!
self._update_indexes()

def _assert_open(self):
Expand Down Expand Up @@ -854,6 +859,15 @@ def delete_one(self, filter):
for _id in to_delete:
del self[_id]

def _dump(self, text_buffer):
"Dump collection content serialized to JSON to text-buffer."
if six.PY2:
for doc in self._docs.values():
text_buffer.write(unicode(json.dumps(doc) + '\n', 'utf-8')) # noqa
else:
for doc in self._docs.values():
text_buffer.write((json.dumps(doc) + '\n'))

def dump(self, file=sys.stdout):
"""Dump the collection in JSON-encoding to file.
Expand All @@ -871,21 +885,51 @@ def dump(self, file=sys.stdout):
:param file: The file to write the encoded blob to.
"""
self._assert_open()
for doc in self._docs.values():
file.write(json.dumps(doc) + '\n')
if self._compresslevel > 0:
import gzip
with gzip.GzipFile(
compresslevel=self._compresslevel, fileobj=file, mode='wb') as gzipfile:
text_io = io.TextIOWrapper(gzipfile, encoding='utf-8')
self._dump(text_io)
text_io.flush()
else:
self._dump(file)

@classmethod
def _open(cls, file):
def _open(cls, file, compresslevel=0):
try:
docs = (json.loads(line) for line in file)
collection = cls(docs=docs)
except (IOError, io.UnsupportedOperation):
collection = cls()
if compresslevel > 0:
import gzip
with gzip.GzipFile(fileobj=file, mode='rb') as gzipfile:
if six.PY2 or (sys.version_info.major == 3 and sys.version_info.minor < 6):
text = gzipfile.read().decode('utf-8')
docs = [json.loads(line) for line in text.splitlines()]
collection = cls(docs=docs)
else:
text_io = io.TextIOWrapper(gzipfile, encoding='utf-8')
collection = cls(docs=(json.loads(line) for line in text_io))
text_io.detach()
else:
collection = cls(docs=(json.loads(line) for line in file))
except (IOError, io.UnsupportedOperation) as error:
if str(error) in ('not readable', 'read'):
collection = cls()
else:
raise error
except AttributeError as e:
# This error occurs in python27 and has been evaluated as being
# fine to accept in this manner
if str(e) == "'GzipFile' object has no attribute 'extrastart'":
collection = cls()
else:
raise AttributeError(e)
collection._file = file
collection._compresslevel = compresslevel
collection._requires_flush = False # not needed after initial read
return collection

@classmethod
def open(cls, filename, mode='a+'):
def open(cls, filename, mode=None, compresslevel=None):
"""Open a collection associated with a file on disk.
Using this factory method will return a collection that is
Expand Down Expand Up @@ -913,14 +957,39 @@ def open(cls, filename, mode='a+'):
The open-modes work as expected, so for example to open a collection
file in *read-only* mode, use ``Collection.open('collection.txt', 'r')``.
Opening a gzip (`*.gz`) file also works as expected. Because gzip does not
support a combined read and write mode, `mode=*+` is not available. Be
sure to open the file in read, write, or append mode as required. Due to
the manner in which gzip works, opening a file in `mode=wt` will
effectively erase the current file, so take care using `mode=wt`.
"""
if compresslevel is None:
compresslevel = 9 if filename.endswith('.gz') else 0

logger.debug("Open collection '{}'.".format(filename))
if filename == ':memory:':
file = io.StringIO()
if mode is not None:
raise RuntimeError("File open-mode must be None for in-memory collection.")
return cls(compresslevel=compresslevel) # That's the default open mode.
else:
file = open(filename, mode)
# Set default mode
if mode is None:
mode = 'ab+'

file = io.open(filename, mode)
file.seek(0)
return cls._open(file)

if 'b' in mode:
if compresslevel > 0:
return cls._open(file, compresslevel=compresslevel)
else:
return cls._open(io.TextIOWrapper(file, encoding='utf-8'))
elif compresslevel > 0:
raise RuntimeError(
"Compressed collections must be opened in binary mode, for example: 'ab+'.")
else:
return cls._open(file)

def flush(self):
"""Write all changes to the associated file.
Expand All @@ -938,9 +1007,18 @@ def flush(self):
logger.debug("Flushed collection.")
else:
logger.debug("Flush collection to file '{}'.".format(self._file))
self._file.truncate(0)
self.dump(self._file)
self._file.flush()
try:
self._file.truncate(0)
except ValueError as error:
if isinstance(error, io.UnsupportedOperation):
raise error # Python 3
elif str(error).lower() == "file not open for writing":
raise io.UnsupportedOperation(error) # Python 2
else:
raise error # unrelated error
else:
self.dump(self._file)
self._file.flush()
self._requires_flush = False
else:
logger.debug("Flushed collection (no changes).")
Expand Down
109 changes: 93 additions & 16 deletions tests/test_collection.py
@@ -1,3 +1,4 @@
from __future__ import division
import os
import io
import warnings
Expand Down Expand Up @@ -61,6 +62,13 @@ def setUp(self):
def test_init(self):
self.assertEqual(len(self.c), 0)

def test_buffer_size(self):
docs = [{'a': i, '_id': str(i)} for i in range(10)]
self.c = Collection(docs)
self.assertEqual(len(self.c._file.getvalue()), 0)
self.c.flush()
self.assertGreater(len(self.c._file.getvalue()), 0)

def test_init_with_list_with_ids_sequential(self):
docs = [{'a': i, '_id': str(i)} for i in range(10)]
self.c = Collection(docs)
Expand Down Expand Up @@ -538,6 +546,30 @@ def test_find_logical_operators(self):
self.assertEqual(len(self.c.find({'$not': {'$not': expr}})), expectation)


class CompressedCollectionTest(CollectionTest):

def setUp(self):
self.c = Collection.open(filename=':memory:', compresslevel=9)

def test_compression(self):
# Fill with data
docs = [dict(_id=str(i)) for i in range(10)]
self.c.update(docs)
self.c.flush()

# Create uncompressed copy to compare to.
c2 = Collection(self.c)
c2.flush()

self.assertIsInstance(self.c._file, io.BytesIO)
size_compressed = len(self.c._file.getvalue())
self.assertGreater(size_compressed, 0)
size_uncompressed = len(c2._file.getvalue().encode('utf-8'))
self.assertGreater(size_uncompressed, 0)
compresslevel = size_uncompressed / size_compressed
self.assertGreater(compresslevel, 1.0)


class FileCollectionTestReadOnly(unittest.TestCase):

def setUp(self):
Expand All @@ -559,44 +591,49 @@ def test_write_on_readonly(self):
self.assertEqual(len(list(c)), 10)
c.insert_one(dict())
self.assertEqual(len(list(c)), 11)
if six.PY2:
with self.assertRaises(IOError):
c.flush()
with self.assertRaises(IOError):
c.close()
else:
with self.assertRaises(io.UnsupportedOperation):
c.flush()
with self.assertRaises(io.UnsupportedOperation):
c.close()
with self.assertRaises(io.UnsupportedOperation):
c.flush()
with self.assertRaises(io.UnsupportedOperation):
c.close()
with self.assertRaises(RuntimeError):
c.find()


class FileCollectionTest(CollectionTest):
mode = 'w'
filename = 'test.txt'

def setUp(self):
self._tmp_dir = TemporaryDirectory(prefix='signac_collection_')
self._fn_collection = os.path.join(self._tmp_dir.name, 'test.txt')
self._fn_collection = os.path.join(self._tmp_dir.name, self.filename)
self.addCleanup(self._tmp_dir.cleanup)
self.c = Collection.open(self._fn_collection, mode=self.mode)
self.addCleanup(self.c.close)

def test_reopen(self):
def test_write_and_flush(self):
docs = [dict(_id=str(i)) for i in range(10)]
self.c.update(docs)
self.c.flush()
self.assertGreater(os.path.getsize(self._fn_collection), 0)

with Collection.open(self._fn_collection) as c:
c.update(docs)
def test_write_flush_and_reopen(self):
docs = [dict(_id=str(i)) for i in range(10)]
self.c.update(docs)
self.c.flush()
self.assertGreater(os.path.getsize(self._fn_collection), 0)

with Collection.open(self._fn_collection) as c:
self.assertEqual(len(c), len(docs))
for doc in self.c:
self.assertIn(doc['_id'], c)


class FileCollectionTestAppendPlus(FileCollectionTest):
mode = 'a+'
class BinaryFileCollectionTest(CollectionTest):
mode = 'wb'


class FileCollectionAppendTest(FileCollectionTest):
mode = 'a'

def test_file_size(self):
docs = [dict(_id=str(i)) for i in range(10)]
Expand All @@ -617,5 +654,45 @@ def test_file_size(self):
self.assertEqual(len(list(f)), len(docs))


class BinaryFileCollectionAppendTest(FileCollectionAppendTest):
mode = 'ab'


class FileCollectionAppendPlusTest(FileCollectionAppendTest):
mode = 'a+'


class BinaryFileCollectionAppendPlusTest(FileCollectionAppendTest):
mode = 'ab+'


class ZippedFileCollectionTest(FileCollectionTest):
filename = 'test.txt.gz'
mode = 'wb'

def test_compression_level(self):
docs = [dict(_id=str(i)) for i in range(10)]
self.c.update(docs)
self.c.flush()
fn_txt = self._fn_collection + '.txt'
with Collection.open(fn_txt) as c_text:
c_text.update(self.c)
size_txt = os.path.getsize(fn_txt)
size_gz = os.path.getsize(self._fn_collection)
self.assertGreater(size_txt, 0)
self.assertGreater(size_gz, 0)
compresslevel = size_txt / size_gz
self.assertGreater(compresslevel, 1.0)


class ZippedFileCollectionAppendTest(ZippedFileCollectionTest):
filename = 'test.txt.gz'
mode = 'ab'


class ZippedFileCollectionAppendPlusTest(ZippedFileCollectionAppendTest):
mode = 'ab'


if __name__ == '__main__':
unittest.main()

0 comments on commit 615d396

Please sign in to comment.