# AI-Driven Log Triage Tool: Self-Contained Notebook

This Jupyter Notebook is a self-contained implementation of an AI-driven log analyzer powered by a Replicate-hosted model (e.g., IBM Granite), using LangChain for model interaction. It scans log files from a local ZIP, detects errors, and generates verbose, expert-level recommendations for DevOps engineers debugging pipelines or applications. All code is included, with dependencies installed within the notebook.

**Features**:
- Processes ZIP files containing `.log` files.
- Detects errors using regex (e.g., 'ERROR', 'Exception').
- Generates recommendations using a Replicate-hosted model via LangChain.
- Outputs text or JSON reports.

**Sample Data**:
- 3 generic `.log` files (`build.log`, `test.log`, `deploy.log`).
- Licensed under [CDLA-Permissive-2.0](https://cdla.dev/permissive-2.0/).

**Prerequisites**:
- **Python**: 3.8+ (tested with 3.13).
- **Git**: Required for `ibm-granite-community/utils` (Git 2.22+ for partial clone support).
- **Replicate API Key**: Sign up at [replicate.com](https://replicate.com/) and set the `REPLICATE_API_TOKEN` environment variable.
- **Dependencies**: `requests`, `click`, `langchain`, `langchain_community`, `replicate`, and `ibm-granite-community/utils` (installed in this notebook).

**License**:
- Code: MIT License.
- Sample Data: CDLA-Permissive-2.0.

**MIT License**:
Copyright (c) 2025 Your Name. Permission is hereby granted to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies, subject to including this notice.

## Step 1: Install Dependencies

Install required packages within the virtual environment.

In [None]:
!pip install git+https://github.com/ibm-granite-community/utils \
    langchain_community\
    langchain \
    replicate \
    requests \
    click

## Step 2: Define Core Functions

Core logic for log triage, including verbose AI recommendations using LangChain and Replicate.

In [None]:
import re
import json
import zipfile
import os
import tempfile
from typing import List, Dict, Optional
from langchain_community.llms import Replicate
from langchain.prompts import PromptTemplate
from ibm_granite_community.notebook_utils import get_env_var

# Initializing Replicate model with LangChain
model_id = "ibm-granite/granite-3.3-8b-instruct"
try:
    llm = Replicate(
        model=model_id,
        replicate_api_token=get_env_var('REPLICATE_API_TOKEN'),
        model_kwargs={
            "temperature": 0.1,
            "max_tokens": 4096,
        },
    )
    print(f"Successfully initialized model: {model_id}")
except Exception as e:
    print(f"Error initializing Replicate model: {str(e)}")
    raise

# Defining prompt template
prompt_template = PromptTemplate(
    input_variables=["file", "line_number", "message", "previous_line"],
    template="""
You are an advanced AI-powered log analysis expert, designed to diagnose and resolve issues in system logs with precision and insight. Your expertise lies in identifying error patterns, interpreting log context, and providing actionable recommendations to streamline DevOps workflows. Given the following log details from the file '{file}':
- Line Number: {line_number}
- Error Message: {message}
- Previous Line (for context): {previous_line}

Perform a thorough analysis of the error, focusing on the log file’s context (e.g., build, test, or deployment logs) and common DevOps scenarios. Provide a detailed recommendation in the following structured format:
1. **Problem**: Clearly describe the issue indicated by the error message, including its potential impact on the system or process.
2. **Possible Causes**: List 2-3 likely reasons for the error, drawing on common log patterns, system behaviors, or configuration issues.
3. **Solution**: Recommend specific, actionable steps to resolve the issue, prioritizing the most likely cause. Include diagnostic commands, configuration checks, or code adjustments as needed. Ensure the steps are practical for a DevOps engineer to implement.

Ensure your response is clear, concise, and tailored to the provided log details, avoiding overly generic advice. If the error is ambiguous, suggest diagnostic steps to gather more information.
"""
)

def extract_zip(zip_path: str, temp_dir: str) -> List[str]:
    """Extract ZIP file to temporary directory and return list of .log files.

    Args:
        zip_path (str): Path to the ZIP file.
        temp_dir (str): Directory to extract files to.

    Returns:
        List[str]: List of paths to extracted .log files.

    Raises:
        ValueError: If the ZIP is invalid or contains no .log files.
    """
    log_files = []
    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(temp_dir)
        for root, _, files in os.walk(temp_dir):
            for file in files:
                if file.endswith('.log'):
                    log_files.append(os.path.join(root, file))
        if not log_files:
            raise ValueError("No .log files found in ZIP")
        return log_files
    except zipfile.BadZipFile:
        raise ValueError("Invalid ZIP file")

def scan_logs(log_file: str) -> List[Dict[str, str]]:
    """Scan a log file for errors and return list of issues.

    Args:
        log_file (str): Path to the log file.

    Returns:
        List[Dict[str, str]]: List of issues with file, line number, message, and previous line.
    """
    issues = []
    error_patterns = [r'ERROR.*', r'[Ee]xception.*', r'failed with code \d+']
    with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
        lines = f.readlines()
        for i, line in enumerate(lines, 1):
            for pattern in error_patterns:
                if re.search(pattern, line):
                    previous_line = lines[i-2].strip() if i > 1 else ''
                    issues.append({
                        'file': os.path.basename(log_file),
                        'line_number': str(i),
                        'message': line.strip(),
                        'previous_line': previous_line
                    })
                    break
    return issues

def get_ai_recommendation(issue: Dict[str, str]) -> str:
    """Generate AI recommendation using Replicate model via LangChain.

    Args:
        issue (Dict[str, str]): Issue details (file, line_number, message, previous_line).

    Returns:
        str: AI-generated recommendation.
    """
    try:
        chain = prompt_template | llm
        response = chain.invoke({
            "file": issue['file'],
            "line_number": issue['line_number'],
            "message": issue['message'],
            "previous_line": issue['previous_line'] or 'N/A'
        })
        return response
    except Exception as e:
        return f"""
1. **Problem**: Pipeline failure detected in {issue['file']} at line {issue['line_number']}: {issue['message']}. This error may halt the process or indicate a critical issue in the system.
2. **Possible Causes**:
   - Misconfiguration in the pipeline or application settings.
   - Insufficient system resources (e.g., memory, disk space).
   - A bug or unhandled exception in the application code.
3. **Solution**:
   - Check configuration files for errors or inconsistencies.
   - Verify system resources using commands like `df -h` for disk space or `free -m` for memory.
   - Review the application code at the relevant section, focusing on error handling and input validation.
   - Enable debug logging to capture additional context and rerun the process.
Error details: {str(e)}
"""

def generate_report(issues: List[Dict[str, str]]) -> List[Dict]:
    """Generate report with recommendations for all issues.

    Args:
        issues (List[Dict[str, str]]): List of issues from scan_logs.

    Returns:
        List[Dict]: Report with file, line_number, error, and recommendation.
    """
    report = []
    for issue in issues:
        recommendation = get_ai_recommendation(issue)
        report.append({
            'file': issue['file'],
            'line_number': issue['line_number'],
            'error': issue['message'],
            'recommendation': recommendation
        })
    return report

def output_report(report: List[Dict], output_format: str, output_file: Optional[str] = None):
    """Output report in text or JSON format.

    Args:
        report (List[Dict]): Report from generate_report.
        output_format (str): 'text' or 'json'.
        output_file (Optional[str]): File path for JSON output (optional).
    """
    if output_format == 'json':
        report_json = json.dumps(report, indent=2)
        if output_file:
            with open(output_file, 'w') as f:
                f.write(report_json)
        else:
            print(report_json)
    else:
        for item in report:
            print(f"File: {item['file']}")
            print(f"Line: {item['line_number']}")
            print(f"Error: {item['error']}")
            print(f"Recommendation:\n{item['recommendation']}\n")

def generate_sample_data(zip_path: str = 'sample_logs.zip') -> str:
    """Generate sample log data under CDLA-Permissive-2.0 license.

    Args:
        zip_path (str): Path to save the ZIP file (default: 'sample_logs.zip').

    Returns:
        str: Path to the generated ZIP file.
    """
    sample_logs = {
        'build.log': [
            '[2025-04-23 10:00:01] Starting build process...',
            '[2025-04-23 10:00:02] Compiling source code...',
            '[2025-04-23 10:00:03] ERROR: Process failed with code 1',
            '[2025-04-23 10:00:04] Build aborted.'
        ],
        'test.log': [
            '[2025-04-23 10:01:01] Running test suite...',
            '[2025-04-23 10:01:02] Test case 1 passed.',
            '[2025-04-23 10:01:03] Exception: Null pointer detected',
            '[2025-04-23 10:01:04] Test suite failed.'
        ],
        'deploy.log': [
            '[2025-04-23 10:02:01] Starting deployment...',
            '[2025-04-23 10:02:02] Configuring server...',
            '[2025-04-23 10:02:03] WARNING: Deployment timeout',
            '[2025-04-23 10:02:04] Deployment incomplete.'
        ]
    }
    with tempfile.TemporaryDirectory() as temp_dir:
        for log_file, lines in sample_logs.items():
            with open(os.path.join(temp_dir, log_file), 'w') as f:
                f.write('\n'.join(lines))
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for log_file in sample_logs.keys():
                zipf.write(os.path.join(temp_dir, log_file), log_file)
    return zip_path

print("Core functions defined successfully.")

## Step 3: Define CLI-Like Function

This function mimics CLI behavior, allowing analysis of local ZIP files with command-line-style arguments.

In [None]:
import click

@click.command()
@click.argument('zip_path', type=click.Path())
@click.option('--output-format', default='text', type=click.Choice(['text', 'json']), help='Output format: text or json')
@click.option('--output-file', default=None, type=click.Path(), help='Output file path for JSON report')
@click.option('--generate-sample', is_flag=True, help='Generate sample log data')
def run_triage(zip_path: str, output_format: str, output_file: Optional[str], generate_sample: bool) -> None:
    """Run the log triage tool with CLI-like arguments.

    Args:
        zip_path (str): Path to the ZIP file containing logs.
        output_format (str): 'text' or 'json' (default: 'text').
        output_file (Optional[str]): File path for JSON output (optional).
        generate_sample (bool): If True, generate sample log data (default: False).
    """
    # Generate sample data first if --generate-sample is set
    if generate_sample:
        zip_path = generate_sample_data(zip_path)
        click.echo(f"Generated sample data at {zip_path}")

    # Check if zip_path exists after generation
    if not os.path.exists(zip_path):
        click.echo(f"Error: {zip_path} does not exist")
        return

    with tempfile.TemporaryDirectory() as temp_dir:
        try:
            # Extract ZIP
            log_files = extract_zip(zip_path, temp_dir)
            click.echo(f"Found {len(log_files)} .log files")

            # Scan logs
            all_issues = []
            for log_file in log_files:
                issues = scan_logs(log_file)
                all_issues.extend(issues)
                click.echo(f"Scanned {log_file}: {len(issues)} issues")

            if not all_issues:
                click.echo("No issues found in logs")
                return

            # Generate report
            report = generate_report(all_issues)
            click.echo(f"Generated report for {len(report)} issues")

            # Output report
            output_report(report, output_format, output_file)

        except ValueError as e:
            click.echo(f"Error: {str(e)}")
        except Exception as e:
            click.echo(f"Unexpected error: {str(e)}")

print("CLI-like function defined successfully.")

## Step 4: Test Sample Data (Text Output)

Generate `sample_logs.zip` and analyze it, displaying results in text format. Ensure Step 3 is executed before running this cell.

In [None]:
from click.testing import CliRunner

if 'run_triage' not in globals():
    raise NameError("run_triage is not defined. Please run Step 3 first.")

# Use CliRunner to invoke run_triage programmatically
runner = CliRunner()
result = runner.invoke(run_triage, ['sample_logs.zip', '--output-format', 'text', '--generate-sample'])
print(result.output)

if result.exception:
    print(f"Error: {result.exception}")

## Step 5: Test Sample Data (JSON Output)

Generate `sample_logs.zip`, analyze it, and save the report as `report.json`. Ensure Step 3 is executed before running this cell.

In [None]:
from click.testing import CliRunner

if 'run_triage' not in globals():
    raise NameError("run_triage is not defined. Please run Step 3 first.")

# Use CliRunner to invoke run_triage programmatically
runner = CliRunner()
result = runner.invoke(run_triage, ['sample_logs.zip', '--output-format', 'json', '--output-file', 'report.json', '--generate-sample'])
print(result.output)

if result.exception:
    print(f"Error: {result.exception}")

if os.path.exists('report.json'):
    with open('report.json', 'r') as f:
        print(f.read())

## Step 6: Programmatic Use

Use core functions directly to demonstrate modularity with sample data.

In [None]:
zip_path = generate_sample_data()
with tempfile.TemporaryDirectory() as temp_dir:
    log_files = extract_zip(zip_path, temp_dir)
    all_issues = []
    for log_file in log_files:
        issues = scan_logs(log_file)
        all_issues.extend(issues)
    if all_issues:
        report = generate_report(all_issues)
        output_report(report, 'text')

## Next Steps & Future Enhancement Ideas

- **Testing**: Use Steps 4 and 5 to test with sample data, or Step 6 for programmatic use. Ensure Step 3 is run before Steps 4 and 5.
- **Customization**: Modify the `PromptTemplate` for specific error types or add more sample logs.
- **Extend for Your Logs**: Leverage this notebook’s functionality (e.g., `extract_zip`, `scan_logs`, `get_ai_recommendation`) to create a tailored log analyzer. Replace `sample_logs.zip` with your ZIP file, adjust regex patterns in `scan_logs` for domain-specific errors, or enhance the `PromptTemplate` for custom context.
- **Incorporate Troubleshooting Guides**: Append a troubleshooting guide to the `PromptTemplate` or preprocess it into the pipeline. For example, parse guide steps and include them in `get_ai_recommendation` for precise solutions.
- **Scalable Approach with Embeddings and Vector Search**: Transform this into a scalable solution using embeddings and vector search.