Skip to content

Commit

Permalink
able to fetch logs together in gql
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed May 22, 2024
1 parent 8378bb2 commit 9d68b9d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 1 deletion.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
from typing import TYPE_CHECKING, Optional, Sequence

import dagster._check as check
import graphene
from dagster import AssetKey
from dagster import AssetKey, _seven
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
from dagster._core.definitions.partition import PartitionsSubset
from dagster._core.definitions.partition_key_range import PartitionKeyRange
Expand All @@ -20,7 +21,9 @@
)
from dagster._core.instance import DagsterInstance
from dagster._core.remote_representation.external import ExternalPartitionSet
from dagster._core.storage.captured_log_manager import CapturedLogManager
from dagster._core.storage.dagster_run import DagsterRun, RunPartitionData, RunRecord, RunsFilter
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
Expand Down Expand Up @@ -286,6 +289,9 @@ class Meta:
hasResumePermission = graphene.NonNull(graphene.Boolean)
user = graphene.Field(graphene.String)
tags = non_null_list("dagster_graphql.schema.tags.GraphenePipelineTag")
logEvents = graphene.Field(
graphene.NonNull("dagster_graphql.schema.instigation.GrapheneInstigationEventConnection")
)

def __init__(self, backfill_job: PartitionBackfill):
self._backfill_job = check.inst_param(backfill_job, "backfill_job", PartitionBackfill)
Expand Down Expand Up @@ -524,6 +530,64 @@ def resolve_hasResumePermission(self, graphene_info: ResolveInfo) -> bool:
def resolve_user(self, _graphene_info: ResolveInfo) -> Optional[str]:
return self._backfill_job.user

def resolve_logEvents(self, graphene_info: ResolveInfo):
from ..schema.instigation import (
GrapheneInstigationEvent,
GrapheneInstigationEventConnection,
)
from ..schema.logs.log_level import GrapheneLogLevel

backfill_id = self._backfill_job.backfill_id
# TODO find a better way to keep this in sync. maybe store as part of the asset backfill data?
backfill_log_dir = ["backfill", backfill_id]

instance = graphene_info.context.instance

if not isinstance(instance.compute_log_manager, CapturedLogManager):
return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False)

# TODO - remove when impl for other compute log managers
if not isinstance(instance.compute_log_manager, LocalComputeLogManager):
return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False)

events = []
log_file_names = sorted(
instance.compute_log_manager.get_log_files_in_directory(backfill_log_dir)
)
for log_file in log_file_names:
log_key = [*backfill_log_dir, log_file]
log_data = instance.compute_log_manager.get_log_data(log_key)
raw_logs = log_data.stderr.decode("utf-8") if log_data.stderr else ""

records = []

for line in raw_logs.split("\n"):
if not line:
continue
try:
records.append(_seven.json.loads(line))
except json.JSONDecodeError:
continue

for record_dict in records:
exc_info = record_dict.get("exc_info")
message = record_dict.get("msg")
if exc_info:
message = f"{message}\n\n{exc_info}"
event = GrapheneInstigationEvent(
message=message,
level=GrapheneLogLevel.from_level(record_dict["levelno"]),
timestamp=int(record_dict["created"] * 1000),
)

events.append(event)

return GrapheneInstigationEventConnection(
events=events,
cursor=None,
hasMore=False,
)


class GrapheneBackfillNotFoundError(graphene.ObjectType):
class Meta:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ def open_log_stream(
def is_capture_complete(self, log_key: Sequence[str]) -> bool:
return os.path.exists(self.complete_artifact_path(log_key))

def get_log_files_in_directory(self, log_key_dir: Sequence[str]) -> Sequence[str]:
base_dir_path = Path(self._base_dir).resolve()
log_dir = base_dir_path.joinpath(*log_key_dir).resolve()

return [
filename.rsplit(".", 1)[0] # TODO - need to remove the file extension. feels hacky
for filename in os.listdir(log_dir)
if os.path.isfile(os.path.join(log_dir, filename))
]

def get_log_data(
self, log_key: Sequence[str], cursor: Optional[str] = None, max_bytes: Optional[int] = None
) -> CapturedLogData:
Expand Down

0 comments on commit 9d68b9d

Please sign in to comment.