Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions GenerateReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def main() -> None:
if release_info:
for entry in release_info:
if 'requires_dist' in entry:
logger.debug(f"requires_dist: \n{requires_dist}")
logger.debug(f"requires_dist: \n{entry['requires_dist']}")
cur_ver_deps.extend(entry['requires_dist'])
break
if not cur_ver_deps:
Expand Down Expand Up @@ -199,7 +199,7 @@ def main() -> None:

# Get Upgrade Instruction
if suggested in ("unknown", "Up-to-date"):
instruction = {"base_package": f"{pkg}=={version}", "dependencies": []}
instruction = {"base_package": f"{pkg}=={cur_ver}", "dependencies": []}
else:
instruction = generate_upgrade_instruction(pkg, suggested)

Expand Down
3 changes: 2 additions & 1 deletion utils/CheckDependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from datetime import datetime, timedelta
import os
from dotenv import load_dotenv

# ---------------- Logging Configuration ----------------
from logging import Formatter, StreamHandler
Expand Down Expand Up @@ -44,7 +45,7 @@ def formatTime(self, record, datefmt=None):

# Step 1: Load base package list and save original line
base_packages = {}
with open(requirements_file) as f:
with open(REQUIREMENTS_FILE) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
Expand Down
2 changes: 1 addition & 1 deletion utils/CustodianUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ def custom_sort_key(row: dict, CustomOrder: dict) -> tuple:

PkgName = row.get("Package Name", "").lower()

return (CustodianRank, PkgTypeRank, PkgName)
return (CustodianRank, PkgTypeRank, PkgName)
2 changes: 1 addition & 1 deletion utils/PyPiUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ def GetPyPiInfo(package: str) -> dict | None:
return r.json()
except Exception as e:
logger.warning(f"Fetch PyPI metadata for {package} failed: {e}")
return None
return None
30 changes: 8 additions & 22 deletions utils/SGTUtils.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,28 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Custom logging formatter that ensures timestamps are in Asia/Singapore timezone.
Compatible with Python 3.9+ (using zoneinfo) and Python <3.9 (fallback to pytz).
"""
"""Utilities for timezone-aware logging and timestamps."""

from logging import Formatter
from datetime import datetime

try:
from zoneinfo import ZoneInfo # Python 3.9+
except ImportError:
from pytz import timezone as ZoneInfo # fallback for Python <3.9
except ImportError: # pragma: no cover - fallback for <3.9
from pytz import timezone as ZoneInfo

SG_TZ = ZoneInfo("Asia/Singapore")

class SGTFormatter(Formatter):
"""
Custom logging formatter that outputs timestamps in Asia/Singapore timezone.
"""
"""Logging formatter that outputs times in Singapore timezone."""

def formatTime(self, record, datefmt=None):
ct = datetime.fromtimestamp(record.created, SG_TZ)
if datefmt:
return ct.strftime(datefmt)
return ct.isoformat()

# Timezone support for UTC+8 (Singapore)
try:
from zoneinfo import ZoneInfo # Python 3.9+
except ImportError:
from pytz import timezone as ZoneInfo # fallback for Python <3.9
SG_TZ = ZoneInfo("Asia/Singapore")

def now_sg() -> datetime:
"""
Get the current datetime in Singapore timezone (UTC+8).
"""Return current time in Singapore timezone (UTC+8)."""
return datetime.now(SG_TZ)

Returns:
datetime: Current datetime in Asia/Singapore timezone.
"""
return datetime.now(SG_TZ)
2 changes: 1 addition & 1 deletion utils/UpgradeInstruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,4 @@ def generate_upgrade_instruction(base_package: str, target_version: str) -> dict

result = generate_upgrade_instruction(args.package, args.version)
print("Upgrade Instruction:")
print(result)
print(result)
38 changes: 35 additions & 3 deletions utils/VersionSuggester.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,38 @@ def get_all_versions(pkg: str) -> list:
data = r.json()
return [v for v in data.get("releases", {})]

def suggest_upgrade_version(all_versions: list, current_version: str) -> str:
"""Suggest the best upgrade version from ``all_versions``.

Preference is given to the newest version within the same major
release. If none is newer in the same major version, the absolute
newest version is returned. ``Up-to-date`` is returned when no
newer release exists.
"""
try:
cur_ver = version.parse(current_version)
parsed_versions = []
for v in all_versions:
try:
pv = version.parse(v)
parsed_versions.append((pv, v))
except InvalidVersion:
continue

newer_versions = [v for (pv, v) in parsed_versions if pv > cur_ver]
if not newer_versions:
return "Up-to-date"

same_major = [v for (pv, v) in parsed_versions
if pv > cur_ver and pv.major == cur_ver.major]
if same_major:
return same_major[-1]

return newer_versions[-1]
except Exception as e:
logger.error(f"Suggest upgrade error for {current_version}: {e}")
return "unknown"

async def suggest_safe_minor_upgrade(pkg: str, current_version: str, all_versions: list) -> str:
"""
Suggest the highest minor upgrade version that is not vulnerable.
Expand Down Expand Up @@ -95,13 +127,13 @@ def main():
pkg_name = args.package # used in suggest_safe_minor_upgrade

versions = get_all_versions(pkg_name)
basic = suggest_upgrade_version(pkg, versions, args.current)
basic = suggest_upgrade_version(versions, args.current)
print(f"Suggested upgrade: {basic}")

if args.safe_minor:
safe = asyncio.run(
suggest_safe_minor_upgrade(pkg_name, args.current, versions)
)
suggest_safe_minor_upgrade(pkg_name, args.current, versions)
)
print(f"Safe minor upgrade: {safe}")

if __name__ == "__main__":
Expand Down
3 changes: 1 addition & 2 deletions utils/VulnChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ async def check_multiple_versions(
owns_session = session is None

if owns_session:
async with aiohttp.ClientSession() as session:
session = aiohttp.ClientSession()
session = aiohttp.ClientSession()

try:
tasks = [fetch_osv(session, package, v, sem) for v in versions]
Expand Down