In [1]:
import os
import sys

# Add the project root directory to Python path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))

if project_root not in sys.path:
    sys.path.append(project_root)
print(f"Added {project_root} to Python path")

Added c:\Machine Learning\log-analyzer to Python path


In [2]:
from app.core.rule_converter import RuleConverter

converter = RuleConverter()

# Provide sample logs from YOUR application
sample_logs = """
2025-10-05 14:47:03 | INFO     | Starting recording for game abc123
2025-10-05 14:47:15 | INFO     | Recording stopped for game abc123
"""

rules = converter.invoke(
    file_path="C:\\web development projects\\FOG\\laser-tag\\fog-laser-tag\\src\\moments\\repomix-output.xml",
    sample_logs=sample_logs  # This helps LLM understand your format
)

# Use rules
# from log_chunker import LogChunker
# chunker = LogChunker(rules)
# chunks = chunker.chunk_file('your_app.log')

  SYSTEM_PROMPT = """


In [6]:
rules

[Rule(rule_name='General Error', priority=1, type='Multi-line Error', start_pattern='^.*(ERROR|Exception|Traceback).*', end_pattern='^(\\d{4}-\\d{2}-\\d{2}.*|$)'),
 Rule(rule_name='Application Update Process', priority=2, type='Transaction', start_pattern='^.*Updating\\s+app\\s+to\\s+version\\s+([0-9.]+)\\s+from\\s+pre-signed\\s+URL', end_pattern='^.*Version\\s+([0-9.]+)\\s+is\\s+ready\\s+for\\s+the\\s+next\\s+launch'),
 Rule(rule_name='Application Launch Process', priority=2, type='Transaction', start_pattern='^.*Attempting\\s+to\\s+launch:\\s+.*app\\.exe', end_pattern='^.*Successfully\\s+launched\\s+version\\s+([0-9.]+)\\.'),
 Rule(rule_name='Recording Session', priority=2, type='Transaction', start_pattern='^.*Starting\\s+recording\\s+for\\s+game\\s+([a-zA-Z0-9-]+)', end_pattern='^.*Recording\\s+stopped\\s+successfully\\s+for\\s+game\\s+([a-zA-Z0-9-]+)'),
 Rule(rule_name='Cloudflare Token Generation', priority=3, type='Transaction', start_pattern='^.*Creating\\s+token\\s+for\\s+game

In [4]:
from app.core.rule_converter import RuleConverter
import re

sample_logs = """
2025-10-05 14:47:03 | INFO | app.api.recording:start_recording:32 - Starting recording for game 68e236ef72613cc4b4be0905 of type soloDeathMatch
2025-10-05 14:47:04 | ERROR | app.core.camera_manager:can_record:20 - Camera 3 could not be opened.
2025-10-05 14:52:54 | INFO | app.api.recording:stop_recording:65 - Recording stopped successfully for game 68e236ef72613cc4b4be0905
"""

# Test the first few
test_line = "2025-10-05 21:02:36 | INFO | app.core.analyzer_manager:send_frame:30 - Adding clip"
for rule in rules[:3]:
    match = re.search(rule.start_pattern, test_line)
    print(f"{rule.rule_name}: {'✓ MATCH' if match else '✗ no match'}")

General Error: ✗ no match
Application Update Process: ✗ no match
Application Launch Process: ✗ no match


In [7]:
"""
Log Chunker Module - Groups log lines into meaningful chunks based on rules
"""
import re
from typing import List, Optional, Dict, Tuple
from dataclasses import dataclass, field
from enum import Enum
from app.models import Rule


class ChunkStatus(Enum):
    """Status of a chunk"""
    COMPLETE = "complete"
    INCOMPLETE = "incomplete"
    UNMATCHED = "unmatched"


@dataclass
class Chunk:
    """Represents a grouped set of log lines"""
    rule_name: str
    rule_type: str  # 'Transaction' or 'Multi-line Error'
    lines: List[str] = field(default_factory=list)
    status: ChunkStatus = ChunkStatus.INCOMPLETE
    start_line_num: int = 0
    end_line_num: Optional[int] = None
    extracted_ids: Dict[str, str] = field(default_factory=dict)  # e.g., {'game_id': 'ABC123'}
    metadata: Dict = field(default_factory=dict)
    
    def add_line(self, line: str, line_num: int):
        """Add a line to this chunk"""
        self.lines.append(line)
        if not self.start_line_num:
            self.start_line_num = line_num
        self.end_line_num = line_num
    
    def get_text(self) -> str:
        """Get all lines as a single text"""
        return '\n'.join(self.lines)
    
    def mark_complete(self):
        """Mark this chunk as complete"""
        self.status = ChunkStatus.COMPLETE
    
    def to_dict(self) -> dict:
        """Convert chunk to dictionary for serialization"""
        return {
            'rule_name': self.rule_name,
            'rule_type': self.rule_type,
            'status': self.status.value,
            'start_line_num': self.start_line_num,
            'end_line_num': self.end_line_num,
            'line_count': len(self.lines),
            'text': self.get_text(),
            'extracted_ids': self.extracted_ids,
            'metadata': self.metadata
        }


class LogChunker:
    """
    Chunks log files based on provided rules.
    Handles both transactions (with unique IDs) and multi-line errors.
    """
    
    def __init__(
        self, 
        rules: List[Rule],
        log_line_pattern: str = r'^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+\|',
        max_incomplete_buffer: int = 10000,
        force_close_after_lines: int = 1000,
        debug: bool = False
    ):
        """
        Initialize the log chunker.
        
        Args:
            rules: List of Rule objects to use for chunking
            log_line_pattern: Regex pattern to identify start of a new log entry
                Default matches: "YYYY-MM-DD HH:MM:SS |" with flexible whitespace
            max_incomplete_buffer: Maximum number of incomplete chunks to track
            force_close_after_lines: Force close incomplete chunks after this many lines
            debug: Enable debug output
        """
        self.rules = sorted(rules, key=lambda r: r.priority)  # Sort by priority
        self.log_line_pattern = re.compile(log_line_pattern)
        self.max_incomplete_buffer = max_incomplete_buffer
        self.force_close_after_lines = force_close_after_lines
        self.debug = debug
        
        # Compile all regex patterns
        for rule in self.rules:
            try:
                rule._start_regex = re.compile(rule.start_pattern)
                rule._end_regex = re.compile(rule.end_pattern)
            except re.error as e:
                print(f"⚠️  Skipping rule '{rule.rule_name}' - invalid pattern: {e}")
                self.rules.remove(rule)
    
    def chunk_file(self, file_path: str) -> List[Chunk]:
        """
        Chunk a log file into meaningful groups.
        
        Args:
            file_path: Path to the log file
            
        Returns:
            List of Chunk objects
        """
        with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
            return self.chunk_lines(f)
    
    def chunk_lines(self, lines) -> List[Chunk]:
        """
        Chunk an iterable of log lines.
        
        Args:
            lines: Iterable of log lines (file object or list)
            
        Returns:
            List of Chunk objects
        """
        completed_chunks: List[Chunk] = []
        active_transactions: Dict[str, Chunk] = {}  # key: rule_name + extracted_ids
        active_multiline: Optional[Chunk] = None
        line_num = 0
        
        for line in lines:
            line_num += 1
            line = line.rstrip('\n')
            
            if self.debug:
                print(f"\n--- Line {line_num}: {line[:80]}...")
                print(f"    Active transactions: {len(active_transactions)}")
                print(f"    Active multiline: {active_multiline is not None}")
            
            # Check if this line is the start of a new log entry
            is_new_entry = bool(self.log_line_pattern.match(line))
            
            # Try to match this line as an END pattern for active transactions
            matched_as_end = False
            if is_new_entry and active_transactions:
                for rule in self.rules:
                    if rule.type != 'Transaction':
                        continue
                    
                    end_match = rule._end_regex.search(line)
                    if end_match:
                        extracted_ids = self._extract_ids(end_match)
                        
                        # Find matching transaction
                        for key, chunk in list(active_transactions.items()):
                            if chunk.rule_name == rule.rule_name and self._ids_match(chunk.extracted_ids, extracted_ids):
                                chunk.add_line(line, line_num)
                                chunk.mark_complete()
                                completed_chunks.append(chunk)
                                del active_transactions[key]
                                matched_as_end = True
                                
                                if self.debug:
                                    print(f"    ✓ Matched END for transaction: {rule.rule_name}")
                                break
                    
                    if matched_as_end:
                        break
            
            if matched_as_end:
                continue
            
            # Try to match this line as a START pattern
            matched_as_start = False
            if is_new_entry:
                for rule in self.rules:
                    start_match = rule._start_regex.search(line)
                    if start_match:
                        if self.debug:
                            print(f"    ✓ Matched START: {rule.rule_name} ({rule.type})")
                        
                        if rule.type == 'Transaction':
                            # Extract IDs and create new transaction
                            extracted_ids = self._extract_ids(start_match)
                            chunk_key = self._make_chunk_key(rule.rule_name, extracted_ids)
                            
                            chunk = Chunk(
                                rule_name=rule.rule_name,
                                rule_type=rule.type,
                                extracted_ids=extracted_ids
                            )
                            chunk.add_line(line, line_num)
                            active_transactions[chunk_key] = chunk
                            matched_as_start = True
                            
                        elif rule.type == 'Multi-line Error':
                            # Close previous multiline error if exists
                            if active_multiline:
                                active_multiline.mark_complete()
                                completed_chunks.append(active_multiline)
                            
                            # Start new multiline error
                            active_multiline = Chunk(
                                rule_name=rule.rule_name,
                                rule_type=rule.type
                            )
                            active_multiline.add_line(line, line_num)
                            matched_as_start = True
                        
                        break  # First matching rule wins (priority-based)
                    elif self.debug and line_num <= 5:
                        # Show why first few lines don't match (for debugging)
                        print(f"    ✗ No match: {rule.rule_name}")
            
            if matched_as_start:
                continue
            
            # If not matched as start/end, check if it's a continuation line for multiline error
            if active_multiline and not is_new_entry:
                active_multiline.add_line(line, line_num)
                if self.debug:
                    print(f"    → Added to active multiline error")
                continue
            
            # Check if multiline error should end (new log entry that doesn't start a new pattern)
            if active_multiline and is_new_entry:
                # Check if this line matches the end pattern
                for rule in self.rules:
                    if rule.type == 'Multi-line Error' and active_multiline.rule_name == rule.rule_name:
                        if rule._end_regex.match(line):
                            active_multiline.mark_complete()
                            completed_chunks.append(active_multiline)
                            active_multiline = None
                            if self.debug:
                                print(f"    ✓ Closed multiline error (end pattern matched)")
                            break
            
            # If we have a new entry that didn't match any START pattern, it's unmatched
            if is_new_entry and not matched_as_start:
                unmatched_chunk = Chunk(
                    rule_name="Unmatched",
                    rule_type="Single Line",
                    status=ChunkStatus.UNMATCHED
                )
                unmatched_chunk.add_line(line, line_num)
                completed_chunks.append(unmatched_chunk)
                
                if self.debug:
                    print(f"    → Created unmatched chunk")
            
            # Force close old incomplete transactions
            self._force_close_old_chunks(active_transactions, completed_chunks, line_num)
            
            # Limit buffer size
            if len(active_transactions) > self.max_incomplete_buffer:
                self._flush_oldest_chunks(active_transactions, completed_chunks)
        
        # Close remaining incomplete chunks
        if active_multiline:
            completed_chunks.append(active_multiline)
        
        for chunk in active_transactions.values():
            completed_chunks.append(chunk)
        
        return completed_chunks
    
    def _make_chunk_key(self, rule_name: str, extracted_ids: Dict[str, str]) -> str:
        """Create unique key for a transaction chunk"""
        id_str = '_'.join(f"{k}={v}" for k, v in sorted(extracted_ids.items()))
        return f"{rule_name}::{id_str}" if id_str else f"{rule_name}::no_id"
    
    def _extract_ids(self, match) -> Dict[str, str]:
        """Extract captured groups from regex match as IDs"""
        ids = {}
        for i, group in enumerate(match.groups()):
            if group:
                ids[f'id_{i}'] = group
        return ids
    
    def _ids_match(self, ids1: Dict[str, str], ids2: Dict[str, str]) -> bool:
        """Check if extracted IDs match (for pairing start/end of transactions)"""
        if not ids1 or not ids2:
            return True  # If no IDs extracted, consider it a match
        
        # Check if any common keys have matching values
        common_keys = set(ids1.keys()) & set(ids2.keys())
        if not common_keys:
            return True
        
        return any(ids1[key] == ids2[key] for key in common_keys)
    
    def _force_close_old_chunks(
        self, 
        active_transactions: Dict[str, Chunk], 
        completed_chunks: List[Chunk],
        current_line: int
    ):
        """Force close chunks that have been open too long"""
        to_remove = []
        for key, chunk in active_transactions.items():
            if current_line - chunk.start_line_num > self.force_close_after_lines:
                chunk.metadata['forced_close'] = True
                completed_chunks.append(chunk)
                to_remove.append(key)
        
        for key in to_remove:
            del active_transactions[key]
    
    def _flush_oldest_chunks(
        self,
        active_transactions: Dict[str, Chunk],
        completed_chunks: List[Chunk]
    ):
        """Flush oldest chunks when buffer is full"""
        # Sort by start line and close oldest 10%
        sorted_chunks = sorted(active_transactions.items(), key=lambda x: x[1].start_line_num)
        num_to_flush = max(1, len(sorted_chunks) // 10)
        
        for i in range(num_to_flush):
            key, chunk = sorted_chunks[i]
            chunk.metadata['buffer_flush'] = True
            completed_chunks.append(chunk)
            del active_transactions[key]


In [8]:
rules

[Rule(rule_name='General Error', priority=1, type='Multi-line Error', start_pattern='^.*(ERROR|Exception|Traceback).*', end_pattern='^(\\d{4}-\\d{2}-\\d{2}.*|$)'),
 Rule(rule_name='Application Update Process', priority=2, type='Transaction', start_pattern='^.*Updating\\s+app\\s+to\\s+version\\s+([0-9.]+)\\s+from\\s+pre-signed\\s+URL', end_pattern='^.*Version\\s+([0-9.]+)\\s+is\\s+ready\\s+for\\s+the\\s+next\\s+launch'),
 Rule(rule_name='Application Launch Process', priority=2, type='Transaction', start_pattern='^.*Attempting\\s+to\\s+launch:\\s+.*app\\.exe', end_pattern='^.*Successfully\\s+launched\\s+version\\s+([0-9.]+)\\.'),
 Rule(rule_name='Recording Session', priority=2, type='Transaction', start_pattern='^.*Starting\\s+recording\\s+for\\s+game\\s+([a-zA-Z0-9-]+)', end_pattern='^.*Recording\\s+stopped\\s+successfully\\s+for\\s+game\\s+([a-zA-Z0-9-]+)'),
 Rule(rule_name='Cloudflare Token Generation', priority=3, type='Transaction', start_pattern='^.*Creating\\s+token\\s+for\\s+game

In [9]:

chunker = LogChunker(rules, debug=True)
chunks = chunker.chunk_file('C:\\Machine Learning\\log-analyzer\\data\\python.log')


--- Line 1: 2025-10-05 11:06:21 | INFO     | app.core.configurations:ensure_config_directory...
    Active transactions: 0
    Active multiline: False
    ✗ No match: General Error
    ✗ No match: Application Update Process
    ✗ No match: Application Launch Process
    ✗ No match: Recording Session
    ✗ No match: Cloudflare Token Generation
    ✗ No match: Download Session Tracking
    ✗ No match: File Encryption
    ✗ No match: File Decryption
    ✗ No match: Template Update Process
    ✗ No match: Photo Processing Lifecycle
    ✗ No match: Video Analyzer Lifecycle
    ✗ No match: Camera Thread Lifecycle
    ✗ No match: Video Clip Creation
    ✗ No match: Full Video Concatenation Process
    ✗ No match: Highlight Video Creation Process
    ✗ No match: Concurrent Video Processing Lifecycle
    ✗ No match: Socket.IO Client Connection Lifecycle
    ✗ No match: Application Shutdown Process
    → Created unmatched chunk

--- Line 2: 2025-10-05 11:06:21 | INFO     | app.core.configuratio

In [10]:
len(chunks)

7512

In [20]:
import re

# Your actual log line
test_line = "2025-10-05 11:06:21 | ERROR | app.core.configurations:ensure_config_directory:105 - Config directory ensured"

# Print first rule from your rules
rule = rules[0]  # Get first rule
print(f"Testing rule: {rule.rule_name}")
print(f"Start pattern: {rule.start_pattern}")
print(f"Test line: {test_line}\n")

# Test if it matches
pattern = re.compile(rule.start_pattern)
match = pattern.search(test_line)
print(f"Match result: {match}")

if not match:
    print("\n❌ Pattern doesn't match!")
    print("\nLet's check what your pattern expects vs what you have:")
    print(f"Pattern looks for: {rule.start_pattern[:100]}")
    print(f"Your log has: {test_line[:100]}")

Testing rule: General Multi-line Error
Test line: 2025-10-05 11:06:21 | ERROR | app.core.configurations:ensure_config_directory:105 - Config directory ensured

Match result: <re.Match object; span=(0, 108), match='2025-10-05 11:06:21 | ERROR | app.core.configurat>


In [21]:
# Show the first 5 rules and test them against a real log line
test_line = "2025-10-05 21:02:36 | INFO | app.core.analyzer_manager:send_frame:30 - Adding clip"

print("Testing patterns against:", test_line, "\n")

for i, rule in enumerate(rules):  # First 5 rules
    print(f"\n{'='*80}")
    print(f"Rule {i+1}: {rule.rule_name} ({rule.type})")
    print(f"Start pattern: {rule.start_pattern[:150]}...")
    
    # Test if it matches
    try:
        match = re.search(rule.start_pattern, test_line)
        if match:
            print(f"✓ MATCHES! Extracted: {match.groups()}")
        else:
            print(f"✗ No match")
    except Exception as e:
        print(f"✗ Error testing: {e}")

Testing patterns against: 2025-10-05 21:02:36 | INFO | app.core.analyzer_manager:send_frame:30 - Adding clip 


Rule 1: General Multi-line Error (Multi-line Error)
✗ No match

Rule 2: API Request Error (Multi-line Error)
Start pattern: ^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+\|\s+ERROR\s+\|\s+app\.api\..*:\d+\s+-\s+Error (?:opening|closing|detecting|in|while|uploading|serving|delet...
✗ No match

Rule 3: Recording Session (Transaction)
Start pattern: ^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+\|\s+INFO\s+\|\s+app\.api\.recording:start_recording:\d+\s+-\s+Starting recording for game ([a-zA-Z0-9-]+)\s...
✗ No match

Rule 4: Download URL Generation (Transaction)
Start pattern: ^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+\|\s+DEBUG\s+\|\s+app\.api\.download:get_download_url:\d+\s+-\s+Creating token for game_id:\s+([a-zA-Z0-9-]+...
✗ No match

Rule 5: Camera API Control (Transaction)
Start pattern: ^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+\|\s+INFO\s+\|\s+app\.api\.cameras:open_cam:\d+\s+-\

In [22]:
# Find the active transaction
print("\n\nActive Transactions:")
for chunk in chunks:
    if chunk.rule_type == "Transaction" and len(chunk.lines) > 1:
        print(f"\n{chunk.rule_name}:")
        print(f"  Lines: {len(chunk.lines)}")
        print(f"  Status: {chunk.status}")
        print(f"  First line: {chunk.lines[0][:100]}")
        if len(chunk.lines) > 1:
            print(f"  Last line: {chunk.lines[-1][:100]}")
        break



Active Transactions:

Recording Session:
  Lines: 2
  Status: ChunkStatus.COMPLETE
  First line: 2025-10-05 14:47:03 | INFO     | app.api.recording:start_recording:32 - Starting recording for game 
  Last line: 2025-10-05 14:52:54 | INFO     | app.api.recording:stop_recording:65 - Recording stopped successfull
