Skip to content

Commit

Permalink
captured log manager (#9429)
Browse files Browse the repository at this point in the history
* captured log manager

* update, add delete method, add explicit write stream

* move external_url from metadata to capture context

* remove public on_progress, comments
  • Loading branch information
prha committed Oct 28, 2022
1 parent 24af700 commit 631c3ef
Show file tree
Hide file tree
Showing 5 changed files with 611 additions and 64 deletions.
26 changes: 25 additions & 1 deletion python_modules/dagit/dagit/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dagster import __version__ as dagster_version
from dagster._core.debug import DebugRunPayload
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext
from dagster._seven import json
from dagster._utils import Counter, traced_counter
Expand Down Expand Up @@ -161,7 +162,7 @@ async def download_compute_logs_endpoint(self, request: Request):
)

if not path.exists(file):
raise HTTPException(404)
raise HTTPException(404, detail="No log files available for download")

return FileResponse(
context.instance.compute_log_manager.get_local_path(
Expand All @@ -172,6 +173,25 @@ async def download_compute_logs_endpoint(self, request: Request):
filename=f"{run_id}_{step_key}.{file_type}",
)

async def download_captured_logs_endpoint(self, request: Request):
[*log_key, file_extension] = request.path_params["path"].split("/")
context = self.make_request_context(request)

if not isinstance(context.instance.compute_log_manager, LocalComputeLogManager):
raise HTTPException(
404, detail="Compute log manager is not compatible for local downloads"
)

location = context.instance.compute_log_manager.get_captured_local_path(
log_key, file_extension
)

if not location or not path.exists(location):
raise HTTPException(404, detail="No log files available for download")

filebase = "__".join(log_key)
return FileResponse(location, filename=f"{filebase}.{file_extension}")

def index_html_endpoint(self, _request: Request):
"""
Serves root html
Expand Down Expand Up @@ -260,6 +280,10 @@ def build_routes(self):
"/download/{run_id:str}/{step_key:str}/{file_type:str}",
self.download_compute_logs_endpoint,
),
Route(
"/logs/{path:path}",
self.download_captured_logs_endpoint,
),
Route(
"/dagit/notebook",
self.download_notebook,
Expand Down
241 changes: 241 additions & 0 deletions python_modules/dagster/dagster/_core/storage/captured_log_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# pylint: disable=unused-argument

from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import IO, Generator, List, NamedTuple, Optional

from dagster._core.storage.compute_log_manager import ComputeIOType

MAX_BYTES_CHUNK_READ = 4194304 # 4 MB


class CapturedLogContext(
NamedTuple(
"_CapturedLogContext",
[
("log_key", List[str]),
("external_url", Optional[str]),
],
)
):
"""
Object representing the context in which logs are captured. Can be used by external logging
sidecar implementations to point dagit to an external url to view compute logs instead of a
Dagster-managed location.
"""

def __new__(cls, log_key: List[str], external_url: Optional[str] = None):
return super(CapturedLogContext, cls).__new__(cls, log_key, external_url=external_url)


class CapturedLogData(
NamedTuple(
"_CapturedLogData",
[
("log_key", List[str]),
("stdout", Optional[bytes]),
("stderr", Optional[bytes]),
("cursor", Optional[str]),
],
)
):
"""
Object representing captured log data, either a partial chunk of the log data or the full
capture. Contains the raw bytes and optionally the cursor offset for the partial chunk.
"""

def __new__(
cls,
log_key: List[str],
stdout: Optional[bytes] = None,
stderr: Optional[bytes] = None,
cursor: Optional[str] = None,
):
return super(CapturedLogData, cls).__new__(cls, log_key, stdout, stderr, cursor)


class CapturedLogMetadata(
NamedTuple(
"_CapturedLogMetadata",
[
("stdout_location", Optional[str]),
("stderr_location", Optional[str]),
("stdout_download_url", Optional[str]),
("stderr_download_url", Optional[str]),
],
)
):
"""
Object representing metadata info for the captured log data, containing a display string for
the location of the log data and a URL for direct download of the captured log data.
"""

def __new__(
cls,
stdout_location: Optional[str] = None,
stderr_location: Optional[str] = None,
stdout_download_url: Optional[str] = None,
stderr_download_url: Optional[str] = None,
):
return super(CapturedLogMetadata, cls).__new__(
cls,
stdout_location=stdout_location,
stderr_location=stderr_location,
stdout_download_url=stdout_download_url,
stderr_download_url=stderr_download_url,
)


class CapturedLogSubscription:
def __init__(self, manager, log_key, cursor):
self._manager = manager
self._log_key = log_key
self._cursor = cursor
self._observer = None

def __call__(self, observer):
self._observer = observer
self.fetch()
if self._manager.is_capture_complete(self._log_key):
self.complete()
return self

@property
def log_key(self):
return self._log_key

def dispose(self):
# called when the connection gets closed, allowing the observer to get GC'ed
if self._observer and callable(getattr(self._observer, "dispose", None)):
self._observer.dispose()
self._observer = None
self._manager.on_unsubscribe(self)

def fetch(self):
if not self._observer:
return

should_fetch = True
while should_fetch:
log_data = self._manager.get_log_data(
self._log_key,
self._cursor,
max_bytes=MAX_BYTES_CHUNK_READ,
)
if not self._cursor or log_data.cursor != self._cursor:
self._observer.on_next(log_data)
self._cursor = log_data.cursor
should_fetch = _has_max_data(log_data.stdout) or _has_max_data(log_data.stderr)

def complete(self):
if not self._observer:
return
self._observer.on_completed()


def _has_max_data(chunk):
return chunk and len(chunk) >= MAX_BYTES_CHUNK_READ


class CapturedLogManager(ABC):
"""Abstract base class for capturing the unstructured logs (stdout/stderr) in the current
process, stored / retrieved with a provided log_key."""

@abstractmethod
@contextmanager
def capture_logs(self, log_key: List[str]) -> Generator[CapturedLogContext, None, None]:
"""
Context manager for capturing the stdout/stderr within the current process, and persisting
it under the given log key.
Args:
log_key (List[String]): The log key identifying the captured logs
"""

@abstractmethod
@contextmanager
def open_log_stream(
self, log_key: List[str], io_type: ComputeIOType
) -> Generator[Optional[IO], None, None]:
"""
Context manager for providing an IO stream that enables the caller to write to a log stream
managed by the captured log manager, to be read later using the given log key.
Args:
log_key (List[String]): The log key identifying the captured logs
"""

@abstractmethod
def is_capture_complete(self, log_key: List[str]) -> bool:
"""Flag indicating when the log capture for a given log key has completed.
Args:
log_key (List[String]): The log key identifying the captured logs
Returns:
Boolean
"""

@abstractmethod
def get_log_data(
self,
log_key: List[str],
cursor: Optional[str] = None,
max_bytes: Optional[int] = None,
) -> CapturedLogData:
"""Returns a chunk of the captured stdout logs for a given log key
Args:
log_key (List[String]): The log key identifying the captured logs
cursor (Optional[str]): A cursor representing the position of the log chunk to fetch
max_bytes (Optional[int]): A limit on the size of the log chunk to fetch
Returns:
CapturedLogData
"""

@abstractmethod
def get_log_metadata(self, log_key: List[str]) -> CapturedLogMetadata:
"""Returns the metadata of the captured logs for a given log key, including
displayable information on where the logs are persisted.
Args:
log_key (List[String]): The log key identifying the captured logs
Returns:
CapturedLogMetadata
"""

@abstractmethod
def delete_logs(self, log_key: List[str]):
"""Deletes the captured logs for a given log key.
Args:
log_key (List[String]): The log key identifying the captured logs
"""

@abstractmethod
def subscribe(
self, log_key: List[str], cursor: Optional[str] = None
) -> CapturedLogSubscription:
"""Registers an observable object for log data
Args:
log_key (List[String]): The log key identifying the captured logs
cursor (Optional[String]): The string cursor marking the position within the log stream
Returns:
ComputeLogSubscription
"""

@abstractmethod
def unsubscribe(self, subscription: CapturedLogSubscription):
"""Deregisters an observable object from receiving log updates
Args:
subscription (CapturedLogSubscription): subscription object which manages when to send
back data to the subscriber
"""

def build_log_key_for_run(self, run_id, step_key):
"""Legacy adapter to translate run_id/key to captured log manager-based log_key"""
return [run_id, "compute_logs", step_key]

0 comments on commit 631c3ef

Please sign in to comment.