Skip to content

Commit

Permalink
feat: Add type hints to ProblemReport and Report
Browse files Browse the repository at this point in the history
Add type hints to all methods of `ProblemReport` and `Report` but
exclude those methods that needs reworking.

Signed-off-by: Benjamin Drung <benjamin.drung@canonical.com>
  • Loading branch information
bdrung committed Oct 31, 2023
1 parent 5d900fa commit 9348fe3
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 39 deletions.
60 changes: 32 additions & 28 deletions apport/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import xml.dom
import xml.dom.minidom
import xml.parsers.expat
from typing import Optional
from typing import Optional, Union

import apport.fileutils
import apport.logging
Expand Down Expand Up @@ -327,7 +327,7 @@ class Report(problem_report.ProblemReport):
standard debugging data.
"""

def __init__(self, problem_type="Crash", date=None):
def __init__(self, problem_type: str = "Crash", date: Optional[str] = None) -> None:
"""Initialize a fresh problem report.
date is the desired date/time string; if None (default), the current
Expand All @@ -337,11 +337,11 @@ def __init__(self, problem_type="Crash", date=None):
self.pid, so that e. g. hooks can use it to collect additional data.
"""
problem_report.ProblemReport.__init__(self, problem_type, date)
self.pid = None
self._proc_maps_cache = None
self.pid: Optional[int] = None
self._proc_maps_cache: Optional[list[tuple[int, int, str]]] = None

@staticmethod
def _customized_package_suffix(package):
def _customized_package_suffix(package: str) -> str:
"""Return a string suitable for appending to Package/Dependencies.
If package has only unmodified files, return the empty string. If not,
Expand All @@ -364,7 +364,7 @@ def _customized_package_suffix(package):

return suffix

def add_package(self, package):
def add_package(self, package: str) -> Optional[str]:
"""Add Package: field.
Determine the version of the given package (uses "(not installed") for
Expand Down Expand Up @@ -404,7 +404,7 @@ def _get_transitive_dependencies(self, package: str) -> str:
dependencies += f"{dep} {v}{self._customized_package_suffix(dep)}"
return dependencies

def add_package_info(self, package=None):
def add_package_info(self, package: Optional[str] = None) -> None:
"""Add packaging information.
If package is not given, the report must have ExecutablePath.
Expand Down Expand Up @@ -510,7 +510,7 @@ def add_snap_contact_info(self, snap_contact: str) -> None:
self["SnapGitName"] = m.group(2)
self["CrashDB"] = "snap-github"

def add_os_info(self):
def add_os_info(self) -> None:
"""Add operating system information.
This adds:
Expand All @@ -528,7 +528,7 @@ def add_os_info(self):
if "Architecture" not in self:
self["Architecture"] = packaging.get_system_architecture()

def add_user_info(self):
def add_user_info(self) -> None:
"""Add information about the user.
This adds:
Expand All @@ -550,7 +550,7 @@ def add_user_info(self):
# UserGroups to exist
self["UserGroups"] = "N/A"

def _check_interpreted(self):
def _check_interpreted(self) -> None:
# TODO: Split into smaller functions/methods
# pylint: disable=too-many-branches
"""Check if process is a script.
Expand Down Expand Up @@ -634,7 +634,7 @@ def _check_interpreted(self):
else:
self["UnreportableReason"] = "Cannot determine twistd client program"

def _twistd_executable(self):
def _twistd_executable(self) -> Optional[str]:
"""Determine the twistd client program from ProcCmdline."""
args = self["ProcCmdline"].split("\0")[2:]

Expand All @@ -659,7 +659,7 @@ def _twistd_executable(self):
return None

@staticmethod
def _python_module_path(module):
def _python_module_path(module: str) -> Optional[str]:
"""Determine path of given Python module."""
try:
spec = importlib.util.find_spec(module)
Expand Down Expand Up @@ -827,7 +827,7 @@ def add_proc_environ(self, pid=None, extraenv=None, proc_pid_fd=None):
]
)

def add_kernel_crash_info(self):
def add_kernel_crash_info(self) -> bool:
"""Add information from kernel crash.
This needs a VmCore in the Report.
Expand Down Expand Up @@ -863,7 +863,9 @@ def add_kernel_crash_info(self):
os.unlink(core)
return ret

