# Local PR Summarization Pipeline

This notebook implements an offline PR summarization system using:
- Git for extracting branch diffs, commits, and changed files
- Local Ollama API (CodeLlama) for LLM-based summarization
- **Atomic change detection** for precise tracking of all code modifications
- **Smart validation** to ensure no changes are missed in summaries
- **Robust error handling** for LLM timeouts and failures

## Features:
- Extract all PR-relevant data from any two branches
- **Parse diffs into atomic changes** (additions, deletions, modifications) with line numbers
- Engineer optimized prompts that explicitly enumerate all changes
- **Validate LLM output** to ensure coverage of all atomic changes
- **Automatic re-prompting** for missing changes
- Handle large diffs through intelligent chunking
- Aggregate file-level summaries into comprehensive PR summary
- **Transparent failure tracking** and placeholder summaries for failed files
- **On-demand retry** for files that fail due to LLM timeouts
- Fully offline and private

## 1. Git Data Extraction

Extract all relevant PR information: branches, commits, changed files, and diffs.

In [15]:
import subprocess
from typing import List, Dict

# Get current git branch
def get_current_branch(repo_path: str = ".") -> str:
    """Get the name of the current git branch."""
    result = subprocess.run(
        ["git", "rev-parse", "--abbrev-ref", "HEAD"], 
        capture_output=True, 
        text=True,
        cwd=repo_path
    )
    return result.stdout.strip()

# Get base branch (default: main)
def get_base_branch(repo_path: str = ".", default: str = "main") -> str:
    """
    Get the base branch for comparison. 
    Tries to detect main/master, falls back to provided default.
    """
    # Check if main exists
    result = subprocess.run(
        ["git", "rev-parse", "--verify", "main"],
        capture_output=True,
        cwd=repo_path
    )
    if result.returncode == 0:
        return "main"
    
    # Check if master exists
    result = subprocess.run(
        ["git", "rev-parse", "--verify", "master"],
        capture_output=True,
        cwd=repo_path
    )
    if result.returncode == 0:
        return "master"
    
    return default

# Get list of changed files between base and current branch
def get_changed_files(base: str, current: str, repo_path: str = ".") -> List[str]:
    """List all files changed between two branches."""
    result = subprocess.run(
        ["git", "diff", "--name-only", f"{base}...{current}"], 
        capture_output=True, 
        text=True,
        cwd=repo_path
    )
    files = result.stdout.strip().splitlines()
    return [f for f in files if f]  # Filter empty strings

# Get commit messages between base and current branch
def get_commit_messages(base: str, current: str, repo_path: str = ".") -> List[str]:
    """Get all commit messages between two branches."""
    result = subprocess.run(
        ["git", "log", f"{base}..{current}", "--pretty=format:%h - %s"], 
        capture_output=True, 
        text=True,
        cwd=repo_path
    )
    messages = result.stdout.strip().splitlines()
    return [m for m in messages if m]

# Get diff between base and current branch
def get_diff(base: str, current: str, repo_path: str = ".") -> str:
    """Get the full diff between two branches."""
    result = subprocess.run(
        ["git", "diff", f"{base}...{current}"], 
        capture_output=True, 
        text=True,
        cwd=repo_path
    )
    return result.stdout

# Get diff for a specific file
def get_file_diff(base: str, current: str, file_path: str, repo_path: str = ".") -> str:
    """Get the diff for a specific file between two branches."""
    result = subprocess.run(
        ["git", "diff", f"{base}...{current}", "--", file_path],
        capture_output=True,
        text=True,
        cwd=repo_path
    )
    return result.stdout

print("âœ“ Git utility functions loaded successfully!")

âœ“ Git utility functions loaded successfully!


## 2. Test Git Extraction

Verify that we can extract all PR data from the current repository.

In [16]:
# Configure repository path (adjust to your Flask repo location)
REPO_PATH = "../flask"

# Extract PR data
current_branch = get_current_branch(REPO_PATH)
base_branch = get_base_branch(REPO_PATH)

print(f"Current branch: {current_branch}")
print(f"Base branch: {base_branch}")

# Get changed files
changed_files = get_changed_files(base_branch, current_branch, REPO_PATH)
print(f"\nChanged files ({len(changed_files)}):")
for i, file in enumerate(changed_files[:10], 1):  # Show first 10
    print(f"  {i}. {file}")
if len(changed_files) > 10:
    print(f"  ... and {len(changed_files) - 10} more")

# Get commit messages
commits = get_commit_messages(base_branch, current_branch, REPO_PATH)
print(f"\nCommit messages ({len(commits)}):")
for commit in commits[:5]:  # Show first 5
    print(f"  - {commit}")
if len(commits) > 5:
    print(f"  ... and {len(commits) - 5} more")

# Get diff size
full_diff = get_diff(base_branch, current_branch, REPO_PATH)
diff_lines = len(full_diff.splitlines())
print(f"\nTotal diff size: {diff_lines} lines ({len(full_diff)} characters)")
print(f"Sample diff (first 500 chars):\n{full_diff[:500]}...")

Current branch: test/pr-summary-demo
Base branch: main

Changed files (6):
  1. .github/workflows/tests.yaml
  2. README.md
  3. pyproject.toml
  4. src/flask/app.py
  5. src/flask/helpers.py
  6. src/flask/views.py

Commit messages (4):
  - 62a77913 - Add debug print and TODO in views.py; update README for PR summarization demo
  - 06e61ab5 - Rename variable in helpers.py for PR summarization demo
  - 1336cc19 - Add test comment to app.py for PR summarization demo
  - ad68a126 - drop experimental 3.13t test env

Total diff size: 239 lines (9727 characters)
Sample diff (first 500 chars):
diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index 892573d8..347e90d5 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -18,7 +18,6 @@ jobs:
           - {name: Windows, python: '3.14', os: windows-latest}
           - {name: Mac, python: '3.14', os: macos-latest}
           - {python: '3.13'}
