Skip to content

Commit

Permalink
fetch backfills logs in GQL (#22038)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Fetches backfill logs using the captured log manager. 

Adds functionality to the captured log manager to read a sequence of
files in a directory as if they are a single file by sorting the file
names and then reading through the files N lines at a time. The majority
of the logic is in the base CapturedLogManager class, but each
implementor class will need to implement a method to get the log keys
for a given directory. I implemented this method for the
LocalComputeLogManager to enable testing, and follow up PRs can
implement it for the remaining ComputeLogManagers. internal pr
dagster-io/internal#9879 that implements the
method for the cloud compute log manager

Fetching logs via GQL is gated on an instance method we can override to
control roll out

## How I Tested These Changes
new unit tests for the pagination scheme, added a unit test where we run
a backfill and then fetch the logs
  • Loading branch information
jamiedemaria authored and benpankow committed Jun 5, 2024
1 parent 2e18999 commit d1a54bc
Show file tree
Hide file tree
Showing 11 changed files with 497 additions and 8 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
from typing import TYPE_CHECKING, Optional, Sequence

import dagster._check as check
import graphene
from dagster import AssetKey
from dagster import AssetKey, _seven
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
from dagster._core.definitions.partition import PartitionsSubset
from dagster._core.definitions.partition_key_range import PartitionKeyRange
Expand All @@ -15,6 +16,7 @@
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.instance import DagsterInstance
from dagster._core.remote_representation.external import ExternalPartitionSet
from dagster._core.storage.captured_log_manager import CapturedLogManager
from dagster._core.storage.dagster_run import DagsterRun, RunPartitionData, RunRecord, RunsFilter
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
Expand Down Expand Up @@ -279,6 +281,10 @@ class Meta:
tags = non_null_list("dagster_graphql.schema.tags.GraphenePipelineTag")
title = graphene.Field(graphene.String)
description = graphene.Field(graphene.String)
logEvents = graphene.Field(
graphene.NonNull("dagster_graphql.schema.instigation.GrapheneInstigationEventConnection"),
cursor=graphene.String(),
)

def __init__(self, backfill_job: PartitionBackfill):
self._backfill_job = check.inst_param(backfill_job, "backfill_job", PartitionBackfill)
Expand Down Expand Up @@ -523,6 +529,54 @@ def resolve_title(self, _graphene_info: ResolveInfo) -> Optional[str]:
def resolve_description(self, _graphene_info: ResolveInfo) -> Optional[str]:
return self._backfill_job.description

def resolve_logEvents(self, graphene_info: ResolveInfo, cursor: Optional[str] = None):
from ..schema.instigation import (
GrapheneInstigationEvent,
GrapheneInstigationEventConnection,
)
from ..schema.logs.log_level import GrapheneLogLevel

backfill_log_key_prefix = self._backfill_job.log_storage_prefix

instance = graphene_info.context.instance

if not isinstance(instance.compute_log_manager, CapturedLogManager):
return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False)

if not instance.backfill_log_storage_enabled():
return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False)

records, new_cursor = instance.compute_log_manager.read_log_lines_for_log_key_prefix(
backfill_log_key_prefix, cursor
)

events = []
for line in records:
if not line:
continue
try:
record_dict = _seven.json.loads(line)
except json.JSONDecodeError:
continue

exc_info = record_dict.get("exc_info")
message = record_dict.get("msg")
if exc_info:
message = f"{message}\n\n{exc_info}"
event = GrapheneInstigationEvent(
message=message,
level=GrapheneLogLevel.from_level(record_dict["levelno"]),
timestamp=int(record_dict["created"] * 1000),
)

events.append(event)

return GrapheneInstigationEventConnection(
events=events,
cursor=new_cursor.to_string() if new_cursor else None,
hasMore=new_cursor.has_more_now if new_cursor else False,
)


