Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions deepnote_toolkit/execution_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""
This module provides execution timeout monitoring for Jupyter notebook cells.
It can detect long-running executions and optionally send warnings or interrupt them.
"""

import os
import signal
import threading
import time
from typing import Optional

import requests
from IPython.core.interactiveshell import ExecutionInfo, ExecutionResult

from .get_webapp_url import get_absolute_userpod_api_url
from .logging import LoggerManager


class ExecutionTimeoutMonitor:
"""
Monitors execution duration and can send warnings or interrupt stuck executions.
"""

def __init__(
self,
warning_threshold_seconds: int = 240,
timeout_seconds: int = 300,
enable_auto_interrupt: bool = False,
):
"""
Initialize the execution timeout monitor.

Args:
warning_threshold_seconds: Seconds after which to send a warning (default: 240s = 4min)
timeout_seconds: Seconds after which to consider execution stuck (default: 300s = 5min)
enable_auto_interrupt: Whether to automatically interrupt stuck executions (default: False)
"""
self.logger = LoggerManager().get_logger()
self.warning_threshold = warning_threshold_seconds
self.timeout_threshold = timeout_seconds
self.enable_auto_interrupt = enable_auto_interrupt
self.current_execution: Optional[dict] = None
self.warning_timer: Optional[threading.Timer] = None
self.timeout_timer: Optional[threading.Timer] = None
self._execution_lock = threading.Lock()

def on_pre_execute(self, info: ExecutionInfo) -> None:
"""
Called before executing a cell.
Starts timers for warning and timeout.
"""
cell_preview = info.raw_cell[:100] if info.raw_cell else "<empty>"

with self._execution_lock:
self.current_execution = {
"code": cell_preview,
"start": time.time(),
}

# Start warning timer
if self.warning_threshold > 0:
self.warning_timer = threading.Timer(
self.warning_threshold, self._send_warning
)
self.warning_timer.daemon = True
self.warning_timer.start()

# Start timeout timer
if self.enable_auto_interrupt and self.timeout_threshold > 0:
self.timeout_timer = threading.Timer(
self.timeout_threshold, self._interrupt_execution
)
self.timeout_timer.daemon = True
self.timeout_timer.start()

self.logger.debug(
"Timeout monitoring started: warning=%ds, timeout=%ds, auto_interrupt=%s",
self.warning_threshold,
self.timeout_threshold,
self.enable_auto_interrupt,
)

def on_post_execute(self, result: ExecutionResult) -> None:
"""
Called after executing a cell.
Cancels any pending timers.
"""
with self._execution_lock:
self._cancel_timers()
self.current_execution = None

def _cancel_timers(self) -> None:
"""Cancel all active timers."""
if self.warning_timer:
self.warning_timer.cancel()
self.warning_timer = None
if self.timeout_timer:
self.timeout_timer.cancel()
self.timeout_timer = None

def _send_warning(self) -> None:
"""Send warning when execution is running longer than threshold."""
# Capture execution data while holding lock
with self._execution_lock:
if not self.current_execution:
return
execution_data = self.current_execution.copy()

# Process outside lock to avoid blocking
duration = time.time() - execution_data["start"]
code_preview = execution_data["code"][:50]

self.logger.warning(
"LONG_EXECUTION | duration=%.1fs | preview=%s",
duration,
code_preview.replace("\n", "\\n"),
)

# Try to report to webapp
self._report_to_webapp(duration, code_preview, warning=True)

def _interrupt_execution(self) -> None:
"""Interrupt execution after timeout threshold is exceeded."""
# Capture execution data while holding lock
with self._execution_lock:
if not self.current_execution:
return
execution_data = self.current_execution.copy()

# Process outside lock to avoid blocking
duration = time.time() - execution_data["start"]
code_preview = execution_data["code"][:50]

self.logger.error(
"TIMEOUT_INTERRUPT | duration=%.1fs | Sending SIGINT to interrupt execution",
duration,
)

# Report to webapp before interrupting
self._report_to_webapp(duration, code_preview, warning=False)

# Send SIGINT to interrupt the execution (simulates Ctrl+C)
try:
os.kill(os.getpid(), signal.SIGINT)
except Exception as e: # pylint: disable=broad-exception-caught
self.logger.error("Failed to send SIGINT: %s", e)

def _report_to_webapp(
self, duration: float, code_preview: str, warning: bool
) -> None:
"""
Report execution warning/timeout to webapp.

Args:
duration: Execution duration in seconds
code_preview: Preview of the code being executed
warning: Whether this is a warning (True) or timeout (False)
"""
try:
endpoint = "warning" if warning else "timeout"
url = get_absolute_userpod_api_url(f"execution/{endpoint}")

payload = {
"duration": duration,
"code_preview": code_preview,
"threshold": (
self.warning_threshold if warning else self.timeout_threshold
),
}

response = requests.post(url, json=payload, timeout=2)
response.raise_for_status()

self.logger.debug("Successfully reported %s to webapp", endpoint)

except Exception as e: # pylint: disable=broad-exception-caught
self.logger.error("Failed to report to webapp: %s", e)


# Global instance
_timeout_monitor: Optional[ExecutionTimeoutMonitor] = None


def setup_execution_timeout_monitor(
warning_threshold_seconds: int = 240,
timeout_seconds: int = 300,
enable_auto_interrupt: bool = False,
) -> None:
"""
Set up execution timeout monitoring.

