Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetch backfills logs in GQL #22038

Merged
merged 22 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
from typing import TYPE_CHECKING, Optional, Sequence

import dagster._check as check
import graphene
from dagster import AssetKey
from dagster import AssetKey, _seven
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
from dagster._core.definitions.partition import PartitionsSubset
from dagster._core.definitions.partition_key_range import PartitionKeyRange
Expand All @@ -15,6 +16,7 @@
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.instance import DagsterInstance
from dagster._core.remote_representation.external import ExternalPartitionSet
from dagster._core.storage.captured_log_manager import CapturedLogManager
from dagster._core.storage.dagster_run import DagsterRun, RunPartitionData, RunRecord, RunsFilter
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
Expand Down Expand Up @@ -279,6 +281,10 @@ class Meta:
tags = non_null_list("dagster_graphql.schema.tags.GraphenePipelineTag")
title = graphene.Field(graphene.String)
description = graphene.Field(graphene.String)
logEvents = graphene.Field(
graphene.NonNull("dagster_graphql.schema.instigation.GrapheneInstigationEventConnection"),
cursor=graphene.String(),
)

def __init__(self, backfill_job: PartitionBackfill):
self._backfill_job = check.inst_param(backfill_job, "backfill_job", PartitionBackfill)
Expand Down Expand Up @@ -523,6 +529,54 @@ def resolve_title(self, _graphene_info: ResolveInfo) -> Optional[str]:
def resolve_description(self, _graphene_info: ResolveInfo) -> Optional[str]:
return self._backfill_job.description

def resolve_logEvents(self, graphene_info: ResolveInfo, cursor: Optional[str] = None):
jamiedemaria marked this conversation as resolved.
Show resolved Hide resolved
from ..schema.instigation import (
GrapheneInstigationEvent,
GrapheneInstigationEventConnection,
)
from ..schema.logs.log_level import GrapheneLogLevel

backfill_log_key_prefix = self._backfill_job.log_storage_prefix

instance = graphene_info.context.instance

if not isinstance(instance.compute_log_manager, CapturedLogManager):
return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False)

if not instance.backfill_log_storage_enabled():
return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False)

records, new_cursor = instance.compute_log_manager.read_log_lines_for_log_key_prefix(
backfill_log_key_prefix, cursor
)

events = []
for line in records:
if not line:
continue
try:
record_dict = _seven.json.loads(line)
except json.JSONDecodeError:
continue

exc_info = record_dict.get("exc_info")
message = record_dict.get("msg")
if exc_info:
message = f"{message}\n\n{exc_info}"
event = GrapheneInstigationEvent(
message=message,
level=GrapheneLogLevel.from_level(record_dict["levelno"]),
timestamp=int(record_dict["created"] * 1000),
)

events.append(event)

return GrapheneInstigationEventConnection(
events=events,
cursor=new_cursor.to_string() if new_cursor else None,
hasMore=new_cursor.has_more_now if new_cursor else False,
)


class GrapheneBackfillNotFoundError(graphene.ObjectType):
class Meta:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from dagster._core.execution.asset_backfill import AssetBackfillData
from dagster._core.instance import DagsterInstance
from dagster._core.test_utils import ensure_dagster_tests_import, instance_for_test
from dagster._daemon import get_default_daemon_logger
from dagster._daemon.backfill import execute_backfill_iteration
from dagster_graphql.client.query import LAUNCH_PARTITION_BACKFILL_MUTATION
from dagster_graphql.test.utils import (
GqlResult,
Expand Down Expand Up @@ -93,6 +95,23 @@
}
"""

ASSET_BACKFILL_LOGS_QUERY = """
query BackfillLogsByAsset($backfillId: String!) {
partitionBackfillOrError(backfillId: $backfillId) {
... on PartitionBackfill {
logEvents {
events {
message
timestamp
level
}
cursor
}
}
}
}
"""

ASSET_BACKFILL_PREVIEW_QUERY = """
query assetBackfillPreview($params: AssetBackfillPreviewParams!) {
assetBackfillPreview(params: $params) {
Expand Down Expand Up @@ -939,3 +958,50 @@ def _get_error_message(launch_backfill_result: GqlResult) -> Optional[str]:
if "message" in launch_backfill_result.data["launchPartitionBackfill"]
else None
)


def test_backfill_logs():
repo = get_repo()
all_asset_keys = repo.asset_graph.materializable_asset_keys

with instance_for_test() as instance:
# need to override this method on the instance since it defaults ot False in OSS. When we enable this
# feature in OSS we can remove this override
def override_backfill_storage_setting(self):
return True

instance.backfill_log_storage_enabled = override_backfill_storage_setting.__get__(
instance, DagsterInstance
)
with define_out_of_process_context(__file__, "get_repo", instance) as context:
# launchPartitionBackfill
launch_backfill_result = execute_dagster_graphql(
context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"partitionNames": ["a", "b"],
"assetSelection": [key.to_graphql_input() for key in all_asset_keys],
}
},
)
backfill_id, asset_backfill_data = _get_backfill_data(
launch_backfill_result, instance, repo
)
assert asset_backfill_data.target_subset.asset_keys == all_asset_keys

