In [44]:
from llama_index.llms.ollama import Ollama

# Load a local model (e.g., Mistral)
llm = Ollama(model="llama3.3:latest", temperature=0.2)


In [None]:
import os
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

os.environ["OPENAI_API_KEY"] = os.getenv('OAK')


client = OpenAI(
    api_key=os.getenv('OAK'),  # This is the default and can be omitted
)

chat_completion = client.chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "Say this is a test",
        }
    ],
    model="gpt-3.5-turbo",
)
print(chat_completion)

ChatCompletion(id='chatcmpl-B6r7Sv4DH6Dal9MDfOx20R7yefv0X', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='This is a test.', refusal=None, role='assistant', audio=None, function_call=None, tool_calls=None))], created=1740973858, model='gpt-3.5-turbo-0125', object='chat.completion', service_tier='default', system_fingerprint=None, usage=CompletionUsage(completion_tokens=6, prompt_tokens=12, total_tokens=18, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0)))


In [22]:
import json
import pandas as pd

initial_csv = pd.read_csv("./csv/Explore-logs-A-data-2025-03-02 22_52_03.csv")

# print(initial_csv.iloc[:, :2])

initial_csv.iloc[:, :2].to_csv('./csv/processed_logs.csv', index=False)

# json_string = r"""{"level":50,"time":1740573577228,"pid":1,"hostname":"amaginow","reqId":"9803d689-39df-4e7c-83a9-44d5aa0c40e3","err":{"message":"Request failed with status code 404","name":"AxiosError","stack":"AxiosError: Request failed with status code 404\n    at settle (/home/node/app/server.js:89512:12)\n    at IncomingMessage.handleStreamEnd (/home/node/app/server.js:91460:11)\n    at IncomingMessage.emit (node:events:532:35)\n    at endReadableNT (node:internal/streams/readable:1696:12)\n    at process.processTicksAndRejections (node:internal/process/task_queues:90:21)\n    at Axios.request (/home/node/app/server.js:92610:41)\n    at process.processTicksAndRejections (node:internal/process/task_queues:105:5)\n    at async Object.getDistributions (/home/node/app/server.js:93191:43)","config":{"transitional":{"silentJSONParsing":true,"forcedJSONParsing":true,"clarifyTimeoutError":false},"adapter":["xhr","http","fetch"],"transformRequest":[null],"transformResponse":[null],"timeout":0,"xsrfCookieName":"XSRF-TOKEN","xsrfHeaderName":"X-XSRF-TOKEN","maxContentLength":-1,"maxBodyLength":-1,"env":{},"headers":{"Accept":"application/json, text/plain, */*","x-account-id":"amg78787","Traceparent":"9803d689-39df-4e7c-83a9-44d5aa0c40e3","User-Agent":"axios/1.7.7","Accept-Encoding":"gzip, compress, deflate, br"},"params":{"per_page":150,"page":1,"order":"desc","search":"","channelid":""},"method":"get","url":"http://fabric.amaginow.svc.cluster.local:5100//api/v3/deliveries"},"code":"ERR_BAD_REQUEST","status":404},"msg":"Request failed with status code 404"}"""

# json_object = json.loads(json_string)

# print(json_object)  # Now it's a Python dictionary
# print(json_object['err']['message'])  

In [None]:
import pandas as pd
import json
from datetime import datetime
# import matplotlib.pyplot as plt
from typing import Dict, List, Any, Optional, Union
# from llama_index import VectorStoreIndex, Document, ServiceContext
from llama_index.llms.openai import OpenAI
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool


