# Log Summarization and Analysis Notebook

This notebook demonstrates how to:
1. Load processed logs from Glue/S3
2. Generate human-readable summaries
3. Identify critical errors
4. Suggest potential fixes

It builds on the existing log processing infrastructure.

In [None]:
import boto3
import pandas as pd
import numpy as np
import json
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import re
from IPython.display import display, HTML
import os
import sys

# For embedding generation
from sentence_transformers import SentenceTransformer

# For vector storage
import faiss

# For clustering
from sklearn.cluster import DBSCAN

# For LLM-based summarization
import openai

# Configure AWS services
s3 = boto3.client('s3')
athena = boto3.client('athena')
glue = boto3.client('glue')

# Set your OpenAI API key
# openai.api_key = "your-api-key"  # Uncomment and set your API key

# S3 bucket and paths
LOG_BUCKET = "first-order-application-logs"
ATHENA_RESULTS = f"s3://{LOG_BUCKET}/athena-results/"
MODEL_PATH = f"s3://{LOG_BUCKET}/models/"

# Glue database and table
GLUE_DATABASE = "first-order-glue-db"
LOGS_TABLE = "fluent_bit_logs"  # This should match the table created by your Glue crawler

## 1. Load and Process Log Data

First, we'll query Athena to get the processed logs from the Glue database, then extract features and generate embeddings.

