Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reading and writing encrypted archives with test suites #45

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion libarchive/adapters/archive_read.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import ctypes
import logging
import sys

import libarchive.constants.archive
import libarchive.exception
Expand Down Expand Up @@ -49,6 +50,18 @@ def _archive_read_support_format_all(archive):
message = c_archive_error_string(archive)
raise libarchive.exception.ArchiveError(message)

def _archive_read_add_passphrase(archive, passphrase):
try:
if sys.version_info >= (3, 0):
passphrase = bytes(passphrase, 'utf-8')
else:
passphrase = unicode(passphrase).encode('utf-8')
return libarchive.calls.archive_read.c_archive_read_add_passphrase(
archive, passphrase)
except:
message = c_archive_error_string(archive)
raise libarchive.exception.ArchiveError(message)

def _archive_read_support_format_7zip(archive):
try:
return libarchive.calls.archive_read.\
Expand Down Expand Up @@ -267,14 +280,17 @@ def _set_read_context(archive_res, format_code=None, filter_code=None):
archive_read_support_filter_all(archive_res)

@contextlib.contextmanager
def _enumerator(opener, entry_cls, format_code=None, filter_code=None):
def _enumerator(opener, entry_cls, passphrases=None, format_code=None, filter_code=None):
"""Return an archive enumerator from a user-defined source, using a user-
defined entry type.
"""

archive_res = _archive_read_new()

try:
if passphrases is not None:
for passphrase in passphrases:
r = _archive_read_add_passphrase(archive_res, passphrase)
r = _set_read_context(archive_res, format_code, filter_code)
opener(archive_res)

Expand Down
30 changes: 30 additions & 0 deletions libarchive/adapters/archive_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,30 @@ def _archive_write_data(archive, data):
message = c_archive_error_string(archive)
raise ValueError("No bytes were written. Error? [%s]" % (message))

def _archive_write_set_passphrase(archive, passphrase):
try:
if sys.version_info >= (3, 0):
passphrase = bytes(passphrase, 'utf-8')
else:
passphrase = unicode(passphrase).encode('utf-8')
return libarchive.calls.archive_write.c_archive_write_set_passphrase(
archive, passphrase)
except:
message = c_archive_error_string(archive)
raise libarchive.exception.ArchiveError(message)

def _archive_write_set_options(archive, options):
try:
if sys.version_info >= (3, 0):
options = bytes(options, 'utf-8')
else:
options = unicode(options).encode('utf-8')
return libarchive.calls.archive_write.c_archive_write_set_options(
archive, options)
except:
message = c_archive_error_string(archive)
raise libarchive.exception.ArchiveError(message)

def _archive_write_add_filter_bzip2(archive):
try:
libarchive.calls.archive_write.c_archive_write_add_filter_bzip2(
Expand Down Expand Up @@ -190,12 +214,18 @@ def _set_write_context(archive_res, format_code, filter_code=None):
def _create(opener,
format_code,
files,
passphrase=None,
options="zip:encryption=zipcrypt",
filter_code=None,
block_size=16384):
"""Create an archive from a collection of files (not recursive)."""

a = _archive_write_new()
_set_write_context(a, format_code, filter_code)
if passphrase is not None and \
format_code == libarchive.constants.ARCHIVE_FORMAT_ZIP:
r = _archive_write_set_options(a, options)
r = _archive_write_set_passphrase(a, passphrase)

_LOGGER.debug("Opening archive (create).")
opener(a)
Expand Down
4 changes: 4 additions & 0 deletions libarchive/calls/archive_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ def _check_zero_success(value):
c_archive_read_data_block = libarchive.archive_read_data_block
c_archive_read_data_block.argtypes = [c_void_p, POINTER(c_void_p), POINTER(c_size_t), POINTER(c_longlong)]
c_archive_read_data_block.restype = c_int

c_archive_read_add_passphrase = libarchive.archive_read_add_passphrase
c_archive_read_add_passphrase.argtypes = [c_void_p, c_char_p]
c_archive_read_add_passphrase.restype = c_int
8 changes: 8 additions & 0 deletions libarchive/calls/archive_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ def _check_zero_success(value):
c_archive_write_disk_set_options.argtypes = [c_void_p, c_int]
c_archive_write_disk_set_options.restype = _check_zero_success

c_archive_write_set_options = libarchive.archive_write_set_options
c_archive_write_set_options.argtypes = [c_void_p, c_char_p]
c_archive_write_set_options.restype = _check_zero_success

c_archive_write_set_passphrase = libarchive.archive_write_set_passphrase
c_archive_write_set_passphrase.argtypes = [c_void_p, c_char_p]
c_archive_write_set_passphrase.restype = c_int

c_archive_write_header = libarchive.archive_write_header
c_archive_write_header.argtypes = [c_void_p, c_void_p]
c_archive_write_header.restype = _check_zero_success
Expand Down
24 changes: 18 additions & 6 deletions libarchive/test_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ def chdir(path):
os.chdir(original_path)

@contextlib.contextmanager
def test_archive():
def test_archive(passphrase=None, encryption="traditional"):
with chdir(_APP_PATH):
temp_path = tempfile.mkdtemp()

output_filename = 'archive.7z'
if passphrase is not None:
output_filename = 'archive.7z'
else:
output_filename = 'archive.zip'
output_filepath = os.path.join(temp_path, output_filename)

# Also, write a source file with a unicode name that we can add to
Expand All @@ -61,10 +64,19 @@ def test_archive():
unicode_test_filepath,
]

