In [None]:
#!/usr/bin/env python3
"""
Script to refactor Python files one at a time, up to a limit per run, by sending them to an LLM for optimization.
Handles scripts longer than LLM_MAX_TOKENS by splitting them into chunks.
Maintains a persistent function/variable name mapping and applies it across all scripts.
Tracks processed files and saves refactored code back to the original files.
Allows skipping specified subfolders.
"""

import os
import requests
import re
from typing import Dict, Tuple, Set, List
import ast
from tqdm import tqdm
import chardet

# Constants
DEFAULT_DIRECTORY = r"C:\Users\harold.noble\Desktop\open-webui\app"
SKIP_FOLDERS = {".github", "code_helper", "ollama-0"}  # Folders to skip during processing
NAME_MAPPING_FILE = "function_variable_mapping.txt"
PROCESSED_FILES_TRACKER = "processed_files.txt"
FILES_PER_RUN = 10
LLM_MODEL = "qwen2.5:14b"
LLM_TEMPERATURE = 0.5
LLM_TOP_P = 0.9
LLM_MAX_TOKENS = 8000  # Max tokens in LLM response
TOKEN_BUFFER = 200  # Reserve tokens for prompt and mappings
MAX_INPUT_TOKENS = LLM_MAX_TOKENS - TOKEN_BUFFER  # Max tokens for script content
DEFAULT_OLLAMA_PORT = "11434"
TIMEOUT_SECONDS = 60*3

# LLM Prompt Template
REFACTOR_PROMPT = """
You are an expert Python developer tasked with refactoring a Python script chunk. Perform these tasks:

1. **Add Docstrings**: Replace all existing comments with detailed docstrings for the module, functions, and classes. Follow PEP 257: include purpose, parameters (type and description), return values, and side effects. Use triple quotes (\"\"\" or ''') for docstrings; retain # only for inline notes not suited to docstrings.

2. **Improve Formatting**: Reformat the code adhering to Black's style (e.g., 88-character line length, consistent indentation, sorted imports alphabetically). Ensure readability and consistency.

3. **Optimize Code**: Enhance performance without sacrificing readability—simplify logic, remove inefficiencies, and streamline operations where possible.

4. **Enhance Error Handling**: Implement robust error handling with specific exceptions (e.g., ValueError, IOError) and actionable error messages instead of generic ones.

5. **Cleanup Dependencies**: Remove unused imports, dead code, and any i18n references (e.g., gettext or similar).

6. **Rename Functions and Variables**: For functions and variables defined in this chunk (not imported), convert names to snake_case and make them more descriptive. Provide mappings for renamed items.

7. **Verify Documentation**: Ensure every function and class has complete, accurate, up-to-date docstrings.

Here is the Python script chunk to refactor:

{script_content}

Return the refactored chunk followed by a list of renamed functions/variables in this format:
<refactored code> ``` --- ``` <original_name> -> <new_name> ```

If no renames occur, include an empty mapping section.
"""


def estimate_tokens(text: str) -> int:
    """Roughly estimate the number of tokens in text (approx. 1 token per 4 chars)."""
    return len(text) // 4 + 1  # Add 1 to account for small rounding errors


def fetch_llm_response(
    prompt: str,
    model: str = LLM_MODEL,
    temperature: float = LLM_TEMPERATURE,
    top_p: float = LLM_TOP_P,
    max_tokens: int = LLM_MAX_TOKENS
) -> str:
    """Send a prompt to the LLM API and return the response."""
    if not prompt or not isinstance(prompt, str):
        raise ValueError("Prompt must be a non-empty string")

    ollama_port = os.getenv("OLLAMA_PORT", DEFAULT_OLLAMA_PORT)
    url = f"http://localhost:{ollama_port}/api/generate"
    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False,
        "temperature": temperature,
        "top_p": top_p,
        "max_tokens": max_tokens
    }
    headers = {"Content-Type": "application/json"}

    try:
        response = requests.post(url, json=payload, headers=headers, timeout=TIMEOUT_SECONDS)
        response.raise_for_status()
        return response.json()["response"]
    except requests.RequestException as e:
        raise requests.RequestException(f"Failed to get response from LLM API: {e}")


def load_name_mapping(mapping_file: str) -> Dict[str, str]:
    """Load existing function/variable name mappings from a file."""
    mapping = {}
    if os.path.exists(mapping_file):
        with open(mapping_file, "r") as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith("#") and "->" in line:
                    original, new = [part.strip() for part in line.split("->")]
                    mapping[original] = new
    return mapping


def save_name_mapping(mapping_file: str, mapping: Dict[str, str]):
    """Save updated function/variable name mappings to a file."""
    with open(mapping_file, "w") as f:
        f.write("# function_variable_mapping.txt\n")
        f.write("# Format: original_name -> new_name\n\n")
        for original, new in sorted(mapping.items()):
            f.write(f"{original} -> {new}\n")