list(
execute_backfill_iteration(
context.process_context, get_default_daemon_logger("BackfillDaemon")
)
)

backfill_logs = execute_dagster_graphql(
context,
ASSET_BACKFILL_LOGS_QUERY,
variables={
"backfillId": backfill_id,
},
)

assert len(backfill_logs.data["partitionBackfillOrError"]["logEvents"]["events"]) > 0
38 changes: 38 additions & 0 deletions python_modules/dagster/dagster/_core/captured_log_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import base64
from typing import NamedTuple, Sequence

from dagster._seven import json


class LogLineCursor(NamedTuple):
"""Representation of a log line cursor, to keep track of the place in the logs.
The captured logs are stored in multiple files in the same direcotry. The cursor keeps
track of the file name and the number of lines read so far.

line=-1 means that the entire file has been read and the next file should be read. This covers the
case when and entire file has been read, but the next file does not exist in storage yet.
line=0 means no lines from the file have been read.
line=n means lines 0 through n-1 have been read from the file.

has_more_now indicates if there are more log lines that can be read immediately. If the process writing
logs is still running, but has not writen a log file, has_more_now will be False once all currently readable
log files have been read. It does not mean that no new logs will be written in the future.
"""

log_key: Sequence[str]
line: int # maybe rename line_offset?
has_more_now: bool

def __str__(self) -> str:
return self.to_string()

def to_string(self) -> str:
raw = json.dumps(
{"log_key": self.log_key, "line": self.line, "has_more_now": self.has_more_now}
)
return base64.b64encode(bytes(raw, encoding="utf-8")).decode("utf-8")

@staticmethod
def parse(cursor_str: str) -> "LogLineCursor":
raw = json.loads(base64.b64decode(cursor_str).decode("utf-8"))
return LogLineCursor(raw["log_key"], raw["line"], raw["has_more_now"])
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ def partition_set_name(self) -> Optional[str]:

return self.partition_set_origin.partition_set_name

@property
def log_storage_prefix(self) -> Sequence[str]:
return ["backfill", self.backfill_id]

@property
def user(self) -> Optional[str]:
if self.tags:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence
from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence, Tuple

from typing_extensions import Final, Self

import dagster._check as check
from dagster._core.captured_log_api import LogLineCursor
from dagster._core.storage.compute_log_manager import ComputeIOType

MAX_BYTES_CHUNK_READ: Final = 4194304 # 4 MB
Expand Down Expand Up @@ -258,3 +260,91 @@ def unsubscribe(self, subscription: CapturedLogSubscription) -> None:
def build_log_key_for_run(self, run_id: str, step_key: str) -> Sequence[str]:
"""Legacy adapter to translate run_id/key to captured log manager-based log_key."""
return [run_id, "compute_logs", step_key]

def get_log_keys_for_log_key_prefix(
self, log_key_prefix: Sequence[str]
) -> Sequence[Sequence[str]]:
"""Returns the logs keys for a given log key prefix. This is determined by looking at the
directory defined by the log key prefix and creating a log_key for each file in the directory.
"""
# NOTE: This method was introduced to support backfill logs, which are always stored as .err files.
# Thus the implementations only look for .err files when determining the log_keys. If other file extensions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason why the initial implementation shouldn't also look for .out files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really. mostly just reducing surface area to test against, but it wouldn't be hard to add. I have some follow up PRs planned for this stack, so i'll add expanding to .out files as part of that so it doesn't block the backfill work from landing

# need to be supported, an io_type parameter will need to be added to this method
raise NotImplementedError("Must implement get_log_keys_for_log_key_prefix")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having this raise an error rather than be an abstract method so i dont have to implement it for all CapturedLogManagers yet. Follow up PRs will add those implementations and then I can switch to abstractmethod


