# **Log Analyzer** ðŸ“Š

This Python script processes web server log files to extract and analyze key information, such as request counts, frequently accessed endpoints, and suspicious activity (e.g., failed login attempts). It uses a more structured approach, leveraging data classes and advanced error handling.

---

##### Objective

The goal of this script is to:
1. Count Requests per IP Address
2. Identify the Most Frequently Accessed Endpoint
3. Detect Suspicious Activity (e.g., failed login attempts)

---

##### Features

- Count Requests per IP: Tracks how many requests each IP address makes.
- Most Accessed Endpoint: Identifies which endpoint is accessed the most.
- Suspicious Activity Detection: Flags IP addresses that have more than a specified threshold of failed login attempts (HTTP status `401` or "Invalid credentials").
- CSV Export: Results are saved into a CSV file (`log_analysis_results.csv`).
- Logging: Detailed logging of all actions and any issues encountered during the analysis.

---

##### Requirements

- Python 3.8 or later.
- Libraries: Python standard library (`re`, `csv`, `sys`, `logging`, `pathlib`, `collections`, `typing`, `dataclasses`).

---

##### Usage

###### Prepare Your Log File
- Download or create a sample log file (`sample.log`). A sample log is included with this project.

###### Run the Script
To analyze the log file, run the script from the terminal:

```bash
python3 log_analysis_script.py
```

###### Results
The script will display the following analysis results in the terminal:
- IP Request Counts: A count of requests per IP address.
- Most Accessed Endpoint: The endpoint with the highest number of accesses.
- Suspicious Activities: A list of IP addresses with suspicious failed login attempts.

Additionally, the results will be exported to `log_analysis_results.csv`, containing:
- IP Request Counts
- Most Accessed Endpoint
- Suspicious Activity

---

##### Code Overview

###### LogEntry Class

A data class representing a parsed log entry:
```python
@dataclass
class LogEntry:
    ip_address: str
    method: str
    endpoint: str
    status_code: str
    message: Optional[str] = None
```

###### LogAnalyzer Class

The core class for processing log files and generating reports:
- parse_log_file: Reads and parses the log file line by line.
- _process_log_entry: Tracks IP request counts, endpoint access counts, and failed login attempts.
- get_most_accessed_endpoint: Returns the most accessed endpoint and its access count.
- get_suspicious_activities: Flags IP addresses with suspicious login activity based on failed login attempts.
- display_results: Displays the analysis results in the terminal.
- export_to_csv: Exports the results to a CSV file.

###### Main Execution Function

The `main()` function initializes the `LogAnalyzer`, processes the log file, and displays the results:

```python
def main():
    try:
        log_file_path = Path('sample.log')
        failed_login_threshold = 10

        analyzer = LogAnalyzer(
            log_file_path,
            failed_login_threshold,
            log_level=logging.INFO
        )

        analyzer.parse_log_file()
        analyzer.display_results()
        analyzer.export_to_csv()

    except Exception as e:
        logging.critical(f"Unhandled exception: {e}")
        sys.exit(1)
```

---

##### Log Format

The script expects the log file to follow a common NGINX or Apache log format:

```
<IP Address> - - [<Date>] "<HTTP Method> <Endpoint> HTTP/1.1" <Status Code> <Response Size> "<Message>"
```

Sample log entries:
```
192.168.1.1 - - [03/Dec/2024:10:12:34 +0000] "GET /home HTTP/1.1" 200 512
203.0.113.5 - - [03/Dec/2024:10:12:35 +0000] "POST /login HTTP/1.1" 401 128 "Invalid credentials"
10.0.0.2 - - [03/Dec/2024:10:12:36 +0000] "GET /about HTTP/1.1" 200 256
```

---

##### CSV Output Format

The script generates a CSV file with the following structure:

###### IP Request Counts
| IP Address       | Request Count |
|------------------|---------------|
| 192.168.1.1      | 234           |
| 203.0.113.5      | 187           |