class LogParser:
    """Class for parsing and structuring log data"""
    
    @staticmethod
    def from_csv(file_path: str) -> pd.DataFrame:
        """Load log data from CSV and parse the JSON content."""
        df = pd.read_csv(file_path)
        
        # Parse JSON strings in the 'Line' column
        parsed_data = []
        for _, row in df.iterrows():
            timestamp = int(row['Time'])
            try:
                log_entry = json.loads(row['Line'])
                log_entry['timestamp'] = timestamp
                parsed_data.append(log_entry)
            except json.JSONDecodeError:
                print(f"Error parsing JSON at timestamp {timestamp}")
        
        return pd.DataFrame(parsed_data)

    @staticmethod
    def structure_logs(logs_df: pd.DataFrame) -> pd.DataFrame:
        """Extract and structure relevant information from the logs."""
        structured_logs = []
        
        for _, log in logs_df.iterrows():
            entry = {
                'timestamp': log['timestamp'],
                'datetime': datetime.fromtimestamp(log['timestamp']/1000).strftime('%Y-%m-%d %H:%M:%S'),
                'level': log.get('level'),
                'message': log.get('msg'),
                'request_id': log.get('reqId'),
                'error_name': log.get('err', {}).get('name'),
                'error_message': log.get('err', {}).get('message'),
                'error_code': log.get('err', {}).get('code'),
                'error_status': log.get('err', {}).get('status'),
            }
            
            # Extract API details from config if available
            if 'err' in log and 'config' in log['err']:
                config = log['err']['config']
                entry['api_url'] = config.get('url')
                entry['api_method'] = config.get('method')
                
                if 'params' in config:
                    entry['api_params'] = config['params']
            
            structured_logs.append(entry)
        
        return pd.DataFrame(structured_logs)


class LogAnalyzer:
    """Class for analyzing structured log data"""
    
    def __init__(self, logs_df: pd.DataFrame):
        self.logs_df = logs_df
    
    def count_errors_by_type(self) -> Dict[str, int]:
        """Count errors by type/name."""
        return self.logs_df['error_name'].value_counts().to_dict()
    
    def count_errors_by_status(self) -> Dict[int, int]:
        """Count errors by HTTP status code."""
        return self.logs_df['error_status'].value_counts().to_dict()
    
    def find_most_frequent_urls(self) -> Dict[str, int]:
        """Find the most frequently occurring URLs in error logs."""
        return self.logs_df['api_url'].value_counts().to_dict()
    
    def errors_over_time(self, interval: str = '1min') -> Dict[datetime, int]:
        """Analyze error frequency over time."""
        df_copy = self.logs_df.copy()
        df_copy['datetime'] = pd.to_datetime(df_copy['datetime'])
        df_copy = df_copy.set_index('datetime')
        error_counts = df_copy.resample(interval).size()
        return error_counts.to_dict()
    
    def search_logs_by_request_id(self, request_id: str) -> List[Dict[str, Any]]:
        """Search for logs with a specific request ID."""
        matching_logs = self.logs_df[self.logs_df['request_id'] == request_id]
        return matching_logs.to_dict('records')
    
    def search_logs_by_channel_id(self, channel_id: str) -> List[Dict[str, Any]]:
        """Search for logs related to a specific channel ID."""
        matching_logs = self.logs_df[self.logs_df['api_params'].apply(
            lambda x: isinstance(x, dict) and x.get('channelid') == channel_id
        )]
        return matching_logs.to_dict('records')
    
    def generate_error_summary(self) -> Dict[str, Any]:
        """Generate a summary of error patterns in the logs."""
        total_errors = len(self.logs_df)
        error_types = self.count_errors_by_type()
        status_codes = self.count_errors_by_status()
        
        # Check for duplicate errors (same URL, same status)
        url_status_counts = self.logs_df.groupby(['api_url', 'error_status']).size().sort_values(ascending=False)
        
        # Check for patterns in parameters
        channel_counts = self.logs_df['api_params'].apply(
            lambda x: x.get('channelid') if isinstance(x, dict) else None
        ).value_counts().to_dict()
        
        return {
            "total_errors": total_errors,
            "error_types": error_types,
            "status_codes": status_codes,
            "top_url_status_combinations": url_status_counts.to_dict(),
            "channel_id_frequencies": channel_counts
        }
    
    def analyze_stack_traces(self) -> Dict[str, int]:
        """Extract common patterns from stack traces."""
        stack_trace_components = []
        for _, log in self.logs_df.iterrows():
            if 'err' in log and 'stack' in log['err']:
                stack = log['err']['stack']
                # Extract key components like function names and line numbers
                components = [line.strip() for line in stack.split('\n') if line.strip()]
                stack_trace_components.extend(components)
        
        component_counts = pd.Series(stack_trace_components).value_counts().head(10).to_dict()
        return component_counts