def _get_log_lines_for_log_key(self, log_key: Sequence[str]) -> Sequence[str]:
"""For a log key, gets the corresponding file, and splits the file into lines."""
log_data = self.get_log_data(log_key)
# Note: This method was implemented to support backfill logs, which are always stored as .err files.
# If other file extensions need to be supported, this method will need to be updated to look at the
# correct part of log_data based on an io_type parameter
raw_logs = log_data.stderr.decode("utf-8") if log_data.stderr else ""
log_lines = raw_logs.split("\n")

return log_lines

def read_log_lines_for_log_key_prefix(
self, log_key_prefix: Sequence[str], cursor: Optional[str], num_lines: int = 100
jamiedemaria marked this conversation as resolved.
Show resolved Hide resolved
) -> Tuple[Sequence[str], Optional[LogLineCursor]]:
"""For a given directory defined by log_key_prefix that contains files, read the logs from the files
as if they are a single continuous file. Reads num_lines lines at a time. Returns the lines read and the next cursor.

Note that the has_more_now attribute of the cursor indicates if there are more logs that can be read immediately.
If has_more_now if False, the process producing logs could still be running and dump more logs into the
directory at a later time.
"""
num_lines = int(os.getenv("DAGSTER_CAPTURED_LOG_CHUNK_SIZE", "1000"))
# find all of the log_keys to read from and sort them in the order to be read
log_keys = sorted(
self.get_log_keys_for_log_key_prefix(log_key_prefix), key=lambda x: "/".join(x)
)
if len(log_keys) == 0:
return [], None

log_cursor = LogLineCursor.parse(cursor) if cursor else None
if log_cursor is None:
log_key_to_fetch_idx = 0
line_cursor = 0
else:
log_key_to_fetch_idx = log_keys.index(log_cursor.log_key)
line_cursor = log_cursor.line

if line_cursor == -1:
# line_cursor for -1 means the entirety of the file has been read, but the next file
# didn't exist yet. So we see if a new file has been added.
# if the next file doesn't exist yet, return
if log_key_to_fetch_idx + 1 >= len(log_keys):
return [], log_cursor
log_key_to_fetch_idx += 1
line_cursor = 0

log_lines = self._get_log_lines_for_log_key(log_keys[log_key_to_fetch_idx])
records = []
has_more = True

while len(records) < num_lines:
remaining_log_lines = log_lines[line_cursor:]
remaining_lines_to_fetch = num_lines - len(records)
if remaining_lines_to_fetch < len(remaining_log_lines):
records.extend(remaining_log_lines[:remaining_lines_to_fetch])
line_cursor += remaining_lines_to_fetch
else:
records.extend(remaining_log_lines)
line_cursor = -1

if line_cursor == -1:
# we've read the entirety of the file, update the cursor
if log_key_to_fetch_idx + 1 >= len(log_keys):
# no more files to process
has_more = False
break
log_key_to_fetch_idx += 1
line_cursor = 0
if len(records) < num_lines:
# we still need more records, so fetch the next file
log_lines = self._get_log_lines_for_log_key(log_keys[log_key_to_fetch_idx])

new_cursor = LogLineCursor(
log_key=log_keys[log_key_to_fetch_idx], line=line_cursor, has_more_now=has_more
)
return records, new_cursor
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,27 @@ def subscribe(
def unsubscribe(self, subscription):
self.on_unsubscribe(subscription)

def get_log_keys_for_log_key_prefix(
self, log_key_prefix: Sequence[str]
) -> Sequence[Sequence[str]]:
"""Returns the logs keys for a given log key prefix. This is determined by looking at the
directory defined by the log key prefix and creating a log_key for each file in the directory.
"""
base_dir_path = Path(self._base_dir).resolve()
directory = base_dir_path.joinpath(*log_key_prefix)
objects = directory.iterdir()
results = []
list_key_prefix = list(log_key_prefix)

for obj in objects:
# Note: This method was implemented to support backfill logs, which are always stored as .err files.
# If other file extensions need to be supported, this method will need to be updated to look at the
# correct part of log_data based on an io_type parameter
if obj.is_file() and obj.suffix == "." + IO_TYPE_EXTENSION[ComputeIOType.STDERR]:
results.append(list_key_prefix + [obj.stem])

return results

###############################################
#
# Methods for the ComputeLogManager interface
Expand Down