Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "fix: Respect zipped symlinks (#1140)"" #1482

Merged
merged 3 commits into from Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
79 changes: 72 additions & 7 deletions samcli/local/lambdafn/zip.py
Expand Up @@ -15,6 +15,70 @@

LOG = logging.getLogger(__name__)

S_IFLNK = 0xA


def _is_symlink(file_info):
"""
Check the upper 4 bits of the external attribute for a symlink.
See: https://unix.stackexchange.com/questions/14705/the-zip-formats-external-file-attribute

Parameters
----------
file_info : zipfile.ZipInfo
The ZipInfo for a ZipFile

Returns
-------
bool
A response regarding whether the ZipInfo defines a symlink or not.
"""

return (file_info.external_attr >> 28) == 0xA


def _extract(file_info, output_dir, zip_ref):
"""
Unzip the given file into the given directory while preserving file permissions in the process.

Parameters
----------
file_info : zipfile.ZipInfo
The ZipInfo for a ZipFile

output_dir : str
Path to the directory where the it should be unzipped to

zip_ref : zipfile.ZipFile
The ZipFile we are working with.

Returns
-------
string
Returns the target path the Zip Entry was extracted to.
"""

# Handle any regular file/directory entries
if not _is_symlink(file_info):
return zip_ref.extract(file_info, output_dir)

source = zip_ref.read(file_info.filename).decode("utf8")
link_name = os.path.normpath(os.path.join(output_dir, file_info.filename))

# make leading dirs if needed
leading_dirs = os.path.dirname(link_name)
if not os.path.exists(leading_dirs):
os.makedirs(leading_dirs)

# If the link already exists, delete it or symlink() fails
if os.path.lexists(link_name):
os.remove(link_name)

# Create a symbolic link pointing to source named link_name.
os.symlink(source, link_name)

return link_name


def unzip(zip_file_path, output_dir, permission=None):
"""
Expand All @@ -36,15 +100,16 @@ def unzip(zip_file_path, output_dir, permission=None):

# For each item in the zip file, extract the file and set permissions if available
for file_info in zip_ref.infolist():
name = file_info.filename
extracted_path = os.path.join(output_dir, name)

zip_ref.extract(name, output_dir)
_set_permissions(file_info, extracted_path)
extracted_path = _extract(file_info, output_dir, zip_ref)

_override_permissions(extracted_path, permission)
# If the extracted_path is a symlink, do not set the permissions. If the target of the symlink does not
# exist, then os.chmod will fail with FileNotFoundError
if not os.path.islink(extracted_path):
_set_permissions(file_info, extracted_path)
_override_permissions(extracted_path, permission)

_override_permissions(output_dir, permission)
if not os.path.islink(extracted_path):
_override_permissions(output_dir, permission)


def _override_permissions(path, permission):
Expand Down
17 changes: 17 additions & 0 deletions tests/integration/local/invoke/test_integrations_cli.py
Expand Up @@ -400,6 +400,23 @@ def test_skip_pull_image_in_env_var(self):
process_stderr = stderr.strip()
self.assertIn("Requested to skip pulling images", process_stderr.decode("utf-8"))

@skipIf(SKIP_LAYERS_TESTS, "Skip layers tests in Appveyor only")
@pytest.mark.flaky(reruns=3)
def test_invoke_returns_execpted_results_from_git_function(self):
command_list = self.get_command_list(
"GitLayerFunction", template_path=self.template_path, event_path=self.event_path
)

process = Popen(command_list, stdout=PIPE)
try:
stdout, _ = process.communicate(timeout=TIMEOUT)
except TimeoutExpired:
process.kill()
raise

process_stdout = stdout.strip()
self.assertEqual(process_stdout.decode("utf-8"), '"git init passed"')


class TestUsingConfigFiles(InvokeIntegBase):
template = Path("template.yml")
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/testdata/invoke/main.py
@@ -1,6 +1,7 @@
import time
import os
import sys
import subprocess

print ("Loading function")

Expand Down Expand Up @@ -46,3 +47,10 @@ def echo_event(event, context):

def raise_exception(event, context):
raise Exception("Lambda is raising an exception")


def execute_git(event, context):
return_code = subprocess.call(['git', 'init', '/tmp/samtesting'])
assert return_code == 0

return "git init passed"
9 changes: 9 additions & 0 deletions tests/integration/testdata/invoke/template.yml
Expand Up @@ -179,3 +179,12 @@ Resources:
Handler: main.echo_event
Runtime: python3.6
CodeUri: .

GitLayerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: .
Handler: main.execute_git
Runtime: python3.8
Layers:
- arn:aws:lambda:us-east-1:553035198032:layer:git-lambda2:5
120 changes: 96 additions & 24 deletions tests/unit/local/lambdafn/test_zip.py
Expand Up @@ -7,7 +7,6 @@
from tempfile import NamedTemporaryFile, mkdtemp
from unittest import TestCase
from unittest import skipIf

from unittest.mock import Mock, patch
from parameterized import parameterized, param

Expand All @@ -19,51 +18,82 @@

@skipIf(SKIP_UNZIP_PERMISSION_TESTS, "Skip UnZip Permissions tests in Windows only")
class TestUnzipWithPermissions(TestCase):
files_with_permissions = {
"folder1/1.txt": 0o644,
"folder1/2.txt": 0o777,
"folder2/subdir/1.txt": 0o666,
"folder2/subdir/2.txt": 0o400,
"""
External Attribute Magic = type + permission + DOS is-dir flag?