-          - {python: '3.13t'}
           - {python:

## 3. Diff Chunking Strategy

For large diffs, we need to chunk them intelligently to avoid overwhelming the LLM.

In [17]:
def chunk_diff_by_file(base: str, current: str, changed_files: List[str], repo_path: str = ".") -> Dict[str, str]:
    """
    Split the diff into per-file chunks for manageable summarization.
    Returns a dict mapping file paths to their diffs.
    """
    file_diffs = {}
    for file_path in changed_files:
        diff = get_file_diff(base, current, file_path, repo_path)
        if diff.strip():  # Only include files with actual changes
            file_diffs[file_path] = diff
    return file_diffs

def truncate_large_diff(diff: str, max_lines: int = 100) -> str:
    """
    Truncate very large diffs to focus on beginning and end.
    Useful for summarization when full context isn't needed.
    """
    lines = diff.splitlines()
    if len(lines) <= max_lines:
        return diff
    
    # Take first and last portions
    half = max_lines // 2
    truncated = lines[:half] + ["\n... [truncated middle section] ...\n"] + lines[-half:]
    return "\n".join(truncated)

def should_summarize_file(file_path: str) -> bool:
    """
    Determine if a file should be included in summarization.
    Exclude generated files, lock files, config files, etc.
    """
    exclude_patterns = [
        '.github/',
        'pyproject.toml',
        'package-lock.json',
        'yarn.lock',
        'poetry.lock',
        '.min.js',
        '.min.css',
        '__pycache__',
        '.pyc',
        '.yml',
        '.yaml',
        'requirements.txt',
        'setup.py',  
        'setup.cfg',           
        '.gitignore',        
        'LICENSE',    
        'MANIFEST.in'    
    ]
    for pattern in exclude_patterns:
        if pattern in file_path:
            return False
    return True
print("âœ“ Chunking utilities loaded successfully!")

âœ“ Chunking utilities loaded successfully!


## 3.5 Atomic Diff Parsing

Parse git diffs into atomic changes for precise tracking and verification.

In [18]:
import re
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional

@dataclass
class AtomicChange:
    """Represents a single atomic change in a diff."""
    change_type: str  # 'addition', 'deletion', 'modification'
    line_number: int
    old_line: Optional[int]
    new_line: Optional[int]
    old_content: Optional[str]
    new_content: Optional[str]
    context: str  # Surrounding code context
    
    def __repr__(self):
        if self.change_type == 'addition':
            return f"Line {self.new_line}: + {self.new_content}"
        elif self.change_type == 'deletion':
            return f"Line {self.old_line}: - {self.old_content}"
        else:
            return f"Line {self.old_line}->{self.new_line}: {self.old_content} â†’ {self.new_content}"

def parse_diff_hunks(diff: str) -> List[AtomicChange]:
    """
    Parse a git diff into atomic changes with line numbers and context.
    
    Returns a list of AtomicChange objects, each representing a single
    line addition, deletion, or modification.
    """
    changes = []
    lines = diff.splitlines()
    
    # Track current line numbers
    old_line_num = 0
    new_line_num = 0
    context_buffer = []  # Store recent context lines
    
    for i, line in enumerate(lines):
        # Parse hunk headers: @@ -old_start,old_count +new_start,new_count @@
        if line.startswith('@@'):
            match = re.match(r'@@ -(\d+),?\d* \+(\d+),?\d* @@', line)
            if match:
                old_line_num = int(match.group(1))
                new_line_num = int(match.group(2))
                context_buffer = []
            continue
        
        # Skip file headers
        if line.startswith('diff --git') or line.startswith('index') or \
           line.startswith('---') or line.startswith('+++'):
            continue
        
        # Build context from last 2 unchanged lines
        if line.startswith(' '):
            context_buffer.append(line[1:])
            if len(context_buffer) > 2:
                context_buffer.pop(0)
            old_line_num += 1
            new_line_num += 1
            
        # Addition
        elif line.startswith('+'):
            content = line[1:].strip()
            if content:  # Ignore empty additions
                context = '\n'.join(context_buffer[-2:]) if context_buffer else ""
                changes.append(AtomicChange(
                    change_type='addition',
                    line_number=new_line_num,
                    old_line=None,
                    new_line=new_line_num,
                    old_content=None,
                    new_content=content,
                    context=context
                ))
            new_line_num += 1
            
        # Deletion
        elif line.startswith('-'):
            content = line[1:].strip()
            if content:  # Ignore empty deletions
                context = '\n'.join(context_buffer[-2:]) if context_buffer else ""
                changes.append(AtomicChange(
                    change_type='deletion',
                    line_number=old_line_num,
                    old_line=old_line_num,
                    new_line=None,
                    old_content=content,
                    new_content=None,
                    context=context
                ))
            old_line_num += 1
    
    return changes

def detect_modifications(changes: List[AtomicChange]) -> List[AtomicChange]:
    """
    Post-process changes to detect modifications (deletion + addition pairs).
    Combines adjacent deletions and additions into modification entries.
    """
    if not changes:
        return changes
    
    modified_changes = []
    i = 0
    
    while i < len(changes):
        current = changes[i]
        
        # Check if this deletion is followed by an addition (likely a modification)
        if (current.change_type == 'deletion' and 
            i + 1 < len(changes) and 
            changes[i + 1].change_type == 'addition' and
            abs(current.line_number - changes[i + 1].line_number) <= 2):
            
            next_change = changes[i + 1]
            
            # Create a modification entry
            modified_changes.append(AtomicChange(
                change_type='modification',
                line_number=current.line_number,
                old_line=current.old_line,
                new_line=next_change.new_line,
                old_content=current.old_content,
                new_content=next_change.new_content,
                context=current.context
            ))
            i += 2  # Skip both the deletion and addition
        else:
            modified_changes.append(current)
            i += 1
    
    return modified_changes

def format_atomic_changes(changes: List[AtomicChange]) -> str:
    """
    Format atomic changes into a clear, enumerated list for LLM prompts.
    """
    if not changes:
        return "No atomic changes detected."
    
    formatted = []
    for idx, change in enumerate(changes, 1):
        if change.change_type == 'addition':
            formatted.append(f"{idx}. **Added** at line {change.new_line}: `{change.new_content}`")
        elif change.change_type == 'deletion':
            formatted.append(f"{idx}. **Removed** at line {change.old_line}: `{change.old_content}`")
        elif change.change_type == 'modification':
            formatted.append(
                f"{idx}. **Changed** at line {change.old_line}: "
                f"`{change.old_content}` â†’ `{change.new_content}`"
            )
    
    return '\n'.join(formatted)

print("âœ“ Atomic diff parser loaded successfully!")

âœ“ Atomic diff parser loaded successfully!


## 4. Prompt Engineering

Create effective prompts for the LLM to generate high-quality summaries.

In [35]:
def create_file_summary_prompt(file_path: str, diff: str, max_diff_lines: int = 150) -> str:
    """
    Create a prompt for summarizing a single file's changes.
    Now includes enumerated atomic changes for precise tracking.
    The summary must be a concise, comprehensive paragraph (not a bullet list).
    """
    # Parse atomic changes from diff
    atomic_changes = parse_diff_hunks(diff)
    atomic_changes = detect_modifications(atomic_changes)
    
    # Format atomic changes
    changes_list = format_atomic_changes(atomic_changes)
    change_count = len(atomic_changes)
    
    # Also include truncated diff for additional context
    truncated_diff = truncate_large_diff(diff, max_diff_lines)
    
    prompt = f"""Summarize the code changes for this file. You must mention ALL {change_count} changes listed below.

File: {file_path}

Atomic Changes ({change_count} total):
{changes_list}

Full Diff Context:
```
{truncated_diff}
```

Requirements:
- Describe ALL {change_count} atomic changes listed above
- Be specific: mention variable names, function names, line additions/deletions
- **Write a single concise paragraph (1-2 sentences), not a bullet list**
- Do not infer or hallucinate changes not shown above

Summary (concise paragraph):"""
    
    return prompt

def create_overall_summary_prompt(
    base_branch: str, 
    current_branch: str, 
    commits: list, 
    changed_files: list,
    file_summaries: list
) -> str:
    """
    Create a prompt for generating an overall PR summary from file-level summaries.
    """
    commits_text = "\n".join(f"  - {commit}" for commit in commits[:10])
    if len(commits) > 10:
        commits_text += f"\n  ... and {len(commits) - 10} more commits"
    
    files_text = "\n".join(f"  - {file}" for file in changed_files[:15])
    if len(changed_files) > 15:
        files_text += f"\n  ... and {len(changed_files) - 15} more files"
    
    summaries_text = "\n\n".join(f"{i+1}. {summary}" for i, summary in enumerate(file_summaries))
    
    prompt = f"""Summarize this pull request based only on the information below. Be concise (2-3 sentences total).

Branch: {current_branch} â†’ {base_branch}

Commits ({len(commits)}):


Commits: {len(commits)}
Changed files {len(changed_files)}
{files_text}

File summaries:

{summaries_text}

Provide a brief PR summary covering: purpose, main changes, and impact. Keep it under 3 sentences total.

Summary:"""
    
    return prompt

print("âœ“ Prompt templates created successfully!")


âœ“ Prompt templates created successfully!


## 4.5 Summary Validation

Verify that LLM summaries cover all atomic changes.

In [36]:
def validate_summary_coverage(
    summary: str, 
    atomic_changes: List[AtomicChange],
    file_path: str
) -> Tuple[bool, List[AtomicChange], Dict[str, any]]:
    """
    Validate that the LLM summary mentions all atomic changes.
    
    Returns:
        - is_complete: bool indicating if all changes are covered
        - missing_changes: List of AtomicChange objects not mentioned in summary
        - metrics: Dict with coverage statistics
    """
    if not summary or not atomic_changes:
        return True, [], {"total": 0, "mentioned": 0, "coverage": 100.0}
    
    summary_lower = summary.lower()
    missing_changes = []
    mentioned_count = 0
    
    for change in atomic_changes:
        is_mentioned = False
        
        # Check if key content from the change appears in summary
        if change.change_type == 'addition' and change.new_content:
            # Look for the added content or key keywords from it
            content_keywords = extract_keywords(change.new_content)
            if any(kw in summary_lower for kw in content_keywords):
                is_mentioned = True
                
        elif change.change_type == 'deletion' and change.old_content:
            # Look for the deleted content or removal mention
            content_keywords = extract_keywords(change.old_content)
            if any(kw in summary_lower for kw in content_keywords) or \
               'remov' in summary_lower or 'delet' in summary_lower:
                is_mentioned = True
                
        elif change.change_type == 'modification':
            # Look for old or new content, or change/rename keywords
            old_keywords = extract_keywords(change.old_content) if change.old_content else []
            new_keywords = extract_keywords(change.new_content) if change.new_content else []
            if any(kw in summary_lower for kw in old_keywords + new_keywords) or \
               'chang' in summary_lower or 'modif' in summary_lower or 'renam' in summary_lower:
                is_mentioned = True
        
        if is_mentioned:
            mentioned_count += 1
        else:
            missing_changes.append(change)
    
    total = len(atomic_changes)
    coverage = (mentioned_count / total * 100) if total > 0 else 100.0
    
    metrics = {
        "file": file_path,
        "total_changes": total,
        "mentioned_changes": mentioned_count,
        "missing_changes": len(missing_changes),
        "coverage_percent": coverage
    }
    
    is_complete = coverage >= 80.0  # 80% threshold for acceptable coverage
    
    return is_complete, missing_changes, metrics

def extract_keywords(text: str) -> List[str]:
    """
    Extract meaningful keywords from code text for validation.
    Focuses on identifiers, function names, and significant tokens.
    """
    if not text:
        return []
    
    # Remove common noise and split into tokens
    text = text.lower()
    # Extract words that are likely identifiers (alphanumeric + underscore)
    words = re.findall(r'\b[a-z_][a-z0-9_]*\b', text)
    
    # Filter out common keywords and very short words
    common_keywords = {'if', 'else', 'for', 'while', 'return', 'def', 'class', 
                      'import', 'from', 'as', 'in', 'is', 'and', 'or', 'not', 'the', 'a', 'an'}
    keywords = [w for w in words if len(w) > 2 and w not in common_keywords]
    
    return keywords[:5]  # Return top 5 most significant keywords

def create_reprompt_for_missing_changes(
    file_path: str,
    original_summary: str,
    missing_changes: List[AtomicChange]
) -> str:
    """
    Create a focused re-prompt for changes that were not covered in the initial summary.
    """
    changes_list = format_atomic_changes(missing_changes)
    
    prompt = f"""Your previous summary for {file_path} missed some changes. Please describe these specific changes:

Missing Changes:
{changes_list}

Previous Summary:
{original_summary}

Provide a brief description (1 sentence) of the missing changes above:"""
    
    return prompt

print("âœ“ Summary validation utilities loaded successfully!")

âœ“ Summary validation utilities loaded successfully!


## 5. Ollama API Integration

Connect to the local Ollama server and send prompts for summarization.

In [None]:
import requests

OLLAMA_URL = "http://localhost:11434/api/generate"
MODEL_NAME = "codellama:7b-instruct"

def call_ollama(prompt: str, model: str = MODEL_NAME, temperature: float = 0.3, timeout: int = 200) -> str:
    """
    Send a prompt to the local Ollama API and return the generated response.
    
    Args:
        prompt: The input prompt for the LLM
        model: Model name (default: codellama)
        temperature: Sampling temperature (lower = more focused, higher = more creative)
        timeout: Request timeout in seconds (default: 200)
    
    Returns:
        The generated text response, or None if the request fails
    """
    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False,
        "options": {
            "temperature": temperature,
            "num_predict": 150  # Max tokens for concise summary
        }
    }
    
    try:
        response = requests.post(OLLAMA_URL, json=payload, timeout=timeout)
        response.raise_for_status()
        result = response.json()
        return result.get("response", "").strip()
    except requests.exceptions.Timeout:
        print(f"LLM request timed out after {timeout} seconds")
        return None
    except requests.exceptions.RequestException as e:
        print(f"LLM request failed: {e}")
        return None