This is optional and should be called during runtime initialization if needed.

Args:
warning_threshold_seconds: Seconds after which to send a warning (default: 240s = 4min)
timeout_seconds: Seconds after which to consider execution stuck (default: 300s = 5min)
enable_auto_interrupt: Whether to automatically interrupt stuck executions (default: False)
"""
global _timeout_monitor # pylint: disable=global-statement

try:
from IPython import get_ipython

ip = get_ipython()
if ip is None:
LoggerManager().get_logger().warning(
"IPython instance not available, skipping timeout monitor setup"
)
return

_timeout_monitor = ExecutionTimeoutMonitor(
warning_threshold_seconds=warning_threshold_seconds,
timeout_seconds=timeout_seconds,
enable_auto_interrupt=enable_auto_interrupt,
)

# Register event handlers
ip.events.register("pre_execute", _timeout_monitor.on_pre_execute)
ip.events.register("post_execute", _timeout_monitor.on_post_execute)

LoggerManager().get_logger().info(
"Execution timeout monitor initialized: warning=%ds, timeout=%ds, auto_interrupt=%s",
warning_threshold_seconds,
timeout_seconds,
enable_auto_interrupt,
)

except Exception as e: # pylint: disable=broad-exception-caught
LoggerManager().get_logger().error(
"Failed to set up timeout monitor: %s", e
)
132 changes: 132 additions & 0 deletions deepnote_toolkit/execution_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
This module provides execution tracking for Jupyter notebook cells.
It monitors execution start/end events, duration, and publishes metadata to help
debug stuck executions and improve observability.
"""

import time
from typing import Any, Dict, Optional

from IPython.core.interactiveshell import ExecutionInfo, ExecutionResult

from .ipython_utils import publish_execution_metadata
from .logging import LoggerManager


class ExecutionTracker:
"""Tracks execution state of notebook cells."""

def __init__(self):
self.logger = LoggerManager().get_logger()
self.current_execution: Optional[Dict[str, Any]] = None
self.execution_count = 0

def on_pre_execute(self, info: ExecutionInfo) -> None:
"""
Called before executing a cell.
Logs execution start and tracks execution metadata.
"""
self.execution_count += 1
cell_preview = info.raw_cell[:100] if info.raw_cell else "<empty>"
cell_id = hash(info.raw_cell) if info.raw_cell else 0

self.current_execution = {
"cell_id": cell_id,
"code_preview": cell_preview,
"start_time": time.time(),
"execution_count": self.execution_count,
}

self.logger.info(
"EXEC_START | count=%d | cell_id=%s | preview=%s",
self.execution_count,
cell_id,
cell_preview[:50].replace("\n", "\\n"),
)

def on_post_execute(self, result: ExecutionResult) -> None:
"""
Called after executing a cell.
Logs execution end, duration, and success status.
Publishes execution metadata for webapp consumption.
"""
if not self.current_execution:
self.logger.warning("EXEC_END called without matching EXEC_START")
return

duration = time.time() - self.current_execution["start_time"]
success = result.error_in_exec is None
error_name = (
type(result.error_in_exec).__name__ if result.error_in_exec else None
)

self.logger.info(
"EXEC_END | count=%d | duration=%.2fs | success=%s%s",
self.current_execution["execution_count"],
duration,
success,
f" | error={error_name}" if error_name else "",
)

# Publish metadata to webapp
try:
publish_execution_metadata(
execution_count=self.current_execution["execution_count"],
duration=duration,
success=success,
error_type=error_name,
)
except Exception as e: # pylint: disable=broad-exception-caught
self.logger.error("Failed to publish execution metadata: %s", e)

self.current_execution = None

def on_pre_run_cell(self, info: ExecutionInfo) -> None:
"""Called before running a cell (before pre_execute)."""
cell_preview = info.raw_cell[:30] if info.raw_cell else "<empty>"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this from the logs or truncate

self.logger.debug(
"PRE_RUN | preview=%s", cell_preview.replace("\n", "\\n")
)

def on_post_run_cell(self, result: ExecutionResult) -> None:
"""Called after running a cell (after post_execute)."""
self.logger.debug("POST_RUN | exec_count=%s", result.execution_count)


# Global instance
_execution_tracker: Optional[ExecutionTracker] = None


def setup_execution_tracking() -> None:
"""
Set up IPython event handlers for execution tracking.
This should be called during runtime initialization.
"""
global _execution_tracker # pylint: disable=global-statement

try:
from IPython import get_ipython

ip = get_ipython()
if ip is None:
LoggerManager().get_logger().warning(
"IPython instance not available, skipping execution tracking setup"
)
return

_execution_tracker = ExecutionTracker()

# Register event handlers
ip.events.register("pre_execute", _execution_tracker.on_pre_execute)
ip.events.register("post_execute", _execution_tracker.on_post_execute)
ip.events.register("pre_run_cell", _execution_tracker.on_pre_run_cell)
ip.events.register("post_run_cell", _execution_tracker.on_post_run_cell)

LoggerManager().get_logger().info(
"Execution tracking initialized successfully"
)

except Exception as e: # pylint: disable=broad-exception-caught
LoggerManager().get_logger().error(
"Failed to set up execution tracking: %s", e
)
Loading
Loading