class GrapheneBackfillNotFoundError(graphene.ObjectType):
class Meta:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from dagster._core.execution.asset_backfill import AssetBackfillData
from dagster._core.instance import DagsterInstance
from dagster._core.test_utils import ensure_dagster_tests_import, instance_for_test
from dagster._daemon import get_default_daemon_logger
from dagster._daemon.backfill import execute_backfill_iteration
from dagster_graphql.client.query import LAUNCH_PARTITION_BACKFILL_MUTATION
from dagster_graphql.test.utils import (
GqlResult,
Expand Down Expand Up @@ -93,6 +95,23 @@
}
"""

ASSET_BACKFILL_LOGS_QUERY = """
query BackfillLogsByAsset($backfillId: String!) {
partitionBackfillOrError(backfillId: $backfillId) {
... on PartitionBackfill {
logEvents {
events {
message
timestamp
level
}
cursor
}
}
}
}
"""

ASSET_BACKFILL_PREVIEW_QUERY = """
query assetBackfillPreview($params: AssetBackfillPreviewParams!) {
assetBackfillPreview(params: $params) {
Expand Down Expand Up @@ -939,3 +958,50 @@ def _get_error_message(launch_backfill_result: GqlResult) -> Optional[str]:
if "message" in launch_backfill_result.data["launchPartitionBackfill"]
else None
)


def test_backfill_logs():
repo = get_repo()
all_asset_keys = repo.asset_graph.materializable_asset_keys

with instance_for_test() as instance:
# need to override this method on the instance since it defaults ot False in OSS. When we enable this
# feature in OSS we can remove this override
def override_backfill_storage_setting(self):
return True

instance.backfill_log_storage_enabled = override_backfill_storage_setting.__get__(
instance, DagsterInstance
)
with define_out_of_process_context(__file__, "get_repo", instance) as context:
# launchPartitionBackfill
launch_backfill_result = execute_dagster_graphql(
context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"partitionNames": ["a", "b"],
"assetSelection": [key.to_graphql_input() for key in all_asset_keys],
}
},
)
backfill_id, asset_backfill_data = _get_backfill_data(
launch_backfill_result, instance, repo
)
assert asset_backfill_data.target_subset.asset_keys == all_asset_keys

list(
execute_backfill_iteration(
context.process_context, get_default_daemon_logger("BackfillDaemon")
)
)

backfill_logs = execute_dagster_graphql(
context,
ASSET_BACKFILL_LOGS_QUERY,
variables={
"backfillId": backfill_id,
},
)

assert len(backfill_logs.data["partitionBackfillOrError"]["logEvents"]["events"]) > 0
38 changes: 38 additions & 0 deletions python_modules/dagster/dagster/_core/captured_log_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import base64
from typing import NamedTuple, Sequence

from dagster._seven import json


class LogLineCursor(NamedTuple):
"""Representation of a log line cursor, to keep track of the place in the logs.
The captured logs are stored in multiple files in the same direcotry. The cursor keeps
track of the file name and the number of lines read so far.
line=-1 means that the entire file has been read and the next file should be read. This covers the
case when and entire file has been read, but the next file does not exist in storage yet.
line=0 means no lines from the file have been read.
line=n means lines 0 through n-1 have been read from the file.
has_more_now indicates if there are more log lines that can be read immediately. If the process writing
logs is still running, but has not writen a log file, has_more_now will be False once all currently readable
log files have been read. It does not mean that no new logs will be written in the future.
"""

log_key: Sequence[str]
line: int # maybe rename line_offset?
has_more_now: bool

def __str__(self) -> str:
return self.to_string()

def to_string(self) -> str:
raw = json.dumps(
{"log_key": self.log_key, "line": self.line, "has_more_now": self.has_more_now}
)
return base64.b64encode(bytes(raw, encoding="utf-8")).decode("utf-8")

@staticmethod
def parse(cursor_str: str) -> "LogLineCursor":
raw = json.loads(base64.b64decode(cursor_str).decode("utf-8"))
return LogLineCursor(raw["log_key"], raw["line"], raw["has_more_now"])
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ def partition_set_name(self) -> Optional[str]:

return self.partition_set_origin.partition_set_name

@property
def log_storage_prefix(self) -> Sequence[str]:
return ["backfill", self.backfill_id]

@property
def user(self) -> Optional[str]:
if self.tags:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence
from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence, Tuple

from typing_extensions import Final, Self

import dagster._check as check
from dagster._core.captured_log_api import LogLineCursor
from dagster._core.storage.compute_log_manager import ComputeIOType

MAX_BYTES_CHUNK_READ: Final = 4194304 # 4 MB
Expand Down Expand Up @@ -258,3 +260,91 @@ def unsubscribe(self, subscription: CapturedLogSubscription) -> None:
def build_log_key_for_run(self, run_id: str, step_key: str) -> Sequence[str]:
"""Legacy adapter to translate run_id/key to captured log manager-based log_key."""
return [run_id, "compute_logs", step_key]

def get_log_keys_for_log_key_prefix(
self, log_key_prefix: Sequence[str]
) -> Sequence[Sequence[str]]:
"""Returns the logs keys for a given log key prefix. This is determined by looking at the
directory defined by the log key prefix and creating a log_key for each file in the directory.
"""
# NOTE: This method was introduced to support backfill logs, which are always stored as .err files.
# Thus the implementations only look for .err files when determining the log_keys. If other file extensions
# need to be supported, an io_type parameter will need to be added to this method
raise NotImplementedError("Must implement get_log_keys_for_log_key_prefix")