def test_ollama_connection() -> bool:
    """Test if Ollama server is running and accessible."""
    try:
        response = requests.get("http://localhost:11434/api/tags", timeout=5)
        if response.status_code == 200:
            models = response.json().get("models", [])
            model_names = [m.get("name", "") for m in models]
            print(f"âœ“ Ollama server is running!")
            print(f"  Available models: {', '.join(model_names)}")
            
            if MODEL_NAME in model_names or any(MODEL_NAME in name for name in model_names):
                print(f"  {MODEL_NAME} is available")
                return True
            else:
                print(f"  {MODEL_NAME} not found. Available: {model_names}")
                return False
        return False
    except requests.exceptions.RequestException as e:
        print(f" Cannot connect to Ollama server: {e}")
        print(f"  Make sure Ollama is running (start the Ollama desktop app)")
        return False

# Test connection
test_ollama_connection()

âœ“ Ollama server is running!
  Available models: codellama:7b-instruct, llama3:latest
  codellama:7b-instruct is available


True

## 6. Complete PR Summarization Pipeline

Orchestrate all steps: extract data â†’ chunk â†’ summarize â†’ aggregate.

In [37]:
def summarize_pr(
    repo_path: str = ".",
    base_branch: str = None,
    current_branch: str = None,
    max_files_to_summarize: int = 10,
    enable_validation: bool = True,
    retry_missing: bool = True,
    llm_timeout: int = 200,
    verbose: bool = True
) -> Dict[str, any]:
    """
    Complete PR summarization pipeline with atomic change tracking and validation.
    Now includes robust error handling for LLM failures.
    
    Args:
        repo_path: Path to git repository
        base_branch: Base branch for comparison (auto-detected if None)
        current_branch: Current branch (auto-detected if None)
        max_files_to_summarize: Maximum number of files to process
        enable_validation: Whether to validate summaries cover all changes
        retry_missing: Whether to re-prompt for missing changes
        llm_timeout: Timeout in seconds for LLM requests
        verbose: Whether to print progress
    
    Returns a dictionary with:
        - base_branch: str
        - current_branch: str
        - commits: List[str]
        - changed_files: List[str]
        - file_summaries: Dict[str, str]
        - failed_files: List[str] - files that failed to summarize
        - file_metrics: Dict[str, Dict] - validation metrics per file
        - overall_summary: str
        - repo_path: str - for use in retry functions
    """
    
    if verbose:
        print("PR SUMMARIZATION PIPELINE (with atomic change tracking)")
    
    # Step 1: Extract git data
    if verbose:
        print("\n[1/5] Extracting PR data from git...")
    
    if not current_branch:
        current_branch = get_current_branch(repo_path)
    if not base_branch:
        base_branch = get_base_branch(repo_path)
    
    commits = get_commit_messages(base_branch, current_branch, repo_path)
    changed_files = get_changed_files(base_branch, current_branch, repo_path)
    
    if verbose:
        print(f"  Branch: {current_branch} â†’ {base_branch}")
        print(f"  Commits: {len(commits)}")
        print(f"  Changed files: {len(changed_files)}")
    
    if not changed_files:
        return {
            "base_branch": base_branch,
            "current_branch": current_branch,
            "commits": commits,
            "changed_files": changed_files,
            "file_summaries": {},
            "failed_files": [],
            "file_metrics": {},
            "overall_summary": "No changes detected between branches.",
            "repo_path": repo_path
        }
    
    # Step 2: Chunk diffs by file
    if verbose:
        print(f"\n[2/5] Chunking diffs by file...")
    
    file_diffs = chunk_diff_by_file(base_branch, current_branch, changed_files, repo_path)
    
    # Filter files to summarize
    files_to_summarize = [
        f for f in changed_files 
        if should_summarize_file(f) and f in file_diffs
    ][:max_files_to_summarize]
    
    if verbose:
        print(f"  Files to summarize: {len(files_to_summarize)}")
    
    # Step 3: Summarize each file with atomic change tracking
    if verbose:
        print(f"\n[3/5] Generating file-level summaries with atomic change tracking...")
    
    file_summaries = {}
    failed_files = []
    file_metrics = {}
    
    for i, file_path in enumerate(files_to_summarize, 1):
        if verbose:
            print(f"  [{i}/{len(files_to_summarize)}] {file_path}...")
        
        # Parse atomic changes
        diff = file_diffs[file_path]
        atomic_changes = parse_diff_hunks(diff)
        atomic_changes = detect_modifications(atomic_changes)
        
        if verbose:
            print(f"      â†’ {len(atomic_changes)} atomic changes detected")
        
        # Generate initial summary
        prompt = create_file_summary_prompt(file_path, diff)
        summary = call_ollama(prompt, timeout=llm_timeout)
        
        # Handle LLM failures
        if not summary:
            if verbose:
                print(f"       Failed to generate summary (timeout/error)")
            failed_files.append(file_path)
            file_summaries[file_path] = " Summary could not be generated for this file due to LLM timeout or error."
            continue
        
        # Step 4: Validate coverage
        if enable_validation and atomic_changes:
            is_complete, missing_changes, metrics = validate_summary_coverage(
                summary, atomic_changes, file_path
            )
            file_metrics[file_path] = metrics
            
            if verbose:
                print(f"      Coverage: {metrics['coverage_percent']:.1f}% "
                      f"({metrics['mentioned_changes']}/{metrics['total_changes']} changes)")
            
            # Step 5: Re-prompt for missing changes if enabled
            if not is_complete and retry_missing and missing_changes:
                if verbose:
                    print(f"      Re-prompting for {len(missing_changes)} missing changes...")
                
                reprompt = create_reprompt_for_missing_changes(file_path, summary, missing_changes)
                additional_summary = call_ollama(reprompt, timeout=llm_timeout)
                
                if additional_summary:
                    summary = f"{summary} {additional_summary}"
                    
                    # Re-validate
                    is_complete, missing_changes, metrics = validate_summary_coverage(
                        summary, atomic_changes, file_path
                    )
                    file_metrics[file_path] = metrics
                    
                    if verbose:
                        print(f"      Updated coverage: {metrics['coverage_percent']:.1f}%")
        
        file_summaries[file_path] = summary
    
    if verbose:
        print(f"\n[4/5] Generating overall PR summary...")
    
    # Step 6: Generate overall summary (exclude failed files from summary generation)
    successful_summaries = {
        file: summary for file, summary in file_summaries.items()
        if file not in failed_files
    }
    
    if not successful_summaries:
        overall_summary = "No files could be summarized successfully."
    else:
        summary_list = [f"{file}: {summary}" for file, summary in successful_summaries.items()]
        overall_prompt = create_overall_summary_prompt(
            base_branch, 
            current_branch, 
            commits, 
            changed_files,
            summary_list
        )
        overall_summary = call_ollama(overall_prompt, timeout=llm_timeout)
        if not overall_summary:
            overall_summary = "Error generating overall summary due to LLM timeout or error."
    
    if verbose:
        print(f"\n[5/5] Pipeline complete!")
        print(f"  Successfully summarized: {len(successful_summaries)} files")
        if failed_files:
            print(f"   Failed to summarize: {len(failed_files)} files")
        if file_metrics:
            avg_coverage = sum(m['coverage_percent'] for m in file_metrics.values()) / len(file_metrics)
            print(f"  Average change coverage: {avg_coverage:.1f}%")
    
    return {
        "base_branch": base_branch,
        "current_branch": current_branch,
        "commits": commits,
        "changed_files": changed_files,
        "file_summaries": file_summaries,
        "failed_files": failed_files,
        "file_metrics": file_metrics,
        "overall_summary": overall_summary,
        "repo_path": repo_path
    }

