-
Notifications
You must be signed in to change notification settings - Fork 1
/
sourcecode_page.py
141 lines (118 loc) · 4.52 KB
/
sourcecode_page.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import base64
import datetime
import os
import re
from functools import cached_property
from typing import TYPE_CHECKING, TypedDict, Optional
import click
import requests
from pip_rating.sources.base import SourceBase
if TYPE_CHECKING:
from pip_rating.packages import Package
GITHUB_REPOSITORY_URL = "https://github.com/([^/]+)/([^/]+).*"
GITHUB_README_URL = "https://api.github.com/repos/{owner}/{repo}/readme"
PIP_INSTALL_PATTERNS = [
re.compile(r"pip3? +install +(?:-U +|--upgrade +|)([A-Za-z0-9_\-.]+)"),
re.compile(r"poetry +add +([A-Za-z0-9_\-.]+)"),
re.compile(r"pipenv +install +([A-Za-z0-9_\-.]+)"),
]
github_token = os.environ.get("GITHUB_TOKEN", "")
github_warning = False
def get_github_readme(owner: str, repo: str) -> str:
"""Get the readme content from GitHub."""
headers = {}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
try:
with requests.get(
GITHUB_README_URL.format(owner=owner, repo=repo), headers=headers
) as response:
response.raise_for_status()
content = response.json().get("content", "")
return base64.b64decode(content).decode("utf-8") if content else ""
except requests.RequestException as e:
global github_warning
if (
e.response is not None
and e.response.status_code == 403
and e.response.reason == "rate limit exceeded"
and not github_token
and not github_warning
):
click.echo(
"GitHub rate limit exceeded. Set GITHUB_TOKEN environment variable to increase the limit.",
err=True,
)
github_warning = True
elif (
e.response is not None
and e.response.status_code == 403
and e.response.reason == "rate limit exceeded"
and github_token
and not github_warning
):
click.echo(
"GitHub rate limit exceeded. Check your GITHUB_TOKEN environment variable.",
err=True,
)
github_warning = True
return ""
class Sourcecode(TypedDict):
package_in_readme: Optional[bool]
readme_content: str
class SourcecodeCacheDict(TypedDict):
package_name: str
updated_at: str
source: str
sourcecode: Sourcecode
def replace_chars(package_name: str):
"""Replace characters in package name to match the pattern in readme."""
return package_name.lower().replace("_", "-").replace(".", "-")
def search_in_readme(content: str, package_name: str) -> Optional[bool]:
"""Search for patterns in readme. If found the pattern, check if the package name is package_name.
If the package name found is package_name, return True, else continues searching. If after all
patterns are searched and no package name is found, return False. If any pattern matches,
return None.
"""
package_in_readme = None
for pattern in PIP_INSTALL_PATTERNS:
results = pattern.findall(content)
for result in results:
if result.startswith("-"):
continue
package_in_readme = replace_chars(result) == replace_chars(package_name)
if package_in_readme:
return True
return package_in_readme
class SourcecodePage(SourceBase):
source_name = "sourcecode_page"
def __init__(self, package: "Package"):
self.package = package
super().__init__(package.name)
def get_cache_data(self) -> SourcecodeCacheDict:
project_urls = self.package.pypi.package["info"].get("project_urls") or {}
content = ""
for url in project_urls.values():
github_match = re.match(GITHUB_REPOSITORY_URL, url)
if github_match:
content = get_github_readme(
github_match.group(1), github_match.group(2)
)
break
package_in_readme = search_in_readme(content, self.package.name)
return {
"package_name": self.package_name,
"updated_at": datetime.datetime.now().isoformat(),
"source": "github",
"sourcecode": {
"package_in_readme": package_in_readme,
"readme_content": content,
},
}
@cached_property
def package_in_readme(self) -> Optional[bool]:
if not self.is_cache_expired:
cache = self.get_from_cache()
else:
cache = self.save_to_cache()
return cache["sourcecode"]["package_in_readme"]