In [None]:
%pip install requests python-dotenv

# Workflow Run Analysis
Analysis tool for GitHub Actions workflow runs.

In [None]:
import os
import requests
import json
import re
import zipfile
import io
from dotenv import load_dotenv

# Load environment variables from .env file
# Assuming .env is in the same directory or adjust path as needed
load_dotenv('.env')

# Configuration
REPO_OWNER = "chagong"
REPO_NAME = "IssueLens"
OUTPUT_FILE = "workflow-runs.json"
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_TOKEN") or os.environ.get("GITHUB_PAT")

In [None]:
def get_headers():
    if not GITHUB_TOKEN:
        raise ValueError("Please set GITHUB_TOKEN, GH_TOKEN, or GITHUB_PAT environment variable.")
    return {
        "Authorization": f"Bearer {GITHUB_TOKEN}",
        "Accept": "application/vnd.github+json",
        "X-GitHub-Api-Version": "2022-11-28",
    }

def list_workflow_runs():
    """
    List ALL workflow runs for the repository, handling pagination.
    """
    url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs"
    all_runs = []
    page = 1
    per_page = 100

    print("Starting to fetch all workflow runs...")

    while True:
        params = {
            "per_page": per_page,
            "page": page,
        }
        response = requests.get(url, headers=get_headers(), params=params)
        response.raise_for_status()

        data = response.json()
        runs = data.get("workflow_runs", [])

        if not runs:
            break

        all_runs.extend(runs)
        print(f"  Fetched {len(runs)} runs from page {page}. Total so far: {len(all_runs)}")

        if len(runs) < per_page:
            break

        page += 1

    return all_runs

def fetch_run_logs(run_id):
    """
    Fetch the log zip archive for a specific run.
    """
    url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs/{run_id}/logs"
    # Follow redirects (allow_redirects=True is default for requests)
    response = requests.get(url, headers=get_headers(), stream=True)
    if response.status_code == 200:
        return response.content
    else:
        print(
            f"Warning: Failed to fetch logs for run {run_id}. Status: {response.status_code}"
        )
        return None

def extract_json_from_log_content(content):
    """
    Search for JSON blocks within triple backticks, stripping GitHub Actions timestamps.
    """
    # Regex to capture content inside ``` or ```json blocks
    # We capture everything between backticks to handle the timestamps on every line
    pattern = re.compile(r"```(?:json)?(.*?)```", re.DOTALL | re.IGNORECASE)
    matches = pattern.findall(content)

    valid_jsons = []
    # Regex for GitHub Actions timestamp: YYYY-MM-DDTHH:MM:SS.microsZ
    # Example: 2026-01-21T01:55:25.0319581Z
    timestamp_pattern = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z\s+")

    for match in matches:
        cleaned_lines = []
        for line in match.splitlines():
            # Remove timestamp from the beginning of the line
            line_no_timestamp = timestamp_pattern.sub("", line.strip())
            cleaned_lines.append(line_no_timestamp)

        full_clean_text = "\n".join(cleaned_lines)

        if not full_clean_text:
            continue

        try:
            # Attempt to find the JSON object { ... }
            start_idx = full_clean_text.find("{")
            end_idx = full_clean_text.rfind("}")

            if start_idx != -1 and end_idx != -1 and end_idx >= start_idx:
                json_str = full_clean_text[start_idx : end_idx + 1]
                json_obj = json.loads(json_str)
                valid_jsons.append(json_obj)
        except json.JSONDecodeError:
            continue

    return valid_jsons

def deduplicate_list(data):
    """
    Deduplicate a list of JSON objects (dictionaries).
    """
    unique_data = []
    seen = set()

    for item in data:
        # Convert dictionary to a JSON string with sorted keys to ensure deterministic hashing
        item_str = json.dumps(item, sort_keys=True)
        if item_str not in seen:
            seen.add(item_str)
            unique_data.append(item)
    return unique_data

