Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 116 additions & 46 deletions release_notes_generator/data/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import sys
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional

import semver
Expand Down Expand Up @@ -96,6 +97,9 @@ def mine_missing_sub_issues(self, data: MinedData) -> tuple[dict[Issue, Reposito
logger.info("Mapping sub-issues...")
data.parents_sub_issues = self._scan_sub_issues_for_parents([get_id(i, r) for i, r in data.issues.items()])

logger.info("Fetch all repositories in cache...")
self._fetch_all_repositories_in_cache(data)

logger.info("Fetching missing issues...")
fetched_issues = self._fetch_missing_issues(data)

Expand Down Expand Up @@ -126,65 +130,131 @@ def _scan_sub_issues_for_parents(self, parents_to_check: list[str]) -> dict[str,

return parents_sub_issues

def _fetch_missing_issues(self, data: MinedData) -> dict[Issue, Repository]:
def _fetch_all_repositories_in_cache(self, data: MinedData) -> None:
def _check_repo_and_add(iid: str):
org, repo, _num = parse_issue_id(iid)
full_name = f"{org}/{repo}"
if data.get_repository(full_name) is None:
new_repo = self._fetch_repository(full_name)
if new_repo is None:
logger.error("Repository fetch returned None for %s", full_name)
return

data.add_repository(new_repo)
logger.debug("Fetched missing repository: %s", full_name)

# check keys
for iid in data.parents_sub_issues.keys():
_check_repo_and_add(iid)

# check values
for ids in data.parents_sub_issues.values():
for iid in ids:
_check_repo_and_add(iid)

def _fetch_missing_issues(
self,
data: MinedData,
max_workers: int = 8,
) -> dict[Issue, Repository]:
"""
Fetch missing issues.

Parameters:
data (MinedData): The mined data containing origin sets of issues and pull requests.
Returns:
dict[Issue, Repository]: A dictionary mapping fetched issues to their repositories.
Parallel version of _fetch_missing_issues.
Threaded to speed up GitHub API calls while avoiding data races.
"""
fetched_issues: dict[Issue, Repository] = {}

origin_issue_ids = {get_id(i, r) for i, r in data.issues.items()}
issues_for_remove: list[str] = []
for parent_id in data.parents_sub_issues.keys():
if parent_id in origin_issue_ids:
continue

# fetch issue by id
org, repo, num = parse_issue_id(parent_id)

if data.get_repository(f"{org}/{repo}") is None:
new_repo = self._fetch_repository(f"{org}/{repo}")
if new_repo is not None:
# cache for subsequent lookups
data.add_repository(new_repo)
# Worklist: only parents not already present among origin_issue_ids
to_check: list[str] = [pid for pid in data.parents_sub_issues.keys() if pid not in origin_issue_ids]

# Collect IDs to remove (those that don't meet criteria) and errors
issues_for_remove: set[str] = set()
errors: list[tuple[str, str]] = [] # (parent_id, error_msg)

def should_fetch(issue: Issue) -> bool:
# Mirrors original logic
if not issue.closed_at:
return False
if data.since:
# if since > closed_at => skip
if issue.closed_at and data.since > issue.closed_at:
return False
return True

def worker(parent_id: str) -> tuple[str, Optional[Issue], Optional[Repository], Optional[str]]:
"""
Returns (parent_id, issue|None, repo|None, error|None)
- issue=None & error=None => mark for remove (didn't meet criteria)
- issue=None & error!=None => log error
"""
try:
org, repo, num = parse_issue_id(parent_id)
except Exception as e: # defensive
return (parent_id, None, None, f"parse_issue_id failed: {e}")

issue = None
r = data.get_repository(f"{org}/{repo}")
if r is not None:
if r is None:
return (parent_id, None, None, f"Cannot get repository for {org}/{repo}")

# GitHub call
try:
logger.debug("Fetching missing issue: %s", parent_id)
issue = self._safe_call(r.get_issue)(num)
if issue is None:
logger.error("Issue not found: %s", parent_id)
except Exception as e:
return (parent_id, None, r, f"get_issue failed: {e}")

if issue is None:
return (parent_id, None, r, "Issue not found")

# Criteria
if should_fetch(issue):
return (parent_id, issue, r, None)

return (parent_id, None, r, None) # means: mark for remove

if not to_check:
logger.debug("Fetched 0 missing issues (nothing to check).")
return fetched_issues

# Thread pool
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="fetch-issue") as ex:
futures = {ex.submit(worker, pid): pid for pid in to_check}
for fut in as_completed(futures):
parent_id = futures[fut]
try:
pid, issue, repo, err = fut.result()
except Exception as e:
errors.append((parent_id, f"worker crash: {e}"))
continue

fetch: bool = True
if not issue.closed_at:
fetch = False
elif data.since:
if issue.closed_at and data.since > issue.closed_at:
fetch = False
if err:
# Log and skip; don't remove mapping unless you’re sure you want to drop errored items
logger.error("Error fetching %s: %s", pid, err)
continue

if fetch:
# add to issues list
fetched_issues[issue] = r
if issue is None:
# Did not meet criteria => schedule removal
issues_for_remove.add(pid)
else:
logger.debug("Skipping issue %s since it does not meet criteria.", parent_id)
issues_for_remove.append(parent_id)
else:
logger.error("Cannot get repository for issue %s. Skipping...", parent_id)

# remove issue which does not meet criteria
for iid in issues_for_remove:
data.parents_sub_issues.pop(iid, None)
for sub_issues in data.parents_sub_issues.values():
if iid in sub_issues:
sub_issues.remove(iid)

logger.debug("Fetched %d missing issues.", len(fetched_issues))
# Add to results
fetched_issues[issue] = repo # type: ignore[assignment]

# Apply removals AFTER parallelism to avoid concurrent mutation
if issues_for_remove:
for iid in issues_for_remove:
data.parents_sub_issues.pop(iid, None)
for sub_issues in data.parents_sub_issues.values():
# parents_sub_issues can be dict[str, list[str]] or now dict[str, str] per your later change;
# if it's list[str], this removal is ok; if changed to str, guard it.
if isinstance(sub_issues, list) and iid in sub_issues:
sub_issues.remove(iid)

logger.debug(
"Fetched %d missing issues in parallel (removed %d).",
len(fetched_issues),
len(issues_for_remove),
)

return fetched_issues

def _fetch_repository(self, full_name: str) -> Optional[Repository]:
Expand Down
150 changes: 94 additions & 56 deletions release_notes_generator/record/factory/default_record_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import logging
from concurrent.futures import ThreadPoolExecutor
from typing import cast, Optional

from github import Github
Expand Down Expand Up @@ -70,45 +71,31 @@ def generate(self, data: MinedData) -> dict[str, Record]:
Returns:
dict[str, Record]: A dictionary of records indexed by their IDs.
"""
logger.debug("Creation of records started...")
logger.info("Creation of records started...")

# Before the loop, compute a flat set of all sub-issue IDs
all_sub_issue_ids = {iid for sublist in data.parents_sub_issues.values() for iid in sublist}

for issue, repo in data.issues.items():
iid = get_id(issue, repo)

if len(data.parents_sub_issues.get(iid, [])) > 0:
# issue has sub-issues - it is either hierarchy issue or sub-hierarchy issue
self._create_record_for_hierarchy_issue(issue, iid)

elif iid in all_sub_issue_ids:
# issue has no sub-issues - it is sub-issue
self._create_record_for_sub_issue(issue, iid)

else:
# issue is not sub-issue and has no sub-issues - it is issue
self._create_record_for_issue(issue, iid)
built = build_issue_records_parallel(self, data, max_workers=8)
self._records.update(built)
self.__registered_issues.update(built.keys())

# dev note: Each issue is now in records dict by its issue number - all on same level - no hierarchy
# --> This is useful for population by PRs and commits

logger.debug("Registering Commits to Pull Requests and Pull Requests to Issues...")
logger.info("Registering Commits to Pull Requests and Pull Requests to Issues...")
for pull, repo in data.pull_requests.items():
self._register_pull_and_its_commits_to_issue(pull, get_id(pull, repo), data, target_repository=repo)

if data.pull_requests_of_fetched_cross_issues:
logger.debug("Register cross-repo Pull Requests to its issues")
logger.info("Register cross-repo Pull Requests to its issues")
for iid, prs in data.pull_requests_of_fetched_cross_issues.items():
self._register_cross_repo_prs_to_issue(iid, prs)

logger.debug("Registering direct commits to records...")
logger.info("Registering direct commits to records...")
for commit, repo in data.commits.items():
if commit.sha not in self.__registered_commits:
self._records[get_id(commit, repo)] = CommitRecord(commit)

# dev note: now we have all PRs and commits registered to issues or as stand-alone records
logger.debug("Building issues hierarchy...")
logger.info("Building issues hierarchy...")

sub_i_ids = list({iid for sublist in data.parents_sub_issues.values() for iid in sublist})
sub_i_prts = {sub_issue: parent for parent, sublist in data.parents_sub_issues.items() for sub_issue in sublist}
Expand Down Expand Up @@ -193,26 +180,6 @@ def _register_cross_repo_prs_to_issue(self, iid: str, prs: list[PullRequest]) ->
for pr in prs:
cast(IssueRecord, self._records[iid]).register_pull_request(pr)

def _create_record_for_hierarchy_issue(self, i: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> None:
"""
Create a hierarchy issue record and register sub-issues.

Parameters:
i: The issue to create the record for.
issue_labels: The labels of the issue.

Returns:
None
"""
# check for skip labels presence and skip when detected
if issue_labels is None:
issue_labels = self._get_issue_labels_mix_with_type(i)
skip_record = any(item in issue_labels for item in ActionInputs.get_skip_release_notes_labels())

self._records[iid] = HierarchyIssueRecord(issue=i, skip=skip_record, issue_labels=issue_labels)
self.__registered_issues.add(iid)
logger.debug("Created record for hierarchy issue %s: %s", iid, i.title)

def _get_issue_labels_mix_with_type(self, issue: Issue) -> list[str]:
labels: list[str] = [label.name for label in issue.get_labels()]

Expand All @@ -223,20 +190,6 @@ def _get_issue_labels_mix_with_type(self, issue: Issue) -> list[str]:

return labels

def _create_record_for_sub_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> None:
if issue_labels is None:
issue_labels = self._get_issue_labels_mix_with_type(issue)

skip_record = any(item in issue_labels for item in ActionInputs.get_skip_release_notes_labels())
logger.debug("Created record for sub issue %s: %s", iid, issue.title)
self.__registered_issues.add(iid)
self._records[iid] = SubIssueRecord(issue, issue_labels, skip_record)

if iid.split("#")[0] == self._home_repository.full_name:
return

self._records[iid].is_cross_repo = True

def _re_register_hierarchy_issues(self, sub_issues_ids: list[str], sub_issue_parents: dict[str, str]):
logger.debug("Re-registering hierarchy issues ...")
reduced_sub_issue_ids: list[str] = sub_issues_ids[:]
Expand Down Expand Up @@ -292,3 +245,88 @@ def order_hierarchy_levels(self, level: int = 0) -> None:
top_hierarchy_records = [rec for rec in self._records.values() if isinstance(rec, HierarchyIssueRecord)]
for rec in top_hierarchy_records:
rec.order_hierarchy_levels(level=level)

def build_record_for_hierarchy_issue(self, issue: Issue, issue_labels: Optional[list[str]] = None) -> Record:
"""
Build a hierarchy issue record.

Parameters:
issue (Issue): The issue to build.
issue_labels (list[str]): The labels to use for this issue.
Returns:
Record: The built record.
"""
if issue_labels is None:
issue_labels = self._get_issue_labels_mix_with_type(issue)
skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels)
return HierarchyIssueRecord(issue=issue, skip=skip_record, issue_labels=issue_labels)

def build_record_for_sub_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> Record:
"""
Build a sub issue record.

Parameters:
issue (Issue): The issue to build.
iid (str): The id to use for this issue.
issue_labels (list[str]): The labels to use for this issue.
Returns:
Record: The built record.
"""
if issue_labels is None:
issue_labels = self._get_issue_labels_mix_with_type(issue)
skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels)
rec = SubIssueRecord(issue, issue_labels, skip_record)
# preserve cross-repo flag behavior
if iid.split("#")[0] != self._home_repository.full_name:
rec.is_cross_repo = True
return rec

def build_record_for_issue(self, issue: Issue, issue_labels: Optional[list[str]] = None) -> Record:
"""
Build an issue record.

Parameters:
issue (Issue): The issue to build.
issue_labels (list[str]): The labels to use for this issue.
Returns:
Record: The built record.
"""
if issue_labels is None:
issue_labels = self._get_issue_labels_mix_with_type(issue)
skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels)
return IssueRecord(issue=issue, skip=skip_record, issue_labels=issue_labels)


def build_issue_records_parallel(gen, data, max_workers: int = 8) -> dict[str, "Record"]:
"""
Build issue records in parallel with no side effects on `gen`.
Returns: {iid: Record}
"""
parents_sub_issues = data.parents_sub_issues # read-only snapshot for this phase
all_sub_issue_ids = {iid for subs in parents_sub_issues.values() for iid in subs}
issues_items = list(data.issues.items()) # snapshot

def _classify_and_build(issue, repo) -> tuple[str, "Record"]:
iid = get_id(issue, repo)

# classification
if len(parents_sub_issues.get(iid, [])) > 0:
# hierarchy node (has sub-issues)
rec = gen.build_record_for_hierarchy_issue(issue)
elif iid in all_sub_issue_ids:
# leaf sub-issue
rec = gen.build_record_for_sub_issue(issue, iid)
else:
# plain issue
rec = gen.build_record_for_issue(issue)
return iid, rec

results: dict[str, "Record"] = {}
if not issues_items:
return results

with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="build-issue-rec") as ex:
for iid, rec in ex.map(lambda ir: _classify_and_build(*ir), issues_items):
results[iid] = rec

return results
Loading