class LogVisualizer:
    """Class for visualizing log data"""
    
    def __init__(self, logs_df: pd.DataFrame):
        self.logs_df = logs_df
    
    # def errors_over_time(self, output_path: str = 'error_frequency.png') -> str:
    #     """Generate a visualization of errors over time."""
    #     df_copy = self.logs_df.copy()
    #     df_copy['datetime'] = pd.to_datetime(df_copy['datetime'])
    #     df_copy = df_copy.set_index('datetime')
    #     error_counts = df_copy.resample('1min').size()
        
    #     plt.figure(figsize=(12, 6))
    #     error_counts.plot(kind='line')
    #     plt.title('Error Frequency Over Time')
    #     plt.xlabel('Time')
    #     plt.ylabel('Number of Errors')
    #     plt.tight_layout()
        
    #     # Save the plot to a file
    #     plt.savefig(output_path)
    #     plt.close()
        
    #     return f"Visualization saved to {output_path}"
    
    # def error_types_pie_chart(self, output_path: str = 'error_types.png') -> str:
    #     """Generate a pie chart of error types."""
    #     error_types = self.logs_df['error_name'].value_counts()
        
    #     plt.figure(figsize=(10, 10))
    #     plt.pie(error_types, labels=error_types.index, autopct='%1.1f%%')
    #     plt.title('Distribution of Error Types')
    #     plt.tight_layout()
        
    #     plt.savefig(output_path)
    #     plt.close()
        
    #     return f"Visualization saved to {output_path}"
    
    # def status_code_bar_chart(self, output_path: str = 'status_codes.png') -> str:
    #     """Generate a bar chart of HTTP status codes."""
    #     status_codes = self.logs_df['error_status'].value_counts()
        
    #     plt.figure(figsize=(12, 6))
    #     status_codes.plot(kind='bar')
    #     plt.title('HTTP Status Code Distribution')
    #     plt.xlabel('Status Code')
    #     plt.ylabel('Count')
    #     plt.tight_layout()
        
    #     plt.savefig(output_path)
    #     plt.close()
        
    #     return f"Visualization saved to {output_path}"