print("âœ“ PR summarization pipeline ready!")


âœ“ PR summarization pipeline ready!


## 6.5 On-Demand Retry for Failed Files

Re-summarize specific files that failed due to LLM timeout/error, with configurable timeout.

In [38]:
def summarize_failed_file(
    result: Dict[str, any],
    file_path: str,
    timeout: int = 600,
    enable_validation: bool = True,
    retry_missing: bool = True,
    verbose: bool = True
) -> Dict[str, any]:
    """
    Retry summarization for a specific file that previously failed.
    
    This function allows users to manually retry files that failed due to LLM
    timeout or errors, potentially with a longer timeout value.
    
    Args:
        result: The PR summarization result dict from summarize_pr
        file_path: The specific file to retry (must be in failed_files list)
        timeout: Timeout in seconds for LLM requests (default: 600 for longer wait)
        enable_validation: Whether to validate summary coverage
        retry_missing: Whether to re-prompt for missing changes
        verbose: Whether to print progress
    
    Returns:
        Updated result dictionary with new summary for the specified file
    
    Usage:
        # After initial run, if helpers.py failed:
        result = summarize_failed_file(result, "src/flask/helpers.py", timeout=600)
    """
    
    # Validate inputs
    if file_path not in result.get('failed_files', []):
        if verbose:
            print(f" {file_path} is not in the failed files list.")
            print(f"   Failed files: {result.get('failed_files', [])}")
        return result
    
    if verbose:
        print(f"\n Retrying summarization for: {file_path}")
        print(f"   Timeout: {timeout} seconds")
    
    # Extract necessary data from result
    repo_path = result['repo_path']
    base_branch = result['base_branch']
    current_branch = result['current_branch']
    
    # Get the diff for this file
    diff = get_file_diff(base_branch, current_branch, file_path, repo_path)
    
    if not diff.strip():
        if verbose:
            print(f"    No diff found for {file_path}")
        return result
    
    # Parse atomic changes
    atomic_changes = parse_diff_hunks(diff)
    atomic_changes = detect_modifications(atomic_changes)
    
    if verbose:
        print(f"   â†’ {len(atomic_changes)} atomic changes detected")
    
    # Generate summary with specified timeout
    prompt = create_file_summary_prompt(file_path, diff)
    summary = call_ollama(prompt, timeout=timeout)
    
    if not summary:
        if verbose:
            print(f"    Summary still failed with {timeout}s timeout")
        return result
    
    if verbose:
        print(f"   âœ“ Summary generated successfully!")
    
    # Validate coverage if enabled
    if enable_validation and atomic_changes:
        is_complete, missing_changes, metrics = validate_summary_coverage(
            summary, atomic_changes, file_path
        )
        
        if verbose:
            print(f"   Coverage: {metrics['coverage_percent']:.1f}% "
                  f"({metrics['mentioned_changes']}/{metrics['total_changes']} changes)")
        
        # Re-prompt for missing changes if enabled
        if not is_complete and retry_missing and missing_changes:
            if verbose:
                print(f"   Re-prompting for {len(missing_changes)} missing changes...")
            
            reprompt = create_reprompt_for_missing_changes(file_path, summary, missing_changes)
            additional_summary = call_ollama(reprompt, timeout=timeout)
            
            if additional_summary:
                summary = f"{summary} {additional_summary}"
                
                # Re-validate
                is_complete, missing_changes, metrics = validate_summary_coverage(
                    summary, atomic_changes, file_path
                )
                
                if verbose:
                    print(f"   Updated coverage: {metrics['coverage_percent']:.1f}%")
        
        # Update metrics
        result['file_metrics'][file_path] = metrics
    
    # Update result in-place
    result['file_summaries'][file_path] = summary
    result['failed_files'].remove(file_path)
    
    if verbose:
        print(f"   âœ“ Successfully updated summary for {file_path}")
        print(f"   Remaining failed files: {len(result['failed_files'])}")
    
    return result

