diff --git a/common/devpi_common/archive.py b/common/devpi_common/archive.py index bcba05573..fa0f2091a 100644 --- a/common/devpi_common/archive.py +++ b/common/devpi_common/archive.py @@ -2,10 +2,10 @@ remotely based on some code from https://pypi.org/project/Archive/0.3/ """ from io import BytesIO +from pathlib import Path import os import tarfile import zipfile -import py class UnsupportedArchive(ValueError): @@ -86,12 +86,15 @@ def getfile(self, name): return self._archive.extractfile(member) def extract(self, to_path=''): - to_path = py.path.local(to_path) + to_path = Path(to_path) members = self._archive.getmembers() for member in members: - target = to_path.join(member.name, abs=True) - if not target.relto(to_path): - raise ValueError("archive name %r out of bound" % (member.name,)) + target = to_path.joinpath(member.name) + try: + target.relative_to(to_path) + except ValueError as e: + raise ValueError( + f"archive name {member.name!r} out of bound") from e self._archive.extractall(str(to_path)) @@ -114,45 +117,50 @@ def getfile(self, name): def extract(self, to_path='', safe=False): # XXX unify with TarFile.extract - basedir = py.path.local(to_path) + basedir = Path(to_path) unzipfile = self._archive members = unzipfile.namelist() for name in members: - fpath = basedir.join(name, abs=True) - if not fpath.relto(basedir): - raise ValueError("out of bound path name:" + name) - if name.endswith(basedir.sep) or name[-1] == "/": - fpath.ensure(dir=1) + fpath = basedir.joinpath(name) + try: + fpath.relative_to(basedir) + except ValueError as e: + raise ValueError( + f"archive name {name!r} out of bound") from e + if name.endswith((os.sep, "/")): + fpath.mkdir(parents=True, exist_ok=True) else: - fpath.dirpath().ensure(dir=1) + fpath.parent.mkdir(parents=True, exist_ok=True) with fpath.open("wb") as f: f.write(unzipfile.read(name)) +def _zip_dir(f, basedir): + with zipfile.ZipFile(f, "w") as zf: + _writezip(zf, basedir) + + def zip_dir(basedir, dest=None): + basedir = Path(str(basedir)) if dest is None: - f = BytesIO() - else: - f = open(str(dest), "wb") - zip = zipfile.ZipFile(f, "w") - try: - _writezip(zip, basedir) - finally: - zip.close() - if dest is None: - return f.getvalue() + with BytesIO() as f: + _zip_dir(f, basedir) + return f.getvalue() + dest = Path(str(dest)) + with dest.open('wb') as f: + _zip_dir(f, basedir) def _writezip(zip, basedir): - for p in basedir.visit(): - if p.check(dir=1): - if not p.listdir(): - path = p.relto(basedir) + "/" - zipinfo = zipfile.ZipInfo(path) + assert isinstance(basedir, Path) + for p in basedir.rglob("*"): + if p.is_dir(): + if not any(p.iterdir()): + zipinfo = zipfile.ZipInfo(f"{p.relative_to(basedir)}/") zip.writestr(zipinfo, "") else: - path = p.relto(basedir) - zip.writestr(path, p.read("rb")) + path = p.relative_to(basedir) + zip.writestr(str(path), p.read_bytes()) def zip_dict(contentdict): diff --git a/common/pyproject.toml b/common/pyproject.toml index ef64eb428..c718b709e 100644 --- a/common/pyproject.toml +++ b/common/pyproject.toml @@ -10,7 +10,6 @@ description = "Utilities jointly used by devpi-server, devpi-client and others." dependencies = [ "lazy", "packaging>=22", - "py>=1.4.20", "requests>=2.3.0", ] requires-python = ">=3.7" diff --git a/common/testing/test_archive.py b/common/testing/test_archive.py index 66a17c174..52f36679e 100644 --- a/common/testing/test_archive.py +++ b/common/testing/test_archive.py @@ -4,15 +4,11 @@ from devpi_common.archive import zip_dir from io import BytesIO from subprocess import Popen, PIPE -import py import pytest import shutil import sys -datadir = py.path.local(__file__).dirpath("data") - - def check_files(tmpdir): assert tmpdir.join("1").isfile() assert tmpdir.join("sub", "1").isfile() @@ -104,9 +100,10 @@ def test_printdir(self, archive, capsys): def test_tarfile_outofbound(tmpdir): - with Archive(datadir.join("slash.tar.gz")) as archive: - with pytest.raises(ValueError): - archive.extract(tmpdir) + from pathlib import Path + path = Path(__file__).parent / "data" / "slash.tar.gz" + with Archive(path) as archive, pytest.raises(ValueError, match="archive name '.*' out of bound"): + archive.extract(tmpdir) def test_zip_dict(tmpdir): @@ -123,14 +120,17 @@ def test_zip_dir(tmpdir): dest = tmpdir.join("dest.zip") source.ensure("file") source.ensure("sub", "subfile") + source.ensure("empty", dir=True) zip_dir(source, dest) with Archive(dest) as archive: archive.extract(newdest) assert newdest.join("file").isfile() assert newdest.join("sub", "subfile").isfile() + assert newdest.join("empty").isdir() newdest.remove() with Archive(BytesIO(zip_dir(source))) as archive: archive.extract(newdest) assert newdest.join("file").isfile() assert newdest.join("sub", "subfile").isfile() + assert newdest.join("empty").isdir()