In [1]:
# Failure GHA step log extractor

import requests
import json
import os
from datetime import datetime
import time
from dotenv import load_dotenv
import os
import re

# Configuración
load_dotenv()
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')  # Token de Github
OWNER = "davidibanezibanez"  # Owner del repositorio
REPO = "example-failing-workflows"  # Nombre del repositorio

def sanitize_filename(name, max_length=100):
    # Solo permite letras, números, guiones, guiones bajos y espacios
        name = re.sub(r'[^a-zA-Z0-9 \-_]', '', name)
        return name.strip()[:max_length]

def extract_error_lines(logs):
    error_lines = []
    for line in logs.split('\n'):
        if '##[error]' in line or 'Error:' in line or 'error:' in line:
            error_lines.append(line)
    return '\n'.join(error_lines) if error_lines else '[No se detectó un mensaje de error explícito]'

class GitHubWorkflowLogger:
    def __init__(self, token):
        self.token = token
        self.headers = {
            'Authorization': f'token {token}',
            'Accept': 'application/vnd.github.v3+json'
        }
        self.base_url = 'https://api.github.com'
    
    def get_workflow_runs(self, owner, repo, per_page=10):
        """Obtiene los workflow runs con status failure de un repositorio"""
        url = f"{self.base_url}/repos/{owner}/{repo}/actions/runs"
        params = {
            'status': 'failure',
            'per_page': per_page
        }
        
        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()['workflow_runs']
        except requests.exceptions.RequestException as e:
            print(f"Error obteniendo workflow runs: {e}")
            return []
    
    def get_jobs_for_run(self, owner, repo, run_id):
        """Obtiene los jobs de un workflow run específico"""
        url = f"{self.base_url}/repos/{owner}/{repo}/actions/runs/{run_id}/jobs"
        
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()['jobs']
        except requests.exceptions.RequestException as e:
            print(f"Error obteniendo jobs para run {run_id}: {e}")
            return []
    
    def get_job_logs(self, owner, repo, job_id):
        """Obtiene los logs de un job específico"""
        url = f"{self.base_url}/repos/{owner}/{repo}/actions/jobs/{job_id}/logs"
        
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.text
        except requests.exceptions.RequestException as e:
            print(f"Error obteniendo logs para job {job_id}: {e}")
            return None
    
    def find_failed_steps_in_logs(self, logs):
        """Identifica los steps que fallaron en los logs"""
        failed_steps = []
        lines = logs.split('\n')
        current_step = None
        step_logs = []
        
        for line in lines:
            # Detectar inicio de step
            if '##[group]' in line and 'Run ' in line:
                if current_step and step_logs:
                    # Verificar si el step anterior falló
                    if any('##[error]' in log_line or 'Error:' in log_line for log_line in step_logs):
                        failed_steps.append({
                            'step_name': current_step,
                            'logs': '\n'.join(step_logs)
                        })
                
                current_step = line.replace('##[group]', '').strip()
                step_logs = []
            
            step_logs.append(line)
        
        # Verificar el último step
        if current_step and step_logs:
            if any('##[error]' in log_line or 'Error:' in log_line for log_line in step_logs):
                failed_steps.append({
                    'step_name': current_step,
                    'logs': '\n'.join(step_logs)
                })
        
        return failed_steps
    
    def create_output_directory(self):
        """Crea el directorio de salida si no existe"""
        output_dir = 'github_failure_logs'
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        return output_dir
    
    def save_failure_log(self, owner, repo, workflow_name, job_name, step_info, run_id, job_id, output_dir):
        """Guarda el log de un failure en un archivo"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        safe_workflow_name = "".join(c for c in workflow_name if c.isalnum() or c in (' ', '-', '_')).rstrip()
        safe_job_name = "".join(c for c in job_name if c.isalnum() or c in (' ', '-', '_')).rstrip()
        safe_step_name = sanitize_filename(step_info['step_name'])
        
        filename = f"{owner}_{repo}_{safe_workflow_name}_{safe_job_name}_{safe_step_name}_{timestamp}.txt"
        filepath = os.path.join(output_dir, filename)
        
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write("Github workflow failure log\n")
            f.write(f"Repositorio: {owner}/{repo}\n")
            f.write(f"Workflow: {workflow_name}\n")
            f.write(f"Job: {job_name}\n")
            f.write(f"Step que falló: {step_info['step_name']}\n")
            f.write(f"Run ID: {run_id}\n")
            f.write(f"Job ID: {job_id}\n")
            f.write(f"Fecha de análisis: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write("Mensaje(s) de error detectado(s):\n")
            f.write(extract_error_lines(step_info['logs']))
        
        return filepath
    
    def analyze_repository(self, owner, repo):
        """Analiza un repositorio específico en busca de workflow failures"""
        print(f"Analizando repositorio: {owner}/{repo}")
        
        output_dir = self.create_output_directory()
        failure_count = 0
        
        # Obtener workflow runs fallidos
        workflow_runs = self.get_workflow_runs(owner, repo)
        
        if not workflow_runs:
            print(f"No se encontraron workflow runs fallidos en {owner}/{repo}")
            return
        
        print(f"Encontrados {len(workflow_runs)} workflow runs fallidos")
        
        for run in workflow_runs:
            run_id = run['id']
            workflow_name = run['name']
            
            print(f"Procesando workflow run: {workflow_name} (ID: {run_id})")
            
            # Obtener jobs del workflow run
            jobs = self.get_jobs_for_run(owner, repo, run_id)
            
            for job in jobs:
                if job['conclusion'] == 'failure':
                    job_id = job['id']
                    job_name = job['name']
                    
                    print(f"  Analizando job fallido: {job_name}")
                    
                    # Obtener logs del job
                    logs = self.get_job_logs(owner, repo, job_id)
                    
                    if logs:
                        # Encontrar steps que fallaron
                        failed_steps = self.find_failed_steps_in_logs(logs)
                        
                        if failed_steps:
                            for step_info in failed_steps:
                                filepath = self.save_failure_log(
                                    owner, repo, workflow_name, job_name, 
                                    step_info, run_id, job_id, output_dir
                                )
                                print(f"    Guardado log de failure: {filepath}")
                                failure_count += 1
                        else:
                            # Si no se pueden identificar steps específicos, guardar todo el log del job
                            step_info = {
                                'step_name': 'Job_Complete_Log',
                                'logs': logs
                            }
                            filepath = self.save_failure_log(
                                owner, repo, workflow_name, job_name, 
                                step_info, run_id, job_id, output_dir
                            )
                            print(f"    Guardado log completo del job: {filepath}")
                            failure_count += 1
            
            # Breve pausa para no sobrecargar la API
            time.sleep(0.5)
        
        print(f"\nAnálisis completado. Se guardaron {failure_count} logs de failures.")

# Ejecución
if not GITHUB_TOKEN:
    print("ERROR: configurar token de GitHub en la variable GITHUB_TOKEN")
elif not OWNER or not REPO:
    print("ERROR: configurar OWNER y REPO")
else:
    # Crear instancia del logger
    logger = GitHubWorkflowLogger(GITHUB_TOKEN)
    
    # Analizar repositorio
    logger.analyze_repository(OWNER, REPO)


Analizando repositorio: davidibanezibanez/example-failing-workflows
Encontrados 4 workflow runs fallidos
Procesando workflow run: Fail Workflow 1 (ID: 15814985792)
  Analizando job fallido: intentional-fail-job-1
    Guardado log de failure: github_failure_logs\davidibanezibanez_example-failing-workflows_Fail Workflow 1_intentional-fail-job-1_2025-06-23T0406595385363Z Run exit 1_20250623_000752.txt
Procesando workflow run: Fail Workflow 4 (ID: 15814985795)
  Analizando job fallido: intentional-fail-job-4
    Guardado log de failure: github_failure_logs\davidibanezibanez_example-failing-workflows_Fail Workflow 4_intentional-fail-job-4_2025-06-23T0406594452798Z Run echo This is Job 1_20250623_000754.txt
  Analizando job fallido: intentional-fail-job-4_1
    Guardado log de failure: github_failure_logs\davidibanezibanez_example-failing-workflows_Fail Workflow 4_intentional-fail-job-4_1_2025-06-23T0406595170977Z Run echo This is Job 2_20250623_000755.txt
Procesando workflow run: Fail Workf