Skip to content

Commit

Permalink
Ensure telemetry logging dir is created before using logger (#7192)
Browse files Browse the repository at this point in the history
* Rename method to reflect stateful action, ensure directory is created upon logger initialization

* Re-use call to get dagster home directory

* Dump existing logger if file is destroyed while logger is still open
  • Loading branch information
dpeng817 committed Apr 1, 2022
1 parent 4115505 commit 96d2a25
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
21 changes: 17 additions & 4 deletions python_modules/dagster/dagster/core/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def _dagster_home_if_set():
return os.path.expanduser(dagster_home_path)


def get_dir_from_dagster_home(target_dir):
def get_or_create_dir_from_dagster_home(target_dir):
"""
If $DAGSTER_HOME is set, return $DAGSTER_HOME/<target_dir>/
Otherwise, return ~/.dagster/<target_dir>/
Expand Down Expand Up @@ -275,11 +275,24 @@ def _check_telemetry_instance_param(args, kwargs, instance_index):


def _get_telemetry_logger():

# If a concurrently running process deleted the logging directory since the
# last action, we need to make sure to re-create the directory
# (the logger does not do this itself.)
dagster_home_path = get_or_create_dir_from_dagster_home("logs")

logging_file_path = os.path.join(dagster_home_path, "event.log")
logger = logging.getLogger("dagster_telemetry_logger")

# If the file we were writing to has been overwritten, dump the existing logger and re-open the stream.
if not os.path.exists(logging_file_path) and len(logger.handlers) > 0:
handler = next(iter(logger.handlers))
handler.close()
logger.removeHandler(handler)

if len(logger.handlers) == 0:
handler = RotatingFileHandler(
os.path.join(get_dir_from_dagster_home("logs"), "event.log"),
logging_file_path,
maxBytes=MAX_BYTES,
backupCount=10,
)
Expand Down Expand Up @@ -334,7 +347,7 @@ def _get_or_set_instance_id():

# Gets the instance_id at $DAGSTER_HOME/.telemetry/id.yaml
def _get_telemetry_instance_id():
telemetry_id_path = os.path.join(get_dir_from_dagster_home(TELEMETRY_STR), "id.yaml")
telemetry_id_path = os.path.join(get_or_create_dir_from_dagster_home(TELEMETRY_STR), "id.yaml")
if not os.path.exists(telemetry_id_path):
return

Expand All @@ -352,7 +365,7 @@ def _set_telemetry_instance_id():
click.secho(TELEMETRY_TEXT)
click.secho(SLACK_PROMPT)

telemetry_id_path = os.path.join(get_dir_from_dagster_home(TELEMETRY_STR), "id.yaml")
telemetry_id_path = os.path.join(get_or_create_dir_from_dagster_home(TELEMETRY_STR), "id.yaml")
instance_id = str(uuid.uuid4())

try: # In case we encounter an error while writing to user's file system
Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster/dagster/core/telemetry_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import zlib
from contextlib import contextmanager

from .telemetry import MAX_BYTES, get_dir_from_dagster_home
from .telemetry import MAX_BYTES, get_or_create_dir_from_dagster_home


def get_dagster_telemetry_url():
Expand Down Expand Up @@ -45,8 +45,8 @@ def upload_logs(stop_event, raise_errors=False):

try:
last_run = datetime.datetime.now() - datetime.timedelta(minutes=120)
dagster_log_dir = get_dir_from_dagster_home("logs")
dagster_log_queue_dir = get_dir_from_dagster_home(".logs_queue")
dagster_log_dir = get_or_create_dir_from_dagster_home("logs")
dagster_log_queue_dir = get_or_create_dir_from_dagster_home(".logs_queue")
in_progress = False
while not stop_event.is_set():
log_size = 0
Expand Down Expand Up @@ -75,8 +75,8 @@ def upload_logs(stop_event, raise_errors=False):
):
in_progress = True # Prevent concurrent _upload_logs invocations
last_run = datetime.datetime.now()
dagster_log_dir = get_dir_from_dagster_home("logs")
dagster_log_queue_dir = get_dir_from_dagster_home(".logs_queue")
dagster_log_dir = get_or_create_dir_from_dagster_home("logs")
dagster_log_queue_dir = get_or_create_dir_from_dagster_home(".logs_queue")
_upload_logs(
dagster_log_dir, log_size, dagster_log_queue_dir, raise_errors=raise_errors
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
from dagster.core.definitions.reconstruct import get_ephemeral_repository_name
from dagster.core.telemetry import (
UPDATE_REPO_STATS,
get_dir_from_dagster_home,
get_or_create_dir_from_dagster_home,
hash_name,
log_workspace_stats,
write_telemetry_log_line,
)
from dagster.core.test_utils import instance_for_test
from dagster.core.test_utils import environ, instance_for_test
from dagster.core.workspace.load import load_workspace_process_context_from_yaml_paths
from dagster.utils import file_relative_path, pushd, script_relative_path

Expand Down Expand Up @@ -83,7 +84,9 @@ def test_dagster_telemetry_disabled(caplog):
],
)

assert not os.path.exists(os.path.join(get_dir_from_dagster_home("logs"), "event.log"))
assert not os.path.exists(
os.path.join(get_or_create_dir_from_dagster_home("logs"), "event.log")
)
assert len(caplog.records) == 0
assert result.exit_code == 0

Expand Down Expand Up @@ -181,3 +184,21 @@ def test_hash_name():
assert SequenceMatcher(None, hashes[0], hashes[1]).ratio() < 0.4
assert SequenceMatcher(None, hashes[0], hashes[2]).ratio() < 0.4
assert SequenceMatcher(None, hashes[1], hashes[2]).ratio() < 0.4


def test_write_telemetry_log_line_writes_to_dagster_home():
# Ensures that if logging directory is deleted between writes, it can be re-created without failure.
with tempfile.TemporaryDirectory() as temp_dir:
with environ({"DAGSTER_HOME": temp_dir}):
write_telemetry_log_line({"foo": "bar"})
with open(os.path.join(temp_dir, "logs/event.log"), "r") as f:
res = json.load(f)
assert res == {"foo": "bar"}

os.remove(os.path.join(temp_dir, "logs/event.log"))
os.rmdir(os.path.join(temp_dir, "logs"))

write_telemetry_log_line({"foo": "bar"})
with open(os.path.join(temp_dir, "logs/event.log"), "r") as f:
res = json.load(f)
assert res == {"foo": "bar"}

0 comments on commit 96d2a25

Please sign in to comment.