class LogAnalysisAgent:
    """Class for managing the log analysis agent powered by LlamaIndex and OpenAI"""
    
    def __init__(self, analyzer: LogAnalyzer, visualizer: LogVisualizer, model: str = "gpt-3.5-turbo"):
        self.analyzer = analyzer
        self.visualizer = visualizer
        self.model = model
        self.agent = self._setup_agent()
    
    def _setup_agent(self) -> OpenAIAgent:
        """Set up the LlamaIndex agent with analysis tools."""
        
        # Create function tools
        tools = [
            FunctionTool.from_defaults(
                fn=self.analyzer.count_errors_by_type,
                name="count_errors_by_type",
                description="Count errors by their type/name"
            ),
            FunctionTool.from_defaults(
                fn=self.analyzer.count_errors_by_status,
                name="count_errors_by_status",
                description="Count errors by HTTP status code"
            ),
            FunctionTool.from_defaults(
                fn=self.analyzer.find_most_frequent_urls,
                name="find_most_frequent_urls",
                description="Find the most frequently occurring URLs in error logs"
            ),
            # FunctionTool.from_defaults(
            #     fn=lambda interval='1min': self.analyzer.errors_over_time(interval),
            #     name="errors_over_time",
            #     description="Analyze error frequency over time with specified interval (e.g., '1min', '5min', '1h')"
            # ),
            FunctionTool.from_defaults(
                fn=lambda request_id: self.analyzer.search_logs_by_request_id(request_id),
                name="search_logs_by_request_id",
                description="Search for logs with a specific request ID"
            ),
            FunctionTool.from_defaults(
                fn=lambda channel_id: self.analyzer.search_logs_by_channel_id(channel_id),
                name="search_logs_by_channel_id",
                description="Search for logs related to a specific channel ID"
            ),
            FunctionTool.from_defaults(
                fn=self.analyzer.generate_error_summary,
                name="generate_error_summary",
                description="Generate a summary of error patterns in the logs"
            ),
            FunctionTool.from_defaults(
                fn=self.analyzer.analyze_stack_traces,
                name="analyze_stack_traces",
                description="Extract common patterns from stack traces"
            ),
            # FunctionTool.from_defaults(
            #     fn=self.visualizer.errors_over_time,
            #     name="visualize_errors_over_time",
            #     description="Generate a visualization of errors over time"
            # ),
            # FunctionTool.from_defaults(
            #     fn=self.visualizer.error_types_pie_chart,
            #     name="visualize_error_types",
            #     description="Generate a pie chart of error types"
            # ),
            # FunctionTool.from_defaults(
            #     fn=self.visualizer.status_code_bar_chart,
            #     name="visualize_status_codes",
            #     description="Generate a bar chart of HTTP status codes"
            # )
        ]
        
        # Create OpenAI-based service context and agent
        llm = OpenAI(model=self.model)
        
        # Create the agent
        agent = OpenAIAgent.from_tools(
            tools,
            llm=llm,
            verbose=True,
            system_prompt="""
            You are a log analysis expert. Your job is to help the user understand patterns and issues in their 
            microservice logs. Use the available tools to analyze the log data and provide insights.
            
            Focus on:
            1. Identifying error patterns
            2. Finding the root causes of errors
            3. Suggesting potential fixes
            4. Highlighting unusual behavior
            
            Always show your reasoning and be specific in your recommendations.
            """
        )
        
        return agent
    
    def chat(self, question: str) -> Any:
        """Query the agent with a question about the logs."""
        return self.agent.chat(question)
    
    # def create_vector_index(self, sample_size: int = 50) -> VectorStoreIndex:
    #     """Create a vector index for more complex queries (optional feature)."""
    #     logs_df = self.analyzer.logs_df
    #     llm = OpenAI(model=self.model)
    #     service_context = ServiceContext.from_defaults(llm=llm)
        
    #     # Sample logs to create documents
    #     sample_size = min(sample_size, len(logs_df))
    #     documents = [Document(text=str(row.to_dict())) for _, row in logs_df.sample(sample_size).iterrows()]
        
    #     return VectorStoreIndex.from_documents(documents, service_context=service_context)


