In [11]:
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

import re
import pandas as pd
from tqdm import tqdm
from loguru import logger
from pymongo import MongoClient
import requests 
import time 
import concurrent.futures

# GitHub Personal Access Token (optional, but recommended for higher rate limits)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if not GITHUB_TOKEN:
    logger.warning("GitHub token not found. Rate limits may apply.")

BATCH_SIZE = 500 # Number of records to process before writing to CSV

# Logger config
logger.add("logs/pr-to-issues-keyword.log")


3

# Initial data set


mongoDB connection


In [12]:
# Mongo connection
client = MongoClient("mongodb://localhost:27017/")
db = client.github_data
pull_request_collection = db.pull_requests
logger.info("🔌 MongoDB connected.")

[32m2025-06-03 15:08:35.844[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m5[0m - [1m🔌 MongoDB connected.[0m


In [13]:
# Query to filter closed pull requests
qurey = {
    "pull_request.state": "closed",
}

# Fetch all records into a list to prevent cursor timeout during processing
logger.info("Fetching all records from MongoDB...")
all_records_list = list(pull_request_collection.find(qurey))
toal_pull_request_count = len(all_records_list) # Actual number of records fetched
logger.info(f"Successfully fetched {toal_pull_request_count} records.")


[32m2025-06-03 15:08:41.198[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1mFetching all records from MongoDB...[0m
[32m2025-06-03 15:09:04.007[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m10[0m - [1mSuccessfully fetched 25042 records.[0m


In [14]:
pr_details_list = [] # List to store dictionaries of PR details

def extract_pr_details(record):
    """
    Extracts relevant details (number, link, title, body) from a pull request record.
    """
    pr_data = record.get("pull_request", {})
    # Handle cases where 'pull_request' key might be missing or empty
    if not pr_data: 
        logger.warning(f"Skipping record due to missing 'pull_request' data. Record ID: {record.get('_id', 'N/A')}")
        return None

    pr_number = pr_data.get("number")
    pr_link = pr_data.get("html_url", None)
    pr_title = pr_data.get("title", None)
    # Use "body" for description as it's the common field for PR description
    pr_body = pr_data.get("body", None) 

    if pr_number is None:
        logger.warning(f"Skipping record due to missing PR number. Record ID: {record.get('_id', 'N/A')}")
        return None

    return {
        "pr_number": pr_number,
        "pr_link": pr_link,
        "pr_title": pr_title,
        "pr_body": pr_body, # "description" in user request, mapped to "body"
    }

# Use ThreadPoolExecutor for processing records concurrently
# os.cpu_count() provides a sensible default for max_workers
# Adjust max_workers based on specific I/O vs CPU characteristics if needed
logger.info(f"Starting extraction of PR details for {toal_pull_request_count} records using concurrent.futures...")

with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
    # Submit all tasks to the executor
    future_to_record = {executor.submit(extract_pr_details, record): record for record in all_records_list}
    
    # Process futures as they complete, with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_record), total=len(all_records_list), desc="Extracting PR Details"):
        record_for_future = future_to_record[future] # Get original record for logging in case of error
        try:
            result = future.result()
            if result:  # Add to list if extraction was successful (result is not None)
                pr_details_list.append(result)
        except Exception as exc:
            record_id = record_for_future.get("_id", "N/A")
            logger.error(f"Record ID {record_id} generated an exception during extraction: {exc}")

logger.info(f"Successfully processed all records. Extracted details for {len(pr_details_list)} pull requests.")

[32m2025-06-03 15:09:36.337[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m33[0m - [1mStarting extraction of PR details for 25042 records using concurrent.futures...[0m
Extracting PR Details: 100%|██████████| 25042/25042 [00:00<00:00, 147144.03it/s]
[32m2025-06-03 15:09:42.987[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m50[0m - [1mSuccessfully processed all records. Extracted details for 25042 pull requests.[0m


In [15]:
# Define the path for the output CSV file
output_csv_path = "../data/keyword-linking/pullrequestToIssue/closed_prs_summary.csv"

# Convert the list of dictionaries to a pandas DataFrame and save to CSV
if pr_details_list:
    df_prs = pd.DataFrame(pr_details_list)
    
    # Ensure the columns are in the desired order for the CSV
    # "description" from user request is mapped to "pr_body"
    csv_headers = ["pr_number", "pr_link", "pr_title", "pr_body"]
    df_prs = df_prs[csv_headers]

    try:
        # Save the DataFrame to CSV, overwriting if the file exists
        df_prs.to_csv(output_csv_path, index=False, mode='w')
        logger.info(f"Successfully saved PR details to {output_csv_path}")
    except Exception as e:
        logger.error(f"Failed to save PR details to CSV at {output_csv_path}: {e}")
else:
    logger.warning("No PR details were extracted (or all records were skipped). CSV file will not be created.")

[32m2025-06-03 15:10:25.477[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m16[0m - [1mSuccessfully saved PR details to ../data/keyword-linking/pullrequestToIssue/closed_prs_summary.csv[0m


# Key word processing


## Title


In [11]:
import pandas as pd
import re
from loguru import logger
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from typing import List, Tuple, Optional
import time


# Thread-safe lock for writing results
write_lock = threading.Lock()

csv_path = "../data/keyword-linking/pullrequestToIssue/closed_prs_summary.csv"

def extract_all_issue_numbers_from_title(title: str) -> List[str]:
    """
    Extracts all issue numbers from pull request title.
    Returns a list of issue numbers found.
    """
    if pd.isna(title) or not title.strip():
        return []

   # Covers: fix(es/ed/ing), clos(e/es/ed), resolv(e/es/ed), issue(s), Backport, Revert(s)
    keyword_pattern = r'\b(?:fix(?:es|ed|ing)?|clos(?:e|es|ed)|resolv(?:e|es|ed)|issue|Backport|Revert(?:s)?)\b'

   # Comprehensive patterns to find issue numbers, ordered by specificity
    issue_patterns = [
        # Priority 1: Keywords followed by colon and GitHub URL (most specific)
        rf'{keyword_pattern}:\s+https?://github\.com/[^/\s]+/[^/\s]+/(?:issues|pull)/(\d+)',
        
        # Priority 2: Keywords followed by full GitHub issue URL
        rf'{keyword_pattern}\s+https?://github\.com/[^/\s]+/[^/\s]+/(?:issues|pull)/(\d+)',
        
        # Priority 3: Keywords followed by repo path and issue number
        rf'{keyword_pattern}\s+[^/\s]+/[^/\s]+/issues/(\d+)',
        
        # Priority 4: Keywords + <text> + GitHub issue links + optional plain number
        rf'{keyword_pattern}.*<.*?>\s+https?://github\.com/[^/\s]+/[^/\s]+/(?:issues|pull)/(\d+)(?:\s+(\d+))*',
        
        # Priority 5: Keywords + <text> + multiple issue numbers
        rf'{keyword_pattern}.*<.*?>\s+#(\d+)(?:\s+#(\d+))*',
        
        # Priority 6: Keywords followed by square brackets
        rf'{keyword_pattern}.*\[#(\d+)\]',
        
        # Priority 7: Square brackets followed by keywords
        rf'\[#(\d+)\].*{keyword_pattern}',
        
        # Priority 8: Keywords followed by (from #number)
        rf'{keyword_pattern}.*\(from\s+#(\d+)\)',
        
        # Priority 9: Keywords followed by "part of #number" or "part of issue #number"
        rf'{keyword_pattern}.*part\s+of\s+(?:issue\s+)?#(\d+)',
        
        # Priority 10: Keywords followed by "/fix-#number" or "/fix-number"
        rf'{keyword_pattern}.*(?:/fix-#(\d+)|/fix-(\d+))',
        
        # Priority 11: Multiple issues in one line (e.g., "fixes #123, #456, #789")
        rf'{keyword_pattern}[^#]*?((?:#\d+(?:\s*,\s*#\d+)*)+)',
        
        # Priority 12: Keywords followed by any text and #number
        rf'{keyword_pattern}.*#(\d+)',
        
        # Priority 13: Keywords followed by #number (direct)
        rf'{keyword_pattern}\s+#(\d+)',
        
        # Priority 14: Keywords followed by plain number (no #)
        rf'{keyword_pattern}\s+(\d+)\b',
        
        # Priority 15: #number followed by keyword
        rf'#(\d+)\s+{keyword_pattern}',
        
        # Priority 16: Specific "Address review suggestions" pattern
        rf'Address\s+review\s+suggestions.*#(\d+)',
        
        # Priority 17: Multiple issue numbers after "issues" (e.g., "issues #1159 #947")
        r'\bissues?\s+#(\d+)(?:\s+#(\d+))*',
        
        # Priority 18: GitHub URLs without keywords (issues only, not pull requests)
        r'https?://github\.com/[^/\s]+/[^/\s]+/issues/(\d+)',
    ]

    found_issues = set()  # Use set to avoid duplicates
    
    for pattern in issue_patterns:
        matches = re.finditer(pattern, title, re.IGNORECASE)
        for match in matches:
            groups = match.groups()
            for group in groups:
                if group:
                    # Handle multiple issues in one match (e.g., "#123, #456")
                    if '#' in group:
                        issue_nums = re.findall(r'#(\d+)', group)
                        found_issues.update(issue_nums)
                    else:
                        found_issues.add(group)
    
    return list(found_issues)

def generate_issue_link(issue_number: str, pr_link: str) -> str:
    """
    Generate GitHub issue link based on issue number and PR link.
    Assumes same repository as the PR.
    """
    if not issue_number or not pr_link:
        return ""
    
    # Extract repository info from PR link
    # Example: https://github.com/owner/repo/pull/123 -> https://github.com/owner/repo/issues/456
    match = re.match(r'(https?://github\.com/[^/]+/[^/]+)/', pr_link)
    if match:
        repo_base = match.group(1)
        return f"{repo_base}/issues/{issue_number}"
    
    return ""

def process_row(row_data: Tuple[int, pd.Series]) -> List[dict]:
    """
    Process a single row and return list of records (one per issue found).
    """
    idx, row = row_data
    
    try:
        title = str(row.get('pr_title', ''))
        pr_link = str(row.get('pr_link', ''))
        
        # Extract all issue numbers from title
        issue_numbers = extract_all_issue_numbers_from_title(title)
        
        results = []
        
        if issue_numbers:
            # Create separate record for each issue found
            for issue_num in issue_numbers:
                issue_link = generate_issue_link(issue_num, pr_link)
                
                # Create new record with all original data plus extracted info
                new_record = row.to_dict()
                new_record.update({
                    'extracted_issue_number': issue_num,
                    'extracted_issue_link': issue_link,
                    'found_in_title': True,
                    'original_row_index': idx
                })
                results.append(new_record)
                
            logger.info(f"Row {idx}: Found {len(issue_numbers)} issues in title: {issue_numbers}")
        else:
            # No issues found, keep original record with null values
            new_record = row.to_dict()
            new_record.update({
                'extracted_issue_number': None,
                'extracted_issue_link': None,
                'found_in_title': False,
                'original_row_index': idx
            })
            results.append(new_record)
            
        return results
        
    except Exception as e:
        logger.error(f"Error processing row {idx}: {str(e)}")
        # Return original record with error info
        new_record = row.to_dict()
        new_record.update({
            'extracted_issue_number': None,
            'extracted_issue_link': None,
            'found_in_title': False,
            'original_row_index': idx,
            'processing_error': str(e)
        })
        return [new_record]

def run():
    """
    Main function to process the CSV and create new output file.
    """
    logger.info("Starting issue number extraction process")
    
    try:
        # Load the CSV file
        logger.info(f"Loading CSV file: {csv_path}")
        df = pd.read_csv(csv_path)
        logger.info(f"Loaded {len(df)} records from CSV")
        
        # Prepare data for parallel processing
        row_data = list(df.iterrows())
        
        # Process rows in parallel
        all_results = []
        
        logger.info("Starting parallel processing...")
        with ThreadPoolExecutor(max_workers=4) as executor:
            # Submit all tasks
            future_to_row = {executor.submit(process_row, row): row[0] for row in row_data}
            
            # Process completed tasks with progress bar
            for future in tqdm(as_completed(future_to_row), total=len(future_to_row), desc="Processing rows"):
                try:
                    row_results = future.result()
                    with write_lock:
                        all_results.extend(row_results)
                except Exception as e:
                    row_idx = future_to_row[future]
                    logger.error(f"Failed to process row {row_idx}: {str(e)}")
        
        # Create new DataFrame from results
        logger.info(f"Creating new DataFrame with {len(all_results)} records")
        new_df = pd.DataFrame(all_results)
        
        # Generate output filename
        output_path = csv_path.replace('.csv', '_with_extracted_issues.csv')
        
        # Save to new CSV
        logger.info(f"Saving results to: {output_path}")
        new_df.to_csv(output_path, index=False)
        
        # Log statistics
        total_original_rows = len(df)
        total_new_rows = len(new_df)
        rows_with_issues = len(new_df[new_df['found_in_title'] == True])
        unique_issues_found = len(new_df[new_df['extracted_issue_number'].notna()]['extracted_issue_number'].unique())
        
        logger.info("="*50)
        logger.info("PROCESSING SUMMARY")
        logger.info("="*50)
        logger.info(f"Original rows: {total_original_rows}")
        logger.info(f"New rows: {total_new_rows}")
        logger.info(f"Rows with issues found: {rows_with_issues}")
        logger.info(f"Unique issues extracted: {unique_issues_found}")
        logger.info(f"Expansion ratio: {total_new_rows/total_original_rows:.2f}x")
        logger.info(f"Output file: {output_path}")
        logger.info("="*50)
        
        print(f"\n✅ Processing completed successfully!")
        print(f"📊 Original rows: {total_original_rows}")
        print(f"📊 New rows: {total_new_rows}")
        print(f"📊 Rows with issues: {rows_with_issues}")
        print(f"📊 Unique issues found: {unique_issues_found}")
        print(f"💾 Output saved to: {output_path}")
        
        return new_df
        
    except Exception as e:
        logger.error(f"Fatal error in main process: {str(e)}")
        raise

run()


[32m2025-06-05 11:20:43.995[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m179[0m - [1mStarting issue number extraction process[0m
[32m2025-06-05 11:20:43.995[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m183[0m - [1mLoading CSV file: ../data/keyword-linking/pullrequestToIssue/closed_prs_summary.csv[0m


[32m2025-06-05 11:20:44.332[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m185[0m - [1mLoaded 25042 records from CSV[0m
[32m2025-06-05 11:20:45.488[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun[0m:[36m193[0m - [1mStarting parallel processing...[0m
[32m2025-06-05 11:20:45.521[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_row[0m:[36m148[0m - [1mRow 41: Found 1 issues in title: ['12492'][0m
[32m2025-06-05 11:20:45.531[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_row[0m:[36m148[0m - [1mRow 89: Found 1 issues in title: ['12492'][0m
[32m2025-06-05 11:20:45.539[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_row[0m:[36m148[0m - [1mRow 131: Found 1 issues in title: ['12584'][0m
[32m2025-06-05 11:20:45.549[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_row[0m:[36m148[0m - [1mRow 218: Found 1 issues in title: ['3440'][0m
[32m2025-06-05 11:20:45.549[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_


✅ Processing completed successfully!
📊 Original rows: 25042
📊 New rows: 25065
📊 Rows with issues: 313
📊 Unique issues found: 291
💾 Output saved to: ../data/keyword-linking/pullrequestToIssue/closed_prs_summary_with_extracted_issues.csv


Unnamed: 0,pr_number,pr_link,pr_title,pr_body,extracted_issue_number,extracted_issue_link,found_in_title,original_row_index
0,18210,https://github.com/ballerina-platform/ballerin...,Fix intermittent test failures in Kafka tests,## Purpose\r\n> $subject\r\n\r\nFixes #18209 \...,,,False,3310
1,36110,https://github.com/ballerina-platform/ballerin...,Extend the Type Cast code action for unsupport...,## Purpose\r\n$subject to add a new code actio...,,,False,15162
2,5663,https://github.com/ballerina-platform/ballerin...,Add initial packerina v4 lang server migration,## Purpose\r\n> Initial packerina v4 related m...,,,False,5949
3,37005,https://github.com/ballerina-platform/ballerin...,Support type reference for function parameters,## Purpose\r\n$subject\r\nFixes #36991\r\n\r\n...,,,False,15169
4,27149,https://github.com/ballerina-platform/ballerin...,Fix build failures,"## Purpose\r\n> Describe the problems, issues,...",,,False,4685
...,...,...,...,...,...,...,...,...
25060,20086,https://github.com/ballerina-platform/ballerin...,Fix API Doc generation issue with private type...,## Purpose\r\n> Fix API Doc generation issue w...,,,False,25040
25061,19302,https://github.com/ballerina-platform/ballerin...,Fix closure var init in worker decl,## Purpose\r\n```ballerina\r\npublic function ...,,,False,25041
25062,4547,https://github.com/ballerina-platform/ballerin...,Fix #4338: stop all server connectors if one f...,"## Purpose\r\n> Currently, if at least one ser...",4338,https://github.com/ballerina-platform/ballerin...,True,24911
25063,11109,https://github.com/ballerina-platform/ballerin...,Fix issue #10721,## Purpose\r\nFix #10721 and the relevant exam...,10721,https://github.com/ballerina-platform/ballerin...,True,24961


## Discription

In [12]:
import pandas as pd
import os  
import re
from loguru import logger
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from typing import List, Tuple, Optional
import time


def extract_all_issue_numbers_from_text(text: str) -> List[str]:
    """
    Extracts all issue numbers from a given text string.
    Returns a list of unique issue numbers found.
    """
    if pd.isna(text) or not text.strip():
        return []

    # Covers: fix(es/ed/ing), clos(e/es/ed), resolv(e/es/ed), issue(s)
    keyword_pattern = r'\b(?:fix(?:es|ed|ing)?|clos(?:e|es|ed)|resolv(?:e|es|ed)|issue(?:s)?)\b'

    # Patterns to find issue numbers, ordered by specificity (same as in CELL 11)
    issue_patterns = [
        # Matches: keyword + full GitHub issue/PR URL (e.g., "fixes https://github.com/org/repo/issues/123")
        rf'{keyword_pattern}\s+https?://github\.com/[^/]+/[^/]+/(?:issues|pull)/(\d+)',
        # Matches: keyword + #issue_number (e.g., "closes #456")
        rf'{keyword_pattern}\s+#(\d+)\b',
        # Matches: keyword + issue_number (e.g., "resolves 789") - less common, but possible
        rf'{keyword_pattern}\s+(\d+)\b',
        # Matches: keyword + text + (from #issue_number) (e.g., "fix: cherry pick (from #101)")
        rf'{keyword_pattern}.*\(from\s+#(\d+)\)',
        # Matches: keyword + text + #issue_number (e.g., "fixed: related to #112")
        rf'{keyword_pattern}.*#(\d+)\b',
        # Matches: keyword + text + (part of #issue_number or part of issue #issue_number) (e.g. "Fixes part of #123", "resolves part of issue #456")
        rf'{keyword_pattern}.*(?:part of\s+(?:issue\s+)?#(\d+)\b)',
        # Matches: keyword + text + /fix-#issue_number or /fix-issue_number (e.g. "fix /fix-#123", "closes /fix-456")
        rf'{keyword_pattern}.*(?:/fix-#(\d+)\b|/fix-(\d+)\b)',
        # Matches: "Fixes: " + full GitHub issue URL (e.g., "Fixes: https://github.com/org/repo/issues/123") - common in commit messages
        rf'Fixes:\s+https?://github\.com/[^/]+/[^/]+/issues/(\d+)',
        # Matches: keyword + multiple #issue_numbers separated by commas (e.g., "fixes #123, #456, #789")
        rf'{keyword_pattern}[^#]*?((?:#\d+(?:\s*,\s*#\d+)*)+)',
        # Matches: standalone full GitHub issue URL (e.g., "https://github.com/org/repo/issues/334") - broad
        r'https?://github\.com/[^/]+/[^/]+/issues/(\d+)',
        # Matches: "Fix " + org/repo/issues/issue_number (e.g., "Fix org/repo/issues/556") - specific to a common typo/format
        rf'Fix\s+[^/]+/[^/]+/issues/(\d+)'
    ]

    found_issues = set()  # Use set to avoid duplicates
    
    for pattern in issue_patterns:
        matches = re.finditer(pattern, text, re.IGNORECASE)
        for match in matches:
            groups = match.groups()
            for group_val in groups:
                if group_val:
                    # Handle multiple issues in one match (e.g., "#123, #456")
                    if '#' in group_val:
                        issue_nums_in_group = re.findall(r'#(\d+)', group_val)
                        found_issues.update(issue_nums_in_group)
                    else:
                        found_issues.add(group_val)
    
    return list(found_issues)

def process_row_for_body(row_data: Tuple[int, pd.Series]) -> List[dict]:
    """
    Process a single row to extract issue numbers from 'pr_body'.
    Returns a list of dictionaries, one for each issue found, or one original if none found.
    """
    idx, row = row_data
    
    try:
        pr_body = str(row.get('pr_body', ''))
        pr_link = str(row.get('pr_link', ''))
        
        # Extract all issue numbers from pr_body
        issue_numbers = extract_all_issue_numbers_from_text(pr_body)
        
        results = []
        
        if issue_numbers:
            # Create separate record for each issue found in body
            for issue_num in issue_numbers:
                issue_link = generate_issue_link(issue_num, pr_link) # generate_issue_link from CELL 11
                
                new_record = row.to_dict()
                new_record.update({
                    'extracted_issue_number_body': issue_num,
                    'extracted_issue_link_body': issue_link,
                    'found_in_body': True,
                    'original_row_index': idx  # Keep track of the original row
                })
                results.append(new_record)
                
            logger.info(f"Row {idx}: Found {len(issue_numbers)} issues in body: {issue_numbers}")
        else:
            # No issues found in body, keep original record with null/false values for body extraction
            new_record = row.to_dict()
            new_record.update({
                'extracted_issue_number_body': None,
                'extracted_issue_link_body': None,
                'found_in_body': False,
                'original_row_index': idx
            })
            results.append(new_record)
            
        return results
        
    except Exception as e:
        logger.error(f"Error processing row {idx} for body extraction: {str(e)}")
        # Return original record with error info for body extraction
        new_record = row.to_dict()
        new_record.update({
            'extracted_issue_number_body': None,
            'extracted_issue_link_body': None,
            'found_in_body': False,
            'original_row_index': idx,
            'processing_error_body': str(e)
        })
        return [new_record]

def run_body_extraction():
    """
    Main function to process the CSV for issues in 'pr_body' and create a new output file.
    """
    logger.info("Starting issue number extraction from PR BODY content")
    
    try:
        # csv_path is expected to be defined from a previous cell (e.g., CELL 11)
        # Defaulting if not found, but ideally it's already set.
        current_csv_path = csv_path if 'csv_path' in globals() else "../data/keyword-linking/pullrequestToIssue/closed_prs_summary.csv"

        logger.info(f"Loading CSV file: {current_csv_path}")
        df = pd.read_csv(current_csv_path)
        logger.info(f"Loaded {len(df)} records from CSV for body extraction")
        
        # Prepare data for parallel processing
        row_data_for_body = list(df.iterrows())
        
        all_results_body = []
        
        logger.info("Starting parallel processing for PR bodies...")
        # Using max_workers=4 for consistency with CELL 11, adjust if needed
        with ThreadPoolExecutor(max_workers= os.cpu_count()) as executor:
            future_to_row_body = {executor.submit(process_row_for_body, row_bd): row_bd[0] for row_bd in row_data_for_body}
            
            for future_bd in tqdm(as_completed(future_to_row_body), total=len(future_to_row_body), desc="Processing PR Bodies"):
                try:
                    row_results_bd = future_bd.result()
                    # write_lock is expected to be defined from a previous cell (e.g., CELL 11)
                    with write_lock: 
                        all_results_body.extend(row_results_bd)
                except Exception as e_bd:
                    row_idx_bd = future_to_row_body[future_bd]
                    logger.error(f"Failed to process row {row_idx_bd} for body extraction in main loop: {str(e_bd)}")
        
        logger.info(f"Creating new DataFrame with {len(all_results_body)} records from body extraction")
        new_df_body = pd.DataFrame(all_results_body)
        
        # Generate output filename for body extraction results
        output_path_body = current_csv_path.replace('.csv', '_with_extracted_issues_from_body.csv')
        
        logger.info(f"Saving body extraction results to: {output_path_body}")
        new_df_body.to_csv(output_path_body, index=False)
        
        # Log statistics for body extraction
        total_original_rows_bd = len(df)
        total_new_rows_bd = len(new_df_body)
        rows_with_issues_in_body = len(new_df_body[new_df_body['found_in_body'] == True])
        unique_issues_found_in_body = 0
        if 'extracted_issue_number_body' in new_df_body.columns:
             unique_issues_found_in_body = len(new_df_body[new_df_body['extracted_issue_number_body'].notna()]['extracted_issue_number_body'].unique())
        
        logger.info("="*50)
        logger.info("PR BODY EXTRACTION PROCESSING SUMMARY")
        logger.info("="*50)
        logger.info(f"Original rows processed: {total_original_rows_bd}")
        logger.info(f"New rows generated (from body): {total_new_rows_bd}")
        logger.info(f"Rows with issues found in body: {rows_with_issues_in_body}")
        logger.info(f"Unique issues extracted from body: {unique_issues_found_in_body}")
        if total_original_rows_bd > 0:
            logger.info(f"Expansion ratio (body): {total_new_rows_bd/total_original_rows_bd:.2f}x")
        logger.info(f"Output file (body extraction): {output_path_body}")
        logger.info("="*50)
        
        print(f"\n✅ PR Body issue extraction completed successfully!")
        print(f"📊 Original rows: {total_original_rows_bd}")
        print(f"📊 New rows (from body): {total_new_rows_bd}")
        print(f"📊 Rows with issues found in body: {rows_with_issues_in_body}")
        print(f"📊 Unique issues found in body: {unique_issues_found_in_body}")
        print(f"💾 Output saved to: {output_path_body}")
        
        return new_df_body
        
    except FileNotFoundError:
        logger.error(f"Error: The CSV file was not found at {current_csv_path}.")
        print(f"❌ Error: The CSV file was not found at {current_csv_path}.")
    except Exception as e:
        logger.error(f"Fatal error in PR body extraction process: {str(e)}")
        print(f"❌ Fatal error in PR body extraction process: {str(e)}")
        raise

# Execute the body extraction process
# This will use csv_path, generate_issue_link, and write_lock from the previous cells' scope.
result_df_body = run_body_extraction()

[32m2025-06-05 16:29:41.757[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_body_extraction[0m:[36m127[0m - [1mStarting issue number extraction from PR BODY content[0m
[32m2025-06-05 16:29:41.759[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_body_extraction[0m:[36m134[0m - [1mLoading CSV file: ../data/keyword-linking/pullrequestToIssue/closed_prs_summary.csv[0m
[32m2025-06-05 16:29:42.119[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_body_extraction[0m:[36m136[0m - [1mLoaded 25042 records from CSV for body extraction[0m
[32m2025-06-05 16:29:43.551[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_body_extraction[0m:[36m143[0m - [1mStarting parallel processing for PR bodies...[0m
[32m2025-06-05 16:29:43.567[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_row_for_body[0m:[36m96[0m - [1mRow 16: Found 1 issues in body: ['6892'][0m
[32m2025-06-05 16:29:43.582[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_row_for_body[0m:[3


✅ PR Body issue extraction completed successfully!
📊 Original rows: 25042
📊 New rows (from body): 29081
📊 Rows with issues found in body: 14104
📊 Unique issues found in body: 10375
💾 Output saved to: ../data/keyword-linking/pullrequestToIssue/closed_prs_summary_with_extracted_issues_from_body.csv