TTTTugsrwxrwxrwx0000000000ADVSHR
^^^^____________________________ File Type [UPPER 4 bits, 29-32]
^___________________________ setuid [bit 28]
^__________________________ setgid [bit 27]
^_________________________ sticky [bit 26]
^^^^^^^^^________________ Permissions [bits 17-25]
^^^^^^^^________ Other [bits 9-16]
^^^^^^^^ DOS attribute bits: [LOWER 8 bits]

Interesting File Types
S_IFDIR 0040000 /* directory */
S_IFREG 0100000 /* regular */
S_IFLNK 0120000 /* symbolic link */

See: https://unix.stackexchange.com/questions/14705/%20the-zip-formats-external-file-attribute
"""

files_with_external_attr = {
"1.txt": {"file_type": 0o10, "contents": b"foo", "permissions": 0o644},
"folder1/2.txt": {"file_type": 0o10, "contents": b"bar", "permissions": 0o777},
"folder2/subdir/3.txt": {"file_type": 0o10, "contents": b"foo bar", "permissions": 0o666},
"folder2/subdir/4.txt": {"file_type": 0o10, "contents": b"bar foo", "permissions": 0o400},
"symlinkToF2": {"file_type": 0o12, "contents": b"1.txt", "permissions": 0o644},
}

expected_files = 0
expected_symlinks = 0
actual_files = 0
actual_symlinks = 0

@parameterized.expand([param(True), param(False)])
def test_must_unzip(self, check_permissions):
def test_must_unzip(self, verify_external_attributes):
self._reset(verify_external_attributes)

with self._create_zip(self.files_with_permissions, check_permissions) as zip_file_name:
with self._create_zip(self.files_with_external_attr, verify_external_attributes) as zip_file_name:
with self._temp_dir() as extract_dir:

unzip(zip_file_name, extract_dir)

for root, dirs, files in os.walk(extract_dir):
for file in files:
filepath = os.path.join(extract_dir, root, file)
perm = oct(stat.S_IMODE(os.stat(filepath).st_mode))
key = os.path.relpath(filepath, extract_dir)
expected_permission = oct(self.files_with_permissions[key])
self._verify_file(extract_dir, file, root, verify_external_attributes)

self.assertIn(key, self.files_with_permissions)
self._verify_file_count(verify_external_attributes)

if check_permissions:
self.assertEqual(
expected_permission, perm, "File {} has wrong permission {}".format(key, perm)
)
@contextmanager
def _reset(self, verify_external_attributes):
self.expected_files = 0
self.expected_symlinks = 0
self.actual_files = 0
self.actual_symlinks = 0
if verify_external_attributes:
for filename, data in self.files_with_external_attr.items():
if data["file_type"] == 0o12:
self.expected_symlinks += 1
elif data["file_type"] == 0o10:
self.expected_files += 1

@contextmanager
def _create_zip(self, files_with_permissions, add_permissions=True):
def _create_zip(self, file_dict, add_attributes=True):

zipfilename = None
data = b"hello world"
try:
zipfilename = NamedTemporaryFile(mode="w+b").name

zf = zipfile.ZipFile(zipfilename, "w", zipfile.ZIP_DEFLATED)
for filename, perm in files_with_permissions.items():
for filename, data in file_dict.items():

fileinfo = zipfile.ZipInfo(filename)

if add_permissions:
fileinfo.external_attr = perm << 16
if add_attributes:
fileinfo.external_attr = (data["file_type"] << 28) | (data["permissions"] << 16)

zf.writestr(fileinfo, data)
zf.writestr(fileinfo, data["contents"])

zf.close()

Expand All @@ -73,6 +103,48 @@ def _create_zip(self, files_with_permissions, add_permissions=True):
if zipfilename:
os.remove(zipfilename)

@contextmanager
def _verify_file(self, extract_dir, file, root, verify_external_attributes):
filepath = os.path.join(extract_dir, root, file)
key = os.path.relpath(filepath, extract_dir)
mode = os.lstat(filepath).st_mode
actual_permissions = oct(stat.S_IMODE(mode))
expected_permission = oct(self.files_with_external_attr[key]["permissions"])

self.assertIn(key, self.files_with_external_attr)
if verify_external_attributes:
self._verify_external_attributes(actual_permissions, expected_permission, key, mode)

@contextmanager
def _verify_external_attributes(self, actual_permissions, expected_permission, key, mode):
if stat.S_ISREG(mode):
self.assertTrue(self.files_with_external_attr[key]["file_type"] == 0o10, "Expected a regular file.")
self.actual_files += 1
elif stat.S_ISLNK(mode):
self.assertTrue(self.files_with_external_attr[key]["file_type"] == 0o12, "Expected a Symlink.")
self.actual_symlinks += 1
return

self.assertEqual(
expected_permission,
actual_permissions,
"File {} has wrong permission {}, expected {}.".format(key, actual_permissions, expected_permission),
)

@contextmanager
def _verify_file_count(self, verify_external_attributes):
if verify_external_attributes:
self.assertEqual(
self.expected_files,
self.actual_files,
"Expected {} files but found {}.".format(self.expected_files, self.actual_files),
)
self.assertEqual(
self.expected_symlinks,
self.actual_symlinks,
"Expected {} symlinks but found {}.".format(self.expected_symlinks, self.actual_symlinks),
)

@contextmanager
def _temp_dir(self):
name = None
Expand Down