###### Most Accessed Endpoint
| Endpoint     | Access Count |
|--------------|--------------|
| /home        | 403          |

###### Suspicious Activity
| IP Address    | Failed Login Count |
|---------------|--------------------|
| 192.168.1.100 | 56                 |
| 203.0.113.34  | 12                 |

---

##### Logging

Logging is configured to log messages both to the console and to a file (`log_analysis.log`):
- INFO: Logs regular operations.
- WARNING: Logs lines that couldn't be parsed.
- ERROR: Logs errors when processing individual log entries.
- CRITICAL: Logs critical errors, such as issues opening the log file or unhandled exceptions.

---

Submitted by: Anush Dubey (anushdubey881@gmail.com)

# Python Script

In [5]:
import re
import csv
import sys
import logging
from pathlib import Path
from typing import List, Dict, Tuple, Optional
from collections import Counter
from dataclasses import dataclass, asdict

@dataclass
class LogEntry:
    """
    Structured data class to represent a parsed log entry.
    Allows for more robust and type-safe log parsing.
    """
    ip_address: str
    method: str
    endpoint: str
    status_code: str
    message: Optional[str] = None

class LogAnalyzer:
    def __init__(
        self,
        log_file_path: Path,
        failed_login_threshold: int = 10,
        log_level: int = logging.INFO
    ):
        """
        Initialize the Log Analyzer with advanced configuration.

        :param log_file_path: Path to the log file
        :param failed_login_threshold: Threshold for suspicious login attempts
        :param log_level: Logging verbosity level
        """
        # Configure logging
        logging.basicConfig(
            level=log_level,
            format='%(asctime)s - %(levelname)s: %(message)s',
            handlers=[
                logging.StreamHandler(sys.stderr),
                logging.FileHandler('log_analysis.log', mode='w')
            ]
        )
        self.logger = logging.getLogger(__name__)

        # Validate input file
        self.log_file_path = Path(log_file_path)
        if not self.log_file_path.is_file():
            self.logger.error(f"Log file not found: {self.log_file_path}")
            raise FileNotFoundError(f"Log file not found: {self.log_file_path}")

        # Configuration parameters
        self.failed_login_threshold = failed_login_threshold

        # Analysis storage
        self.parsed_entries: List[LogEntry] = []
        self.ip_request_counts: Counter = Counter()
        self.endpoint_access_counts: Counter = Counter()
        self.failed_login_attempts: Counter = Counter()

    def parse_log_file(self) -> None:
        """
        Advanced log file parsing with comprehensive error handling.
        """
        log_pattern = re.compile(
            r'^(\d+\.\d+\.\d+\.\d+).*"(\w+)\s+([^\s]+)\s+[^"]*"\s+(\d+)\s*(.*)$'
        )

        try:
            with open(self.log_file_path, 'r') as log_file:
                for line_num, line in enumerate(log_file, 1):
                    try:
                        match = log_pattern.match(line.strip())
                        if match:
                            groups = match.groups()
                            log_entry = LogEntry(
                                ip_address=groups[0],
                                method=groups[1],
                                endpoint=groups[2],
                                status_code=groups[3],
                                message=groups[4] or None
                            )
                            self._process_log_entry(log_entry)
                        else:
                            self.logger.warning(f"Unparseable log line {line_num}: {line.strip()}")
                    except Exception as entry_error:
                        self.logger.error(f"Error processing line {line_num}: {entry_error}")

        except IOError as file_error:
            self.logger.critical(f"File reading error: {file_error}")
            raise

    def _process_log_entry(self, entry: LogEntry) -> None:
        """
        Process individual log entries and track metrics.

        :param entry: Parsed log entry
        """
        # Track IP requests
        self.ip_request_counts[entry.ip_address] += 1

        # Track endpoint access
        self.endpoint_access_counts[entry.endpoint] += 1

        # Detect suspicious login attempts
        if (entry.status_code == '401' or
            (entry.message and 'Invalid credentials' in entry.message)):
            self.failed_login_attempts[entry.ip_address] += 1

    def get_most_accessed_endpoint(self) -> Tuple[str, int]:
        """
        Find the most frequently accessed endpoint.

        :return: Tuple of (endpoint, access_count)
        """
        return self.endpoint_access_counts.most_common(1)[0] if self.endpoint_access_counts else ("N/A", 0)

    def get_suspicious_activities(self) -> List[Tuple[str, int]]:
        """
        Identify IP addresses with suspicious login activity.

        :return: List of (IP, failed_login_count) tuples exceeding threshold
        """
        return [
            (ip, count)
            for ip, count in self.failed_login_attempts.items()
            if count > self.failed_login_threshold
        ]

    def display_results(self) -> None:
        """
        Display comprehensive analysis results with formatting.
        """
        print("\n" + "=" * 50)
        print("VRV SECURITY - LOG ANALYSIS REPORT")
        print("=" * 50)

        # IP Request Counts
        print("\n--- IP Request Counts ---")
        for ip, count in self.ip_request_counts.most_common():
            print(f"{ip:<20} {count:>5} requests")

        # Most Accessed Endpoint
        endpoint, access_count = self.get_most_accessed_endpoint()
        print(f"\n--- Most Accessed Endpoint ---")
        print(f"{endpoint} (Accessed {access_count} times)")

        # Suspicious Activities
        suspicious_ips = self.get_suspicious_activities()
        print("\n--- Suspicious Activities ---")
        if suspicious_ips:
            for ip, count in suspicious_ips:
                print(f"{ip:<20} {count:>3} failed login attempts")
        else:
            print("No suspicious activities detected.")

    def export_to_csv(
        self,
        output_file: Path = Path('log_analysis_results.csv')
    ) -> None:
        """
        Export analysis results to a structured CSV file.

        :param output_file: Path to the output CSV file
        """
        try:
            with open(output_file, 'w', newline='') as csvfile:
                csv_writer = csv.writer(csvfile)

                # IP Request Counts
                csv_writer.writerow(["IP Request Counts"])
                csv_writer.writerow(["IP Address", "Request Count"])
                for ip, count in self.ip_request_counts.most_common():
                    csv_writer.writerow([ip, count])

                # Most Accessed Endpoint
                csv_writer.writerow([])
                csv_writer.writerow(["Most Accessed Endpoint"])
                csv_writer.writerow(["Endpoint", "Access Count"])
                endpoint, access_count = self.get_most_accessed_endpoint()
                csv_writer.writerow([endpoint, access_count])

                # Suspicious Activities
                csv_writer.writerow([])
                csv_writer.writerow(["Suspicious Activities"])
                csv_writer.writerow(["IP Address", "Failed Login Count"])
                for ip, count in self.get_suspicious_activities():
                    csv_writer.writerow([ip, count])

            self.logger.info(f"Results exported to {output_file}")

        except IOError as e:
            self.logger.error(f"CSV export failed: {e}")
            raise

def main():
    """
    Main execution function with robust error handling.
    """
    try:
        # Configurable parameters
        log_file_path = Path('sample.log')
        failed_login_threshold = 10

        # Initialize and run log analyzer
        analyzer = LogAnalyzer(
            log_file_path,
            failed_login_threshold,
            log_level=logging.INFO
        )

        # Process log file
        analyzer.parse_log_file()

        # Display results
        analyzer.display_results()

        # Export to CSV
        analyzer.export_to_csv()

    except Exception as e:
        logging.critical(f"Unhandled exception: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()


VRV SECURITY - LOG ANALYSIS REPORT

--- IP Request Counts ---
203.0.113.5              8 requests
198.51.100.23            8 requests
192.168.1.1              7 requests
10.0.0.2                 6 requests
192.168.1.100            5 requests

--- Most Accessed Endpoint ---
/login (Accessed 13 times)

--- Suspicious Activities ---
No suspicious activities detected.
