In [1]:
import pandas as pd
import os
from dotenv import load_dotenv
import logging
import paramiko
from datetime import date

# Configure basic loggig
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Define constants and file paths
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), '..'))
DATA_DIR = os.path.join(PROJECT_ROOT, 'data')
ENV_FILE_PATH = os.path.join(PROJECT_ROOT, '.env')
ANALYSIS_TRACKER_FILENAME = 'log_analysis_tracker.xlsx'
ANALYSIS_TRACKER_PATH = os.path.join(DATA_DIR, ANALYSIS_TRACKER_FILENAME)


# Column name in Excel containing remote log directory paths
LOG_PATH_COLUMN = 'remote_log_directory'

# Define SFTP Port for Docker container connection
SFTP_PORT = 2222

# Base dir for local logs
LOCAL_LOG_STORAGE_BASE = os.path.join(DATA_DIR, 'downloaded_logs')

# Keywords to search for in logs (Make these match your sample logs)
SUCCESS_KEYWORD = 'Execution Return Code: 0'
FAILURE_KEYWORD = '*** Failure'
ERROR_KEYWORD = '*** Error:'

# --- Helper Functions ---

def get_analysis_tracker(filename: str, required_col: str) -> pd.DataFrame | None:
    """Loads the analysis tracker Excel file into a pandas DataFrame."""
    if not os.path.exists(filename):
        logging.error(f"Tracker file not found: '{filename}'. Please create it.")
        return None

    logging.info(f"Loading tracker file: '{filename}'")
    try:
        df = pd.read_excel(filename, header=0)
        logging.info(f"Successfully loaded tracker with shape: {df.shape}")
        if required_col not in df.columns:
            logging.error(f"Tracker file '{filename}' is missing the required column: '{required_col}'")
            return None
        logging.info(f"Required column '{required_col}' found in tracker.")
        return df
    except Exception as e:
        logging.error(f"Failed to read tracker file '{filename}': {e}")
        return None

def get_current_date_string() -> str:
    """Returns today's date as a string in 'YYYYMMDD' format."""
    return date.today().strftime('%Y%m%d')

def get_log_download_directory(base_dir: str, date_string: str) -> str:
    """Constructs the path to the directory holding logs for a specific date."""
    return os.path.join(base_dir, f"{date_string}_logs")