def load_processed_files(tracker_file: str) -> Set[str]:
    """Load the set of already processed file paths from a tracker file."""
    processed = set()
    if os.path.exists(tracker_file):
        with open(tracker_file, "r") as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith("#"):
                    processed.add(line)
    return processed


def save_processed_files(tracker_file: str, processed: Set[str]):
    """Save the set of processed file paths to a tracker file."""
    with open(tracker_file, "w") as f:
        f.write("# processed_files.txt\n")
        f.write("# Tracks files already refactored\n\n")
        for file_path in sorted(processed):
            f.write(f"{file_path}\n")


def apply_existing_mapping(script_content: str, mapping: Dict[str, str]) -> str:
    """Apply existing name mappings to the script content, handling longer names first."""
    sorted_mapping = sorted(mapping.items(), key=lambda x: len(x[0]), reverse=True)
    for original, new in sorted_mapping:
        script_content = re.sub(r'\b' + re.escape(original) + r'\b', new, script_content)
    return script_content


def extract_mapping_from_response(response: str) -> Tuple[str, Dict[str, str]]:
    """Extract refactored code and new name mappings from LLM response."""
    try:
        code_section, mapping_section = response.split("---\n", 1)
        refactored_code = code_section.strip("`\n")
        new_mapping = {}
        mapping_lines = mapping_section.strip("`\n").split("\n")
        for line in mapping_lines:
            line = line.strip()
            if line and "->" in line:
                original, new = [part.strip() for part in line.split("->")]
                new_mapping[original] = new
        return refactored_code, new_mapping
    except ValueError:
        raise ValueError("LLM response format invalid: missing code or mapping section")


def split_script_into_chunks(script_content: str, max_tokens: int = MAX_INPUT_TOKENS) -> List[str]:
    """
    Split a script into chunks that fit within max_tokens, preserving function boundaries where possible.
    """
    chunks = []
    current_chunk = []
    current_token_count = 0

    # Parse the script into an AST to identify function definitions
    try:
        tree = ast.parse(script_content)
        lines = script_content.splitlines()
        function_starts = {node.lineno - 1: node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)}
    except SyntaxError:
        # Fallback to line-by-line splitting if AST parsing fails
        lines = script_content.splitlines()
        function_starts = {}

    for i, line in enumerate(lines):
        line_tokens = estimate_tokens(line)
        # Check if adding this line exceeds the token limit
        if current_token_count + line_tokens > max_tokens and current_chunk:
            chunks.append("\n".join(current_chunk))
            current_chunk = []
            current_token_count = 0

        # Start a new chunk at function boundaries if possible
        if i in function_starts and current_chunk:
            chunks.append("\n".join(current_chunk))
            current_chunk = [line]
            current_token_count = line_tokens
        else:
            current_chunk.append(line)
            current_token_count += line_tokens

    if current_chunk:
        chunks.append("\n".join(current_chunk))

    return chunks


def process_single_file(file_path: str, global_mapping: Dict[str, str]) -> bool:
    """Process a single file, handling large scripts by splitting into chunks."""
    try:
        # Try UTF-8 first
        with open(file_path, "r", encoding="utf-8") as f:
            original_content = f.read()
    except UnicodeDecodeError:
        # Fallback: try to detect or use 'latin1' as a last resort
        try:
            with open(file_path, "rb") as f:
                raw = f.read()
                detected = chardet.detect(raw)
                encoding = detected["encoding"] or "latin1"
            with open(file_path, "r", encoding=encoding) as f:
                original_content = f.read()
            print(f"Warning: {file_path} decoded with fallback encoding '{encoding}'")
        except Exception as e:
            print(f"Error decoding {file_path}: {e}")
            return False
    except IOError as e:
        print(f"Error reading {file_path}: {e}")
        return False

    # Apply existing mappings first
    content_with_mappings = apply_existing_mapping(original_content, global_mapping)
    total_tokens = estimate_tokens(content_with_mappings) + estimate_tokens(REFACTOR_PROMPT)

    if total_tokens <= MAX_INPUT_TOKENS:
        # Small enough to process in one go
        prompt = REFACTOR_PROMPT.format(script_content=content_with_mappings)
        try:
            response = fetch_llm_response(prompt)
            refactored_code, new_mapping = extract_mapping_from_response(response)
        except (requests.RequestException, ValueError) as e:
            print(f"Error processing {file_path}: {e}")
            return False
    else:
        # Split into chunks and process sequentially
        print(f"Script {file_path} exceeds {MAX_INPUT_TOKENS} tokens ({total_tokens}). Splitting into chunks...")
        chunks = split_script_into_chunks(content_with_mappings)
        refactored_chunks = []
        new_mapping = {}

        for i, chunk in enumerate(chunks, 1):
            print(f"Processing chunk {i}/{len(chunks)} of {file_path}")
            prompt = REFACTOR_PROMPT.format(script_content=chunk)
            try:
                response = fetch_llm_response(prompt)
                refactored_chunk, chunk_mapping = extract_mapping_from_response(response)
                refactored_chunks.append(refactored_chunk)
                new_mapping.update(chunk_mapping)
                # Apply new mappings to remaining chunks
                for j in range(i, len(chunks)):
                    chunks[j] = apply_existing_mapping(chunks[j], chunk_mapping)
            except (requests.RequestException, ValueError) as e:
                print(f"Error processing chunk {i} of {file_path}: {e}")
                return False

        refactored_code = "\n\n".join(refactored_chunks)

    # Update global mapping and save file
    global_mapping.update(new_mapping)
    try:
        with open(file_path, "w", encoding="utf-8") as f:  # Write back as UTF-8
            f.write(refactored_code)
        print(f"Refactored and overwrote {file_path}")
        return True
    except IOError as e:
        print(f"Error writing to {file_path}: {e}")
        return False