def add_gdb_info(self, rootdir=None, gdb_sandbox=None):
def add_gdb_info(
self, rootdir: Optional[str] = None, gdb_sandbox: Optional[str] = None
) -> None:
# TODO: Split into smaller functions/methods
# pylint: disable=too-many-branches,too-many-locals,too-many-statements
"""Add information from gdb.
Expand Down Expand Up @@ -1159,7 +1161,7 @@ def _add_hooks_info(

return False

def search_bug_patterns(self, url):
def search_bug_patterns(self, url: Optional[str]) -> Optional[str]:
r"""Check bug patterns loaded from the specified url.
Return bug URL on match, or None otherwise.
Expand Down Expand Up @@ -1306,7 +1308,7 @@ def check_ignored(self) -> bool:

return False

def mark_ignore(self):
def mark_ignore(self) -> None:
"""Ignore future crashes of this executable.
Add a ignore list entry for this report to ~/.apport-ignore.xml, so
Expand Down Expand Up @@ -1351,7 +1353,7 @@ def mark_ignore(self):

dom.unlink()

def has_useful_stacktrace(self):
def has_useful_stacktrace(self) -> bool:
"""Check whether StackTrace can be considered 'useful'.
The current heuristic is to consider it useless if it either is shorter
Expand All @@ -1368,7 +1370,7 @@ def has_useful_stacktrace(self):

return unknown_fn.count(True) <= len(unknown_fn) / 2.0

def stacktrace_top_function(self):
def stacktrace_top_function(self) -> Optional[str]:
"""Return topmost function in StacktraceTop."""
for line in self.get("StacktraceTop", "").splitlines():
fname = line.split("(")[0].strip()
Expand Down Expand Up @@ -1514,7 +1516,7 @@ def standard_title(self):

return None

