From 7b3777fe03d7e8587283b445d06c9bac91af2e8a Mon Sep 17 00:00:00 2001 From: miroslavpojer Date: Sun, 5 Oct 2025 14:16:13 +0200 Subject: [PATCH 1/2] #178 - Improve performance of fetching missing issues - Added multiple threads to fetching of missing issues. - Added multiple threads to record creation. --- release_notes_generator/data/miner.py | 134 +++++++++++++++++- .../record/factory/default_record_factory.py | 87 +++++++++--- 2 files changed, 198 insertions(+), 23 deletions(-) diff --git a/release_notes_generator/data/miner.py b/release_notes_generator/data/miner.py index 98bdd2b0..8cae46a4 100644 --- a/release_notes_generator/data/miner.py +++ b/release_notes_generator/data/miner.py @@ -21,6 +21,8 @@ import logging import sys import traceback +from asyncio import Lock +from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional import semver @@ -96,8 +98,11 @@ def mine_missing_sub_issues(self, data: MinedData) -> tuple[dict[Issue, Reposito logger.info("Mapping sub-issues...") data.parents_sub_issues = self._scan_sub_issues_for_parents([get_id(i, r) for i, r in data.issues.items()]) + logger.info("Fetch all repositories in cache...") + self._fetch_all_repositories_in_cache(data) + logger.info("Fetching missing issues...") - fetched_issues = self._fetch_missing_issues(data) + fetched_issues = self._fetch_missing_issues_parallel(data) logger.info("Getting PRs and Commits for missing issues...") prs_of_fetched_cross_repo_issues = self._fetch_prs_for_fetched_cross_issues(fetched_issues) @@ -187,6 +192,133 @@ def _fetch_missing_issues(self, data: MinedData) -> dict[Issue, Repository]: logger.debug("Fetched %d missing issues.", len(fetched_issues)) return fetched_issues + def _fetch_all_repositories_in_cache(self, data: MinedData) -> None: + def _check_repo_and_add(iid: str): + org, repo, num = parse_issue_id(iid) + full_name = f"{org}/{repo}" + if data.get_repository(full_name) is None: + new_repo = self._fetch_repository(full_name) + if new_repo is None: + logger.error("Repository fetch returned None for %s", full_name) + return None + else: + data.add_repository(new_repo) + logger.debug("Fetched missing repository: %s", full_name) + + # check keys + for iid in data.parents_sub_issues.keys(): + _check_repo_and_add(iid) + + # check values + for ids in data.parents_sub_issues.values(): + for iid in ids: + _check_repo_and_add(iid) + + def _fetch_missing_issues_parallel( + self, + data: MinedData, + max_workers: int = 8, + ) -> dict[Issue, Repository]: + """ + Parallel version of _fetch_missing_issues. + Threaded to speed up GitHub API calls while avoiding data races. + """ + fetched_issues: dict[Issue, Repository] = {} + origin_issue_ids = {get_id(i, r) for i, r in data.issues.items()} + + # Worklist: only parents not already present among origin_issue_ids + to_check: list[str] = [pid for pid in data.parents_sub_issues.keys() if pid not in origin_issue_ids] + + # Collect IDs to remove (those that don't meet criteria) and errors + issues_for_remove: set[str] = set() + errors: list[tuple[str, str]] = [] # (parent_id, error_msg) + + def should_fetch(issue: Issue) -> bool: + # Mirrors original logic + if not issue.closed_at: + return False + if data.since: + # if since > closed_at => skip + if issue.closed_at and data.since > issue.closed_at: + return False + return True + + def worker(parent_id: str) -> tuple[str, Optional[Issue], Optional[Repository], Optional[str]]: + """ + Returns (parent_id, issue|None, repo|None, error|None) + - issue=None & error=None => mark for remove (didn't meet criteria) + - issue=None & error!=None => log error + """ + try: + org, repo, num = parse_issue_id(parent_id) + except Exception as e: # defensive + return (parent_id, None, None, f"parse_issue_id failed: {e}") + + r = data.get_repository(f"{org}/{repo}") + if r is None: + return (parent_id, None, None, f"Cannot get repository for {org}/{repo}") + + # GitHub call + try: + logger.debug("Fetching missing issue: %s", parent_id) + issue = self._safe_call(r.get_issue)(num) + except Exception as e: + return (parent_id, None, r, f"get_issue failed: {e}") + + if issue is None: + return (parent_id, None, r, "Issue not found") + + # Criteria + if should_fetch(issue): + return (parent_id, issue, r, None) + else: + return (parent_id, None, r, None) # means: mark for remove + + if not to_check: + logger.debug("Fetched 0 missing issues (nothing to check).") + return fetched_issues + + # Thread pool + with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="fetch-issue") as ex: + futures = {ex.submit(worker, pid): pid for pid in to_check} + for fut in as_completed(futures): + parent_id = futures[fut] + try: + pid, issue, repo, err = fut.result() + except Exception as e: + errors.append((parent_id, f"worker crash: {e}")) + continue + + if err: + # Log and skip; don't remove mapping unless you’re sure you want to drop errored items + logger.error("Error fetching %s: %s", pid, err) + continue + + if issue is None: + # Did not meet criteria => schedule removal + issues_for_remove.add(pid) + else: + # Add to results + fetched_issues[issue] = repo # repo non-None here + + # Apply removals AFTER parallelism to avoid concurrent mutation + if issues_for_remove: + for iid in issues_for_remove: + data.parents_sub_issues.pop(iid, None) + for sub_issues in data.parents_sub_issues.values(): + # parents_sub_issues can be dict[str, list[str]] or now dict[str, str] per your later change; + # if it's list[str], this removal is ok; if changed to str, guard it. + if isinstance(sub_issues, list) and iid in sub_issues: + sub_issues.remove(iid) + + logger.debug( + "Fetched %d missing issues in parallel (removed %d).", + len(fetched_issues), + len(issues_for_remove), + ) + + return fetched_issues + def _fetch_repository(self, full_name: str) -> Optional[Repository]: """ Fetch a repository by its full name. diff --git a/release_notes_generator/record/factory/default_record_factory.py b/release_notes_generator/record/factory/default_record_factory.py index c54b2f21..b45e543f 100644 --- a/release_notes_generator/record/factory/default_record_factory.py +++ b/release_notes_generator/record/factory/default_record_factory.py @@ -19,6 +19,7 @@ """ import logging +from concurrent.futures import ThreadPoolExecutor from typing import cast, Optional from github import Github @@ -70,45 +71,31 @@ def generate(self, data: MinedData) -> dict[str, Record]: Returns: dict[str, Record]: A dictionary of records indexed by their IDs. """ - logger.debug("Creation of records started...") + logger.info("Creation of records started...") - # Before the loop, compute a flat set of all sub-issue IDs - all_sub_issue_ids = {iid for sublist in data.parents_sub_issues.values() for iid in sublist} - - for issue, repo in data.issues.items(): - iid = get_id(issue, repo) - - if len(data.parents_sub_issues.get(iid, [])) > 0: - # issue has sub-issues - it is either hierarchy issue or sub-hierarchy issue - self._create_record_for_hierarchy_issue(issue, iid) - - elif iid in all_sub_issue_ids: - # issue has no sub-issues - it is sub-issue - self._create_record_for_sub_issue(issue, iid) - - else: - # issue is not sub-issue and has no sub-issues - it is issue - self._create_record_for_issue(issue, iid) + built = build_issue_records_parallel(self, data, max_workers=8) + self._records.update(built) + self.__registered_issues.update(built.keys()) # dev note: Each issue is now in records dict by its issue number - all on same level - no hierarchy # --> This is useful for population by PRs and commits - logger.debug("Registering Commits to Pull Requests and Pull Requests to Issues...") + logger.info("Registering Commits to Pull Requests and Pull Requests to Issues...") for pull, repo in data.pull_requests.items(): self._register_pull_and_its_commits_to_issue(pull, get_id(pull, repo), data, target_repository=repo) if data.pull_requests_of_fetched_cross_issues: - logger.debug("Register cross-repo Pull Requests to its issues") + logger.info("Register cross-repo Pull Requests to its issues") for iid, prs in data.pull_requests_of_fetched_cross_issues.items(): self._register_cross_repo_prs_to_issue(iid, prs) - logger.debug("Registering direct commits to records...") + logger.info("Registering direct commits to records...") for commit, repo in data.commits.items(): if commit.sha not in self.__registered_commits: self._records[get_id(commit, repo)] = CommitRecord(commit) # dev note: now we have all PRs and commits registered to issues or as stand-alone records - logger.debug("Building issues hierarchy...") + logger.info("Building issues hierarchy...") sub_i_ids = list({iid for sublist in data.parents_sub_issues.values() for iid in sublist}) sub_i_prts = {sub_issue: parent for parent, sublist in data.parents_sub_issues.items() for sub_issue in sublist} @@ -292,3 +279,59 @@ def order_hierarchy_levels(self, level: int = 0) -> None: top_hierarchy_records = [rec for rec in self._records.values() if isinstance(rec, HierarchyIssueRecord)] for rec in top_hierarchy_records: rec.order_hierarchy_levels(level=level) + + def _build_record_for_hierarchy_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> Record: + if issue_labels is None: + issue_labels = self._get_issue_labels_mix_with_type(issue) + skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels) + return HierarchyIssueRecord(issue=issue, skip=skip_record, issue_labels=issue_labels) + + def _build_record_for_sub_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> Record: + if issue_labels is None: + issue_labels = self._get_issue_labels_mix_with_type(issue) + skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels) + rec = SubIssueRecord(issue, issue_labels, skip_record) + # preserve cross-repo flag behavior + if iid.split("#")[0] != self._home_repository.full_name: + rec.is_cross_repo = True + return rec + + def _build_record_for_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> Record: + if issue_labels is None: + issue_labels = self._get_issue_labels_mix_with_type(issue) + skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels) + return IssueRecord(issue=issue, skip=skip_record, issue_labels=issue_labels) + +def build_issue_records_parallel(gen, data, max_workers: int = 8) -> dict[str, "Record"]: + """ + Build issue records in parallel with no side effects on `gen`. + Returns: {iid: Record} + """ + parents_sub_issues = data.parents_sub_issues # read-only snapshot for this phase + all_sub_issue_ids = {iid for subs in parents_sub_issues.values() for iid in subs} + issues_items = list(data.issues.items()) # snapshot + + def _classify_and_build(issue, repo) -> tuple[str, "Record"]: + iid = get_id(issue, repo) + + # classification + if len(parents_sub_issues.get(iid, [])) > 0: + # hierarchy node (has sub-issues) + rec = gen._build_record_for_hierarchy_issue(issue, iid) + elif iid in all_sub_issue_ids: + # leaf sub-issue + rec = gen._build_record_for_sub_issue(issue, iid) + else: + # plain issue + rec = gen._build_record_for_issue(issue, iid) + return iid, rec + + results: dict[str, "Record"] = {} + if not issues_items: + return results + + with ThreadPoolExecutor(max_workers=8, thread_name_prefix="build-issue-rec") as ex: + for iid, rec in ex.map(lambda ir: _classify_and_build(*ir), issues_items): + results[iid] = rec + + return results From 5d8756ab04d57df0dcc3633f4b2f2929ae47cc0d Mon Sep 17 00:00:00 2001 From: miroslavpojer Date: Sun, 5 Oct 2025 14:44:19 +0200 Subject: [PATCH 2/2] Local fix of checkers and linters. --- release_notes_generator/data/miner.py | 88 +++---------------- .../record/factory/default_record_factory.py | 77 ++++++++-------- 2 files changed, 49 insertions(+), 116 deletions(-) diff --git a/release_notes_generator/data/miner.py b/release_notes_generator/data/miner.py index 8cae46a4..da967aec 100644 --- a/release_notes_generator/data/miner.py +++ b/release_notes_generator/data/miner.py @@ -21,7 +21,6 @@ import logging import sys import traceback -from asyncio import Lock from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional @@ -102,7 +101,7 @@ def mine_missing_sub_issues(self, data: MinedData) -> tuple[dict[Issue, Reposito self._fetch_all_repositories_in_cache(data) logger.info("Fetching missing issues...") - fetched_issues = self._fetch_missing_issues_parallel(data) + fetched_issues = self._fetch_missing_issues(data) logger.info("Getting PRs and Commits for missing issues...") prs_of_fetched_cross_repo_issues = self._fetch_prs_for_fetched_cross_issues(fetched_issues) @@ -131,79 +130,18 @@ def _scan_sub_issues_for_parents(self, parents_to_check: list[str]) -> dict[str, return parents_sub_issues - def _fetch_missing_issues(self, data: MinedData) -> dict[Issue, Repository]: - """ - Fetch missing issues. - - Parameters: - data (MinedData): The mined data containing origin sets of issues and pull requests. - Returns: - dict[Issue, Repository]: A dictionary mapping fetched issues to their repositories. - """ - fetched_issues: dict[Issue, Repository] = {} - - origin_issue_ids = {get_id(i, r) for i, r in data.issues.items()} - issues_for_remove: list[str] = [] - for parent_id in data.parents_sub_issues.keys(): - if parent_id in origin_issue_ids: - continue - - # fetch issue by id - org, repo, num = parse_issue_id(parent_id) - - if data.get_repository(f"{org}/{repo}") is None: - new_repo = self._fetch_repository(f"{org}/{repo}") - if new_repo is not None: - # cache for subsequent lookups - data.add_repository(new_repo) - - issue = None - r = data.get_repository(f"{org}/{repo}") - if r is not None: - logger.debug("Fetching missing issue: %s", parent_id) - issue = self._safe_call(r.get_issue)(num) - if issue is None: - logger.error("Issue not found: %s", parent_id) - continue - - fetch: bool = True - if not issue.closed_at: - fetch = False - elif data.since: - if issue.closed_at and data.since > issue.closed_at: - fetch = False - - if fetch: - # add to issues list - fetched_issues[issue] = r - else: - logger.debug("Skipping issue %s since it does not meet criteria.", parent_id) - issues_for_remove.append(parent_id) - else: - logger.error("Cannot get repository for issue %s. Skipping...", parent_id) - - # remove issue which does not meet criteria - for iid in issues_for_remove: - data.parents_sub_issues.pop(iid, None) - for sub_issues in data.parents_sub_issues.values(): - if iid in sub_issues: - sub_issues.remove(iid) - - logger.debug("Fetched %d missing issues.", len(fetched_issues)) - return fetched_issues - def _fetch_all_repositories_in_cache(self, data: MinedData) -> None: def _check_repo_and_add(iid: str): - org, repo, num = parse_issue_id(iid) + org, repo, _num = parse_issue_id(iid) full_name = f"{org}/{repo}" if data.get_repository(full_name) is None: new_repo = self._fetch_repository(full_name) if new_repo is None: logger.error("Repository fetch returned None for %s", full_name) - return None - else: - data.add_repository(new_repo) - logger.debug("Fetched missing repository: %s", full_name) + return + + data.add_repository(new_repo) + logger.debug("Fetched missing repository: %s", full_name) # check keys for iid in data.parents_sub_issues.keys(): @@ -214,10 +152,10 @@ def _check_repo_and_add(iid: str): for iid in ids: _check_repo_and_add(iid) - def _fetch_missing_issues_parallel( - self, - data: MinedData, - max_workers: int = 8, + def _fetch_missing_issues( + self, + data: MinedData, + max_workers: int = 8, ) -> dict[Issue, Repository]: """ Parallel version of _fetch_missing_issues. @@ -271,8 +209,8 @@ def worker(parent_id: str) -> tuple[str, Optional[Issue], Optional[Repository], # Criteria if should_fetch(issue): return (parent_id, issue, r, None) - else: - return (parent_id, None, r, None) # means: mark for remove + + return (parent_id, None, r, None) # means: mark for remove if not to_check: logger.debug("Fetched 0 missing issues (nothing to check).") @@ -299,7 +237,7 @@ def worker(parent_id: str) -> tuple[str, Optional[Issue], Optional[Repository], issues_for_remove.add(pid) else: # Add to results - fetched_issues[issue] = repo # repo non-None here + fetched_issues[issue] = repo # type: ignore[assignment] # Apply removals AFTER parallelism to avoid concurrent mutation if issues_for_remove: diff --git a/release_notes_generator/record/factory/default_record_factory.py b/release_notes_generator/record/factory/default_record_factory.py index b45e543f..c4d161c0 100644 --- a/release_notes_generator/record/factory/default_record_factory.py +++ b/release_notes_generator/record/factory/default_record_factory.py @@ -180,26 +180,6 @@ def _register_cross_repo_prs_to_issue(self, iid: str, prs: list[PullRequest]) -> for pr in prs: cast(IssueRecord, self._records[iid]).register_pull_request(pr) - def _create_record_for_hierarchy_issue(self, i: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> None: - """ - Create a hierarchy issue record and register sub-issues. - - Parameters: - i: The issue to create the record for. - issue_labels: The labels of the issue. - - Returns: - None - """ - # check for skip labels presence and skip when detected - if issue_labels is None: - issue_labels = self._get_issue_labels_mix_with_type(i) - skip_record = any(item in issue_labels for item in ActionInputs.get_skip_release_notes_labels()) - - self._records[iid] = HierarchyIssueRecord(issue=i, skip=skip_record, issue_labels=issue_labels) - self.__registered_issues.add(iid) - logger.debug("Created record for hierarchy issue %s: %s", iid, i.title) - def _get_issue_labels_mix_with_type(self, issue: Issue) -> list[str]: labels: list[str] = [label.name for label in issue.get_labels()] @@ -210,20 +190,6 @@ def _get_issue_labels_mix_with_type(self, issue: Issue) -> list[str]: return labels - def _create_record_for_sub_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> None: - if issue_labels is None: - issue_labels = self._get_issue_labels_mix_with_type(issue) - - skip_record = any(item in issue_labels for item in ActionInputs.get_skip_release_notes_labels()) - logger.debug("Created record for sub issue %s: %s", iid, issue.title) - self.__registered_issues.add(iid) - self._records[iid] = SubIssueRecord(issue, issue_labels, skip_record) - - if iid.split("#")[0] == self._home_repository.full_name: - return - - self._records[iid].is_cross_repo = True - def _re_register_hierarchy_issues(self, sub_issues_ids: list[str], sub_issue_parents: dict[str, str]): logger.debug("Re-registering hierarchy issues ...") reduced_sub_issue_ids: list[str] = sub_issues_ids[:] @@ -280,13 +246,32 @@ def order_hierarchy_levels(self, level: int = 0) -> None: for rec in top_hierarchy_records: rec.order_hierarchy_levels(level=level) - def _build_record_for_hierarchy_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> Record: + def build_record_for_hierarchy_issue(self, issue: Issue, issue_labels: Optional[list[str]] = None) -> Record: + """ + Build a hierarchy issue record. + + Parameters: + issue (Issue): The issue to build. + issue_labels (list[str]): The labels to use for this issue. + Returns: + Record: The built record. + """ if issue_labels is None: issue_labels = self._get_issue_labels_mix_with_type(issue) skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels) return HierarchyIssueRecord(issue=issue, skip=skip_record, issue_labels=issue_labels) - def _build_record_for_sub_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> Record: + def build_record_for_sub_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> Record: + """ + Build a sub issue record. + + Parameters: + issue (Issue): The issue to build. + iid (str): The id to use for this issue. + issue_labels (list[str]): The labels to use for this issue. + Returns: + Record: The built record. + """ if issue_labels is None: issue_labels = self._get_issue_labels_mix_with_type(issue) skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels) @@ -296,12 +281,22 @@ def _build_record_for_sub_issue(self, issue: Issue, iid: str, issue_labels: Opti rec.is_cross_repo = True return rec - def _build_record_for_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> Record: + def build_record_for_issue(self, issue: Issue, issue_labels: Optional[list[str]] = None) -> Record: + """ + Build an issue record. + + Parameters: + issue (Issue): The issue to build. + issue_labels (list[str]): The labels to use for this issue. + Returns: + Record: The built record. + """ if issue_labels is None: issue_labels = self._get_issue_labels_mix_with_type(issue) skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels) return IssueRecord(issue=issue, skip=skip_record, issue_labels=issue_labels) + def build_issue_records_parallel(gen, data, max_workers: int = 8) -> dict[str, "Record"]: """ Build issue records in parallel with no side effects on `gen`. @@ -317,20 +312,20 @@ def _classify_and_build(issue, repo) -> tuple[str, "Record"]: # classification if len(parents_sub_issues.get(iid, [])) > 0: # hierarchy node (has sub-issues) - rec = gen._build_record_for_hierarchy_issue(issue, iid) + rec = gen.build_record_for_hierarchy_issue(issue) elif iid in all_sub_issue_ids: # leaf sub-issue - rec = gen._build_record_for_sub_issue(issue, iid) + rec = gen.build_record_for_sub_issue(issue, iid) else: # plain issue - rec = gen._build_record_for_issue(issue, iid) + rec = gen.build_record_for_issue(issue) return iid, rec results: dict[str, "Record"] = {} if not issues_items: return results - with ThreadPoolExecutor(max_workers=8, thread_name_prefix="build-issue-rec") as ex: + with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="build-issue-rec") as ex: for iid, rec in ex.map(lambda ir: _classify_and_build(*ir), issues_items): results[iid] = rec