Skip to content

Commit

Permalink
tracks helpers usage and source names (#497)
Browse files Browse the repository at this point in the history
* tracks helper usage

* tracks source names

* adds source names to extract info

* bumps version to 0.3.4

* uses typed dict for sources info
  • Loading branch information
rudolfix committed Jul 17, 2023
1 parent fb9dabf commit 15c32da
Show file tree
Hide file tree
Showing 14 changed files with 174 additions and 74 deletions.
42 changes: 3 additions & 39 deletions dlt/cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import ast
import inspect
import os
import tempfile
import time
import contextlib
from typing import Any, Callable, Tuple
from typing import Callable

from dlt.common import git
from dlt.common.reflection.utils import set_ast_parents
from dlt.common.storages import FileStorage
from dlt.common.typing import TFun
from dlt.common.runtime.telemetry import start_telemetry
from dlt.common.runtime.segment import track
from dlt.common.configuration import resolve_configuration
from dlt.common.configuration.specs import RunConfiguration
from dlt.common.runtime.telemetry import with_telemetry

from dlt.reflection.script_visitor import PipelineScriptVisitor

Expand Down Expand Up @@ -62,39 +58,7 @@ def ensure_git_command(command: str) -> None:


def track_command(command: str, track_before: bool, *args: str) -> Callable[[TFun], TFun]:
"""Adds telemetry to f: TFun and add optional f *args values to `properties` of telemetry event"""
def decorator(f: TFun) -> TFun:
sig: inspect.Signature = inspect.signature(f)
def _wrap(*f_args: Any, **f_kwargs: Any) -> Any:
# look for additional arguments
bound_args = sig.bind(*f_args, **f_kwargs)
props = {p:bound_args.arguments[p] for p in args if p in bound_args.arguments}
start_ts = time.time()

def _track(success: bool) -> None:
with contextlib.suppress(Exception):
props["elapsed"] = time.time() - start_ts
props["success"] = success
# resolve runtime config and init telemetry
c = resolve_configuration(RunConfiguration())
start_telemetry(c)
track("command", command, props)

# some commands should be tracked before execution
if track_before:
_track(True)
return f(*f_args, **f_kwargs)
# some commands we track after, where we can pass the success
try:
rv = f(*f_args, **f_kwargs)
_track(rv == 0)
return rv
except Exception:
_track(False)
raise

return _wrap # type: ignore
return decorator
return with_telemetry("command", command, track_before, *args)


def get_telemetry_status() -> bool:
Expand Down
12 changes: 10 additions & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import datetime # noqa: 251
import humanize
import inspect
import contextlib
from typing import Any, Callable, ClassVar, Dict, List, NamedTuple, Optional, Protocol, Sequence, TYPE_CHECKING, Tuple, TypedDict

Expand All @@ -25,8 +24,16 @@
from dlt.common.data_writers.writers import TLoaderFileFormat


class ExtractDataInfo(TypedDict):
name: str
data_type: str


class ExtractInfo(NamedTuple):
"""A tuple holding information on extracted data items. Returned by pipeline `extract` method."""

extract_data_info: List[ExtractDataInfo]

def asdict(self) -> DictStrAny:
return {}

Expand Down Expand Up @@ -209,7 +216,8 @@ def __call__(
table_name: str = None,
write_disposition: TWriteDisposition = None,
columns: Sequence[TColumnSchema] = None,
schema: Schema = None
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None
) -> LoadInfo:
...

Expand Down
6 changes: 4 additions & 2 deletions dlt/common/runtime/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
from dlt.common.configuration.paths import get_dlt_data_dir

from dlt.common.runtime import logger

from dlt.common.configuration.specs import RunConfiguration
from dlt.common.runtime.exec_info import exec_info_names, in_continuous_integration
from dlt.common.typing import DictStrAny, StrAny
from dlt.common.utils import uniq_id
from dlt.version import __version__, DLT_PKG_NAME

TEventCategory = Literal["pipeline", "command"]
TEventCategory = Literal["pipeline", "command", "helper"]

_THREAD_POOL: ThreadPoolExecutor = None
_SESSION: requests.Session = None
Expand Down Expand Up @@ -202,9 +203,10 @@ def _send_event(
headers = _segment_request_header(_WRITE_KEY)

def _future_send() -> None:
# import time
# start_ts = time.time()
resp = _SESSION.post(_SEGMENT_ENDPOINT, headers=headers, json=payload, timeout=_SEGMENT_REQUEST_TIMEOUT)
# print(f"sending to Segment done {resp.status_code} {time.time() - start_ts}")
# print(f"SENDING TO Segment done {resp.status_code} {time.time() - start_ts} {base64.b64decode(_WRITE_KEY)}")
# handle different failure cases
if resp.status_code != 200:
logger.debug(
Expand Down
69 changes: 61 additions & 8 deletions dlt/common/runtime/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import time
import contextlib
import inspect
from typing import Any, Callable

from dlt.common.configuration.specs import RunConfiguration
from dlt.common.runtime.segment import init_segment, disable_segment
from dlt.common.typing import TFun
from dlt.common.configuration import resolve_configuration
from dlt.common.runtime.segment import TEventCategory, init_segment, disable_segment, track

from dlt.common.runtime.sentry import init_sentry, disable_sentry


_TELEMETRY_ENABLED = False
_TELEMETRY_STARTED = False


def start_telemetry(config: RunConfiguration) -> None:
# enable telemetry only once

global _TELEMETRY_ENABLED
if _TELEMETRY_ENABLED:
global _TELEMETRY_STARTED
if _TELEMETRY_STARTED:
return

if config.sentry_dsn:
Expand All @@ -20,15 +27,61 @@ def start_telemetry(config: RunConfiguration) -> None:
if config.dlthub_telemetry:
init_segment(config)

_TELEMETRY_ENABLED = True
_TELEMETRY_STARTED = True


def stop_telemetry() -> None:
global _TELEMETRY_ENABLED
if not _TELEMETRY_ENABLED:
global _TELEMETRY_STARTED
if not _TELEMETRY_STARTED:
return

disable_sentry()
disable_segment()

_TELEMETRY_ENABLED = False
_TELEMETRY_STARTED = False


def is_telemetry_started() -> bool:
return _TELEMETRY_STARTED


def with_telemetry(category: TEventCategory, command: str, track_before: bool, *args: str) -> Callable[[TFun], TFun]:
"""Adds telemetry to f: TFun and add optional f *args values to `properties` of telemetry event"""
def decorator(f: TFun) -> TFun:
sig: inspect.Signature = inspect.signature(f)
def _wrap(*f_args: Any, **f_kwargs: Any) -> Any:
# look for additional arguments
bound_args = sig.bind(*f_args, **f_kwargs)
props = {p:bound_args.arguments[p] for p in args if p in bound_args.arguments}
start_ts = time.time()

def _track(success: bool) -> None:
with contextlib.suppress(Exception):
props["elapsed"] = time.time() - start_ts
props["success"] = success
# resolve runtime config and init telemetry
if not _TELEMETRY_STARTED:
c = resolve_configuration(RunConfiguration())
start_telemetry(c)
track(category, command, props)

# some commands should be tracked before execution
if track_before:
_track(True)
return f(*f_args, **f_kwargs)
# some commands we track after, where we can pass the success
try:
rv = f(*f_args, **f_kwargs)
# if decorated function returns int, 0 is a success - used to track dlt commands
if isinstance(rv, int):
success = rv == 0
else:
success = True
_track(success)
return rv
except Exception:
_track(False)
raise

return _wrap # type: ignore
return decorator
3 changes: 2 additions & 1 deletion dlt/helpers/airflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from tenacity import retry_if_exception, wait_exponential, stop_after_attempt, Retrying, RetryCallState

from dlt.common.exceptions import MissingDependencyException
from dlt.common.runtime.telemetry import with_telemetry

try:
from airflow.configuration import conf
from airflow.utils.task_group import TaskGroup
#from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.python import get_current_context
except ImportError:
Expand Down Expand Up @@ -118,6 +118,7 @@ def __init__(
if ConfigProvidersContext in Container():
del Container()[ConfigProvidersContext]

@with_telemetry("helper", "airflow_add_run", False, "decompose")
def add_run(
self,
pipeline: Pipeline,
Expand Down
3 changes: 3 additions & 0 deletions dlt/helpers/dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from dlt.helpers.dbt.configuration import DBTRunnerConfiguration
from dlt.helpers.dbt.exceptions import IncrementalSchemaOutOfSyncError, PrerequisitesException, DBTNodeResult, DBTProcessingError

from dlt.common.runtime.telemetry import with_telemetry


class DBTPackageRunner:
"""A Python wrapper over a dbt package
Expand Down Expand Up @@ -256,6 +258,7 @@ def run_all(self,
raise


@with_telemetry("helper", "dbt_create_runner", False, "package_profile_name")
@with_config(spec=DBTRunnerConfiguration, sections=(known_sections.DBT_PACKAGE_RUNNER,))
def create_runner(
venv: Venv,
Expand Down
9 changes: 4 additions & 5 deletions dlt/pipeline/helpers.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import contextlib
from typing import Callable, Sequence, Iterable, Optional, Any, List, Iterator, Dict, Union, TypedDict
from typing import Callable, Sequence, Iterable, Optional, Any, List, Dict, Tuple, Union, TypedDict
from itertools import chain

from dlt.common.jsonpath import resolve_paths, TAnyJsonPath, compile_paths

from dlt.common.exceptions import TerminalException
from dlt.common.schema.utils import get_child_tables, group_tables_by_resource, compile_simple_regexes, compile_simple_regex
from dlt.common.schema.utils import group_tables_by_resource, compile_simple_regexes, compile_simple_regex
from dlt.common.schema.typing import TSimpleRegex
from dlt.common.typing import REPattern
from dlt.destinations.exceptions import DatabaseUndefinedRelation
from dlt.common.pipeline import TSourceState, _reset_resource_state, _sources_state, _delete_source_state_keys, _get_matching_resources

from dlt.destinations.exceptions import DatabaseUndefinedRelation
from dlt.pipeline.exceptions import PipelineStepFailed, PipelineHasPendingDataException
from dlt.pipeline.typing import TPipelineStep
from dlt.pipeline import Pipeline
from dlt.common.pipeline import TSourceState, _reset_resource_state, _sources_state, _delete_source_state_keys, _get_matching_resources


def retry_load(retry_on_pipeline_steps: Sequence[TPipelineStep] = ("load",)) -> Callable[[BaseException], bool]:
Expand Down
9 changes: 4 additions & 5 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dlt.common.storages import LiveSchemaStorage, NormalizeStorage, LoadStorage, SchemaStorage, FileStorage, NormalizeStorageConfiguration, SchemaStorageConfiguration, LoadStorageConfiguration
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import DestinationReference, JobClientBase, DestinationClientConfiguration, DestinationClientDwhConfiguration, TDestinationReferenceArg, DestinationClientStagingConfiguration, DestinationClientDwhConfiguration
from dlt.common.destination.capabilities import INTERNAL_LOADER_FILE_FORMATS
from dlt.common.pipeline import ExtractInfo, LoadInfo, NormalizeInfo, PipelineContext, SupportsPipeline, TPipelineLocalState, TPipelineState, StateInjectableContext
from dlt.common.schema import Schema
from dlt.common.utils import is_interactive
Expand All @@ -44,12 +45,10 @@
from dlt.pipeline.configuration import PipelineConfiguration
from dlt.pipeline.progress import _Collector, _NULL_COLLECTOR
from dlt.pipeline.exceptions import CannotRestorePipelineException, InvalidPipelineName, PipelineConfigMissing, PipelineNotActive, PipelineStepFailed, SqlClientNotAvailable
from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, load_trace, merge_traces, start_trace, start_trace_step, end_trace_step, end_trace
from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, load_trace, merge_traces, start_trace, start_trace_step, end_trace_step, end_trace, describe_extract_data
from dlt.pipeline.typing import TPipelineStep
from dlt.pipeline.state_sync import STATE_ENGINE_VERSION, load_state_from_destination, merge_state_if_changed, migrate_state, state_resource, json_encode_state, json_decode_state

from dlt.common.destination.capabilities import INTERNAL_LOADER_FILE_FORMATS


def with_state_sync(may_extract_state: bool = False) -> Callable[[TFun], TFun]:

Expand Down Expand Up @@ -285,10 +284,10 @@ def extract(
# TODO: if we fail here we should probably wipe out the whole extract folder
for extract_id in extract_ids:
storage.commit_extract_files(extract_id)
return ExtractInfo()
return ExtractInfo(describe_extract_data(data))
except Exception as exc:
# TODO: provide metrics from extractor
raise PipelineStepFailed(self, "extract", exc, ExtractInfo()) from exc
raise PipelineStepFailed(self, "extract", exc, ExtractInfo(describe_extract_data(data))) from exc

@with_runtime_trace
@with_schemas_sync
Expand Down
39 changes: 36 additions & 3 deletions dlt/pipeline/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
import pickle
import datetime # noqa: 251
import dataclasses
from typing import Any, List, NamedTuple, Optional, Protocol, Sequence

from collections.abc import Sequence as C_Sequence
from typing import Any, List, Tuple, NamedTuple, Optional, Protocol, Sequence
import humanize

from dlt.common import pendulum
from dlt.common.runtime.logger import suppress_and_warn
from dlt.common.configuration import is_secret_hint
from dlt.common.configuration.utils import _RESOLVED_TRACES
from dlt.common.pipeline import SupportsPipeline
from dlt.common.pipeline import ExtractDataInfo, SupportsPipeline
from dlt.common.typing import StrAny
from dlt.common.utils import uniq_id

from dlt.extract.source import DltResource, DltSource
from dlt.pipeline.typing import TPipelineStep
from dlt.pipeline.exceptions import PipelineStepFailed

Expand Down Expand Up @@ -212,3 +213,35 @@ def load_trace(trace_path: str) -> PipelineTrace:
except (AttributeError, FileNotFoundError):
# on incompatible pickling / file not found return no trace
return None


def describe_extract_data(data: Any) -> List[ExtractDataInfo]:
"""Extract source and resource names from data passed to extract"""
data_info: List[ExtractDataInfo] = []

def add_item(item: Any) -> bool:
if isinstance(item, (DltResource, DltSource)):
# record names of sources/resources
data_info.append({
"name": item.name,
"data_type": "resource" if isinstance(item, DltResource) else "source"
})
return False
else:
# anything else
data_info.append({
"name": "",
"data_type": type(item).__name__
})
return True

item: Any = data
if isinstance(data, C_Sequence) and len(data) > 0:
for item in data:
# add_item returns True if non named item was returned. in that case we break
if add_item(item):
break
return data_info

add_item(item)
return data_info
Loading

0 comments on commit 15c32da

Please sign in to comment.