Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pypichecker: Add checker for PyPI packages
- Loading branch information
Showing
4 changed files
with
199 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import logging | ||
from datetime import datetime | ||
from distutils.version import StrictVersion, LooseVersion | ||
import operator | ||
import re | ||
import typing as t | ||
|
||
import requests | ||
|
||
from ..lib.externaldata import Checker, ExternalFile | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
PYPI_INDEX = "https://pypi.org/pypi" | ||
OPERATORS = { | ||
"<": operator.lt, | ||
"<=": operator.le, | ||
">": operator.gt, | ||
">=": operator.ge, | ||
"==": operator.eq, | ||
"!=": operator.ne, | ||
} | ||
BDIST_RE = re.compile(r"^(\S+)-(\d[\d.]*\d)-(\S+)-(\S+)-(\S+).whl$") | ||
|
||
|
||
def _version_matches(version: str, constraints: t.List[t.Tuple[str, str]]): | ||
for ver_oper, ver_limit in constraints: | ||
oper = OPERATORS[ver_oper] | ||
try: | ||
matches = oper(StrictVersion(version), StrictVersion(ver_limit)) | ||
except ValueError: | ||
matches = oper(LooseVersion(version), LooseVersion(ver_limit)) | ||
return matches | ||
return None | ||
|
||
|
||
def _filter_releases( | ||
pypy_releases: t.Dict[str, t.List[t.Dict]], | ||
constraints: t.List[t.Tuple[str, str]], | ||
packagetype: str, | ||
) -> t.Generator[t.Tuple[str, t.List[t.Dict], datetime], None, None]: | ||
for pypi_version, pypi_downloads in pypy_releases.items(): | ||
downloads = [] | ||
dates = [] | ||
if not _version_matches(pypi_version, constraints): | ||
continue | ||
for download in pypi_downloads: | ||
if download["packagetype"] == packagetype: | ||
downloads.append(download) | ||
dates.append( | ||
datetime.fromisoformat(download["upload_time_iso_8601"].rstrip("Z")) | ||
) | ||
if downloads: | ||
yield (pypi_version, downloads, max(dates)) | ||
|
||
|
||
class Wheel(t.NamedTuple): | ||
name: str | ||
version: str | ||
python_version: str | ||
python_abi: str | ||
platform: str | ||
|
||
@classmethod | ||
def parse(cls, wheel_filename: str): | ||
match = BDIST_RE.match(wheel_filename) | ||
assert match is not None, f"{wheel_filename} didn't match {BDIST_RE.pattern}" | ||
return cls(*match.groups()) | ||
|
||
def is_supported(self): | ||
return self.python_abi == "none" and self.platform == "any" | ||
|
||
|
||
class PyPIChecker(Checker): | ||
CHECKER_DATA_TYPE = "pypi" | ||
|
||
def __init__(self): | ||
self.session = requests.Session() | ||
|
||
def check(self, external_data): | ||
package_name = external_data.checker_data["name"] | ||
package_type = external_data.checker_data.get("packagetype", "sdist") | ||
with self.session.get(f"{PYPI_INDEX}/{package_name}/json") as response: | ||
response.raise_for_status() | ||
pypi_data = response.json() | ||
|
||
if "versions" in external_data.checker_data: | ||
constraints = [(o, l) for o, l in external_data.checker_data["versions"]] | ||
else: | ||
constraints = [] | ||
|
||
releases = sorted( | ||
_filter_releases(pypi_data["releases"], constraints, package_type), | ||
key=lambda r: r[2], | ||
) | ||
|
||
pypi_version, pypi_downloads, pypi_date = releases[-1] | ||
|
||
pypi_download = None | ||
for pypi_download in pypi_downloads: | ||
if package_type == "bdist_wheel": | ||
wheel = Wheel.parse(pypi_download["filename"]) | ||
if not wheel.is_supported(): | ||
continue | ||
break | ||
else: | ||
log.error("Couldn't find %s for package %s", package_type, package_name) | ||
return | ||
|
||
new_version = ExternalFile( | ||
url=pypi_download["url"], | ||
checksum=pypi_download["digests"]["sha256"], | ||
size=pypi_download["size"], | ||
version=pypi_version, | ||
timestamp=pypi_date, | ||
) | ||
external_data.set_new_version(new_version) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
id: com.valvesoftware.Steam | ||
modules: | ||
|
||
- name: python-modules | ||
sources: | ||
|
||
- type: file | ||
url: https://files.pythonhosted.org/packages/6d/38/c21ef5034684ffc0412deefbb07d66678332290c14bb5269c85145fbd55e/setuptools-50.3.2-py3-none-any.whl | ||
sha256: 2c242a0856fbad7efbe560df4a7add9324f340cf48df43651e9604924466794a | ||
x-checker-data: | ||
type: pypi | ||
name: setuptools | ||
packagetype: bdist_wheel | ||
|
||
- type: file | ||
url: https://files.pythonhosted.org/packages/64/c2/b80047c7ac2478f9501676c988a5411ed5572f35d1beff9cae07d321512c/PyYAML-5.3.1.tar.gz | ||
sha256: b8eac752c5e14d3eca0e6dd9199cd627518cb5ec06add0de9d32baeee6fe645d | ||
x-checker-data: | ||
type: pypi | ||
name: PyYAML | ||
packagetype: sdist | ||
|
||
- type: file | ||
url: https://files.pythonhosted.org/packages/7a/c2/bf87cef932c45cb7b7a79a0a954e3307fcff209c7639182a2b9ae0127959/vdf-3.1-py2.py3-none-any.whl | ||
sha256: a5da182b3ef888d45f39862725bc7bb2836515c9fc329843001e506e73bb5cd4 | ||
x-checker-data: | ||
type: pypi | ||
name: vdf | ||
versions: | ||
- ["==", "3.2"] | ||
packagetype: bdist_wheel | ||
|
||
|
||
- type: file | ||
url: "https://files.pythonhosted.org/packages/3e/02/b09732ca4b14405ff159c470a612979acfc6e8645dc32f83ea0129709f7a/Pillow-7.2.0.tar.gz" | ||
sha256: "97f9e7953a77d5a70f49b9a48da7776dc51e9b738151b22dacf101641594a626" | ||
x-checker-data: | ||
type: pypi | ||
name: Pillow | ||
packagetype: bdist_wheel |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import os | ||
import unittest | ||
|
||
from src.checker import ManifestChecker | ||
from src.lib.utils import init_logging | ||
|
||
TEST_MANIFEST = os.path.join( | ||
os.path.dirname(__file__), "com.valvesoftware.Steam.yml" | ||
) | ||
|
||
|
||
class TestRustChecker(unittest.TestCase): | ||
def setUp(self): | ||
init_logging() | ||
|
||
def test_check(self): | ||
checker = ManifestChecker(TEST_MANIFEST) | ||
ext_data = checker.check() | ||
|
||
self.assertEqual(len(ext_data), 4) | ||
for data in ext_data: | ||
if data.filename != "Pillow-7.2.0.tar.gz": | ||
self.assertIsNotNone(data.new_version) | ||
self.assertIsNotNone(data.new_version.url) | ||
self.assertIsNotNone(data.new_version.checksum) | ||
self.assertIsNotNone(data.new_version.version) | ||
self.assertNotEqual(data.new_version.url, data.current_version.url) | ||
self.assertNotEqual(data.new_version.checksum, data.current_version.checksum) | ||
if data.filename == "setuptools-50.3.2-py3-none-any.whl": | ||
self.assertRegex(data.new_version.url, r"https://files.pythonhosted.org/packages/[a-f0-9/]+/setuptools-[\d\.]+-[\S\.]+-none-any.whl") | ||
elif data.filename == "PyYAML-5.3.1.tar.gz": | ||
self.assertRegex(data.new_version.url, r"https://files.pythonhosted.org/packages/[a-f0-9/]+/PyYAML-[\d\.]+.(tar.(gz|xz|bz2)|zip)") | ||
elif data.filename == "vdf-3.1-py2.py3-none-any.whl": | ||
self.assertRegex(data.new_version.url, r"https://files.pythonhosted.org/packages/[a-f0-9/]+/vdf-[\d\.]+-[\S\.]+-none-any.whl") | ||
self.assertEqual(data.new_version.version, "3.2") | ||
elif data.filename == "Pillow-7.2.0.tar.gz": | ||
self.assertIsNone(data.new_version) | ||
else: | ||
self.fail(f"Unknown data {data.filename}") | ||
|