libarchive.public.create_file(
output_filepath,
libarchive.constants.ARCHIVE_FORMAT_7ZIP,
files)
if passphrase is not None:
options = "zip:encryption={}".format(encryption)
libarchive.public.create_file(
output_filepath,
libarchive.constants.ARCHIVE_FORMAT_7ZIP,
files,
options=options,
passphrase=passphrase)
else:
libarchive.public.create_file(
output_filepath,
libarchive.constants.ARCHIVE_FORMAT_ZIP,
files)

assert \
os.path.exists(output_filepath) is True, \
Expand Down
23 changes: 20 additions & 3 deletions tests/adapters/test_archive_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,27 @@


class TestArchiveRead(unittest.TestCase):
def _test_enumerate_from_file(self, passphrase=None, encryption="traditional"):
if passphrase is not None:
with libarchive.test_support.test_archive(passphrase, encryption) as filepath:
with libarchive.adapters.archive_read.file_enumerator(filepath, passphrases=[passphrase]) as e:
list(e)
else:
with libarchive.test_support.test_archive() as filepath:
with libarchive.adapters.archive_read.file_enumerator(filepath) as e:
list(e)

def test_enumerate_from_file(self):
with libarchive.test_support.test_archive() as filepath:
with libarchive.adapters.archive_read.file_enumerator(filepath) as e:
list(e)
self._test_enumerate_from_file()

def test_enumerate_from_file_with_passphrase_traditional(self):
self._test_enumerate_from_file(passphrase="test_passphrase")

def test_enumerate_from_file_with_passphrase_aes128(self):
self._test_enumerate_from_file(passphrase="test_passphrase", encryption="aes128")

def test_enumerate_from_file_with_passphrase_aes256(self):
self._test_enumerate_from_file(passphrase="test_passphrase", encryption="aes256")

def test_enumerate_from_memory(self):
with libarchive.test_support.test_archive() as filepath:
Expand Down
51 changes: 51 additions & 0 deletions tests/adapters/test_archive_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,54 @@ def test_create_file__unicode(self):
]

self.assertEquals(actual, expected)

def test_create_file_with_passphrase_traditional(self):
self._create_file_with_passphrase("traditional")


def test_create_file_with_passphrase_aes128(self):
self._create_file_with_passphrase("aes128")


def test_create_file_with_passphrase_aes256(self):
self._create_file_with_passphrase("aes256")


def _create_file_with_passphrase(self, encryption):
with libarchive.test_support.chdir(_APP_PATH):
temp_path = tempfile.mkdtemp()

output_filename = 'archive.zip'
output_filepath = os.path.join(temp_path, output_filename)
try:
files = [
'libarchive/resources/README.md',
'libarchive/resources/requirements.txt',
]
options = "zip:encryption={}".format(encryption)
libarchive.adapters.archive_write.create_file(
output_filepath,
libarchive.constants.ARCHIVE_FORMAT_ZIP,
files,
options=options,
passphrase="test_passphrase")

assert \
os.path.exists(output_filepath) is True, \
"Test archive was not created correctly."

with libarchive.adapters.archive_read.file_enumerator(output_filepath, passphrases=["test_passphrase"]) as e:
actual = [entry.pathname for entry in e]

finally:
try:
shutil.rmtree(output_path)
except:
pass

expected = [
'libarchive/resources/README.md',
'libarchive/resources/requirements.txt',
]

self.assertEquals(actual, expected)