In [1]:
import requests
import os
import json
from typing import Dict, List, Any, TypedDict, Optional, Set, Tuple
from langchain_community.chat_models import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_core.runnables import RunnablePassthrough
from langgraph.graph import StateGraph, END
from openpyxl import Workbook, load_workbook

# Asumiendo que estas variables están definidas en otro lugar o deberían ser definidas aquí
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
GITHUB_API_URL = "https://api.github.com"

class EnhancedAnalyzedIssuesStorage:
    def __init__(self, filename='analyzed_issues.xlsx'):
        self.filename = filename
        self.analyzed_issues = {}
        self.visited_pages = {}
        self._load()

    def _load(self):
        if not os.path.exists(self.filename):
            # Crear el archivo con las columnas iniciales
            workbook = Workbook()
            sheet = workbook.active
            sheet.title = 'Analyzed Issues'
            sheet.append(['issue_id', 'repo_owner', 'repo_name', 'issue_description', 'issue_prompt', 'issue_url'])
            workbook.save(self.filename)
            return
        
        try:
            workbook = load_workbook(self.filename)
            sheet = workbook.active
            
            # Cargar issues analizados
            self.analyzed_issues = {}
            for row in sheet.iter_rows(min_row=2, values_only=True):
                if not row[0]:  # Saltarse filas vacías
                    continue
                    
                issue_id, owner, repo, description, prompt, url = row
                if owner not in self.analyzed_issues:
                    self.analyzed_issues[owner] = {}
                if repo not in self.analyzed_issues[owner]:
                    self.analyzed_issues[owner][repo] = []
                self.analyzed_issues[owner][repo].append(str(issue_id))
            
            # Cargar páginas visitadas
            self.visited_pages = {}
            visited_pages_sheet = None
            
            # Verificar si ya existe la hoja para páginas visitadas
            if 'Visited Pages' in workbook.sheetnames:
                visited_pages_sheet = workbook['Visited Pages']
            else:
                visited_pages_sheet = workbook.create_sheet('Visited Pages')
                visited_pages_sheet.append(['repo_owner', 'repo_name', 'page_number'])
                workbook.save(self.filename)
            
            for row in visited_pages_sheet.iter_rows(min_row=2, values_only=True):
                if not row[0]:  # Saltarse filas vacías
                    continue
                
                owner, repo, page = row
                if owner not in self.visited_pages:
                    self.visited_pages[owner] = {}
                if repo not in self.visited_pages[owner]:
                    self.visited_pages[owner][repo] = set()
                self.visited_pages[owner][repo].add(int(page))
                
        except Exception as e:
            print(f"Error reading {self.filename}: {str(e)}")
            self.analyzed_issues = {}
            self.visited_pages = {}
    
    def save_issue(self, owner: str, repo: str, issue_id: int, description: str, prompt: str, url: str):
        # Marcar el issue como analizado
        if owner not in self.analyzed_issues:
            self.analyzed_issues[owner] = {}
        if repo not in self.analyzed_issues[owner]:
            self.analyzed_issues[owner][repo] = []
        
        # No añadir si ya existe
        if str(issue_id) in self.analyzed_issues[owner][repo]:
            return False
        
        self.analyzed_issues[owner][repo].append(str(issue_id))
        
        # Añadir a Excel
        try:
            workbook = load_workbook(self.filename)
            sheet = workbook.active
            sheet.append([str(issue_id), owner, repo, description, prompt, url])
            workbook.save(self.filename)
            return True
        except Exception as e:
            print(f"Error saving to {self.filename}: {str(e)}")
            return False
    
    def is_analyzed(self, owner: str, repo: str, issue_id: int) -> bool:
        return str(issue_id) in self.analyzed_issues.get(owner, {}).get(repo, [])
    
    def mark_page_as_visited(self, owner: str, repo: str, page: int):
        if owner not in self.visited_pages:
            self.visited_pages[owner] = {}
        if repo not in self.visited_pages[owner]:
            self.visited_pages[owner][repo] = set()
        
        # No añadir si ya existe
        if page in self.visited_pages[owner][repo]:
            return
        
        self.visited_pages[owner][repo].add(page)
        
        # Añadir a Excel
        try:
            workbook = load_workbook(self.filename)
            
            # Verificar si ya existe la hoja para páginas visitadas
            if 'Visited Pages' in workbook.sheetnames:
                visited_pages_sheet = workbook['Visited Pages']
            else:
                visited_pages_sheet = workbook.create_sheet('Visited Pages')
                visited_pages_sheet.append(['repo_owner', 'repo_name', 'page_number'])
            
            visited_pages_sheet.append([owner, repo, page])
            workbook.save(self.filename)
        except Exception as e:
            print(f"Error saving visited page to {self.filename}: {str(e)}")
    
    def is_page_visited(self, owner: str, repo: str, page: int) -> bool:
        return page in self.visited_pages.get(owner, {}).get(repo, set())
    
    def get_visited_pages(self, owner: str, repo: str) -> Set[int]:
        return self.visited_pages.get(owner, {}).get(repo, set())

def fetch_github_issues(owner: str, repo: str, page: int = 1, per_page: int = 10):
    if not GITHUB_TOKEN:
        raise ValueError("GITHUB_TOKEN environment variable not set")
        
    headers = {
        'Authorization': f'token {GITHUB_TOKEN}',
        'Accept': 'application/vnd.github.v3+json'
    }
    params = {
        'q': f'repo:{owner}/{repo} is:issue', # Asegurarse de que sólo buscamos issues, no PRs
        'page': page,
        'per_page': per_page
    }
    response = requests.get(f'{GITHUB_API_URL}/search/issues', headers=headers, params=params)
    response.raise_for_status()
    return response.json()

