Skip to content

Commit

Permalink
pythongh-106531: Refresh zipfile._path with zipp 3.18. (python#116835)
Browse files Browse the repository at this point in the history
* pythongh-106531: Refresh zipfile._path with zipp 3.18.

* Add blurb
  • Loading branch information
jaraco authored and adorilson committed Mar 25, 2024
1 parent 4f69412 commit 2d12163
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 54 deletions.
8 changes: 6 additions & 2 deletions Lib/test/test_zipfile/_path/test_complexity.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ def make_zip_path(self, depth=1, width=1) -> zipfile.Path:
@classmethod
def make_names(cls, width, letters=string.ascii_lowercase):
"""
>>> list(TestComplexity.make_names(1))
['a']
>>> list(TestComplexity.make_names(2))
['a', 'b']
>>> list(TestComplexity.make_names(30))
['aa', 'ab', ..., 'bd']
>>> list(TestComplexity.make_names(17124))
['aaa', 'aab', ..., 'zip']
"""
# determine how many products are needed to produce width
n_products = math.ceil(math.log(width, len(letters)))
n_products = max(1, math.ceil(math.log(width, len(letters))))
inputs = (letters,) * n_products
combinations = itertools.product(*inputs)
names = map(''.join, combinations)
Expand Down Expand Up @@ -80,7 +84,7 @@ def test_glob_depth(self):
max_n=100,
min_n=1,
)
assert best <= big_o.complexities.Quadratic
assert best <= big_o.complexities.Linear

@pytest.mark.flaky
def test_glob_width(self):
Expand Down
23 changes: 8 additions & 15 deletions Lib/test/test_zipfile/_path/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import unittest
import zipfile
import zipfile._path

from ._functools import compose
from ._itertools import Counter
Expand All @@ -20,16 +21,6 @@ class itertools:
Counter = Counter


def add_dirs(zf):
"""
Given a writable zip file zf, inject directory entries for
any directories implied by the presence of children.
"""
for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
zf.writestr(name, b"")
return zf


def build_alpharep_fixture():
"""
Create a zip file with this structure:
Expand Down Expand Up @@ -76,7 +67,7 @@ def build_alpharep_fixture():

alpharep_generators = [
Invoked.wrap(build_alpharep_fixture),
Invoked.wrap(compose(add_dirs, build_alpharep_fixture)),
Invoked.wrap(compose(zipfile._path.CompleteDirs.inject, build_alpharep_fixture)),
]

pass_alpharep = parameterize(['alpharep'], alpharep_generators)
Expand Down Expand Up @@ -210,11 +201,12 @@ def test_open_write(self):
with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
strm.write('text file')

def test_open_extant_directory(self):
@pass_alpharep
def test_open_extant_directory(self, alpharep):
"""
Attempting to open a directory raises IsADirectoryError.
"""
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
zf = zipfile.Path(alpharep)
with self.assertRaises(IsADirectoryError):
zf.joinpath('b').open()

Expand All @@ -226,11 +218,12 @@ def test_open_binary_invalid_args(self, alpharep):
with self.assertRaises(ValueError):
root.joinpath('a.txt').open('rb', 'utf-8')

def test_open_missing_directory(self):
@pass_alpharep
def test_open_missing_directory(self, alpharep):
"""
Attempting to open a missing directory raises FileNotFoundError.
"""
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
zf = zipfile.Path(alpharep)
with self.assertRaises(FileNotFoundError):
zf.joinpath('z').open()

Expand Down
65 changes: 51 additions & 14 deletions Lib/zipfile/_path/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import contextlib
import pathlib
import re
import sys

from .glob import translate
from .glob import Translator


__all__ = ['Path']
Expand Down Expand Up @@ -147,6 +148,16 @@ def make(cls, source):
source.__class__ = cls
return source

@classmethod
def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile:
"""
Given a writable zip file zf, inject directory entries for
any directories implied by the presence of children.
"""
for name in cls._implied_dirs(zf.namelist()):
zf.writestr(name, b"")
return zf


class FastLookup(CompleteDirs):
"""
Expand All @@ -168,8 +179,10 @@ def _name_set(self):


def _extract_text_encoding(encoding=None, *args, **kwargs):
# stacklevel=3 so that the caller of the caller see any warning.
return io.text_encoding(encoding, 3), args, kwargs
# compute stack level so that the caller of the caller sees any warning.
is_pypy = sys.implementation.name == 'pypy'
stack_level = 3 + is_pypy
return io.text_encoding(encoding, stack_level), args, kwargs