class LogAnalysisSystem:
    """Main class for the log analysis system"""
    
    @classmethod
    def from_csv(cls, file_path: str, model: str = "gpt-3.5-turbo") -> "LogAnalysisSystem":
        """Create a log analysis system from a CSV file."""
        # Parse logs
        raw_logs_df = LogParser.from_csv(file_path)
        structured_logs_df = LogParser.structure_logs(raw_logs_df)
        
        # Create analyzer and visualizer
        analyzer = LogAnalyzer(structured_logs_df)
        visualizer = LogVisualizer(structured_logs_df)
        
        # Create agent
        agent = LogAnalysisAgent(analyzer, visualizer, model)
        
        return cls(structured_logs_df, analyzer, visualizer, agent)
    
    def __init__(
        self, 
        logs_df: pd.DataFrame, 
        analyzer: LogAnalyzer, 
        visualizer: LogVisualizer, 
        agent: LogAnalysisAgent
    ):
        self.logs_df = logs_df
        self.analyzer = analyzer
        self.visualizer = visualizer
        self.agent = agent
    
    def run_interactive_session(self) -> None:
        """Run an interactive session with the log analysis agent."""
        print("Log Analysis Agent is ready! Type 'exit' to end the session.")
        
        while True:
            question = input("\nWhat would you like to know about your logs? ")
            if question.lower() == 'exit':
                break
            
            print("Analyzing...")
            response = self.agent.chat(question)
            print(f"Response: {response}")
    
    def run_batch_analysis(self, questions: List[str]) -> Dict[str, Any]:
        """Run batch analysis with a list of questions."""
        results = {}
        for question in questions:
            print(f"Analyzing: {question}")
            response = self.agent.chat(question)
            results[question] = response
        
        return results
    
    def generate_comprehensive_report(self, output_file: str = "log_analysis_report.md") -> str:
        """Generate a comprehensive analysis report."""
        # Get basic statistics
        summary = self.analyzer.generate_error_summary()
        
        # Generate visualizations
        # time_viz = self.visualizer.errors_over_time("error_time_series.png")
        # types_viz = self.visualizer.error_types_pie_chart("error_types_chart.png")
        # status_viz = self.visualizer.status_code_bar_chart("status_codes_chart.png")
        
        # Ask the agent for insights
        standard_questions = [
            "What are the most common error patterns in these logs?",
            "What might be causing these errors based on the data?",
            "What recommendations would you make to fix these issues?",
            "Are there any unusual patterns worth investigating further?"
        ]
        
        insights = self.run_batch_analysis(standard_questions)
        
        # Create report content
        report = f"""# Microservice Log Analysis Report

## Summary Statistics
- Total Errors: {summary['total_errors']}
- Unique Error Types: {len(summary['error_types'])}
- Status Codes: {summary['status_codes']}

## Error Patterns
{insights[standard_questions[0]]}

## Probable Causes
{insights[standard_questions[1]]}

## Recommendations
{insights[standard_questions[2]]}

## Unusual Patterns
{insights[standard_questions[3]]}

## Visualizations
- Error Frequency Over Time: error_time_series.png
- Error Types Distribution: error_types_chart.png
- HTTP Status Codes: status_codes_chart.png

## Detailed Error Breakdown
```
{json.dumps(summary, indent=2)}
```

## Stack Trace Analysis
```
{json.dumps(self.analyzer.analyze_stack_traces(), indent=2)}
```
"""
        
        # Write report to file
        with open(output_file, 'w') as f:
            f.write(report)
        
        return f"Comprehensive report generated at {output_file}"


# Example usage
if __name__ == "__main__":
    # Initialize the system
    logs_system = LogAnalysisSystem.from_csv("./csv/processed_logs.csv")
    
    # Option 1: Run interactive session
    # logs_system.run_interactive_session()
    
    # Option 2: Run batch analysis
    questions = [
        "What are the most common error types in the logs?",
        "Analyze the error patterns and suggest potential causes.",
        "Are there specific channel IDs that experience more errors?",
        # "What's the distribution of errors over time?",
        "Can you identify any potential API endpoint issues based on the logs?",
        # "Generate a comprehensive summary of all errors.",
        # "What can you tell me about the stack traces in these errors?"
    ]
    results = logs_system.run_batch_analysis(questions)
    
    # Option 3: Generate comprehensive report
    logs_system.generate_comprehensive_report()

Analyzing: What are the most common error types in the logs?
Added user message to memory: What are the most common error types in the logs?
=== Calling Function ===
Calling function: count_errors_by_type with args: {}
Got output: {'AxiosError': 26}

Analyzing: Analyze the error patterns and suggest potential causes.
Added user message to memory: Analyze the error patterns and suggest potential causes.
=== Calling Function ===
Calling function: generate_error_summary with args: {}
Got output: {'total_errors': 26, 'error_types': {'AxiosError': 26}, 'status_codes': {404: 26}, 'top_url_status_combinations': {('http://fabric.amaginow.svc.cluster.local:5100//api/v3/deliveries', 404): 26}, 'channel_id_frequencies': {'amg78787c7': 9, 'amg78787c6': 6, '': 5, 'amg78787c8': 4, 'amg78787c9': 2}}

Analyzing: Are there specific channel IDs that experience more errors?
Added user message to memory: Are there specific channel IDs that experience more errors?
Analyzing: Can you identify any potential 

TypeError: keys must be str, int, float, bool or None, not tuple