def generate_issue_description_and_prompt(issue_data: Dict[str, Any]) -> Tuple[str, str]:
    """
    Genera una descripción corta y un prompt para resolver el issue utilizando un LLM.
    """
    # Inicializar el modelo de OpenAI
    llm = ChatOpenAI(temperature=0)
    
    # Extraer información relevante del issue
    title = issue_data.get('title', '')
    body = issue_data.get('body', '') or ''
    if len(body) > 1000:  # Limitar el tamaño del cuerpo para no exceder tokens
        body = body[:1000] + "..."
    
    # Crear el prompt para el LLM
    system_prompt = """
    Eres un experto en análisis de problemas técnicos en repositorios de código. 
    Tu tarea es analizar issues de GitHub y proporcionar:
    
    1. Una descripción concisa (máximo 5 líneas) que resuma el problema planteado en el issue.
    2. Un prompt bien estructurado (máximo 10 líneas) que podría usarse para solicitar una solución a este issue a un modelo de lenguaje.
    
    Responde en formato JSON con las claves "description" y "prompt".
    """
    
    user_prompt = f"""
    Analiza el siguiente issue de GitHub:
    
    Título: {title}
    
    Descripción:
    {body}
    
    Genera:
    1. Una descripción concisa (máximo 5 líneas) que resuma el problema.
    2. Un prompt optimizado (máximo 10 líneas) para solicitar una solución a este problema.
    """
    
    # Crear la cadena para generar y parsear la respuesta
    output_parser = JsonOutputParser()
    chain = (
        {"system": SystemMessage(content=system_prompt), "human": HumanMessage(content=user_prompt)}
        | llm
        | output_parser
    )
    
    try:
        # Ejecutar la cadena
        result = chain.invoke({})
        return result.get("description", ""), result.get("prompt", "")
    except Exception as e:
        print(f"Error generating description and prompt: {str(e)}")
        # Fallback en caso de error
        description = f"Issue relacionado con: {title}"
        prompt = f"¿Cómo resolver el problema '{title}' en un repositorio de GitHub?"
        return description, prompt

def analyze_github_issues(owner: str, repo: str, target_issues: int, per_page: int = 10) -> int:
    """
    Función principal para analizar issues de GitHub.
    
    Args:
        owner: Propietario del repositorio
        repo: Nombre del repositorio
        target_issues: Número objetivo de issues a analizar
        per_page: Número de issues a recuperar por página
        
    Returns:
        Número de issues analizados
    """
    # Inicializar el almacenamiento
    storage = EnhancedAnalyzedIssuesStorage()
    
    # Inicializar contador de issues analizados
    analyzed_count = 0
    
    # Inicializar página actual
    current_page = 1
    
    # Recuperar páginas ya visitadas
    visited_pages = storage.get_visited_pages(owner, repo)
    
    # Si hay páginas visitadas, empezar por la siguiente a la más alta
    if visited_pages:
        current_page = max(visited_pages) + 1
    
    while analyzed_count < target_issues:
        # Verificar si ya hemos visitado esta página
        if storage.is_page_visited(owner, repo, current_page):
            current_page += 1
            continue
        
        try:
            # Recuperar issues
            print(f"Fetching page {current_page} for {owner}/{repo}...")
            response_data = fetch_github_issues(owner, repo, current_page, per_page)
            
            # Marcar página como visitada
            storage.mark_page_as_visited(owner, repo, current_page)
            
            # Obtener los issues de la respuesta
            issues = response_data.get('items', [])
            
            # Si no hay más issues, terminar
            if not issues:
                print(f"No more issues found for {owner}/{repo}")
                break
            
            # Procesar cada issue
            for issue in issues:
                # Verificar si ya analizamos este issue
                issue_number = issue.get('number')
                if not issue_number or storage.is_analyzed(owner, repo, issue_number):
                    continue
                
                # Generar descripción y prompt
                description, prompt = generate_issue_description_and_prompt(issue)
                
                # Guardar en el almacenamiento
                url = issue.get('html_url', '')
                saved = storage.save_issue(owner, repo, issue_number, description, prompt, url)
                
                if saved:
                    analyzed_count += 1
                    print(f"Analyzed issue #{issue_number} - Total: {analyzed_count}/{target_issues}")
                
                # Si ya alcanzamos el objetivo, terminar
                if analyzed_count >= target_issues:
                    break
            
            # Ir a la siguiente página
            current_page += 1
            
        except Exception as e:
            print(f"Error processing page {current_page}: {str(e)}")
            # Intentar con la siguiente página
            current_page += 1
            
            # Si hemos tenido muchos errores, mejor terminar
            if current_page > 100:  # Límite arbitrario
                print("Too many errors, stopping")
                break
    
    return analyzed_count

# Ejemplo de uso
if __name__ == "__main__":
    # Estas líneas se ejecutarían sólo si este script se ejecuta directamente
    import sys
    
    if len(sys.argv) < 4:
        print("Usage: python github_issue_analyzer.py <owner> <repo> <target_issues>")
        sys.exit(1)
    
    owner = sys.argv[1]
    repo = sys.argv[2]
    target_issues = int(sys.argv[3])
    
    # Verificar que el token de GitHub está configurado
    if not GITHUB_TOKEN:
        print("ERROR: GITHUB_TOKEN environment variable not set")
        print("Please set it with: export GITHUB_TOKEN=your_token_here")
        sys.exit(1)
    
    # Ejecutar el análisis
    analyzed = analyze_github_issues(owner, repo, target_issues)
    print(f"Successfully analyzed {analyzed} issues from {owner}/{repo}")

Usage: python github_issue_analyzer.py <owner> <repo> <target_issues>


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
