Skip to content

Commit

Permalink
Merge pull request #511 from dlt-hub/rfix/adds-destination-fingerprint
Browse files Browse the repository at this point in the history
adds destination fingerprint
  • Loading branch information
rudolfix committed Jul 23, 2023
2 parents 449fe7e + 5b4c851 commit 95644ec
Show file tree
Hide file tree
Showing 22 changed files with 114 additions and 22 deletions.
5 changes: 2 additions & 3 deletions dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@

from dlt.common.configuration.specs.base_configuration import BaseConfiguration, CredentialsConfiguration, is_secret_hint, extract_inner_hint, is_context_inner_hint, is_base_configuration_inner_hint, is_valid_hint
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
from dlt.common.configuration.container import Container
from dlt.common.configuration.specs.exceptions import NativeValueError
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
from dlt.common.configuration.container import Container
from dlt.common.configuration.utils import log_traces, deserialize_value
from dlt.common.configuration.exceptions import (
FinalConfigFieldException, LookupTrace, ConfigFieldMissingException, ConfigurationWrongTypeException,
ValueNotSecretException, InvalidNativeValue, UnmatchedConfigHintResolversException)

from dlt.common.configuration.specs.exceptions import NativeValueError

TConfiguration = TypeVar("TConfiguration", bound=BaseConfiguration)


Expand Down
4 changes: 4 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class DestinationClientConfiguration(BaseConfiguration):
destination_name: str = None # which destination to load data to
credentials: Optional[CredentialsConfiguration]

def fingerprint(self) -> str:
"""Returns a destination fingerprint which is a hash of selected configuration fields. ie. host in case of connection string"""
return ""

def __str__(self) -> str:
"""Return displayable destination location"""
return str(self.credentials)
Expand Down
1 change: 1 addition & 0 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class LoadInfo(NamedTuple):
destination_displayable_credentials: str
staging_name: str
staging_displayable_credentials: str
destination_fingerprint: str
dataset_name: str
loads_ids: List[str]
"""ids of the loaded packages"""
Expand Down
8 changes: 8 additions & 0 deletions dlt/destinations/bigquery/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import GcpServiceAccountCredentials
from dlt.common.utils import digest128

from dlt.common.destination.reference import DestinationClientDwhConfiguration


Expand All @@ -26,6 +28,12 @@ def get_location(self) -> str:
warnings.warn("Setting BigQuery location in the credentials is deprecated. Please set the location directly in bigquery section ie. destinations.bigquery.location='EU'")
return self.credentials.location

def fingerprint(self) -> str:
"""Returns a fingerprint of project_id"""
if self.credentials and self.credentials.project_id:
return digest128(self.credentials.project_id)
return ""

if TYPE_CHECKING:
def __init__(
self,
Expand Down
23 changes: 20 additions & 3 deletions dlt/destinations/filesystem/configuration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from urllib.parse import urlparse

from typing import Final, Type, Optional, Union
from typing import Final, Type, Optional, Union, TYPE_CHECKING

from dlt.common.configuration import configspec, resolve_type
from dlt.common.destination.reference import CredentialsConfiguration, DestinationClientStagingConfiguration
from dlt.common.configuration.specs import GcpCredentials, GcpServiceAccountCredentials, AwsCredentials, GcpOAuthCredentials

from dlt.common.configuration.specs import GcpServiceAccountCredentials, AwsCredentials, GcpOAuthCredentials
from dlt.common.utils import digest128
from dlt.common.configuration.exceptions import ConfigurationValueError


Expand Down Expand Up @@ -44,6 +44,12 @@ def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
# use known credentials or empty credentials for unknown protocol
return PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value]

def fingerprint(self) -> str:
"""Returns a fingerprint of bucket_url"""
if self.bucket_url:
return digest128(self.bucket_url)
return ""

def __str__(self) -> str:
"""Return displayable destination location"""
url = urlparse(self.bucket_url)
Expand All @@ -54,3 +60,14 @@ def __str__(self) -> str:
new_netloc += f":{url.port}"
return url._replace(netloc=new_netloc).geturl()
return self.bucket_url

if TYPE_CHECKING:
def __init__(
self,
destination_name: str = None,
credentials: Optional[GcpServiceAccountCredentials] = None,
dataset_name: str = None,
default_schema_name: Optional[str] = None,
bucket_url: str = None
) -> None:
...
2 changes: 1 addition & 1 deletion dlt/destinations/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
file_name = FileStorage.get_file_name_from_file_path(local_path)
self.config = config
self.dataset_path = dataset_path
self.destination_file_name = LoadFilesystemJob.make_destination_filename(file_name, schema_name, load_id)

