Skip to content

Commit

Permalink
testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed May 30, 2024
1 parent e56032a commit 931f345
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ def subscribe(
def unsubscribe(self, subscription):
self.on_unsubscribe(subscription)

def get_log_keys_for_log_key_prefix(self, log_key_prefix: Sequence[str]) -> Sequence[str]:
def get_log_keys_for_log_key_prefix(
self, log_key_prefix: Sequence[str]
) -> Sequence[Sequence[str]]:
"""Returns the logs keys for a given log key prefix. This is determined by looking at the
directory defined by the log key prefix and creating a log_key for each file in the directory.
"""
Expand Down
67 changes: 67 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import random
import string
Expand Down Expand Up @@ -33,6 +34,7 @@
op,
repository,
)
from dagster._core.captured_log_api import LOG_STREAM_COMPLETED_SIGIL
from dagster._core.definitions import (
StaticPartitionsDefinition,
)
Expand Down Expand Up @@ -2281,3 +2283,68 @@ def test_old_dynamic_partitions_job_backfill(
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))

assert instance.get_runs_count() == 4


def test_asset_backfill_logs(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
external_repo: ExternalRepository,
):
# need to override this method on the instance since it defaults ot False in OSS. When we enable this
# feature in OSS we can remove this override
def override_backfill_storage_setting(self):
return True

instance.backfill_log_storage_enabled = override_backfill_storage_setting.__get__(
instance, DagsterInstance
)

partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
backfill_id="backfill_with_asset_selection",
tags={"custom_tag_key": "custom_tag_value"},
backfill_timestamp=pendulum.now().timestamp(),
asset_selection=asset_selection,
partition_names=partition_keys,
dynamic_partitions_store=instance,
all_partitions=False,
title=None,
description=None,
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill("backfill_with_asset_selection")
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 3
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 3
wait_for_all_runs_to_finish(instance, timeout=30)

logs, cursor = instance.compute_log_manager.read_json_log_lines_for_log_key_prefix(
["backfill", backfill.backfill_id], cursor=None, num_lines=15
)
assert logs
for log_line in logs:
assert log_line.get("msg")

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_with_asset_selection")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED

# set num_lines hihg so we know we get all of the remaining logs
logs, cursor = instance.compute_log_manager.read_json_log_lines_for_log_key_prefix(
["backfill", backfill.backfill_id], cursor=cursor.to_string(), num_lines=100
)

assert not cursor.has_more
for log_line in logs:
assert log_line.get("msg")

assert LOG_STREAM_COMPLETED_SIGIL not in json.dumps(logs[-1])
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,29 @@ def my_job():
)


def test_get_log_keys_for_log_key_prefix():
with tempfile.TemporaryDirectory() as tmpdir_path:
cm = LocalComputeLogManager(tmpdir_path)
evaluation_time = pendulum.now()
log_key_prefix = ["test_log_bucket", evaluation_time.strftime("%Y%m%d_%H%M%S")]

def write_log_file(file_id: int):
full_log_key = [*log_key_prefix, f"{file_id}"]
with cm.open_log_stream(full_log_key, ComputeIOType.STDERR) as f:
f.write("foo")

for i in range(4):
write_log_file(i)

log_keys = cm.get_log_keys_for_log_key_prefix(log_key_prefix)
assert log_keys == [
[*[log_key_prefix], "0"],
[*[log_key_prefix], "1"],
[*[log_key_prefix], "2"],
[*[log_key_prefix], "3"],
]


def test_get_json_log_lines_for_log_key_prefix():
"""Tests that we can read a sequence of files in a bucket as if they are a single file."""
with tempfile.TemporaryDirectory() as tmpdir_path:
Expand Down

0 comments on commit 931f345

Please sign in to comment.