diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index f5e581f5a86ca..c7816513a1071 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -2786,6 +2786,7 @@ type PartitionBackfill { tags: [PipelineTag!]! title: String description: String + logEvents(cursor: String): InstigationEventConnection! } enum BulkActionStatus { diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index c3d65e5f2ff55..f306c642511f2 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -3065,6 +3065,7 @@ export type PartitionBackfill = { id: Scalars['String']['output']; isAssetBackfill: Scalars['Boolean']['output']; isValidSerialization: Scalars['Boolean']['output']; + logEvents: InstigationEventConnection; numCancelable: Scalars['Int']['output']; numPartitions: Maybe; partitionNames: Maybe>; @@ -3083,6 +3084,10 @@ export type PartitionBackfill = { user: Maybe; }; +export type PartitionBackfillLogEventsArgs = { + cursor?: InputMaybe; +}; + export type PartitionBackfillPartitionsTargetedForAssetKeyArgs = { assetKey?: InputMaybe; }; @@ -10644,6 +10649,12 @@ export const buildPartitionBackfill = ( overrides && overrides.hasOwnProperty('isValidSerialization') ? overrides.isValidSerialization! : false, + logEvents: + overrides && overrides.hasOwnProperty('logEvents') + ? overrides.logEvents! + : relationshipsToOmit.has('InstigationEventConnection') + ? ({} as InstigationEventConnection) + : buildInstigationEventConnection({}, relationshipsToOmit), numCancelable: overrides && overrides.hasOwnProperty('numCancelable') ? overrides.numCancelable! : 53, numPartitions: diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index 7ed29af144aa0..32d5aba180c2b 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -1,8 +1,9 @@ +import json from typing import TYPE_CHECKING, Optional, Sequence import dagster._check as check import graphene -from dagster import AssetKey +from dagster import AssetKey, _seven from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType from dagster._core.definitions.partition import PartitionsSubset from dagster._core.definitions.partition_key_range import PartitionKeyRange @@ -15,6 +16,7 @@ from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill from dagster._core.instance import DagsterInstance from dagster._core.remote_representation.external import ExternalPartitionSet +from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._core.storage.dagster_run import DagsterRun, RunPartitionData, RunRecord, RunsFilter from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, @@ -279,6 +281,10 @@ class Meta: tags = non_null_list("dagster_graphql.schema.tags.GraphenePipelineTag") title = graphene.Field(graphene.String) description = graphene.Field(graphene.String) + logEvents = graphene.Field( + graphene.NonNull("dagster_graphql.schema.instigation.GrapheneInstigationEventConnection"), + cursor=graphene.String(), + ) def __init__(self, backfill_job: PartitionBackfill): self._backfill_job = check.inst_param(backfill_job, "backfill_job", PartitionBackfill) @@ -523,6 +529,54 @@ def resolve_title(self, _graphene_info: ResolveInfo) -> Optional[str]: def resolve_description(self, _graphene_info: ResolveInfo) -> Optional[str]: return self._backfill_job.description + def resolve_logEvents(self, graphene_info: ResolveInfo, cursor: Optional[str] = None): + from ..schema.instigation import ( + GrapheneInstigationEvent, + GrapheneInstigationEventConnection, + ) + from ..schema.logs.log_level import GrapheneLogLevel + + backfill_log_key_prefix = self._backfill_job.log_storage_prefix + + instance = graphene_info.context.instance + + if not isinstance(instance.compute_log_manager, CapturedLogManager): + return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False) + + if not instance.backfill_log_storage_enabled(): + return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False) + + records, new_cursor = instance.compute_log_manager.read_log_lines_for_log_key_prefix( + backfill_log_key_prefix, cursor + ) + + events = [] + for line in records: + if not line: + continue + try: + record_dict = _seven.json.loads(line) + except json.JSONDecodeError: + continue + + exc_info = record_dict.get("exc_info") + message = record_dict.get("msg") + if exc_info: + message = f"{message}\n\n{exc_info}" + event = GrapheneInstigationEvent( + message=message, + level=GrapheneLogLevel.from_level(record_dict["levelno"]), + timestamp=int(record_dict["created"] * 1000), + ) + + events.append(event) + + return GrapheneInstigationEventConnection( + events=events, + cursor=new_cursor.to_string() if new_cursor else None, + hasMore=new_cursor.has_more_now if new_cursor else False, + ) + class GrapheneBackfillNotFoundError(graphene.ObjectType): class Meta: diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index 7cfe4a8515c3f..cd8639e9ca25a 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -16,6 +16,8 @@ from dagster._core.execution.asset_backfill import AssetBackfillData from dagster._core.instance import DagsterInstance from dagster._core.test_utils import ensure_dagster_tests_import, instance_for_test +from dagster._daemon import get_default_daemon_logger +from dagster._daemon.backfill import execute_backfill_iteration from dagster_graphql.client.query import LAUNCH_PARTITION_BACKFILL_MUTATION from dagster_graphql.test.utils import ( GqlResult, @@ -93,6 +95,23 @@ } """ +ASSET_BACKFILL_LOGS_QUERY = """ + query BackfillLogsByAsset($backfillId: String!) { + partitionBackfillOrError(backfillId: $backfillId) { + ... on PartitionBackfill { + logEvents { + events { + message + timestamp + level + } + cursor + } + } + } + } +""" + ASSET_BACKFILL_PREVIEW_QUERY = """ query assetBackfillPreview($params: AssetBackfillPreviewParams!) { assetBackfillPreview(params: $params) { @@ -939,3 +958,50 @@ def _get_error_message(launch_backfill_result: GqlResult) -> Optional[str]: if "message" in launch_backfill_result.data["launchPartitionBackfill"] else None ) + + +def test_backfill_logs(): + repo = get_repo() + all_asset_keys = repo.asset_graph.materializable_asset_keys + + with instance_for_test() as instance: + # need to override this method on the instance since it defaults ot False in OSS. When we enable this + # feature in OSS we can remove this override + def override_backfill_storage_setting(self): + return True + + instance.backfill_log_storage_enabled = override_backfill_storage_setting.__get__( + instance, DagsterInstance + ) + with define_out_of_process_context(__file__, "get_repo", instance) as context: + # launchPartitionBackfill + launch_backfill_result = execute_dagster_graphql( + context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": ["a", "b"], + "assetSelection": [key.to_graphql_input() for key in all_asset_keys], + } + }, + ) + backfill_id, asset_backfill_data = _get_backfill_data( + launch_backfill_result, instance, repo + ) + assert asset_backfill_data.target_subset.asset_keys == all_asset_keys + + list( + execute_backfill_iteration( + context.process_context, get_default_daemon_logger("BackfillDaemon") + ) + ) + + backfill_logs = execute_dagster_graphql( + context, + ASSET_BACKFILL_LOGS_QUERY, + variables={ + "backfillId": backfill_id, + }, + ) + + assert len(backfill_logs.data["partitionBackfillOrError"]["logEvents"]["events"]) > 0 diff --git a/python_modules/dagster/dagster/_core/captured_log_api.py b/python_modules/dagster/dagster/_core/captured_log_api.py new file mode 100644 index 0000000000000..a86e8b3f9df90 --- /dev/null +++ b/python_modules/dagster/dagster/_core/captured_log_api.py @@ -0,0 +1,38 @@ +import base64 +from typing import NamedTuple, Sequence + +from dagster._seven import json + + +class LogLineCursor(NamedTuple): + """Representation of a log line cursor, to keep track of the place in the logs. + The captured logs are stored in multiple files in the same direcotry. The cursor keeps + track of the file name and the number of lines read so far. + + line=-1 means that the entire file has been read and the next file should be read. This covers the + case when and entire file has been read, but the next file does not exist in storage yet. + line=0 means no lines from the file have been read. + line=n means lines 0 through n-1 have been read from the file. + + has_more_now indicates if there are more log lines that can be read immediately. If the process writing + logs is still running, but has not writen a log file, has_more_now will be False once all currently readable + log files have been read. It does not mean that no new logs will be written in the future. + """ + + log_key: Sequence[str] + line: int # maybe rename line_offset? + has_more_now: bool + + def __str__(self) -> str: + return self.to_string() + + def to_string(self) -> str: + raw = json.dumps( + {"log_key": self.log_key, "line": self.line, "has_more_now": self.has_more_now} + ) + return base64.b64encode(bytes(raw, encoding="utf-8")).decode("utf-8") + + @staticmethod + def parse(cursor_str: str) -> "LogLineCursor": + raw = json.loads(base64.b64decode(cursor_str).decode("utf-8")) + return LogLineCursor(raw["log_key"], raw["line"], raw["has_more_now"]) diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 4781c0a50f907..3e2431b673c1c 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -161,6 +161,10 @@ def partition_set_name(self) -> Optional[str]: return self.partition_set_origin.partition_set_name + @property + def log_storage_prefix(self) -> Sequence[str]: + return ["backfill", self.backfill_id] + @property def user(self) -> Optional[str]: if self.tags: diff --git a/python_modules/dagster/dagster/_core/storage/captured_log_manager.py b/python_modules/dagster/dagster/_core/storage/captured_log_manager.py index 618827d938cf0..c187884dabdfa 100644 --- a/python_modules/dagster/dagster/_core/storage/captured_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/captured_log_manager.py @@ -1,10 +1,12 @@ +import os from abc import ABC, abstractmethod from contextlib import contextmanager -from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence +from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence, Tuple from typing_extensions import Final, Self import dagster._check as check +from dagster._core.captured_log_api import LogLineCursor from dagster._core.storage.compute_log_manager import ComputeIOType MAX_BYTES_CHUNK_READ: Final = 4194304 # 4 MB @@ -258,3 +260,91 @@ def unsubscribe(self, subscription: CapturedLogSubscription) -> None: def build_log_key_for_run(self, run_id: str, step_key: str) -> Sequence[str]: """Legacy adapter to translate run_id/key to captured log manager-based log_key.""" return [run_id, "compute_logs", step_key] + + def get_log_keys_for_log_key_prefix( + self, log_key_prefix: Sequence[str] + ) -> Sequence[Sequence[str]]: + """Returns the logs keys for a given log key prefix. This is determined by looking at the + directory defined by the log key prefix and creating a log_key for each file in the directory. + """ + # NOTE: This method was introduced to support backfill logs, which are always stored as .err files. + # Thus the implementations only look for .err files when determining the log_keys. If other file extensions + # need to be supported, an io_type parameter will need to be added to this method + raise NotImplementedError("Must implement get_log_keys_for_log_key_prefix") + + def _get_log_lines_for_log_key(self, log_key: Sequence[str]) -> Sequence[str]: + """For a log key, gets the corresponding file, and splits the file into lines.""" + log_data = self.get_log_data(log_key) + # Note: This method was implemented to support backfill logs, which are always stored as .err files. + # If other file extensions need to be supported, this method will need to be updated to look at the + # correct part of log_data based on an io_type parameter + raw_logs = log_data.stderr.decode("utf-8") if log_data.stderr else "" + log_lines = raw_logs.split("\n") + + return log_lines + + def read_log_lines_for_log_key_prefix( + self, log_key_prefix: Sequence[str], cursor: Optional[str], num_lines: int = 100 + ) -> Tuple[Sequence[str], Optional[LogLineCursor]]: + """For a given directory defined by log_key_prefix that contains files, read the logs from the files + as if they are a single continuous file. Reads num_lines lines at a time. Returns the lines read and the next cursor. + + Note that the has_more_now attribute of the cursor indicates if there are more logs that can be read immediately. + If has_more_now if False, the process producing logs could still be running and dump more logs into the + directory at a later time. + """ + num_lines = int(os.getenv("DAGSTER_CAPTURED_LOG_CHUNK_SIZE", "1000")) + # find all of the log_keys to read from and sort them in the order to be read + log_keys = sorted( + self.get_log_keys_for_log_key_prefix(log_key_prefix), key=lambda x: "/".join(x) + ) + if len(log_keys) == 0: + return [], None + + log_cursor = LogLineCursor.parse(cursor) if cursor else None + if log_cursor is None: + log_key_to_fetch_idx = 0 + line_cursor = 0 + else: + log_key_to_fetch_idx = log_keys.index(log_cursor.log_key) + line_cursor = log_cursor.line + + if line_cursor == -1: + # line_cursor for -1 means the entirety of the file has been read, but the next file + # didn't exist yet. So we see if a new file has been added. + # if the next file doesn't exist yet, return + if log_key_to_fetch_idx + 1 >= len(log_keys): + return [], log_cursor + log_key_to_fetch_idx += 1 + line_cursor = 0 + + log_lines = self._get_log_lines_for_log_key(log_keys[log_key_to_fetch_idx]) + records = [] + has_more = True + + while len(records) < num_lines: + remaining_log_lines = log_lines[line_cursor:] + remaining_lines_to_fetch = num_lines - len(records) + if remaining_lines_to_fetch < len(remaining_log_lines): + records.extend(remaining_log_lines[:remaining_lines_to_fetch]) + line_cursor += remaining_lines_to_fetch + else: + records.extend(remaining_log_lines) + line_cursor = -1 + + if line_cursor == -1: + # we've read the entirety of the file, update the cursor + if log_key_to_fetch_idx + 1 >= len(log_keys): + # no more files to process + has_more = False + break + log_key_to_fetch_idx += 1 + line_cursor = 0 + if len(records) < num_lines: + # we still need more records, so fetch the next file + log_lines = self._get_log_lines_for_log_key(log_keys[log_key_to_fetch_idx]) + + new_cursor = LogLineCursor( + log_key=log_keys[log_key_to_fetch_idx], line=line_cursor, has_more_now=has_more + ) + return records, new_cursor diff --git a/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py index e85e379d30292..b03f0562ccbcb 100644 --- a/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py @@ -243,6 +243,27 @@ def subscribe( def unsubscribe(self, subscription): self.on_unsubscribe(subscription) + def get_log_keys_for_log_key_prefix( + self, log_key_prefix: Sequence[str] + ) -> Sequence[Sequence[str]]: + """Returns the logs keys for a given log key prefix. This is determined by looking at the + directory defined by the log key prefix and creating a log_key for each file in the directory. + """ + base_dir_path = Path(self._base_dir).resolve() + directory = base_dir_path.joinpath(*log_key_prefix) + objects = directory.iterdir() + results = [] + list_key_prefix = list(log_key_prefix) + + for obj in objects: + # Note: This method was implemented to support backfill logs, which are always stored as .err files. + # If other file extensions need to be supported, this method will need to be updated to look at the + # correct part of log_data based on an io_type parameter + if obj.is_file() and obj.suffix == "." + IO_TYPE_EXTENSION[ComputeIOType.STDERR]: + results.append(list_key_prefix + [obj.stem]) + + return results + ############################################### # # Methods for the ComputeLogManager interface diff --git a/python_modules/dagster/dagster/_daemon/backfill.py b/python_modules/dagster/dagster/_daemon/backfill.py index 08f8143717bae..39d6cb8db2bb2 100644 --- a/python_modules/dagster/dagster/_daemon/backfill.py +++ b/python_modules/dagster/dagster/_daemon/backfill.py @@ -16,16 +16,16 @@ @contextmanager def _get_instigation_logger_if_log_storage_enabled( - instance, backfill_id: str, default_logger: logging.Logger + instance, backfill: PartitionBackfill, default_logger: logging.Logger ): if instance.backfill_log_storage_enabled(): evaluation_time = pendulum.now("UTC") - log_key = ["backfill", backfill_id, evaluation_time.strftime("%Y%m%d_%H%M%S")] + log_key = [*backfill.log_storage_prefix, evaluation_time.strftime("%Y%m%d_%H%M%S")] with InstigationLogger( log_key, instance, repository_name=None, - name=backfill_id, + name=backfill.backfill_id, ) as _logger: backfill_logger = cast(logging.Logger, _logger) yield backfill_logger @@ -69,9 +69,7 @@ def execute_backfill_jobs( # refetch, in case the backfill was updated in the meantime backfill = cast(PartitionBackfill, instance.get_backfill(backfill_id)) - with _get_instigation_logger_if_log_storage_enabled( - instance, backfill.backfill_id, logger - ) as _logger: + with _get_instigation_logger_if_log_storage_enabled(instance, backfill, logger) as _logger: # create a logger that will always include the backfill_id as an `extra` backfill_logger = cast( logging.Logger, diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 9d410d35b5d4a..9dbd8386ca5f9 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -1,3 +1,4 @@ +import json import os import random import string @@ -24,6 +25,7 @@ Nothing, Out, StaticPartitionMapping, + _seven, asset, daily_partitioned_config, define_asset_job, @@ -51,6 +53,7 @@ InProcessCodeLocationOrigin, RemoteRepositoryOrigin, ) +from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._core.storage.dagster_run import IN_PROGRESS_RUN_STATUSES, DagsterRunStatus, RunsFilter from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, @@ -2272,3 +2275,88 @@ def test_old_dynamic_partitions_job_backfill( list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) assert instance.get_runs_count() == 4 + + +def test_asset_backfill_logs( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + external_repo: ExternalRepository, +): + # need to override this method on the instance since it defaults ot False in OSS. When we enable this + # feature in OSS we can remove this override + def override_backfill_storage_setting(self): + return True + + instance.backfill_log_storage_enabled = override_backfill_storage_setting.__get__( + instance, DagsterInstance + ) + + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id="backfill_with_asset_selection", + tags={"custom_tag_key": "custom_tag_value"}, + backfill_timestamp=pendulum.now().timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill("backfill_with_asset_selection") + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=15) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=15) + + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "20" + + cm = instance.compute_log_manager + + assert isinstance(cm, CapturedLogManager) + + logs, cursor = cm.read_log_lines_for_log_key_prefix( + ["backfill", backfill.backfill_id], cursor=None + ) + assert cursor is not None + assert logs + for log_line in logs: + if not log_line: + continue + try: + record_dict = _seven.json.loads(log_line) + except json.JSONDecodeError: + continue + assert record_dict.get("msg") + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill("backfill_with_asset_selection") + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED + + # set num_lines high so we know we get all of the remaining logs + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "100" + logs, cursor = cm.read_log_lines_for_log_key_prefix( + ["backfill", backfill.backfill_id], + cursor=cursor.to_string(), + ) + + assert cursor is not None + assert not cursor.has_more_now + for log_line in logs: + if not log_line: + continue + try: + record_dict = _seven.json.loads(log_line) + except json.JSONDecodeError: + continue + assert record_dict.get("msg") diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py index 6d9c56cb20470..fca964e3fa358 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py @@ -1,12 +1,15 @@ +import os import sys import tempfile from contextlib import contextmanager from typing import Any, Generator, Mapping, Sequence +import pendulum import pytest from dagster import job, op from dagster._core.events import DagsterEventType from dagster._core.storage.captured_log_manager import CapturedLogContext +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager from dagster._core.test_utils import instance_for_test @@ -86,3 +89,118 @@ def my_job(): assert ( entry.dagster_event.logs_captured_data.external_stderr_url == "https://fake.com/stderr" ) + + +def test_get_log_keys_for_log_key_prefix(): + with tempfile.TemporaryDirectory() as tmpdir_path: + cm = LocalComputeLogManager(tmpdir_path) + evaluation_time = pendulum.now() + log_key_prefix = ["test_log_bucket", evaluation_time.strftime("%Y%m%d_%H%M%S")] + + def write_log_file(file_id: int): + full_log_key = [*log_key_prefix, f"{file_id}"] + with cm.open_log_stream(full_log_key, ComputeIOType.STDERR) as f: + f.write("foo") + + for i in range(4): + write_log_file(i) + + log_keys = cm.get_log_keys_for_log_key_prefix(log_key_prefix) + assert sorted(log_keys) == [ + [*log_key_prefix, "0"], + [*log_key_prefix, "1"], + [*log_key_prefix, "2"], + [*log_key_prefix, "3"], + ] + + +def test_read_log_lines_for_log_key_prefix(): + """Tests that we can read a sequence of files in a bucket as if they are a single file.""" + with tempfile.TemporaryDirectory() as tmpdir_path: + cm = LocalComputeLogManager(tmpdir_path) + evaluation_time = pendulum.now() + log_key_prefix = ["test_log_bucket", evaluation_time.strftime("%Y%m%d_%H%M%S")] + + all_logs = [] + + def write_log_file(file_id: int): + full_log_key = [*log_key_prefix, f"{file_id}"] + with cm.open_log_stream(full_log_key, ComputeIOType.STDERR) as f: + num_lines = 10 + for j in range(num_lines): + msg = f"file: {file_id}, line: {j}" + all_logs.append(msg) + f.write(msg) + if j < num_lines - 1: + f.write("\n") + + for i in range(4): + write_log_file(i) + + all_logs_iter = iter(all_logs) + + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "10" + # read the entirety of the first file + log_lines, cursor = cm.read_log_lines_for_log_key_prefix(log_key_prefix, cursor=None) + assert len(log_lines) == 10 + assert cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "1"] + assert cursor.line == 0 + for ll in log_lines: + assert ll == next(all_logs_iter) + + # read half of the next log file + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "5" + log_lines, cursor = cm.read_log_lines_for_log_key_prefix( + log_key_prefix, + cursor=cursor.to_string(), + ) + assert len(log_lines) == 5 + assert cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "1"] + assert cursor.line == 5 + for ll in log_lines: + assert ll == next(all_logs_iter) + + # read the next ten lines, five will be in the second file, five will be in the third + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "10" + log_lines, cursor = cm.read_log_lines_for_log_key_prefix( + log_key_prefix, cursor=cursor.to_string() + ) + assert len(log_lines) == 10 + assert cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "2"] + assert cursor.line == 5 + for ll in log_lines: + assert ll == next(all_logs_iter) + + # read the remaining 15 lines, but request 20 + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "20" + log_lines, cursor = cm.read_log_lines_for_log_key_prefix( + log_key_prefix, + cursor=cursor.to_string(), + ) + assert len(log_lines) == 15 + assert not cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "3"] + # processed up to the end of the file, but there is not another file to process so cursor should be -1 + assert cursor.line == -1 + for ll in log_lines: + assert ll == next(all_logs_iter) + + # write a final log file + + write_log_file(4) + + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "15" + log_lines, cursor = cm.read_log_lines_for_log_key_prefix( + log_key_prefix, + cursor=cursor.to_string(), + ) + assert len(log_lines) == 10 + assert not cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "4"] + # processed up to the end of the file, but there is not another file to process so cursor should be -1 + assert cursor.line == -1 + for ll in log_lines: + assert ll == next(all_logs_iter)