super().__init__(file_name)
fs_client, _ = client_from_config(config)
Expand Down Expand Up @@ -62,7 +63,6 @@ def __init__(
except FileNotFoundError:
pass

self.destination_file_name = LoadFilesystemJob.make_destination_filename(file_name, schema_name, load_id)
fs_client.put_file(local_path, self.make_remote_path())

@staticmethod
Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/filesystem/filesystem_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import DictStrAny
from dlt.common.configuration.specs import CredentialsWithDefault
from dlt.destinations.filesystem.configuration import FilesystemClientConfiguration, GcpCredentials, GcpServiceAccountCredentials, AwsCredentials
from dlt.common.configuration.specs import CredentialsWithDefault, GcpCredentials, AwsCredentials

from dlt.destinations.filesystem.configuration import FilesystemClientConfiguration

from dlt import version

Expand Down
7 changes: 7 additions & 0 deletions dlt/destinations/motherduck/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dlt.common.configuration import configspec
from dlt.common.destination.reference import DestinationClientDwhConfiguration
from dlt.common.typing import TSecretValue
from dlt.common.utils import digest128
from dlt.common.configuration.exceptions import ConfigurationValueError

from dlt.destinations.duckdb.configuration import DuckDbBaseCredentials
Expand Down Expand Up @@ -43,3 +44,9 @@ class MotherDuckClientConfiguration(DestinationClientDwhConfiguration):
credentials: MotherDuckCredentials

create_indexes: bool = False # should unique indexes be created, this slows loading down massively

def fingerprint(self) -> str:
"""Returns a fingerprint of user access token"""
if self.credentials and self.credentials.password:
return digest128(self.credentials.password)
return ""
10 changes: 9 additions & 1 deletion dlt/destinations/postgres/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.destination.reference import DestinationClientDwhConfiguration
from dlt.common.utils import digest128
from dlt.common.typing import TSecretValue

from dlt.common.destination.reference import DestinationClientDwhConfiguration


@configspec
class PostgresCredentials(ConnectionStringCredentials):
Expand Down Expand Up @@ -38,3 +40,9 @@ class PostgresClientConfiguration(DestinationClientDwhConfiguration):
credentials: PostgresCredentials

create_indexes: bool = True

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""
7 changes: 7 additions & 0 deletions dlt/destinations/redshift/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dlt.common.typing import TSecretValue
from dlt.common.configuration import configspec
from dlt.common.utils import digest128

from dlt.destinations.postgres.configuration import PostgresCredentials, PostgresClientConfiguration

Expand All @@ -19,3 +20,9 @@ class RedshiftClientConfiguration(PostgresClientConfiguration):
destination_name: Final[str] = "redshift" # type: ignore
credentials: RedshiftCredentials
staging_iam_role: Optional[str] = None

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""
9 changes: 8 additions & 1 deletion dlt/destinations/snowflake/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dlt.common.configuration.exceptions import ConfigurationValueError
from dlt.common.configuration import configspec
from dlt.common.destination.reference import DestinationClientDwhConfiguration
from dlt.common.utils import digest128


def _read_private_key(private_key: str, password: Optional[str] = None) -> bytes:
Expand Down Expand Up @@ -90,4 +91,10 @@ class SnowflakeClientConfiguration(DestinationClientDwhConfiguration):
stage_name: Optional[str] = None
"""Use an existing named stage instead of the default. Default uses the implicit table stage per table"""
keep_staged_files: bool = True
"""Whether to keep or delete the staged files after COPY INTO succeeds"""
"""Whether to keep or delete the staged files after COPY INTO succeeds"""

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""
1 change: 1 addition & 0 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def get_load_info(self, pipeline: SupportsPipeline, started_at: datetime.datetim
str(self.initial_client_config),
self.initial_staging_client_config.destination_name if self.initial_staging_client_config else None,
str(self.initial_staging_client_config) if self.initial_staging_client_config else None,
self.initial_client_config.fingerprint(),
dataset_name,
list(load_ids),
load_packages,
Expand Down
6 changes: 3 additions & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from dlt.common.configuration.container import Container
from dlt.common.configuration.exceptions import ConfigFieldMissingException, ContextDefaultCannotBeCreated
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
from dlt.common.exceptions import DestinationLoadingViaStagingNotSupported, DestinationNoStagingMode, MissingDependencyException, DestinationIncompatibleLoaderFileFormatException
from dlt.common.configuration.resolve import initialize_credentials
from dlt.common.exceptions import (DestinationLoadingViaStagingNotSupported, DestinationNoStagingMode, MissingDependencyException,
DestinationIncompatibleLoaderFileFormatException)
from dlt.common.normalizers import default_normalizers, import_normalizers
from dlt.common.runtime import signals, initialize_runtime
from dlt.common.schema.exceptions import InvalidDatasetName
Expand Down Expand Up @@ -49,8 +51,6 @@
from dlt.pipeline.typing import TPipelineStep
from dlt.pipeline.state_sync import STATE_ENGINE_VERSION, load_state_from_destination, merge_state_if_changed, migrate_state, state_resource, json_encode_state, json_decode_state

from dlt.common.configuration.resolve import initialize_credentials


def with_state_sync(may_extract_state: bool = False) -> Callable[[TFun], TFun]:

Expand Down
3 changes: 3 additions & 0 deletions dlt/pipeline/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: S
if step.step == "extract" and step_info:
assert isinstance(step_info, ExtractInfo)
props["extract_data"] = step_info.extract_data_info
if step.step == "load" and step_info:
assert isinstance(step_info, LoadInfo)
props["destination_fingerprint"] = step_info.destination_fingerprint
dlthub_telemetry_track("pipeline", step.step, props)


Expand Down
3 changes: 2 additions & 1 deletion docs/website/docs/reference/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Anonymous telemetry is sent when:
case of `dlt init` command, we also send the requested destination and data source names.
- When `pipeline.run` is called, we send information when
[extract, normalize and load](explainers/how-dlt-works.md) steps are completed. The data contains
the destination name (e.g. `duckdb`), elapsed time, and if the step succeeded or not.
the destination name (e.g. `duckdb`), destination fingerprint (which is a hash of selected destination configuration fields), elapsed time, and if the step succeeded or not.
- When `dbt` and `airflow` helpers are used

Here is an example `dlt init` telemetry message:
Expand Down Expand Up @@ -107,6 +107,7 @@ Example for `load` pipeline run step:
"event": "pipeline_load",
"properties": {
"destination_name": "duckdb",
"destination_fingerprint": "",
"elapsed": 2.234885,
"event_category": "pipeline",
"event_name": "load",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dlt"
version = "0.3.5a0"
version = "0.3.5"
description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run."
authors = ["dltHub Inc. <services@dlthub.com>"]
maintainers = [ "Marcin Rudolf <marcin@dlthub.com>", "Adrian Brudaru <adrian@dlthub.com>", "Ty Dunn <ty@dlthub.com>"]
Expand Down
6 changes: 5 additions & 1 deletion tests/load/bigquery/test_bigquery_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dlt.common.configuration.specs import gcp_credentials
from dlt.common.configuration.specs.exceptions import InvalidGoogleNativeCredentialsType
from dlt.common.storages import FileStorage
from dlt.common.utils import uniq_id, custom_environ
from dlt.common.utils import digest128, uniq_id, custom_environ

from dlt.destinations.bigquery.bigquery import BigQueryClient, BigQueryClientConfiguration
from dlt.destinations.exceptions import LoadJobNotExistsException, LoadJobTerminalException
Expand Down Expand Up @@ -187,6 +187,7 @@ def test_bigquery_configuration() -> None:
assert config.http_timeout == 15.0
assert config.retry_deadline == 60.0
assert config.file_upload_timeout == 1800.0
assert config.fingerprint() == digest128("chat-analytics-rasa-ci")

# credentials location is deprecated
os.environ["CREDENTIALS__LOCATION"] = "EU"
Expand All @@ -202,6 +203,9 @@ def test_bigquery_configuration() -> None:
config = resolve_configuration(BigQueryClientConfiguration(dataset_name="dataset"), sections=("destination", "bigquery"))
assert config.file_upload_timeout == 20000.0

# default fingerprint is empty
assert BigQueryClientConfiguration(dataset_name="dataset").fingerprint() == ""


def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) -> None:
# non existing job
Expand Down
12 changes: 9 additions & 3 deletions tests/load/filesystem/test_filesystem_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from typing import Sequence, Tuple, List

import pytest
from dlt.common.schema.schema import Schema

from dlt.common.utils import digest128
from dlt.common.storages import LoadStorage, FileStorage
from dlt.common.destination.reference import LoadJob
from dlt.destinations.filesystem.filesystem import FilesystemClient, LoadFilesystemJob

from dlt.destinations.filesystem.filesystem import FilesystemClient, LoadFilesystemJob, FilesystemClientConfiguration
from dlt.load import Load
from dlt.destinations.job_impl import EmptyLoadJob

Expand All @@ -30,6 +30,12 @@ def logger_autouse() -> None:
"event_loop_interrupted.839c6e6b514e427687586ccc65bf133f.0.jsonl"
]


def test_filesystem_configuration() -> None:
assert FilesystemClientConfiguration().fingerprint() == ""
assert FilesystemClientConfiguration(bucket_url="s3://cool").fingerprint() == digest128("s3://cool")


@pytest.mark.parametrize('write_disposition', ('replace', 'append', 'merge'))
def test_successful_load(write_disposition: str, all_buckets_env: str, filesystem_client: FilesystemClient) -> None:
"""Test load is successful with an empty destination dataset"""
Expand Down
2 changes: 2 additions & 0 deletions tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ def extended_rows():
info = p.load(dataset_name=dataset_name)
# test __str__
print(info)
# test fingerprint in load
assert info.destination_fingerprint == p._destination_client().config.fingerprint()
# print(p.default_schema.to_pretty_yaml())
schema = p.default_schema
assert "simple_rows" in schema.tables
Expand Down
10 changes: 8 additions & 2 deletions tests/load/redshift/test_redshift_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sqlfluff
from copy import deepcopy

from dlt.common.utils import uniq_id, custom_environ
from dlt.common.utils import uniq_id, custom_environ, digest128
from dlt.common.schema import Schema
from dlt.common.configuration import resolve_configuration

Expand All @@ -24,13 +24,19 @@ def client(schema: Schema) -> RedshiftClient:
return RedshiftClient(schema, RedshiftClientConfiguration(dataset_name="test_" + uniq_id(), credentials=RedshiftCredentials()))


def test_configuration() -> None:
def test_redshift_configuration() -> None:
# check names normalized
with custom_environ({"DESTINATION__REDSHIFT__CREDENTIALS__DATABASE": "UPPER_CASE_DATABASE", "DESTINATION__REDSHIFT__CREDENTIALS__PASSWORD": " pass\n"}):
C = resolve_configuration(RedshiftCredentials(), sections=("destination", "redshift"))
assert C.database == "upper_case_database"
assert C.password == "pass"

# check fingerprint
assert RedshiftClientConfiguration().fingerprint() == ""
# based on host
c = resolve_configuration(RedshiftCredentials(), explicit_value="postgres://user1:pass@host1/db1?warehouse=warehouse1&role=role1")
assert RedshiftClientConfiguration(credentials=c).fingerprint() == digest128("host1")


def test_create_table(client: RedshiftClient) -> None:
# non existing table
Expand Down
7 changes: 7 additions & 0 deletions tests/load/snowflake/test_snowflake_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dlt.common.configuration.resolve import resolve_configuration
from dlt.common.configuration.exceptions import ConfigurationValueError
from dlt.common.utils import digest128

from dlt.destinations.snowflake.configuration import SnowflakeClientConfiguration, SnowflakeCredentials

Expand Down Expand Up @@ -80,3 +81,9 @@ def test_snowflake_credentials_native_value(environment) -> None:
assert c.private_key == "pk"


def test_snowflake_configuration() -> None:
# def empty fingerprint
assert SnowflakeClientConfiguration().fingerprint() == ""
# based on host
c = resolve_configuration(SnowflakeCredentials(), explicit_value="snowflake://user1:pass@host1/db1?warehouse=warehouse1&role=role1")
assert SnowflakeClientConfiguration(credentials=c).fingerprint() == digest128("host1")
3 changes: 3 additions & 0 deletions tests/pipeline/test_pipeline_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ def test_trace_telemetry() -> None:
# check extract info
if step == "extract":
assert event["properties"]["extract_data"] == [{"name": "", "data_type": "int"}]
if step == "load":
# dummy has empty fingerprint
assert event["properties"]["destination_fingerprint"] == ""
# we have two failed files (state and data) that should be logged by sentry
assert len(SENTRY_SENT_ITEMS) == 2

Expand Down

0 comments on commit 95644ec

Please sign in to comment.