In [64]:
from langchain.chat_models import ChatOpenAI
import os
import json
import re
from typing import List, Dict, Any, Optional
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.agents import initialize_agent, AgentType
from langchain_core.tools import Tool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.pydantic_v1 import BaseModel, Field
from langchain.memory import ConversationBufferMemory
from dotenv import load_dotenv

load_dotenv()

# Define response schema
class ErrorResolution(BaseModel):
    error_explanation: str = Field(description="Explanation of what the error means")
    fixes: List[str] = Field(description="List of potential fixes for the error")
    code_suggestion: str = Field(description="Code suggestion to fix the error")

llm = ChatOpenAI(
            model="gpt-4o",
            temperature=0.7
        )

# File operation tools
class FileOperations:
    @staticmethod
    def read_file(file_path: str) -> str:
        """Read the content of a file"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        except Exception as e:
            return f"Error reading file: {str(e)}"

    @staticmethod
    def write_file(file_path: str, content: str) -> str:
        """Write content to a file"""
        try:
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(content)
            return f"Content successfully written to {file_path}"
        except Exception as e:
            return f"Error writing to file: {str(e)}"
        
    def list_file(self, path:str = ".") -> str:
        """List files in the current directory"""
        try:
            files = os.listdir(path)
            return f"Files in the current directory: {', '.join(files)}"
        except Exception as e:
            return f"Error listing files: {str(e)}"
        
# Add recursive file search capability with log file filtering
def find_log_files(directory='.', extensions=['.log', '.txt'], max_depth=4):
    """
    Recursively search for log files with specified extensions
    
    Args:
        directory: Starting directory for search
        extensions: File extensions to consider as log files
        max_depth: Maximum recursion depth
        
    Returns:
        List of log file paths found
    """
    log_files = []
    
    def _search_dir(current_dir, current_depth):
        if current_depth > max_depth:
            return
            
        try:
            for item in os.listdir(current_dir):
                item_path = os.path.join(current_dir, item)
                
                # Check if it's a file with log extension
                if os.path.isfile(item_path):
                    _, ext = os.path.splitext(item_path)
                    if ext.lower() in extensions:
                        log_files.append(item_path)
                
                # Recurse into directories
                elif os.path.isdir(item_path):
                    _search_dir(item_path, current_depth + 1)
        except Exception as e:
            print(f"Error accessing {current_dir}: {e}")
    
    _search_dir(directory, 1)
    return log_files


# Create tools
def create_tools():
    file_ops = FileOperations()
    
    tools = [
        Tool(
            name="read_file",
            func=file_ops.read_file,
            description="Read content from a file. Input should be the file path."
        ),
        Tool(
            name="write_file",
            func=file_ops.write_file,
            description="Write content to a file. Input should be a JSON string with 'file_path' and 'content' keys."
        ),
        Tool(
            name="list_file",
            func=file_ops.list_file,
            description="List files in the current directory."
        ),
        Tool(
            name="find_log_files",
            func=find_log_files,
            description="Find all log files (with .log or .txt extensions) in the current directory and subdirectories."
        )
    ]
    
    return tools

In [79]:
prompt=ChatPromptTemplate.from_messages([
        ("system", """You are an expert error resolver agent specializing in Python errors. Your task is to:
        
        1. First use find_log_files tool to locate all log files
        2. Read each log file's content using read_file tool
        3. Analyze the errors in each log file
        4. Generate solutions for each error
        
        For each error:
        1. Understand the error message and its context in traceback
        2. Provide detailed explanation of the error
        3. List specific fixes to resolve the error
        4. Suggest code modifications
        
        IMPORTANT: Your final answer for EACH error MUST be a valid JSON object with these fields:
        {{
            "error_explanation": "detailed explanation of what the error means and why it occurred",
            "fixes": ["fix1", "fix2", "etc"],
            "code_suggestion": "code snippet to fix the error"
        }}
         
         STRICTLY follow the JSON format. Do not add any other text or explanation outside of the JSON object.
        
        Group all errors by log file in your final response.
        """),
        MessagesPlaceholder(variable_name="chat_history"),
        ("human", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ])

In [112]:
from ToolKit import extract_errors_for_llm_analysis

# Option 1: Extract errors from a single log file
errors = extract_errors_for_llm_analysis("HealthApp.log")

Error extracting from HealthApp.log: global flags not at the start of the expression at position 1


In [110]:
errors

'No errors found in the provided logs.'

In [81]:
tools = create_tools()
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)

agent = create_openai_tools_agent(
    llm=llm,
    tools=create_tools(),
    prompt=prompt
)

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    memory=memory,
    return_intermediate_steps=True
)

In [83]:
res = agent_executor.invoke({
    "input": "Please find all log files in the current directory ""."" and analyze them for errors."
})



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `find_log_files` with `.`


[0m[36;1m[1;3m['.\\logs.log'][0m[32;1m[1;3m
Invoking: `read_file` with `.\logs.log`


[0m[36;1m[1;3m2025-04-25 22:54:02,462 - ERROR - Exception occurred
Traceback (most recent call last):
  File "c:\Users\shubh\Desktop\Work\Calfus_hack\log_simulator.py", line 20, in <module>
    result = 1 / 0
             ~~^~~
ZeroDivisionError: division by zero
2025-04-25 22:54:02,462 - ERROR - Exception occurred
Traceback (most recent call last):
  File "c:\Users\shubh\Desktop\Work\Calfus_hack\log_simulator.py", line 26, in <module>
    with open('non_existent_file.txt', 'r') as f:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
FileNotFoundError: [Errno 2] No such file or directory: 'non_existent_file.txt'
2025-04-25 22:54:02,467 - ERROR - Exception occurred
Traceback (most recent call last):
  File "c:\Users\shubh\Desktop\Work\Calfus_hack\log_simulator.py", line 34, in <module>
    value = d['



In [84]:
from pprint import pprint

pprint(res["output"])

('Here are the error analyses and solutions for the errors found in the log '
 'file `logs.log`:\n'
 '\n'
 '### Error 1: ZeroDivisionError\n'
 '```json\n'
 '{\n'
 '    "error_explanation": "A ZeroDivisionError occurs when you attempt to '
 'divide a number by zero, which is mathematically undefined.",\n'
 '    "fixes": ["Ensure the denominator is not zero before performing '
 'division."],\n'
 '    "code_suggestion": "if denominator != 0:\\n    result = numerator / '
 'denominator\\nelse:\\n    print(\'Denominator is zero, cannot divide.\')"\n'
 '}\n'
 '```\n'
 '\n'
 '### Error 2: FileNotFoundError\n'
 '```json\n'
 '{\n'
 '    "error_explanation": "A FileNotFoundError occurs when an attempt is made '
 'to open a file that does not exist at the specified path.",\n'
 '    "fixes": ["Check if the file exists before trying to open it.", "Ensure '
 'the file path is correct.", "Use exception handling to handle the error '
 'gracefully."],\n'
 '    "code_suggestion": "import os\\nif '
 "os.p

In [92]:
def list_code_files(root_dir: str, extensions: Optional[List[str]] = None) -> List[str]:
    """
    Recursively lists all files in root_dir matching given extensions.
    Returns a list of absolute file paths.
    """
    if extensions is None:
        extensions = ['.py', '.js', '.ts', '.java', '.cpp', '.go', '.rb', '.rs']
    code_files = []
    for dirpath, _, filenames in os.walk(root_dir):
        for f in filenames:
            if any(f.endswith(ext) for ext in extensions):
                abs_path = os.path.abspath(os.path.join(dirpath, f))
                code_files.append(abs_path)
    return code_files

In [95]:
list_code_files(".")

['c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\agent.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\app.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\codebase_QA.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\database.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\driver.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\log_obs.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\log_simulator.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\main.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\sample\\loganalyzer\\analyzer.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\sample\\loganalyzer\\api.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\sample\\loganalyzer\\main.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\sample\\loganalyzer\\monitor.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\sample\\loganalyzer\\output.py',
 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\sample\\loganalyzer\\__init__.py']

In [96]:
from langchain.schema import Document

def load_with_line_metadata(file_path: str, chunk_size: int = 200) -> List[Document]:
    """
    Loads a file and splits into Document chunks with line number metadata.
    Uses absolute file paths for reliable reference.
    """
    docs = []
    # Convert to absolute path to ensure consistency
    abs_file_path = os.path.abspath(file_path)
    
    try:
        with open(abs_file_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
            
        for i in range(0, len(lines), chunk_size):
            chunk_lines = lines[i:i + chunk_size]
            docs.append(
                Document(
                    page_content=''.join(chunk_lines),
                    metadata={
                        'source': abs_file_path,  # Store absolute path
                        'start_line': i + 1,
                        'end_line': i + len(chunk_lines)
                    }
                )
            )
    except Exception as e:
        print(f"Error loading {abs_file_path}: {str(e)}")
    
    return docs

In [90]:
code_files = list_code_files('.')

In [97]:
load_with_line_metadata(code_files[0], chunk_size=10)

[Document(metadata={'source': 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\agent.py', 'start_line': 1, 'end_line': 10}, page_content='import os\nimport json\nimport re\nfrom typing import List, Dict, Any, Optional\nimport logging\nfrom langchain_openai import ChatOpenAI\nfrom langchain.agents import AgentExecutor, create_openai_tools_agent\nfrom langchain_core.tools import Tool\nfrom langchain.prompts import ChatPromptTemplate, MessagesPlaceholder\nfrom langchain.pydantic_v1 import BaseModel, Field\n'),
 Document(metadata={'source': 'c:\\Users\\shubh\\Desktop\\Work\\Calfus_hack\\agent.py', 'start_line': 11, 'end_line': 20}, page_content='from langchain.memory import ConversationBufferMemory\nfrom dotenv import load_dotenv\n\n# Load environment variables\nload_dotenv()\n\n# Configure logging\nlogging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")\nlogger = logging.getLogger(__name__)\n\n'),
 Document(metadata={'source': 'c:\\Users\\shubh\\Desktop\\

In [98]:
def read_file(full_path, st, end):
    """
    Reads raw text from code files.
    Accepts the full file path, start line (st), and end line (end) as arguments.
    If st=0 and end=0, the entire file is read.
    """
    # Ensure the path is valid
    if not os.path.exists(full_path):
        return f"Error: File not found: {full_path}"

    # Read the file
    try:
        with open(full_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()

        # If st=0 and end=0, read the entire file
        if st == 0 and end == 0:
            return ''.join(lines)

        # Clamp values
        st = max(1, st)
        end = min(len(lines), end) if end > 0 else len(lines)

        # Return the requested lines
        return ''.join(lines[st - 1: end])
    except Exception as e:
        return f"Error reading file: {str(e)}"

In [104]:
read_file(code_files[0], 1, 10)

'import os\nimport json\nimport re\nfrom typing import List, Dict, Any, Optional\nimport logging\nfrom langchain_openai import ChatOpenAI\nfrom langchain.agents import AgentExecutor, create_openai_tools_agent\nfrom langchain_core.tools import Tool\nfrom langchain.prompts import ChatPromptTemplate, MessagesPlaceholder\nfrom langchain.pydantic_v1 import BaseModel, Field\n'

# Fetching Logs from Loki

This section adds functionality to fetch logs from a Loki instance for advanced log analysis.

In [None]:
import requests
from datetime import datetime, timedelta
import pandas as pd
import urllib.parse

class LokiLogFetcher:
    def __init__(self, base_url="http://localhost:3100"):
        """Initialize the Loki log fetcher with the Loki server URL.
        
        Args:
            base_url: The URL of the Loki server, default is http://localhost:3100
        """
        self.base_url = base_url
        self.query_endpoint = f"{base_url}/loki/api/v1/query_range"
    
    def query_logs(self, query, start_time=None, end_time=None, limit=100, direction="BACKWARD"):
        """Query logs from Loki.
        
        Args:
            query: LogQL query string
            start_time: Start time in ISO format or datetime object
            end_time: End time in ISO format or datetime object
            limit: Maximum number of entries to return
            direction: Query direction (BACKWARD or FORWARD)
            
        Returns:
            DataFrame containing the logs or raw response if pandas is not available
        """
        # Set default time range if not provided
        if end_time is None:
            end_time = datetime.now()
        if start_time is None:
            start_time = end_time - timedelta(hours=1)
            
        # Convert datetime objects to ISO format strings
        if isinstance(start_time, datetime):
            start_time = start_time.isoformat()
        if isinstance(end_time, datetime):
            end_time = end_time.isoformat()
            
        # Prepare query parameters
        params = {
            "query": query,
            "start": start_time,
            "end": end_time,
            "limit": limit,
            "direction": direction
        }
        
        # Make the request to Loki
        try:
            response = requests.get(self.query_endpoint, params=params)
            response.raise_for_status()  # Raise an exception for HTTP errors
            data = response.json()
            
            # Process the response into a DataFrame
            if "data" in data and "result" in data["data"]:
                results = data["data"]["result"]
                
                # Prepare data for DataFrame
                log_entries = []
                for result in results:
                    labels = result.get("stream", {})
                    for value in result.get("values", []):
                        timestamp_ns, log_line = value
                        timestamp = datetime.fromtimestamp(int(timestamp_ns) / 1e9)
                        entry = {
                            "timestamp": timestamp,
                            "log": log_line,
                            **labels  # Include all labels as columns
                        }
                        log_entries.append(entry)
                
                if log_entries:
                    return pd.DataFrame(log_entries)
                else:
                    print("No log entries found.")
                    return pd.DataFrame()
            else:
                print(f"Unexpected response format: {data}")
                return data
        except requests.exceptions.RequestException as e:
            print(f"Error querying Loki: {e}")
            return None
    
    def analyze_error_patterns(self, logs_df):
        """Analyze error patterns in the logs.
        
        Args:
            logs_df: DataFrame containing logs
            
        Returns:
            Dictionary with error analysis
        """
        if logs_df is None or logs_df.empty:
            return {"error": "No logs to analyze"}
            
        # Filter for error logs (customize based on your log format)
        error_df = logs_df[logs_df["log"].str.contains("error|exception|fail", case=False, regex=True)].copy()
        
        # Count occurrences of different error types
        error_counts = {}
        error_patterns = [
            ("ZeroDivisionError", r"ZeroDivisionError"),
            ("FileNotFoundError", r"FileNotFoundError"),
            ("KeyError", r"KeyError"),
            ("IndexError", r"IndexError"),
            ("AttributeError", r"AttributeError"),
            ("TypeError", r"TypeError"),
            ("ValueError", r"ValueError"),
            ("NameError", r"NameError"),
            ("Import Error", r"ImportError|ModuleNotFoundError"),
            ("Permission Error", r"PermissionError"),
            ("Connection Error", r"ConnectionError|ConnectionRefusedError"),
            ("Timeout Error", r"TimeoutError"),
            ("Other Error", r"Error|Exception")  # Catch-all for other errors
        ]
        
        for error_name, pattern in error_patterns:
            mask = error_df["log"].str.contains(pattern, case=False, regex=True)
            count = mask.sum()
            if count > 0:
                error_counts[error_name] = count
                
        # Time series analysis of errors
        error_df["hour"] = error_df["timestamp"].dt.hour
        hourly_counts = error_df.groupby("hour").size().to_dict()
        
        return {
            "total_logs": len(logs_df),
            "error_logs": len(error_df),
            "error_types": error_counts,
            "hourly_distribution": hourly_counts
        }

# Create an instance of the Loki log fetcher
loki_fetcher = LokiLogFetcher(base_url="http://localhost:3100")

In [None]:
# Example query to fetch error logs from Loki
# Replace with your actual label selectors
query = '{job="application"}'

# Get logs from the last hour (modify as needed)
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)

try:
    # Fetch logs
    logs_df = loki_fetcher.query_logs(
        query=query,
        start_time=start_time,
        end_time=end_time,
        limit=1000
    )
    
    if logs_df is not None and not logs_df.empty:
        print(f"Retrieved {len(logs_df)} log entries.")
        # Display the first few log entries
        display(logs_df.head())
    else:
        print("No logs retrieved or connection failed.")
except Exception as e:
    print(f"Error fetching logs: {e}")

In [None]:
# Analyze error patterns in the retrieved logs
try:
    if 'logs_df' in locals() and logs_df is not None and not logs_df.empty:
        error_analysis = loki_fetcher.analyze_error_patterns(logs_df)
        
        # Display error analysis results
        print(f"Total logs analyzed: {error_analysis['total_logs']}")
        print(f"Error logs found: {error_analysis['error_logs']} ({error_analysis['error_logs']/error_analysis['total_logs']*100:.2f}%)\n")
        
        print("Error type distribution:")
        for error_type, count in error_analysis['error_types'].items():
            print(f"  - {error_type}: {count} occurrences")
        
        # Create a bar chart of error types
        if error_analysis['error_types']:
            error_types_df = pd.DataFrame(list(error_analysis['error_types'].items()), 
                                         columns=['Error Type', 'Count'])
            error_types_df.sort_values('Count', ascending=False, inplace=True)
            
            # Plot using matplotlib
            import matplotlib.pyplot as plt
            plt.figure(figsize=(12, 6))
            plt.bar(error_types_df['Error Type'], error_types_df['Count'])
            plt.xticks(rotation=45, ha='right')
            plt.title('Error Types in Logs')
            plt.xlabel('Error Type')
            plt.ylabel('Count')
            plt.tight_layout()
            plt.show()
            
            # Plot hourly distribution
            if error_analysis['hourly_distribution']:
                hours = sorted(error_analysis['hourly_distribution'].keys())
                counts = [error_analysis['hourly_distribution'][h] for h in hours]
                
                plt.figure(figsize=(12, 6))
                plt.plot(hours, counts, marker='o')
                plt.title('Hourly Distribution of Errors')
                plt.xlabel('Hour')
                plt.ylabel('Error Count')
                plt.xticks(range(0, 24, 1))
                plt.grid(True, alpha=0.3)
                plt.tight_layout()
                plt.show()
    else:
        print("No logs available for analysis.")
except Exception as e:
    print(f"Error during analysis: {e}")

## Advanced Error Analysis with LLM

Using our existing agent to analyze specific error messages from Loki logs.

In [None]:
def extract_error_contexts(logs_df, error_pattern="error|exception|fail", context_lines=5):
    """Extract error messages with surrounding context.
    
    Args:
        logs_df: DataFrame containing logs
        error_pattern: Regex pattern to identify error logs
        context_lines: Number of lines of context before and after the error
        
    Returns:
        List of dictionaries containing error contexts
    """
    if logs_df is None or logs_df.empty:
        return []
        
    # Create a copy and ensure logs are sorted by timestamp
    df = logs_df.sort_values('timestamp').copy()
    df.reset_index(drop=True, inplace=True)
    
    # Find error log indices
    error_indices = df[df['log'].str.contains(error_pattern, case=False, regex=True)].index
    
    error_contexts = []
    for idx in error_indices:
        # Get context before and after the error
        start_idx = max(0, idx - context_lines)
        end_idx = min(len(df) - 1, idx + context_lines)
        
        context_logs = df.loc[start_idx:end_idx, 'log'].tolist()
        error_log = df.loc[idx, 'log']
        timestamp = df.loc[idx, 'timestamp']
        
        # Extract additional context if available
        labels = {col: df.loc[idx, col] for col in df.columns 
                  if col not in ['timestamp', 'log', 'hour']}
        
        error_contexts.append({
            'timestamp': timestamp,
            'error_log': error_log,
            'context_logs': context_logs,
            'labels': labels
        })
    
    return error_contexts

In [None]:
# Extract error contexts from the logs
if 'logs_df' in locals() and logs_df is not None and not logs_df.empty:
    error_contexts = extract_error_contexts(logs_df)
    print(f"Found {len(error_contexts)} errors with context.")
    
    # Display the first error context as an example
    if error_contexts:
        print("\nExample error context:")
        print(f"Timestamp: {error_contexts[0]['timestamp']}")
        print(f"Error: {error_contexts[0]['error_log']}")
        print("Context logs:")
        for i, log in enumerate(error_contexts[0]['context_logs']):
            print(f"  {i+1}. {log[:100]}{'...' if len(log) > 100 else ''}")
else:
    print("No logs available for extracting error contexts.")

In [None]:
def analyze_error_with_llm(error_context, agent_executor):
    """Analyze a single error context using the LLM agent.
    
    Args:
        error_context: Dictionary containing error context
        agent_executor: LangChain agent executor to use for analysis
        
    Returns:
        Analysis result from the agent
    """
    # Format the error and context as a prompt
    error_prompt = f"""Analyze this error and provide a solution:
    
    Error log: {error_context['error_log']}
    
    Context (surrounding log entries):
    """
    
    # Add the context logs
    for i, log in enumerate(error_context['context_logs']):
        error_prompt += f"\n{i+1}. {log}"
    
    # Add any additional labels/metadata
    if error_context['labels']:
        error_prompt += "\n\nAdditional context:"
        for key, value in error_context['labels'].items():
            error_prompt += f"\n{key}: {value}"
    
    # Run analysis with the agent
    try:
        result = agent_executor.invoke({"input": error_prompt})
        return {
            'timestamp': error_context['timestamp'],
            'error_log': error_context['error_log'],
            'analysis': result['output']
        }
    except Exception as e:
        return {
            'timestamp': error_context['timestamp'],
            'error_log': error_context['error_log'],
            'analysis': f"Error during analysis: {str(e)}"
        }

In [None]:
# Analyze a sample of errors using our LLM agent
if 'error_contexts' in locals() and error_contexts:
    # Take a sample of up to 3 errors to analyze (to limit API usage)
    sample_size = min(3, len(error_contexts))
    sample_errors = error_contexts[:sample_size]
    
    analyses = []
    for i, error_context in enumerate(sample_errors):
        print(f"Analyzing error {i+1}/{sample_size}...")
        analysis = analyze_error_with_llm(error_context, agent_executor)
        analyses.append(analysis)
    
    # Display the analyses
    for i, analysis in enumerate(analyses):
        print(f"\n--- Error Analysis {i+1} ---")
        print(f"Timestamp: {analysis['timestamp']}")
        print(f"Error: {analysis['error_log']}")
        print(f"\nAnalysis & Solution:\n{analysis['analysis']}")
        print("\n" + "-"*50)
else:
    print("No error contexts available to analyze.")