## 1. Fetch Workflow Runs
Fetch workflow runs, extract JSON from logs, deduplicate, and save to file.

In [None]:
if not GITHUB_TOKEN:
    print("Error: GITHUB_TOKEN or GH_TOKEN environment variable is not set.")
    print("Please export your GitHub PAT before running this script.")
else:
    print(f"Fetching recent workflow runs for {REPO_OWNER}/{REPO_NAME}...")
    try:
        runs = list_workflow_runs()
        print(f"Found {len(runs)} total runs.")
    except Exception as e:
        print(f"Error listing runs: {e}")
        runs = []

    all_extracted_data = []

    for run in runs:
        run_id = run["id"]
        run_name = run.get("name", "unknown")
        print(f"Processing Run ID: {run_id} ({run_name})...")

        log_zip_content = fetch_run_logs(run_id)
        if not log_zip_content:
            continue

        try:
            with zipfile.ZipFile(io.BytesIO(log_zip_content)) as z:
                # Iterate over all files in the zip
                for filename in z.namelist():
                    with z.open(filename) as f:
                        try:
                            # Log files are typically UTF-8
                            content = f.read().decode("utf-8", errors="replace")
                            extracted = extract_json_from_log_content(content)
                            if extracted:
                                print(
                                    f"  -> Found {len(extracted)} JSON object(s) in {filename}"
                                )
                                all_extracted_data.extend(extracted)
                        except Exception as e:
                            # If a single file fails, log and continue
                            print(f"  -> Error processing file {filename}: {e}")
        except zipfile.BadZipFile:
            print(f"  -> Invalid zip file received for run {run_id}")
        except Exception as e:
            print(f"  -> Error processing logs for run {run_id}: {e}")

    # Deduplicate data
    unique_data = deduplicate_list(all_extracted_data)
    print(
        f"\nDeduplication complete: {len(all_extracted_data)} -> {len(unique_data)} items"
    )

    # Store results
    try:
        with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
            json.dump(unique_data, f, indent=4)
        print(f"\nCompleted! Saved {len(unique_data)} total JSON objects to:")
        print(f"{OUTPUT_FILE}")
    except Exception as e:
        print(f"Error saving output file: {e}")

## 2. Deduplicate Existing Data
Run this cell if you want to deduplicate an existing JSON file without fetching new logs.

In [None]:
if not os.path.exists(OUTPUT_FILE):
    print(f"File not found: {OUTPUT_FILE}")
else:
    print(f"Reading from {OUTPUT_FILE}...")
    try:
        with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
            data = json.load(f)

        print(f"Read {len(data)} items.")
        unique_data = deduplicate_list(data)
        print(f"Deduplication complete: {len(data)} -> {len(unique_data)} items")

        with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
            json.dump(unique_data, f, indent=4)
        print(f"Saved deduplicated data to {OUTPUT_FILE}")

    except Exception as e:
        print(f"Error processing file: {e}")

## 3. Count Statistics
Calculate total and critical issue counts from the data.

In [3]:
if not os.path.exists(OUTPUT_FILE):
    print(f"File not found: {OUTPUT_FILE}")
else:
    print(f"Reading from {OUTPUT_FILE}...")
    try:
        with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
            data = json.load(f)

        total_issues = 0
        critical_issues = 0

        for item in data:
            total_issues += item.get("totalIssues", 0)
            critical_issues += item.get("criticalIssues", 0)

        print("-" * 30)
        print(f"Total entries processed: {len(data)}")
        print(f"Sum of 'totalIssues':    {total_issues}")
        print(f"Sum of 'criticalIssues': {critical_issues}")
        print("-" * 30)

    except Exception as e:
        print(f"Error processing file: {e}")

Reading from workfow-runs.json...
------------------------------
Total entries processed: 60
Sum of 'totalIssues':    304
Sum of 'criticalIssues': 53
------------------------------