def obsolete_packages(self):
def obsolete_packages(self) -> list[str]:
"""Return list of obsolete packages in Package and Dependencies."""
obsolete = []
for line in (
Expand All @@ -1533,7 +1535,7 @@ def obsolete_packages(self):
obsolete.append(pkg)
return obsolete

def crash_signature(self):
def crash_signature(self) -> Optional[str]:
# TODO: Split into smaller functions/methods
# pylint: disable=too-many-branches,too-many-return-statements
# pylint: disable=too-many-statements
Expand Down Expand Up @@ -1678,7 +1680,7 @@ def crash_signature(self):
return None

@staticmethod
def _extract_function_and_address(line):
def _extract_function_and_address(line: str) -> Optional[str]:
parsed = re.search(r"\[.*\] (.*)$", line)
if parsed:
match = parsed.group(1)
Expand All @@ -1687,7 +1689,7 @@ def _extract_function_and_address(line):
return match
return None

def crash_signature_addresses(self):
def crash_signature_addresses(self) -> Optional[str]:
"""Compute heuristic duplicate signature for a signal crash.
This should be used if crash_signature() fails, i. e. Stacktrace does
Expand Down Expand Up @@ -1747,7 +1749,7 @@ def crash_signature_addresses(self):

return f"{self['ExecutablePath']}:{self['Signal']}:{':'.join(stack)}"

def anonymize(self):
def anonymize(self) -> None:
"""Remove user identifying strings from the report.
This particularly removes the user name, host name, and IPs
Expand Down Expand Up @@ -1940,7 +1942,7 @@ def _address_to_offset(self, addr):

return None

def _build_proc_maps_cache(self):
def _build_proc_maps_cache(self) -> None:
"""Generate self._proc_maps_cache from ProcMaps field.
This only gets done once.
Expand Down Expand Up @@ -1974,16 +1976,18 @@ def _build_proc_maps_cache(self):
)

@staticmethod
def get_logind_session(pid=None, proc_pid_fd=None):
def get_logind_session(
pid: Optional[int] = None, proc_pid_fd: Optional[int] = None
) -> Optional[tuple[str, float]]:
"""Get logind session path and start time.
Return (session_id, session_start_timestamp) if process is in a logind
session, or None otherwise.
"""
if proc_pid_fd is not None:
cgroup_file = os.open("cgroup", os.O_RDONLY, dir_fd=proc_pid_fd)
if proc_pid_fd is None:
cgroup_file: Union[int, str] = f"/proc/{pid}/cgroup"
else:
cgroup_file = f"/proc/{pid}/cgroup"
cgroup_file = os.open("cgroup", os.O_RDONLY, dir_fd=proc_pid_fd)

# determine cgroup
try:
Expand Down
22 changes: 11 additions & 11 deletions problem_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import time
import typing
import zlib
from typing import Optional
from typing import Optional, Union

# magic number (0x1F 0x8B) and compression method (0x08 for DEFLATE)
GZIP_HEADER_START = b"\037\213\010"
Expand Down Expand Up @@ -108,7 +108,7 @@ def splitlines(self):
class ProblemReport(collections.UserDict):
"""Class to store, load, and handle problem reports."""

def __init__(self, problem_type="Crash", date=None):
def __init__(self, problem_type: str = "Crash", date: Optional[str] = None) -> None:
"""Initialize a fresh problem report.
problem_type can be 'Crash', 'Packaging', 'KernelCrash' or
Expand All @@ -120,7 +120,7 @@ def __init__(self, problem_type="Crash", date=None):
super().__init__({"ProblemType": problem_type, "Date": date})

# keeps track of keys which were added since the last ctor or load()
self.old_keys = set()
self.old_keys: set[str] = set()

def add_tags(self, tags: typing.Iterable[str]) -> None:
"""Add tags to the report. Duplicates are dropped."""
Expand Down Expand Up @@ -299,7 +299,7 @@ def get_timestamp(self) -> Optional[int]:
except locale.Error:
return None

def has_removed_fields(self):
def has_removed_fields(self) -> bool:
"""Check if the report has any keys which were not loaded.
This could happen when using binary=False in load().
Expand Down Expand Up @@ -337,7 +337,7 @@ def _decompress_line(cls, line, decompressor, value=b""):
return decompressor, value

@staticmethod
def is_binary(string):
def is_binary(string: Union[bytes, str]) -> bool:
"""Check if the given strings contains binary data."""
if isinstance(string, bytes):
for c in string:
Expand All @@ -346,7 +346,7 @@ def is_binary(string):
return False

@classmethod
def _try_unicode(cls, value):
def _try_unicode(cls, value: Union[bytes, str]) -> Union[bytes, str]:
"""Try to convert bytearray value to Unicode."""
if isinstance(value, bytes) and not cls.is_binary(value):
try:
Expand Down Expand Up @@ -545,7 +545,7 @@ def _write_key_and_binary_value_compressed_and_encoded_to_file(
file.write(base64.b64encode(block))
file.write(b"\n")

def add_to_existing(self, reportfile, keep_times=False):
def add_to_existing(self, reportfile: str, keep_times: bool = False) -> None:
"""Add this report's data to an already existing report file.
The file will be temporarily chmod'ed to 000 to prevent frontends
Expand Down Expand Up @@ -711,7 +711,7 @@ def write_mime(
file.write(msg.as_string().encode("UTF-8"))
file.write(b"\n")

def __setitem__(self, k, v):
def __setitem__(self, k: str, v: Union[bytes, CompressedValue, str, tuple]) -> None:
assert hasattr(k, "isalnum")
if not k.replace(".", "").replace("-", "").replace("_", "").isalnum():
raise ValueError(
Expand Down Expand Up @@ -739,7 +739,7 @@ def __setitem__(self, k, v):

return self.data.__setitem__(k, v)

def new_keys(self):
def new_keys(self) -> set[str]:
"""Return newly added keys.
Return the set of keys which have been added to the report since it
Expand All @@ -748,7 +748,7 @@ def new_keys(self):
return set(self.data.keys()) - self.old_keys

@staticmethod
def _strip_gzip_header(line):
def _strip_gzip_header(line: bytes) -> bytes:
"""Strip gzip header from line and return the rest."""
flags = line[3]
offset = 10
Expand All @@ -768,6 +768,6 @@ def _strip_gzip_header(line):
return line[offset:]

@staticmethod
def _assert_bin_mode(file):
def _assert_bin_mode(file: object) -> None:
"""Assert that given file object is in binary mode."""
assert not hasattr(file, "encoding"), "file stream must be in binary mode"

0 comments on commit 9348fe3

Please sign in to comment.