In [None]:
def run_athena_query(query, database=GLUE_DATABASE, output_location=ATHENA_RESULTS):
    """Run a query on Athena and return the results as a DataFrame"""
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': output_location,
        }
    )
    
    query_execution_id = response['QueryExecutionId']
    
    # Wait for query to complete
    state = 'RUNNING'
    while state in ['RUNNING', 'QUEUED']:
        response = athena.get_query_execution(QueryExecutionId=query_execution_id)
        state = response['QueryExecution']['Status']['State']
        if state == 'FAILED':
            raise Exception(f"Query failed: {response['QueryExecution']['Status']['StateChangeReason']}")
        elif state == 'SUCCEEDED':
            # Get the results
            results = athena.get_query_results(QueryExecutionId=query_execution_id)
            
            # Parse the results into a DataFrame
            columns = [col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
            rows = []
            for row in results['ResultSet']['Rows'][1:]:  # Skip the header row
                data = [field.get('VarCharValue', '') if 'VarCharValue' in field else None for field in row['Data']]
                rows.append(data)
            
            df = pd.DataFrame(rows, columns=columns)
            return df
    
    return None

def extract_features(logs_df):
    """Extract features from logs DataFrame"""
    # Create a copy to avoid modifying the original
    df = logs_df.copy()
    
    # Initialize new columns
    df['log_level'] = None
    df['component'] = None
    df['action'] = None
    df['status'] = None
    df['error_type'] = None
    df['error_details'] = None
    
    # Extract features from log messages
    for idx, row in df.iterrows():
        log_msg = row.get('log', '')
        
        # Extract log level
        level_match = re.search(r'\[(info|warn|error|debug)\]', log_msg, re.IGNORECASE)
        if level_match:
            df.at[idx, 'log_level'] = level_match.group(1).lower()
        
        # Extract component
        component_match = re.search(r'\[([^:]+):([^:]+):([^\]]+)\]', log_msg)
        if component_match:
            df.at[idx, 'component'] = f"{component_match.group(1)}:{component_match.group(2)}:{component_match.group(3)}"
        
        # Extract action and status
        if "Successfully" in log_msg and "uploaded" in log_msg:
            df.at[idx, 'action'] = "upload"
            df.at[idx, 'status'] = "success"
        elif "Failed to upload" in log_msg:
            df.at[idx, 'action'] = "upload"
            df.at[idx, 'status'] = "failure"
            
            # Extract error details
            error_match = re.search(r'Failed to upload.*: (\w+)', log_msg)
            if error_match:
                df.at[idx, 'error_type'] = error_match.group(1)
                df.at[idx, 'error_details'] = log_msg.split("Failed to upload")[1].strip()
        elif "Connection timeout" in log_msg:
            df.at[idx, 'action'] = "connect"
            df.at[idx, 'status'] = "failure"
            df.at[idx, 'error_type'] = "Timeout"
            
            # Extract timeout details
            timeout_match = re.search(r'timeout after (\d+)s', log_msg)
            if timeout_match:
                df.at[idx, 'error_details'] = f"Timeout after {timeout_match.group(1)} seconds"
        elif "Invalid JSON" in log_msg:
            df.at[idx, 'action'] = "parse"
            df.at[idx, 'status'] = "failure"
            df.at[idx, 'error_type'] = "InvalidFormat"
            df.at[idx, 'error_details'] = "Invalid JSON format"
        elif "Scanning log file" in log_msg:
            df.at[idx, 'action'] = "scan"
            df.at[idx, 'status'] = "info"
        elif "File rotated" in log_msg:
            df.at[idx, 'action'] = "rotate"
            df.at[idx, 'status'] = "info"
        elif "Retrying" in log_msg and "upload" in log_msg:
            df.at[idx, 'action'] = "retry"
            df.at[idx, 'status'] = "warning"
            
            # Extract retry details
            retry_match = re.search(r'attempt (\d+)/(\d+)', log_msg)
            if retry_match:
                df.at[idx, 'error_details'] = f"Attempt {retry_match.group(1)} of {retry_match.group(2)}"
        elif "Started" in log_msg:
            df.at[idx, 'action'] = "start"
            df.at[idx, 'status'] = "info"
        elif "Shutting down" in log_msg:
            df.at[idx, 'action'] = "shutdown"
            df.at[idx, 'status'] = "info"
    
    return df

# Query logs from the last 24 hours
query = f"""
SELECT *
FROM {LOGS_TABLE}
WHERE date >= TIMESTAMP '{(datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S')}'
LIMIT 1000
"""

try:
    logs_df = run_athena_query(query)
    print(f"Retrieved {len(logs_df)} logs")
except Exception as e:
    print(f"Error querying logs: {e}")
    print("Using sample data instead...")
    
    # Create sample data for demonstration
    logs_df = pd.DataFrame({
        'date': [datetime.now() - timedelta(minutes=i) for i in range(10)],
        'log': [
            "[2025/04/01 23:59:58] [ info] [output:s3:s3.0] Successfully uploaded object /path/to/object",
            "[2025/04/01 23:59:57] [ info] [input:tail:tail.0] Scanning log file /var/log/containers/app-xyz.log",
            "[2025/04/01 23:59:56] [ error] [output:s3:s3.0] Failed to upload object: AccessDenied",
            "[2025/04/01 23:59:55] [ warn] [filter:kubernetes:kubernetes.0] Missing annotation",
            "[2025/04/01 23:59:54] [ error] [output:s3:s3.0] Connection timeout after 30s",
            "[2025/04/01 23:59:53] [ info] [input:tail:tail.0] File rotated: /var/log/containers/app-xyz.log",
            "[2025/04/01 23:59:52] [ error] [parser:json:json.0] Invalid JSON format",
            "[2025/04/01 23:59:51] [ warn] [output:s3:s3.0] Retrying upload (attempt 3/5)",
            "[2025/04/01 23:59:50] [ info] [engine] Shutting down",
            "[2025/04/01 23:59:49] [ info] [engine] Started (version 1.8.15)"
        ],
        'kubernetes.pod_name': ['app-xyz-1234', 'app-xyz-1234', 'app-xyz-1234', 'app-xyz-1234', 'app-xyz-1234',
                              'app-xyz-1234', 'app-xyz-1234', 'app-xyz-1234', 'fluent-bit-qmn54', 'fluent-bit-qmn54'],
        'kubernetes.namespace_name': ['default', 'default', 'default', 'default', 'default',
                                    'default', 'default', 'default', 'logging', 'logging']
    })

# Extract features
features_df = extract_features(logs_df)
display(features_df.head())

## 2. Identify Critical Errors and Patterns

Now we'll identify critical errors and patterns in the logs.

In [None]:
def identify_critical_errors(df):
    """Identify critical errors in logs"""
    # Define critical error patterns
    critical_patterns = [
        # Access denied errors
        {'pattern': 'AccessDenied', 'severity': 'high', 'category': 'permission', 
         'description': 'S3 access denied error', 'column': 'error_type'},
        # Connection timeouts
        {'pattern': 'Timeout', 'severity': 'medium', 'category': 'network', 
         'description': 'Connection timeout', 'column': 'error_type'},
        # Invalid formats
        {'pattern': 'InvalidFormat', 'severity': 'medium', 'category': 'data', 
         'description': 'Invalid data format', 'column': 'error_type'},
        # Retry attempts
        {'pattern': 'retry', 'severity': 'low', 'category': 'operation', 
         'description': 'Operation retry', 'column': 'action'}
    ]
    
    # Find matches
    critical_errors = []
    
    for pattern in critical_patterns:
        column = pattern['column']
        matches = df[df[column] == pattern['pattern']]
        
        if len(matches) > 0:
            for _, row in matches.iterrows():
                error = {
                    'timestamp': row.get('date'),
                    'log': row.get('log'),
                    'pod_name': row.get('kubernetes.pod_name'),
                    'namespace': row.get('kubernetes.namespace_name'),
                    'severity': pattern['severity'],
                    'category': pattern['category'],
                    'description': pattern['description'],
                    'details': row.get('error_details'),
                    'count': 1
                }
                critical_errors.append(error)
    
    # Group similar errors
    grouped_errors = {}
    for error in critical_errors:
        key = f"{error['severity']}_{error['category']}_{error['description']}"
        if key in grouped_errors:
            grouped_errors[key]['count'] += 1
            # Keep the most recent occurrence
            if error['timestamp'] > grouped_errors[key]['timestamp']:
                grouped_errors[key]['timestamp'] = error['timestamp']
                grouped_errors[key]['log'] = error['log']
                grouped_errors[key]['details'] = error['details']
        else:
            grouped_errors[key] = error
    
    return list(grouped_errors.values())

def identify_patterns(df):
    """Identify patterns in logs"""
    patterns = {}
    
    # Group by component and action
    for (component, action), group_df in df.groupby(['component', 'action']):
        if pd.isna(component) or pd.isna(action) or len(group_df) < 2:
            continue
            
        # Get the most common status
        status = group_df['status'].mode().iloc[0] if not group_df['status'].isna().all() else 'unknown'
        
        # Create pattern entry
        pattern_key = f"{component}_{action}"
        patterns[pattern_key] = {
            'component': component,
            'action': action,
            'status': status,
            'count': len(group_df),
            'sample_logs': group_df.head(3)['log'].tolist()
        }
    
    return patterns

# Identify critical errors
critical_errors = identify_critical_errors(features_df)
print(f"Identified {len(critical_errors)} critical errors")
for error in critical_errors:
    print(f"- {error['severity'].upper()} {error['category']}: {error['description']} (count: {error['count']})")
    print(f"  Example: {error['log']}")
    print()

# Identify patterns
patterns = identify_patterns(features_df)
print(f"\nIdentified {len(patterns)} patterns")
for key, pattern in list(patterns.items())[:3]:  # Show first 3 patterns
    print(f"- {pattern['component']} {pattern['action']} ({pattern['count']} logs)")
    print(f"  Example: {pattern['sample_logs'][0] if pattern['sample_logs'] else 'No sample'}")
    print()

## 3. Generate Human-Readable Summaries and Suggest Fixes

Now we'll generate human-readable summaries and suggest fixes for the critical errors.

In [None]:
# Example fixes for common errors
common_fixes = {
    'AccessDenied': [
        "Check IAM permissions for the FluentBit service account",
        "Verify S3 bucket policy allows write access from the cluster's IP range",
        "Ensure KMS key permissions are properly configured if using SSE-KMS"
    ],
    'Timeout': [
        "Check network connectivity between the cluster and S3 endpoint",
        "Verify VPC endpoints are properly configured",
        "Consider increasing the timeout setting in FluentBit configuration"
    ],
    'InvalidFormat': [
        "Review the log format configuration in FluentBit",
        "Check for malformed JSON in application logs",
        "Add a parser filter to handle the specific log format"
    ]
}

# Display suggested fixes for identified errors
print("Suggested Fixes for Critical Errors:\n")
for error in critical_errors:
    if error['severity'] == 'low':
        continue
        
    print(f"For {error['severity'].upper()} {error['category']}: {error['description']}")
    
    # Get fixes based on error type
    if error['error_type'] in common_fixes:
        for i, fix in enumerate(common_fixes[error['error_type']], 1):
            print(f"  {i}. {fix}")
    else:
        print("  No specific fixes available for this error type.")
    print()

## 4. Next Steps for Log Processing Pipeline

Based on the analysis, here are the recommended next steps to enhance the log processing pipeline:

In [None]:
# Display next steps for the log processing pipeline
next_steps = [
    "1. **Deploy the Glue Job**: Configure and deploy the log_processor.py as a scheduled Glue job",
    "2. **Set Up Monitoring**: Create CloudWatch alarms for critical error patterns",
    "3. **Implement Automated Remediation**: Use AWS Lambda to automatically fix common issues",
    "4. **Create Dashboard**: Build a QuickSight dashboard for log insights",
    "5. **Integrate with ChatOps**: Send summaries to Slack/Teams channels"
]

print("# Recommended Next Steps for Log Processing Pipeline\n")
for step in next_steps:
    print(step)
    print()

print("## Implementation Plan\n")
print("1. **Short-term (1-2 weeks)**")
print("   - Configure and deploy the Glue job with daily schedule")
print("   - Set up basic CloudWatch alarms for critical errors")
print()
print("2. **Medium-term (2-4 weeks)**")
print("   - Develop QuickSight dashboard for log insights")
print("   - Implement ChatOps integration for daily summaries")
print()
print("3. **Long-term (1-2 months)**")
print("   - Implement automated remediation with AWS Lambda")
print("   - Enhance the log processor with more advanced ML capabilities")