Skip to content

Commit

Permalink
fix: Respect zipped symlinks (#1140)
Browse files Browse the repository at this point in the history
  • Loading branch information
bubba-h57 authored and jfuss committed Jul 26, 2019
1 parent 3370cbf commit ad9a171
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 28 deletions.
71 changes: 66 additions & 5 deletions samcli/local/lambdafn/zip.py
Expand Up @@ -19,6 +19,70 @@

LOG = logging.getLogger(__name__)

S_IFLNK = 0xA


def _is_symlink(file_info):
"""
Check the upper 4 bits of the external attribute for a symlink.
See: https://unix.stackexchange.com/questions/14705/the-zip-formats-external-file-attribute
Parameters
----------
file_info : zipfile.ZipInfo
The ZipInfo for a ZipFile
Returns
-------
bool
A response regarding whether the ZipInfo defines a symlink or not.
"""

return (file_info.external_attr >> 28) == 0xA


def _extract(file_info, output_dir, zip_ref):
"""
Unzip the given file into the given directory while preserving file permissions in the process.
Parameters
----------
file_info : zipfile.ZipInfo
The ZipInfo for a ZipFile
output_dir : str
Path to the directory where the it should be unzipped to
zip_ref : zipfile.ZipFile
The ZipFile we are working with.
Returns
-------
string
Returns the target path the Zip Entry was extracted to.
"""

# Handle any regular file/directory entries
if not _is_symlink(file_info):
return zip_ref.extract(file_info, output_dir)

source = zip_ref.read(file_info.filename).decode('utf8')
link_name = os.path.normpath(os.path.join(output_dir, file_info.filename))

# make leading dirs if needed
leading_dirs = os.path.dirname(link_name)
if not os.path.exists(leading_dirs):
os.makedirs(leading_dirs)

# If the link already exists, delete it or symlink() fails
if os.path.lexists(link_name):
os.remove(link_name)

# Create a symbolic link pointing to source named link_name.
os.symlink(source, link_name)

return link_name


def unzip(zip_file_path, output_dir, permission=None):
"""
Expand All @@ -40,10 +104,7 @@ def unzip(zip_file_path, output_dir, permission=None):

# For each item in the zip file, extract the file and set permissions if available
for file_info in zip_ref.infolist():
name = file_info.filename
extracted_path = os.path.join(output_dir, name)

zip_ref.extract(name, output_dir)
extracted_path = _extract(file_info, output_dir, zip_ref)
_set_permissions(file_info, extracted_path)

_override_permissions(extracted_path, permission)
Expand Down Expand Up @@ -81,7 +142,7 @@ def _set_permissions(zip_file_info, extracted_path):
"""

# Permission information is stored in first two bytes.
permission = zip_file_info.external_attr >> 16
permission = (zip_file_info.external_attr >> 16) & 511
if not permission:
# Zips created on certain Windows machines, however, might not have any permission information on them.
# Skip setting a permission on these files.
Expand Down
154 changes: 131 additions & 23 deletions tests/unit/local/lambdafn/test_zip.py
Expand Up @@ -14,51 +14,102 @@

class TestUnzipWithPermissions(TestCase):

files_with_permissions = {
"folder1/1.txt": 0o644,
"folder1/2.txt": 0o777,
"folder2/subdir/1.txt": 0o666,
"folder2/subdir/2.txt": 0o400
"""
External Attribute Magic = type + permission + DOS is-dir flag?
TTTTugsrwxrwxrwx0000000000ADVSHR
^^^^____________________________ File Type [UPPER 4 bits, 29-32]
^___________________________ setuid [bit 28]
^__________________________ setgid [bit 27]
^_________________________ sticky [bit 26]
^^^^^^^^^________________ Permissions [bits 17-25]
^^^^^^^^________ Other [bits 9-16]
^^^^^^^^ DOS attribute bits: [LOWER 8 bits]
Interesting File Types
S_IFDIR 0040000 /* directory */
S_IFREG 0100000 /* regular */
S_IFLNK 0120000 /* symbolic link */
See: https://unix.stackexchange.com/questions/14705/%20the-zip-formats-external-file-attribute
"""

files_with_external_attr = {
"1.txt": {
"file_type": 0o10,
"contents": b'foo',
"permissions": 0o644,
},
"folder1/2.txt": {
"file_type": 0o10,
"contents": b'bar',
"permissions": 0o777,
},
"folder2/subdir/3.txt": {
"file_type": 0o10,
"contents": b'foo bar',
"permissions": 0o666,
},
"folder2/subdir/4.txt": {
"file_type": 0o10,
"contents": b'bar foo',
"permissions": 0o400,
},
"symlinkToF2": {
"file_type": 0o12,
"contents": b'1.txt',
"permissions": 0o644,
}
}

expected_files = 0
expected_symlinks = 0
actual_files = 0
actual_symlinks = 0

@parameterized.expand([param(True), param(False)])
def test_must_unzip(self, check_permissions):
def test_must_unzip(self, verify_external_attributes):
self._reset(verify_external_attributes)

with self._create_zip(self.files_with_permissions, check_permissions) as zip_file_name:
with self._create_zip(self.files_with_external_attr, verify_external_attributes) as zip_file_name:
with self._temp_dir() as extract_dir:

unzip(zip_file_name, extract_dir)

for root, dirs, files in os.walk(extract_dir):
for file in files:
filepath = os.path.join(extract_dir, root, file)
perm = oct(stat.S_IMODE(os.stat(filepath).st_mode))
key = os.path.relpath(filepath, extract_dir)
expected_permission = oct(self.files_with_permissions[key])
self._verify_file(extract_dir, file, root, verify_external_attributes)

self.assertIn(key, self.files_with_permissions)
self._verify_file_count(verify_external_attributes)

if check_permissions:
self.assertEquals(expected_permission,
perm,
"File {} has wrong permission {}".format(key, perm))
@contextmanager
def _reset(self, verify_external_attributes):
self.expected_files = 0
self.expected_symlinks = 0
self.actual_files = 0
self.actual_symlinks = 0
if verify_external_attributes:
for filename, data in self.files_with_external_attr.items():
if data["file_type"] == 0o12:
self.expected_symlinks += 1
elif data["file_type"] == 0o10:
self.expected_files += 1

@contextmanager
def _create_zip(self, files_with_permissions, add_permissions=True):
def _create_zip(self, file_dict, add_attributes=True):

zipfilename = None
data = b'hello world'
try:
zipfilename = NamedTemporaryFile(mode="w+b").name

zf = zipfile.ZipFile(zipfilename, "w", zipfile.ZIP_DEFLATED)
for filename, perm in files_with_permissions.items():
for filename, data in file_dict.items():

fileinfo = zipfile.ZipInfo(filename)

if add_permissions:
fileinfo.external_attr = perm << 16
if add_attributes:
fileinfo.external_attr = (data["file_type"] << 28) | (data["permissions"] << 16)

zf.writestr(fileinfo, data)
zf.writestr(fileinfo, data["contents"])

zf.close()

Expand All @@ -68,6 +119,63 @@ def _create_zip(self, files_with_permissions, add_permissions=True):
if zipfilename:
os.remove(zipfilename)

@contextmanager
def _verify_file(self, extract_dir, file, root, verify_external_attributes):
filepath = os.path.join(extract_dir, root, file)
key = os.path.relpath(filepath, extract_dir)
mode = os.lstat(filepath).st_mode
actual_permissions = oct(stat.S_IMODE(mode))
expected_permission = oct(self.files_with_external_attr[key]["permissions"])

self.assertIn(key, self.files_with_external_attr)
if verify_external_attributes:
self._verify_external_attributes(
actual_permissions,
expected_permission,
key,
mode)

@contextmanager
def _verify_external_attributes(self, actual_permissions, expected_permission, key,
mode):
if stat.S_ISREG(mode):
self.assertTrue(
self.files_with_external_attr[key]["file_type"] == 0o10,
"Expected a regular file."
)
self.actual_files += 1
elif stat.S_ISLNK(mode):
self.assertTrue(
self.files_with_external_attr[key]["file_type"] == 0o12,
"Expected a Symlink."
)
self.actual_symlinks += 1
return

self.assertEquals(
expected_permission,
actual_permissions,
"File {} has wrong permission {}, expected {}.".format(
key,
actual_permissions,
expected_permission
)
)

@contextmanager
def _verify_file_count(self, verify_external_attributes):
if verify_external_attributes:
self.assertEqual(
self.expected_files,
self.actual_files,
"Expected {} files but found {}.".format(self.expected_files, self.actual_files)
)
self.assertEqual(
self.expected_symlinks,
self.actual_symlinks,
"Expected {} symlinks but found {}.".format(self.expected_symlinks, self.actual_symlinks)
)

@contextmanager
def _temp_dir(self):
name = None
Expand Down

0 comments on commit ad9a171

Please sign in to comment.