def list_failed_files(result: Dict[str, any]):
    """
    Display a list of all files that failed to summarize.
    
    Args:
        result: The PR summarization result dict from summarize_pr
    """
    failed = result.get('failed_files', [])
    
    if not failed:
        print("âœ“ All files were summarized successfully!")
        return
    
    print(f"\n {len(failed)} file(s) failed to summarize:\n")
    for i, file_path in enumerate(failed, 1):
        print(f"  {i}. {file_path}")
    
    print(f"\nTo retry a specific file, use:")
    print(f"  result = summarize_failed_file(result, '<file_path>', timeout=600)")

print("âœ“ On-demand retry functions loaded successfully!")


âœ“ On-demand retry functions loaded successfully!


## 7. Run PR Summarization

Execute the pipeline on the current repository and display results.

In [39]:
# Run the PR summarization pipeline
result = summarize_pr(
    repo_path=REPO_PATH,
    max_files_to_summarize=5,  # Limit for testing; increase for full analysis
    verbose=True
)

PR SUMMARIZATION PIPELINE (with atomic change tracking)

[1/5] Extracting PR data from git...
  Branch: test/pr-summary-demo â†’ main
  Commits: 4
  Changed files: 6

[2/5] Chunking diffs by file...
  Files to summarize: 4

[3/5] Generating file-level summaries with atomic change tracking...
  [1/4] README.md...
      â†’ 1 atomic changes detected
      Coverage: 100.0% (1/1 changes)
  [2/4] src/flask/app.py...
      â†’ 1 atomic changes detected
      Coverage: 100.0% (1/1 changes)
  [3/4] src/flask/helpers.py...
      â†’ 25 atomic changes detected