class Path:
Expand All @@ -194,13 +207,13 @@ class Path:
Path accepts the zipfile object itself or a filename
>>> root = Path(zf)
>>> path = Path(zf)
From there, several path operations are available.
Directory iteration (including the zip file itself):
>>> a, b = root.iterdir()
>>> a, b = path.iterdir()
>>> a
Path('mem/abcde.zip', 'a.txt')
>>> b
Expand Down Expand Up @@ -238,16 +251,38 @@ class Path:
'mem/abcde.zip/b/c.txt'
At the root, ``name``, ``filename``, and ``parent``
resolve to the zipfile. Note these attributes are not
valid and will raise a ``ValueError`` if the zipfile
has no filename.
resolve to the zipfile.
>>> root.name
>>> str(path)
'mem/abcde.zip/'
>>> path.name
'abcde.zip'
>>> str(root.filename).replace(os.sep, posixpath.sep)
'mem/abcde.zip'
>>> str(root.parent)
>>> path.filename == pathlib.Path('mem/abcde.zip')
True
>>> str(path.parent)
'mem'
If the zipfile has no filename, such attribtues are not
valid and accessing them will raise an Exception.
>>> zf.filename = None
>>> path.name
Traceback (most recent call last):
...
TypeError: ...
>>> path.filename
Traceback (most recent call last):
...
TypeError: ...
>>> path.parent
Traceback (most recent call last):
...
TypeError: ...
# workaround python/cpython#106763
>>> pass
"""

__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
Expand Down Expand Up @@ -364,8 +399,10 @@ def glob(self, pattern):
raise ValueError(f"Unacceptable pattern: {pattern!r}")

prefix = re.escape(self.at)
matches = re.compile(prefix + translate(pattern)).fullmatch
return map(self._next, filter(matches, self.root.namelist()))
tr = Translator(seps='/')
matches = re.compile(prefix + tr.translate(pattern)).fullmatch
names = (data.filename for data in self.root.filelist)
return map(self._next, filter(matches, names))

def rglob(self, pattern):
return self.glob(f'**/{pattern}')
Expand Down
112 changes: 89 additions & 23 deletions Lib/zipfile/_path/glob.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,97 @@
import os
import re


def translate(pattern):
r"""
Given a glob pattern, produce a regex that matches it.
_default_seps = os.sep + str(os.altsep) * bool(os.altsep)

>>> translate('*.txt')
'[^/]*\\.txt'
>>> translate('a?txt')
'a.txt'
>>> translate('**/*')
'.*/[^/]*'

class Translator:
"""
>>> Translator('xyz')
Traceback (most recent call last):
...
AssertionError: Invalid separators
>>> Translator('')
Traceback (most recent call last):
...
AssertionError: Invalid separators
"""
return ''.join(map(replace, separate(pattern)))

seps: str

def __init__(self, seps: str = _default_seps):
assert seps and set(seps) <= set(_default_seps), "Invalid separators"
self.seps = seps

def translate(self, pattern):
"""
Given a glob pattern, produce a regex that matches it.
"""
return self.extend(self.translate_core(pattern))

def extend(self, pattern):
r"""
Extend regex for pattern-wide concerns.
Apply '(?s:)' to create a non-matching group that
matches newlines (valid on Unix).
Append '\Z' to imply fullmatch even when match is used.
"""
return rf'(?s:{pattern})\Z'

def translate_core(self, pattern):
r"""
Given a glob pattern, produce a regex that matches it.
>>> t = Translator()
>>> t.translate_core('*.txt').replace('\\\\', '')
'[^/]*\\.txt'
>>> t.translate_core('a?txt')
'a[^/]txt'
>>> t.translate_core('**/*').replace('\\\\', '')
'.*/[^/][^/]*'
"""
self.restrict_rglob(pattern)
return ''.join(map(self.replace, separate(self.star_not_empty(pattern))))

def replace(self, match):
"""
Perform the replacements for a match from :func:`separate`.
"""
return match.group('set') or (
re.escape(match.group(0))
.replace('\\*\\*', r'.*')
.replace('\\*', rf'[^{re.escape(self.seps)}]*')
.replace('\\?', r'[^/]')
)

def restrict_rglob(self, pattern):
"""
Raise ValueError if ** appears in anything but a full path segment.
>>> Translator().translate('**foo')
Traceback (most recent call last):
...
ValueError: ** must appear alone in a path segment
"""
seps_pattern = rf'[{re.escape(self.seps)}]+'
segments = re.split(seps_pattern, pattern)
if any('**' in segment and segment != '**' for segment in segments):
raise ValueError("** must appear alone in a path segment")

def star_not_empty(self, pattern):
"""
Ensure that * will not match an empty segment.
"""

def handle_segment(match):
segment = match.group(0)
return '?*' if segment == '*' else segment

not_seps_pattern = rf'[^{re.escape(self.seps)}]+'
return re.sub(not_seps_pattern, handle_segment, pattern)


def separate(pattern):
Expand All @@ -25,16 +104,3 @@ def separate(pattern):
['a', '[?]', 'txt']
"""
return re.finditer(r'([^\[]+)|(?P<set>[\[].*?[\]])|([\[][^\]]*$)', pattern)


def replace(match):
"""
Perform the replacements for a match from :func:`separate`.
"""

return match.group('set') or (
re.escape(match.group(0))
.replace('\\*\\*', r'.*')
.replace('\\*', r'[^/]*')
.replace('\\?', r'.')
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Refreshed zipfile._path from `zipp 3.18
<https://zipp.readthedocs.io/en/latest/history.html#v3-18-0>`_, providing
better compatibility for PyPy, better glob performance for deeply nested
zipfiles, and providing internal access to ``CompleteDirs.inject`` for use
in other tests (like importlib.resources).

0 comments on commit 2d12163

Please sign in to comment.