In [2]:
def analyze_downloaded_logs(local_log_dir: str) -> dict:
    """
    Analyzes the downloaded log files found in the specified local directory.

    Args:
        local_log_dir (str): The local directory containing downloaded logs for a specific date.

    Returns:
        dict: A dictionary mapping log filenames to their determined status
              ('success', 'failure', 'error', 'unknown', 'parse_error', 'not_found').
              Returns an empty dict if the directory doesn't exist.
    """
    log_analysis_results = {}
    logging.info(f"Starting analysis of logs in directory: {local_log_dir}")

    if not os.path.isdir(local_log_dir):
        logging.error(f"Local log directory not found: {local_log_dir}. Cannot analyze.")
        return {'error': 'directory_not_found'} # Indicate directory missing

    local_log_files = os.listdir(local_log_dir)
    if not local_log_files:
        logging.warning(f"No log files found in local directory: {local_log_dir}")
        return {} # Return empty if no files to analyze

    logging.info(f"Found {len(local_log_files)} log files to analyze.")

    for log_filename in local_log_files:
        local_log_path = os.path.join(local_log_dir, log_filename)
        analysis_status = 'unknown' # Default status

        try:
            logging.debug(f"Analyzing file: {log_filename}")
            with open(local_log_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.readlines()
                log_found_keyword = False
                # Search from end of file backwards for efficiency
                for line in reversed(content):
                    if SUCCESS_KEYWORD in line:
                        analysis_status = 'success'
                        log_found_keyword = True
                        logging.debug(f"Found success keyword in {log_filename}")
                        break
                    elif FAILURE_KEYWORD in line:
                        analysis_status = 'failure'
                        log_found_keyword = True
                        logging.warning(f"Found failure keyword in {log_filename}: {line.strip()}")
                        break
                    elif ERROR_KEYWORD in line:
                        analysis_status = 'error'
                        log_found_keyword = True
                        logging.warning(f"Found error keyword in {log_filename}: {line.strip()}")
                        break

                # If loop completes without finding keywords, status remains 'unknown'
                if not log_found_keyword:
                     logging.info(f"No specific keywords found in {log_filename}, status set to 'unknown'.")

        except FileNotFoundError:
             logging.error(f"Log file vanished during analysis?: {local_log_path}")
             analysis_status = 'not_found' # Should not happen if listed initially
        except Exception as e:
            logging.error(f"Error reading or parsing log file {log_filename}: {e}")
            analysis_status = 'parse_error'

        log_analysis_results[log_filename] = analysis_status
        logging.info(f"Analysis result for '{log_filename}': {analysis_status}")

    logging.info("Log analysis finished.")
    return log_analysis_results

# --- Execute Analysis ---

# Determine local log directory
today_date_str = get_current_date_string()
local_log_dir_to_analyze = get_log_download_directory(LOCAL_LOG_STORAGE_BASE, today_date_str)

# Run the analysis
analysis_results = analyze_downloaded_logs(local_log_dir_to_analyze)

# Display results
print("\n--- Analysis Results ---")
if analysis_results:
    if 'error' in analysis_results and analysis_results['error'] == 'directory_not_found':
         print(f"Error: Local log directory not found at '{local_log_dir_to_analyze}'")
    else:
        # Pretty print the results
        success_count = sum(1 for status in analysis_results.values() if status == 'success')
        failure_count = sum(1 for status in analysis_results.values() if status == 'failure')
        error_count = sum(1 for status in analysis_results.values() if status == 'error')
        unknown_count = sum(1 for status in analysis_results.values() if status == 'unknown')
        other_count = len(analysis_results) - (success_count + failure_count + error_count + unknown_count)

        print(f"Analysis Summary for logs in '{local_log_dir_to_analyze}':")
        print(f"  - Success: {success_count}")
        print(f"  - Failure: {failure_count}")
        print(f"  - Error:   {error_count}")
        print(f"  - Unknown: {unknown_count}")
        if other_count > 0:
             print(f"  - Other (parse_error/not_found): {other_count}")
        print("\nIndividual File Status:")
        for filename, status in sorted(analysis_results.items()):
            print(f"  - {filename}: {status}")
else:
     print(f"No logs were analyzed from '{local_log_dir_to_analyze}'. Directory might be empty or non-existent.")


2025-05-01 16:35:22,657 - INFO - Starting analysis of logs in directory: /Users/benkaan/Desktop/projects/remote-log-analysis-automation/data/downloaded_logs/20250501_logs
2025-05-01 16:35:22,657 - INFO - Found 12 log files to analyze.
2025-05-01 16:35:22,658 - INFO - Analysis result for 'job_conversion_rate-20250501_083000.log': error
2025-05-01 16:35:22,659 - INFO - Analysis result for 'job_payment_proc-20250501_083000.log': error
2025-05-01 16:35:22,660 - INFO - Analysis result for 'job_report_monthly-20250501_083000.log': success
2025-05-01 16:35:22,661 - INFO - Analysis result for 'job_tax_calc-20250501_083000.log': success
2025-05-01 16:35:22,662 - INFO - Analysis result for 'job_email_blast-20250501_083000.log': success
2025-05-01 16:35:22,663 - INFO - Analysis result for 'job_update_crm-20250501_083000.log': success
2025-05-01 16:35:22,663 - INFO - Analysis result for 'job_web_traffic-20250501_083000.log': success
2025-05-01 16:35:22,664 - INFO - Analysis result for 'job_report_


--- Analysis Results ---
Analysis Summary for logs in '/Users/benkaan/Desktop/projects/remote-log-analysis-automation/data/downloaded_logs/20250501_logs':
  - Success: 8
  - Failure: 0
  - Error:   4
  - Unknown: 0

Individual File Status:
  - job_conversion_rate-20250501_083000.log: error
  - job_email_blast-20250501_083000.log: success
  - job_invoice_gen-20250501_083000.log: success
  - job_payment_proc-20250501_083000.log: error
  - job_report_daily-20250501_083000.log: success
  - job_report_monthly-20250501_083000.log: success
  - job_report_weekly-20250501_083000.log: error
  - job_roi_report-20250501_083000.log: success
  - job_segment_users-20250501_083000.log: error
  - job_tax_calc-20250501_083000.log: success
  - job_update_crm-20250501_083000.log: success
  - job_web_traffic-20250501_083000.log: success