def apply_mapping_to_all_files(directory: str, mapping: Dict[str, str], processed: Set[str]):
    """Apply the updated mapping to all Python files in the directory, skipping specified folders."""
    python_files = get_python_files(directory)
    for file_path in python_files:
        try:
            with open(file_path, "r") as f:
                content = f.read()
            updated_content = apply_existing_mapping(content, mapping)
            if updated_content != content:
                with open(file_path, "w") as f:
                    f.write(updated_content)
                print(f"Updated mappings in {file_path}")
        except IOError as e:
            print(f"Error updating {file_path}: {e}")


def get_python_files(directory: str) -> List[str]:
    """
    Recursively find all Python files in the directory, excluding SKIP_FOLDERS.
    """
    python_files = []
    for root, dirs, files in os.walk(directory):
        # Modify dirs in-place to skip specified folders
        dirs[:] = [d for d in dirs if d not in SKIP_FOLDERS]
        for file in files:
            if file.endswith(".py"):
                full_path = os.path.join(root, file)
                python_files.append(full_path)
    return python_files


def main():
    """Main function to process up to FILES_PER_RUN scripts and propagate mappings with a progress bar."""
    if not os.path.exists(DEFAULT_DIRECTORY):
        print(f"Error: Directory {DEFAULT_DIRECTORY} does not exist.")
        return

    global_mapping = load_name_mapping(NAME_MAPPING_FILE)
    processed_files = load_processed_files(PROCESSED_FILES_TRACKER)

    python_files = get_python_files(DEFAULT_DIRECTORY)
    if not python_files:
        print(f"No Python files found in {DEFAULT_DIRECTORY} (excluding {SKIP_FOLDERS}).")
        return

    remaining_files = [f for f in python_files if f not in processed_files]
    if not remaining_files:
        print("All files have already been processed.")
        return

    files_to_process = remaining_files[:FILES_PER_RUN]
    print(f"Found {len(remaining_files)} unprocessed files. Processing {len(files_to_process)} this run.")
    print(f"Skipping folders: {SKIP_FOLDERS}")

    # Use tqdm to display a progress bar
    with tqdm(total=len(files_to_process), desc="Refactoring Files", unit="file") as pbar:
        for i, file_path in enumerate(files_to_process, 1):
            tqdm.write(f"Processing file {i}/{len(files_to_process)}: {file_path}")
            if process_single_file(file_path, global_mapping):
                processed_files.add(file_path)
                apply_mapping_to_all_files(DEFAULT_DIRECTORY, global_mapping, processed_files)
                save_name_mapping(NAME_MAPPING_FILE, global_mapping)
                save_processed_files(PROCESSED_FILES_TRACKER, processed_files)
            pbar.update(1)

    remaining = len(remaining_files) - len(files_to_process)
    print(f"Processed {len(files_to_process)} files this run. {remaining} files remain unprocessed.")
    print("Refactoring batch complete! Check the results and run again for the next batch.")


if __name__ == "__main__":
    main()

Found 144 unprocessed files. Processing 10 this run.
Skipping folders: {'ollama-0', '.github', 'code_helper'}


Refactoring Files:   0%|          | 0/10 [00:00<?, ?file/s]

Processing file 1/10: C:\Users\harold.noble\Desktop\open-webui\app\backend\webui\config.py
Script C:\Users\harold.noble\Desktop\open-webui\app\backend\webui\config.py exceeds 7800 tokens (18515). Splitting into chunks...
Processing chunk 1/26 of C:\Users\harold.noble\Desktop\open-webui\app\backend\webui\config.py