def _get_log_lines_for_log_key(self, log_key: Sequence[str]) -> Sequence[str]:
"""For a log key, gets the corresponding file, and splits the file into lines."""
log_data = self.get_log_data(log_key)
# Note: This method was implemented to support backfill logs, which are always stored as .err files.
# If other file extensions need to be supported, this method will need to be updated to look at the
# correct part of log_data based on an io_type parameter
raw_logs = log_data.stderr.decode("utf-8") if log_data.stderr else ""
log_lines = raw_logs.split("\n")

return log_lines

def read_log_lines_for_log_key_prefix(
self, log_key_prefix: Sequence[str], cursor: Optional[str], num_lines: int = 100
) -> Tuple[Sequence[str], Optional[LogLineCursor]]:
"""For a given directory defined by log_key_prefix that contains files, read the logs from the files
as if they are a single continuous file. Reads num_lines lines at a time. Returns the lines read and the next cursor.
Note that the has_more_now attribute of the cursor indicates if there are more logs that can be read immediately.
If has_more_now if False, the process producing logs could still be running and dump more logs into the
directory at a later time.
"""
num_lines = int(os.getenv("DAGSTER_CAPTURED_LOG_CHUNK_SIZE", "1000"))
# find all of the log_keys to read from and sort them in the order to be read
log_keys = sorted(
self.get_log_keys_for_log_key_prefix(log_key_prefix), key=lambda x: "/".join(x)
)
if len(log_keys) == 0:
return [], None

log_cursor = LogLineCursor.parse(cursor) if cursor else None
if log_cursor is None:
log_key_to_fetch_idx = 0
line_cursor = 0
else:
log_key_to_fetch_idx = log_keys.index(log_cursor.log_key)
line_cursor = log_cursor.line

if line_cursor == -1:
# line_cursor for -1 means the entirety of the file has been read, but the next file
# didn't exist yet. So we see if a new file has been added.
# if the next file doesn't exist yet, return
if log_key_to_fetch_idx + 1 >= len(log_keys):
return [], log_cursor
log_key_to_fetch_idx += 1
line_cursor = 0

log_lines = self._get_log_lines_for_log_key(log_keys[log_key_to_fetch_idx])
records = []
has_more = True

while len(records) < num_lines:
remaining_log_lines = log_lines[line_cursor:]
remaining_lines_to_fetch = num_lines - len(records)
if remaining_lines_to_fetch < len(remaining_log_lines):
records.extend(remaining_log_lines[:remaining_lines_to_fetch])
line_cursor += remaining_lines_to_fetch
else:
records.extend(remaining_log_lines)
line_cursor = -1

if line_cursor == -1:
# we've read the entirety of the file, update the cursor
if log_key_to_fetch_idx + 1 >= len(log_keys):
# no more files to process
has_more = False
break
log_key_to_fetch_idx += 1
line_cursor = 0
if len(records) < num_lines:
# we still need more records, so fetch the next file
log_lines = self._get_log_lines_for_log_key(log_keys[log_key_to_fetch_idx])

new_cursor = LogLineCursor(
log_key=log_keys[log_key_to_fetch_idx], line=line_cursor, has_more_now=has_more
)
return records, new_cursor
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,27 @@ def subscribe(
def unsubscribe(self, subscription):
self.on_unsubscribe(subscription)

def get_log_keys_for_log_key_prefix(
self, log_key_prefix: Sequence[str]
) -> Sequence[Sequence[str]]:
"""Returns the logs keys for a given log key prefix. This is determined by looking at the
directory defined by the log key prefix and creating a log_key for each file in the directory.
"""
base_dir_path = Path(self._base_dir).resolve()
directory = base_dir_path.joinpath(*log_key_prefix)
objects = directory.iterdir()
results = []
list_key_prefix = list(log_key_prefix)

for obj in objects:
# Note: This method was implemented to support backfill logs, which are always stored as .err files.
# If other file extensions need to be supported, this method will need to be updated to look at the
# correct part of log_data based on an io_type parameter
if obj.is_file() and obj.suffix == "." + IO_TYPE_EXTENSION[ComputeIOType.STDERR]:
results.append(list_key_prefix + [obj.stem])

return results

###############################################
#
# Methods for the ComputeLogManager interface
Expand Down

0 comments on commit d1a54bc

Please sign in to comment.