In [1]:
%%capture
%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv

import sys
from pathlib import Path

# Add the project root to Python path
project_root = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
sys.path.insert(0, str(project_root))

In [2]:
from datetime import datetime
from src.domain import Event, EventStatus, EventLevel

In [3]:
mem_leak = Event(
    id=1,
    source="apolo_c001",
    metadata={
        "cpu_usage": 0.8,
        "memory_usage": 0.5,
    },
    level=EventLevel.INFO,
    timestamp=datetime.now(),
    status=EventStatus.NEW,
)

In [4]:
mem_leak.id

1

In [5]:
print(mem_leak.is_acknowledged())
print(mem_leak.is_resolved())

False
False


In [6]:
mem_leak.acknowledge()
mem_leak.resolve()

In [7]:
from src.domain import IRule, IAlertService, IDisplayService, IEventRepository

In [8]:
class MemoryFastEventRepository(IEventRepository):
    def __init__(self):
        self.events_dict = {}

    def save(self, events: list[Event]) -> None:
        for event in events:
            self.events_dict[event.id] = event

    def get_event_by_id(self, id: int) -> Event:
        return self.events_dict[id]
    
    def get_all_events(self) -> list[Event]:
        return list(self.events_dict.values())

In [9]:
class DummyAlertService(IAlertService):
    def alert(self, events: list[Event]) -> None:
        print(f"Alerting {events}")

class DummyDisplayService(IDisplayService):
    def show(self, events: list[Event]) -> None:
        print(f"Displaying {events}")

In [None]:
from datetime import timedelta

class SimpleRule(IRule):
    def apply(self, events: list[Event]) -> Event | list[Event] | None:
        for event in events:
            if event.level in [EventLevel.ERROR, EventLevel.CRITICAL]:
                if event.timestamp < (datetime.now() - timedelta(minutes=10)):
                    event.set_as_alerting()
        return events


In [11]:
rules = SimpleRule()
repository = MemoryFastEventRepository()
alert_service = DummyAlertService()
display_service = DummyDisplayService()

In [12]:
from src.application.use_cases.real_time_even_processor import RealTimeEventProcessor

processor = RealTimeEventProcessor(
    event_repository=repository,
    rule=rules,
    alert_service=alert_service,
    display_service=display_service
)


In [13]:
processor.process([mem_leak])

Alerting []
Displaying Event(id=1, source=apolo_c001, metadata={'cpu_usage': 0.8, 'memory_usage': 0.5}, level=EventLevel.INFO, timestamp=2026-02-07 11:51:54.219090, status=EventStatus.RESOLVED, ack_timestamp=2026-02-07 11:51:54.246594, resolved_timestamp=2026-02-07 11:51:54.246610)
