diff --git a/concrete/util/file_io.py b/concrete/util/file_io.py index eefcb87..03d181a 100644 --- a/concrete/util/file_io.py +++ b/concrete/util/file_io.py @@ -656,3 +656,64 @@ class CommunicationWriterTGZ(CommunicationWriterTar): def __init__(self, tar_filename=None): super(CommunicationWriterTGZ, self).__init__(tar_filename, gzip=True) + + +class CommunicationWriterZip(object): + '''Class for writing one or more Communications to a .zip archive + + Sample usage:: + + with CommunicationWriterZip('multiple_comms.zip') as writer: + writer.write(comm_object_one, 'comm_one.concrete') + writer.write(comm_object_two, 'comm_two.concrete') + writer.write(comm_object_three, 'comm_three.concrete') + ''' + + def __init__(self, zip_filename=None): + ''' + Args: + zip_filename (str): if specified, open file at this path + during construction (a file can alternatively be opened + after construction using the open method) + ''' + if zip_filename is not None: + self.open(zip_filename) + + def open(self, zip_filename=None): + ''' + Open specified zip file for writing. + + Args: + zip_filename (str): path to file to open for writing + ''' + self.zip_f = zipfile.ZipFile(zip_filename, 'w') + + def close(self): + ''' + Close zip file. + ''' + self.zip_f.close() + + def write(self, comm, comm_filename=None): + ''' + Write communication to zip file.' + + Args: + comm (Communication): communication to write to zip file + comm_filename (str): desired filename of communication + within zip file (by default the filename will be the + communication id appended with a .concrete extension) + ''' + if comm_filename is None: + comm_filename = comm.id + '.concrete' + + thrift_bytes = TSerialization.serialize( + comm, protocol_factory=factory.protocolFactory) + + self.zip_f.writestr(comm_filename, thrift_bytes) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() diff --git a/integration-tests/test_file_io.py b/integration-tests/test_file_io.py index 77eae4c..0948e49 100644 --- a/integration-tests/test_file_io.py +++ b/integration-tests/test_file_io.py @@ -2,13 +2,16 @@ import gzip import os import tarfile -import time +from time import time, localtime +from calendar import timegm +import zipfile from concrete.util import ( CommunicationReader, CommunicationWriter, CommunicationWriterTar, CommunicationWriterTGZ, + CommunicationWriterZip, read_communication_from_file, FileType ) @@ -532,7 +535,7 @@ def test_CommunicationWriterTar_single_file(output_file, login_info): assert "simple_1.concrete" == tarinfo.name assert tarinfo.isreg() - assert tarinfo.mtime > time.time() - TIME_MARGIN + assert tarinfo.mtime > time() - TIME_MARGIN assert os.stat('tests/testdata/simple_1.concrete').st_size == tarinfo.size assert 0o644 == tarinfo.mode assert login_info['uid'] == tarinfo.uid @@ -560,7 +563,7 @@ def test_CommunicationWriterTar_single_file_ctx_mgr(output_file, login_info): assert "simple_1.concrete" == tarinfo.name assert tarinfo.isreg() - assert tarinfo.mtime > time.time() - TIME_MARGIN + assert tarinfo.mtime > time() - TIME_MARGIN assert os.stat('tests/testdata/simple_1.concrete').st_size == tarinfo.size assert 0o644 == tarinfo.mode assert login_info['uid'] == tarinfo.uid @@ -646,7 +649,7 @@ def test_CommunicationWriterTar_single_file_default_name(output_file, assert comm.id + '.concrete' == tarinfo.name assert tarinfo.isreg() - assert tarinfo.mtime > time.time() - TIME_MARGIN + assert tarinfo.mtime > time() - TIME_MARGIN assert os.stat('tests/testdata/simple_1.concrete').st_size == tarinfo.size assert 0o644 == tarinfo.mode assert login_info['uid'] == tarinfo.uid @@ -678,7 +681,7 @@ def test_CommunicationWriterTGZ_single_file(output_file, login_info): assert "simple_1.concrete" == tarinfo.name assert tarinfo.isreg() - assert tarinfo.mtime > time.time() - TIME_MARGIN + assert tarinfo.mtime > time() - TIME_MARGIN assert os.stat('tests/testdata/simple_1.concrete').st_size == tarinfo.size assert 0o644 == tarinfo.mode assert login_info['uid'] == tarinfo.uid @@ -706,7 +709,7 @@ def test_CommunicationWriterTGZ_single_file_ctx_mgr(output_file, login_info): assert "simple_1.concrete" == tarinfo.name assert tarinfo.isreg() - assert tarinfo.mtime > time.time() - TIME_MARGIN + assert tarinfo.mtime > time() - TIME_MARGIN assert os.stat('tests/testdata/simple_1.concrete').st_size == tarinfo.size assert 0o644 == tarinfo.mode assert login_info['uid'] == tarinfo.uid @@ -739,7 +742,7 @@ def test_CommunicationWriterTGZ_single_file_default_name(output_file, assert comm.id + '.concrete' == tarinfo.name assert tarinfo.isreg() - assert tarinfo.mtime > time.time() - TIME_MARGIN + assert tarinfo.mtime > time() - TIME_MARGIN assert os.stat('tests/testdata/simple_1.concrete').st_size == tarinfo.size assert 0o644 == tarinfo.mode assert login_info['uid'] == tarinfo.uid @@ -751,3 +754,111 @@ def test_CommunicationWriterTGZ_single_file_default_name(output_file, assert tarinfo is None f.close() + + +def test_CommunicationWriterZip_single_file(output_file, login_info): + comm = read_communication_from_file("tests/testdata/simple_1.concrete") + writer = CommunicationWriterZip() + try: + writer.open(output_file) + writer.write(comm, "simple_1.concrete") + finally: + writer.close() + + assert zipfile.is_zipfile(output_file) + + f = zipfile.ZipFile(output_file) + + [zipinfo] = f.infolist() + + assert "simple_1.concrete" == zipinfo.filename + assert timegm(zipinfo.date_time) > timegm(localtime()) - TIME_MARGIN + assert os.stat('tests/testdata/simple_1.concrete').st_size == zipinfo.file_size + + f.close() + + +def test_CommunicationWriterZip_single_file_ctx_mgr(output_file, login_info): + comm = read_communication_from_file("tests/testdata/simple_1.concrete") + with CommunicationWriterZip(output_file) as writer: + writer.write(comm, "simple_1.concrete") + + assert zipfile.is_zipfile(output_file) + + f = zipfile.ZipFile(output_file) + + [zipinfo] = f.infolist() + + assert "simple_1.concrete" == zipinfo.filename + assert timegm(zipinfo.date_time) > timegm(localtime()) - TIME_MARGIN + assert os.stat('tests/testdata/simple_1.concrete').st_size == zipinfo.file_size + + f.close() + + +def test_CommunicationWriterZip_single_file_fixed_point(output_file, + login_info): + comm = read_communication_from_file("tests/testdata/simple_1.concrete") + with CommunicationWriterZip(output_file) as writer: + writer.write(comm, "simple_1.concrete") + + assert zipfile.is_zipfile(output_file) + + f = zipfile.ZipFile(output_file) + + [zipinfo] = f.infolist() + + assert "simple_1.concrete" == zipinfo.filename + actual_data = f.open(zipinfo).read() + with open('tests/testdata/simple_1.concrete', 'rb') as expected_f: + expected_data = expected_f.read() + assert expected_data == actual_data + + f.close() + + +def test_CommunicationWriterZip_single_file_fixed_point_unicode(output_file, + login_info): + comm = read_communication_from_file( + "tests/testdata/les-deux-chandeliers.concrete" + ) + with CommunicationWriterZip(output_file) as writer: + writer.write(comm, "les-deux-chandeliers.concrete") + + assert zipfile.is_zipfile(output_file) + + f = zipfile.ZipFile(output_file) + + [zipinfo] = f.infolist() + + assert "les-deux-chandeliers.concrete" == zipinfo.filename + actual_data = f.open(zipinfo).read() + with open('tests/testdata/les-deux-chandeliers.concrete', + 'rb') as expected_f: + expected_data = expected_f.read() + assert expected_data == actual_data + + f.close() + + +def test_CommunicationWriterZip_single_file_default_name(output_file, + login_info): + comm = read_communication_from_file("tests/testdata/simple_1.concrete") + writer = CommunicationWriterZip() + try: + writer.open(output_file) + writer.write(comm) + finally: + writer.close() + + assert zipfile.is_zipfile(output_file) + + f = zipfile.ZipFile(output_file) + + [zipinfo] = f.infolist() + + assert comm.id + '.concrete' == zipinfo.filename + assert timegm(zipinfo.date_time) > timegm(localtime()) - TIME_MARGIN + assert os.stat('tests/testdata/simple_1.concrete').st_size == zipinfo.file_size + + f.close()