LLM request timed out after 200 seconds
       Failed to generate summary (timeout/error)
  [4/4] src/flask/views.py...
      â†’ 1 atomic changes detected
      Coverage: 100.0% (1/1 changes)

[4/5] Generating overall PR summary...

[5/5] Pipeline complete!
  Successfully summarized: 3 files
   Failed to summarize: 1 files
  Average change coverage: 100.0%


## 8. Display Results

Format and display the PR summary in a readable format.

In [40]:
def display_pr_summary(result: Dict[str, any]):
    """Display the PR summary in a formatted, readable way with validation metrics and failure reporting."""
    
    print("PR SUMMARY")
    
    print(f"\nBranch: {result['current_branch']} â†’ {result['base_branch']}")
    print(f"Commits: {len(result['commits'])}")
    print(f"Changed files: {len(result['changed_files'])}")
    
    # Display commit messages
    print(f"\nRecent Commits:")
    for commit in result['commits'][:5]:
        print(f"   {commit}")
    if len(result['commits']) > 5:
        print(f"   ... and {len(result['commits']) - 5} more")
    
    # Separate successful and failed summaries
    failed_files = result.get('failed_files', [])
    successful_files = [f for f in result['file_summaries'].keys() if f not in failed_files]
    
    # Display file-level summaries with validation metrics
    if successful_files:
        print(f"âœ“ Successfully Summarized Files ({len(successful_files)}):")
        for file_path in successful_files:
            summary = result['file_summaries'][file_path]
            print(f"\nðŸ”¹ {file_path}")
            
            # Show validation metrics if available
            if 'file_metrics' in result and file_path in result['file_metrics']:
                metrics = result['file_metrics'][file_path]
                print(f"    {metrics['mentioned_changes']}/{metrics['total_changes']} changes covered "
                      f"({metrics['coverage_percent']:.1f}%)")
            
            print(f"   {summary}")
    
    # Display failed files prominently
    if failed_files:
        print(f"  Failed to Summarize ({len(failed_files)}):")
        for file_path in failed_files:
            print(f"\n {file_path}")
            print(f"   {result['file_summaries'].get(file_path, 'No placeholder found')}")
        
        print("  To retry a failed file with longer timeout:")
        print("   result = summarize_failed_file(result, '<file_path>', timeout=600)")
    
    # Show overall validation statistics
    if 'file_metrics' in result and result['file_metrics']:
        print(" Validation Summary:")
        total_changes = sum(m['total_changes'] for m in result['file_metrics'].values())
        total_mentioned = sum(m['mentioned_changes'] for m in result['file_metrics'].values())
        avg_coverage = sum(m['coverage_percent'] for m in result['file_metrics'].values()) / len(result['file_metrics'])
        
        print(f"   Total atomic changes tracked: {total_changes}")
        print(f"   Changes mentioned in summaries: {total_mentioned}")
        print(f"   Average coverage: {avg_coverage:.1f}%")
    
    # Display overall summary
    print(" Overall PR Summary:")
    print(result['overall_summary'])
    
