In [12]:
import re
from typing import List, Optional, Pattern


class LogRecord:
    
    def __init__(self, raw: str, timestamp: str, level: str, source: str, message: str):
        self.raw = raw
        self.timestamp = timestamp
        self.level = level
        self.source = source
        self.message = message
    
    def __str__(self) -> str:
        return f"[{self.timestamp}] {self.level} [{self.source}] {self.message}"
    
    def is_error(self) -> bool:
        return self.level == "ERROR"
    
    def is_warning(self) -> bool:
        return self.level == "WARNING"


class LogParser:
    
    DEFAULT_PATTERN = re.compile(
        r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})'
        r'\s+(?P<level>\w+)'
        r'\s+\[(?P<source>\w+)\]'
        r'\s+(?P<message>.*)'
    )
    
    def __init__(self, pattern: Optional[Pattern] = None):
        self.pattern = pattern or self.DEFAULT_PATTERN
    
    def parse_line(self, line: str) -> Optional[LogRecord]:
        line = line.strip()
        if not line:
            return None
            
        match = self.pattern.match(line)
        if not match:
            return None
            
        groups = match.groupdict()
        return LogRecord(
            raw=line,
            timestamp=groups.get('timestamp', ''),
            level=groups.get('level', ''),
            source=groups.get('source', ''),
            message=groups.get('message', '')
        )


class LogAnalyzer:
    
    def __init__(self, file_path: str, parser: Optional[LogParser] = None):
        self.file_path = file_path
        self.parser = parser or LogParser()
        self.records: List[LogRecord] = []
    
    def load(self) -> None:
        self.records = []
        with open(self.file_path, 'r', encoding='utf-8') as file:
            for line in file:
                record = self.parser.parse_line(line)
                if record:
                    self.records.append(record)
    
    def filter_by_level(self, level: str) -> List[LogRecord]:
        return [record for record in self.records if record.level == level]
    
    def filter_by_source(self, source: str) -> List[LogRecord]:
        return [record for record in self.records if record.source == source]
    
    def search_in_message(self, pattern: str) -> List[LogRecord]:
        regex = re.compile(pattern)
        return [record for record in self.records if regex.search(record.message)]
    
    def get_stats_by_level(self) -> dict[str, int]:
        stats = {}
        for record in self.records:
            stats[record.level] = stats.get(record.level, 0) + 1
        return stats


# Пример использования библиотеки
if __name__ == "__main__":
    # Создаем тестовый лог-файл
    logs = """2025-12-01 12:45:32,123 INFO  [auth] User "alex" logged in from 192.168.0.10
2025-12-01 12:46:01,987 ERROR [db] Connection timeout for query "SELECT * FROM users"
2025-12-01 12:46:15,050 WARNING [cache] Cache miss for key "profile_123"""
    
    # Сохраняем тестовые данные в файл
    with open("test.log", "w", encoding="utf-8") as f:
        f.write(logs)
    
    print("=== Пример 1: Базовое использование ===")
    analyzer = LogAnalyzer("test.log")
    analyzer.load()
    
    print(f"Всего записей: {len(analyzer.records)}")
    for record in analyzer.records:
        print(f"  {record}")
    print()
    
    print("=== Пример 2: Фильтрация по уровню ERROR ===")
    errors = analyzer.filter_by_level("ERROR")
    print(f"Найдено ошибок: {len(errors)}")
    for error in errors:
        print(f"  {error}")
    print()
    
    
    
    print("=== Пример 3: Поиск по сообщению (содержит 'logged') ===")
    logged_logs = analyzer.search_in_message(r"logged")
    print(f"Найдено записей с 'logged': {len(logged_logs)}")
    for log in logged_logs:
        print(f"  {log}")
    print()
    


    print("=== Пример 4: Использование методов is_error() и is_warning() ===")
    for record in analyzer.records:
        if record.is_error():
            print(f"  ОШИБКА: {record.message}")
        elif record.is_warning():
            print(f"  ПРЕДУПРЕЖДЕНИЕ: {record.message}")
    
    # Очистка тестового файла
    import os
    os.remove("test.log")
  




=== Пример 1: Базовое использование ===
Всего записей: 3
  [2025-12-01 12:45:32,123] INFO [auth] User "alex" logged in from 192.168.0.10
  [2025-12-01 12:46:01,987] ERROR [db] Connection timeout for query "SELECT * FROM users"

=== Пример 2: Фильтрация по уровню ERROR ===
Найдено ошибок: 1
  [2025-12-01 12:46:01,987] ERROR [db] Connection timeout for query "SELECT * FROM users"

=== Пример 3: Поиск по сообщению (содержит 'logged') ===
Найдено записей с 'logged': 1
  [2025-12-01 12:45:32,123] INFO [auth] User "alex" logged in from 192.168.0.10

  ОШИБКА: Connection timeout for query "SELECT * FROM users"
  ПРЕДУПРЕЖДЕНИЕ: Cache miss for key "profile_123