# Display the results
display_pr_summary(result)


PR SUMMARY

Branch: test/pr-summary-demo â†’ main
Commits: 4
Changed files: 6

Recent Commits:
   62a77913 - Add debug print and TODO in views.py; update README for PR summarization demo
   06e61ab5 - Rename variable in helpers.py for PR summarization demo
   1336cc19 - Add test comment to app.py for PR summarization demo
   ad68a126 - drop experimental 3.13t test env
âœ“ Successfully Summarized Files (3):

ðŸ”¹ README.md
    1/1 changes covered (100.0%)
   The README.md file has been modified to include a new line with the text "Test change for PR summarization demo." at line 55, which adds a new line to the file.

ðŸ”¹ src/flask/app.py
    1/1 changes covered (100.0%)
   This file contains one atomic change: the addition of a comment at line 1. The comment reads "#Test: Added comment for PR summarization demo". This comment was added to the file as part of a pull request and serves as a demonstration of how changes can be summarized using Git's diff output.

ðŸ”¹ src/flask/views.py
 

## 9. Export Summary (Optional)

Save the PR summary to a markdown file for documentation or review.

In [26]:
def export_summary_to_markdown(result: Dict[str, any], output_path: str = "pr_summary.md"):
    """Export the PR summary to a markdown file with validation metrics and failure reporting."""
    
    # Separate successful and failed files
    failed_files = result.get('failed_files', [])
    successful_files = [f for f in result['file_summaries'].keys() if f not in failed_files]
    
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(f"# PR Summary: {result['current_branch']} â†’ {result['base_branch']}\n\n")
        
        f.write(f"**Commits:** {len(result['commits'])}  \n")
        f.write(f"**Total Changed Files:** {len(result['changed_files'])}  \n")
        f.write(f"**Successfully Summarized:** {len(successful_files)}  \n")
        
        if failed_files:
            f.write(f"** Failed to Summarize:** {len(failed_files)}  \n")
        
        # Add validation metrics if available
        if 'file_metrics' in result and result['file_metrics']:
            total_changes = sum(m['total_changes'] for m in result['file_metrics'].values())
            total_mentioned = sum(m['mentioned_changes'] for m in result['file_metrics'].values())
            avg_coverage = sum(m['coverage_percent'] for m in result['file_metrics'].values()) / len(result['file_metrics'])
            f.write(f"**Atomic Changes Tracked:** {total_changes}  \n")
            f.write(f"**Coverage:** {avg_coverage:.1f}% ({total_mentioned}/{total_changes} changes mentioned)  \n")
        
        f.write("\n---\n\n")
        
        f.write("## Commits\n\n")
        for commit in result['commits']:
            f.write(f"- {commit}\n")
        
        # Export successful summaries
        if successful_files:
            f.write("\n## âœ“ Successfully Summarized Files\n\n")
            for file_path in successful_files:
                summary = result['file_summaries'][file_path]
                f.write(f"### `{file_path}`\n\n")
                
                # Include metrics if available
                if 'file_metrics' in result and file_path in result['file_metrics']:
                    metrics = result['file_metrics'][file_path]
                    f.write(f"**Changes:** {metrics['mentioned_changes']}/{metrics['total_changes']} "
                           f"({metrics['coverage_percent']:.1f}% coverage)  \n\n")
                
                f.write(f"{summary}\n\n")
        
        # Export failed files section
        if failed_files:
            f.write("\n##  Files That Could Not Be Summarized\n\n")
            f.write("The following files failed to generate summaries due to LLM timeout or errors. ")
            f.write("You can retry these files using the `summarize_failed_file()` function with a longer timeout.\n\n")
            
            for file_path in failed_files:
                f.write(f"### `{file_path}`\n\n")
                f.write(f">  {result['file_summaries'].get(file_path, 'Summary could not be generated.')}\n\n")
                f.write(f"**To retry:** `result = summarize_failed_file(result, '{file_path}', timeout=600)`\n\n")
        
        f.write("\n---\n\n")
        f.write("## Overall Summary\n\n")
        f.write(f"{result['overall_summary']}\n")
        
        # Add footer with retry instructions if there are failures
        if failed_files:
            f.write("\n---\n\n")
            f.write("###  Retry Instructions\n\n")
            f.write("To retry failed file summaries with a longer timeout (e.g., 600 seconds):\n\n")
            f.write("```python\n")
            f.write("# Retry a specific file\n")
            f.write("result = summarize_failed_file(result, '<file_path>', timeout=600)\n\n")
            f.write("# Re-export after successful retry\n")
            f.write(f"export_summary_to_markdown(result, output_path='{output_path}')\n")
            f.write("```\n")
    
    print(f"âœ“ PR summary exported to {output_path}")
    if failed_files:
        print(f"   Note: {len(failed_files)} file(s) failed - see export for retry instructions")

# Uncomment to export
export_summary_to_markdown(result, output_path="../pr_summary-demo01.md")
print(f"âœ“ Export complete.")


âœ“ PR summary exported to ../pr_summary-demo01.md
   Note: 1 file(s) failed - see export for retry instructions
âœ“ Export complete.


## 10. Test Atomic Change Detection

Verify that the atomic parser correctly identifies all changes in a sample diff.

In [12]:
# Test atomic change detection on a sample file
test_file = "src/flask/helpers.py"

if test_file in get_changed_files(base_branch, current_branch, REPO_PATH):
    print(f"Testing atomic change detection for: {test_file}\n")
    
    # Get the diff
    test_diff = get_file_diff(base_branch, current_branch, test_file, REPO_PATH)
    
    # Parse atomic changes
    atomic_changes = parse_diff_hunks(test_diff)
    atomic_changes = detect_modifications(atomic_changes)
    
    print(f"Detected {len(atomic_changes)} atomic changes:\n")
    
    for i, change in enumerate(atomic_changes, 1):
        print(f"{i}. {change.change_type.upper()}")
        if change.change_type == 'addition':
            print(f"   Line {change.new_line}: + {change.new_content}")
        elif change.change_type == 'deletion':
            print(f"   Line {change.old_line}: - {change.old_content}")
        elif change.change_type == 'modification':
            print(f"   Line {change.old_line}â†’{change.new_line}")
            print(f"   OLD: {change.old_content}")
            print(f"   NEW: {change.new_content}")
        print()
    
    print("\nFormatted for LLM prompt:")
    print(format_atomic_changes(atomic_changes))
else:
    print(f"{test_file} not found in changed files. Choose another file to test.")

Testing atomic change detection for: src/flask/helpers.py

Detected 25 atomic changes:

1. DELETION
   Line 32: - val = os.environ.get("FLASK_DEBUG")

2. MODIFICATION
   Line 33â†’32
   OLD: return bool(val and val.lower() not in {"0", "false", "no"})
   NEW: val_demo = os.environ.get("FLASK_DEBUG")

3. ADDITION
   Line 33: + return bool(val_demo and val_demo.lower() not in {"0", "false", "no"})

4. MODIFICATION
   Line 43â†’43
   OLD: val = os.environ.get("FLASK_SKIP_DOTENV")
   NEW: val_demo = os.environ.get("FLASK_SKIP_DOTENV")

5. MODIFICATION
   Line 45â†’45
   OLD: if not val:
   NEW: if not val_demo:

6. MODIFICATION
   Line 48â†’48
   OLD: return val.lower() in ("0", "false", "no")
   NEW: return val_demo.lower() in ("0", "false", "no")

7. MODIFICATION
   Line 138â†’138
   OLD: # Execute the generator to the sentinel value. This ensures the context is
   NEW: # Execute the generator to the sentinel val_demoue. This ensures the context is

8. MODIFICATION
   Line 148â†’148
   O

## 11. Retry Failed File Summaries (On-Demand)

If some files failed to summarize due to LLM timeouts, you can retry them individually with a longer timeout.

### Regenerate Overall PR Summary After Retry

After retrying failed files, you should regenerate the overall PR summary so it includes all newly summarized files. Use the helper below.

In [41]:
def regenerate_overall_summary(result: dict, llm_timeout: int = 200, verbose: bool = True) -> dict:
    """
    Regenerate the overall PR summary using the latest set of successful file summaries.
    This should be called after retrying failed files so the overall summary is up-to-date.
    """
    failed_files = result.get('failed_files', [])
    successful_summaries = {
        file: summary for file, summary in result['file_summaries'].items()
        if file not in failed_files
    }
    if not successful_summaries:
        overall_summary = "No files could be summarized successfully."
    else:
        summary_list = [f"{file}: {summary}" for file, summary in successful_summaries.items()]
        overall_prompt = create_overall_summary_prompt(
            result['base_branch'],
            result['current_branch'],
            result['commits'],
            result['changed_files'],
            summary_list
        )
        overall_summary = call_ollama(overall_prompt, timeout=llm_timeout)
        if not overall_summary:
            overall_summary = "Error generating overall summary due to LLM timeout or error."
    result['overall_summary'] = overall_summary
    if verbose:
        print("âœ“ Overall PR summary regenerated.")
    return result


In [42]:
# Check if there are any failed files
list_failed_files(result)

# Example: Retry a specific failed file with longer timeout
# Uncomment and adjust the file path as needed:
result = summarize_failed_file(result, "src/flask/helpers.py", timeout=600, verbose=True)

# After retry, you can re-display and re-export:
display_pr_summary(result)



 1 file(s) failed to summarize:

  1. src/flask/helpers.py

To retry a specific file, use:
  result = summarize_failed_file(result, '<file_path>', timeout=600)

 Retrying summarization for: src/flask/helpers.py
   Timeout: 600 seconds
   â†’ 25 atomic changes detected
   âœ“ Summary generated successfully!
   Coverage: 96.0% (24/25 changes)
   âœ“ Successfully updated summary for src/flask/helpers.py
   Remaining failed files: 0
PR SUMMARY

Branch: test/pr-summary-demo â†’ main
Commits: 4
Changed files: 6

Recent Commits:
   62a77913 - Add debug print and TODO in views.py; update README for PR summarization demo
   06e61ab5 - Rename variable in helpers.py for PR summarization demo
   1336cc19 - Add test comment to app.py for PR summarization demo
   ad68a126 - drop experimental 3.13t test env
âœ“ Successfully Summarized Files (4):

ðŸ”¹ README.md
    1/1 changes covered (100.0%)
   The README.md file has been modified to include a new line with the text "Test change for PR summarizati

In [43]:
result = regenerate_overall_summary(result, llm_timeout=600)

âœ“ Overall PR summary regenerated.


In [44]:
export_summary_to_markdown(result, output_path="../pr_summary-demo03.md")

âœ“ PR summary exported to ../pr_summary-demo03.md
