From 57730ab5aeb2134a9cb36c7ad7b147f3872f69cb Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 6 Jan 2023 22:38:58 +0100 Subject: [PATCH 1/6] properly restores pipeline state when extract step fails --- dlt/common/storages/live_schema_storage.py | 14 +++-- dlt/extract/decorators.py | 4 +- dlt/pipeline/__init__.py | 2 +- dlt/pipeline/pipeline.py | 21 +++++-- tests/load/pipeline/test_pipelines.py | 5 ++ tests/pipeline/test_pipeline.py | 64 ++++++++++++++++++++++ 6 files changed, 99 insertions(+), 11 deletions(-) diff --git a/dlt/common/storages/live_schema_storage.py b/dlt/common/storages/live_schema_storage.py index 2db4f5c16b..3d9ebb1a95 100644 --- a/dlt/common/storages/live_schema_storage.py +++ b/dlt/common/storages/live_schema_storage.py @@ -18,7 +18,7 @@ def __getitem__(self, name: str) -> Schema: else: # return new schema instance schema = super().load_schema(name) - self._update_live_schema(schema, True) + self._update_live_schema(schema) return schema @@ -29,11 +29,15 @@ def load_schema(self, name: str) -> Schema: def save_schema(self, schema: Schema) -> str: rv = super().save_schema(schema) - # -- update the live schema with schema being saved but do not create live instance if not already present - # no, cre - self._update_live_schema(schema, True) + # update the live schema with schema being saved, if no live schema exist, create one to be available for a getter + self._update_live_schema(schema) return rv + def remove_schema(self, name: str) -> None: + super().remove_schema(name) + # also remove the live schema + self.live_schemas.pop(name, None) + def initialize_import_schema(self, schema: Schema) -> None: if self.config.import_schema_path: try: @@ -50,7 +54,7 @@ def commit_live_schema(self, name: str) -> Schema: self._save_schema(live_schema) return live_schema - def _update_live_schema(self, schema: Schema, can_create_new: bool) -> None: + def _update_live_schema(self, schema: Schema, can_create_new: bool = True) -> None: live_schema = self.live_schemas.get(schema.name) if live_schema: # replace content without replacing instance diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 57431592ca..153a6d46ac 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -51,6 +51,7 @@ def source(func: Optional[AnyFun] = None, /, name: str = None, max_table_nesting ### Passing credentials Another important function of the source decorator is to provide credentials and other configuration to the code that extracts data. The decorator may automatically bind the source function arguments to the secret and config values. + >>> @dlt.source >>> def chess(username, chess_url: str = dlt.config.value, api_secret = dlt.secret.value, title: str = "GM"): >>> return user_profile(username, chess_url, api_secret), user_games(username, chess_url, api_secret, with_titles=title) >>> @@ -199,6 +200,7 @@ def resource( ### Passing credentials If used as a decorator (`data` argument is a `Generator`), it may automatically bind the source function arguments to the secret and config values. + >>> @dlt.resource >>> def user_games(username, chess_url: str = dlt.config.value, api_secret = dlt.secret.value): >>> return requests.get("%s/games/%s" % (chess_url, username), headers={"Authorization": f"Bearer {api_secret}"}) >>> @@ -289,7 +291,7 @@ def transformer( selected: bool = True, spec: Type[BaseConfiguration] = None ) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], Callable[TResourceFunParams, DltResource]]: - """A form of `dlt resource` that takes input from other resources in order to enrich or transformer the data. + """A form of `dlt resource` that takes input from other resources in order to enrich or transform the data. ### Example >>> @dlt.resource diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index 82feee077f..2c8f979167 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -26,7 +26,7 @@ def pipeline( full_refresh: bool = False, credentials: Any = None ) -> Pipeline: - """Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API and a destination ie. database or a data lake. + """Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API to a destination ie. database or a data lake. ### Summary The `pipeline` functions allows you to pass the destination name to which the data should be loaded, the name of the dataset and several other options that govern loading of the data. diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index c0965d0dea..599fb0783b 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -1,11 +1,11 @@ import contextlib import os -import datetime # noqa: 251 from contextlib import contextmanager from functools import wraps from collections.abc import Sequence as C_Sequence from typing import Any, Callable, ClassVar, Dict, List, Iterator, Mapping, Optional, Sequence, Tuple, cast, get_type_hints +from dlt import version from dlt.common import json, logger, signals, pendulum from dlt.common.configuration import inject_namespace from dlt.common.configuration.specs import RunConfiguration, NormalizeVolumeConfiguration, SchemaVolumeConfiguration, LoadVolumeConfiguration, PoolRunnerConfiguration @@ -19,7 +19,6 @@ from dlt.common.schema.utils import default_normalizers, import_normalizers from dlt.common.storages.load_storage import LoadStorage from dlt.common.typing import TFun, TSecretValue - from dlt.common.runners import pool_runner as runner, TRunMetrics, initialize_runner from dlt.common.storages import LiveSchemaStorage, NormalizeStorage from dlt.common.destination import DestinationCapabilitiesContext, DestinationReference, JobClientBase, DestinationClientConfiguration, DestinationClientDwhConfiguration, TDestinationReferenceArg @@ -27,6 +26,7 @@ from dlt.common.schema import Schema from dlt.common.storages.file_storage import FileStorage from dlt.common.utils import is_interactive + from dlt.destinations.exceptions import DatabaseUndefinedRelation from dlt.extract.exceptions import SourceExhausted @@ -619,7 +619,10 @@ def list_failed_jobs_in_package(self, load_id: str) -> Sequence[Tuple[str, str]] return failed_jobs def sync_schema(self, schema_name: str = None, credentials: Any = None) -> None: - """Synchronizes the schema `schema_name` with the destination.""" + """Synchronizes the schema `schema_name` with the destination. If no name is provided, the default schema will be synchronized.""" + if not schema_name and not self.default_schema_name: + raise PipelineConfigMissing(self.pipeline_name, "default_schema_name", "load", "Pipeline contains no schemas. Please extract any data with `extract` or `run` methods.") + schema = self.schemas[schema_name] if schema_name else self.default_schema client_config = self._get_destination_client_initial_config(credentials) with self._get_destination_client(schema, client_config) as client: @@ -820,7 +823,7 @@ def _get_destination_client(self, schema: Schema, initial_config: DestinationCli client_spec = self.destination.spec() raise MissingDependencyException( f"{client_spec.destination_name} destination", - [f"{logger.DLT_PKG_NAME}[{client_spec.destination_name}]"], + [f"{version.DLT_PKG_NAME}[{client_spec.destination_name}]"], "Dependencies for specific destinations are available as extras of python-dlt" ) @@ -991,9 +994,19 @@ def _get_schemas_from_destination(self, schema_names: Sequence[str], always_down def _managed_state(self, *, extract_state: bool = False) -> Iterator[TPipelineState]: # load or restore state state = self._get_state() + # TODO: we should backup schemas here try: yield state except Exception: + backup_state = self._get_state() + # restore original pipeline props + self._state_to_props(backup_state) + # synchronize schema storage with initial list of schemas, note that we'll not be able to synchronize the schema content + # TODO: we should restore schemas backup here + for existing_schema_name in self._schema_storage.list_schemas(): + if existing_schema_name not in self.schema_names: + self._schema_storage.remove_schema(existing_schema_name) + # raise original exception raise else: self._props_to_state(state) diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 2d31b2f983..b558f8f792 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -9,6 +9,7 @@ from dlt.common import Decimal, json from dlt.common.destination import DestinationReference from dlt.common.schema.schema import Schema +from dlt.common.schema.typing import VERSION_TABLE_NAME from dlt.common.time import sleep from dlt.common.typing import TDataItem from dlt.common.utils import uniq_id @@ -139,6 +140,10 @@ def _data(): p.sync_schema() with p._sql_job_client(schema) as job_client: + # there's some data at all + exists, _ = job_client.get_storage_table(VERSION_TABLE_NAME) + assert exists is True + # such tables are not created but silently ignored exists, _ = job_client.get_storage_table("data_table") assert not exists diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index f2a4421ff2..7b023dde43 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -322,6 +322,70 @@ def r_fail(): assert sentry_sdk.Hub.current.scope.span is None + +def test_pipeline_state_on_extract_exception() -> None: + pipeline_name = "pipe_" + uniq_id() + p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + + + @dlt.resource + def data_piece_1(): + yield [1, 2, 3] + yield [3, 4, 5] + + @dlt.resource + def data_piece_2(): + yield [6, 7, 8] + raise NotImplementedError() + + with pytest.raises(PipelineStepFailed): + p.run([data_piece_1, data_piece_2], write_disposition="replace") + + # first run didn't really happen + assert p.first_run is True + assert p.has_data is False + assert p._schema_storage.list_schemas() == [] + assert p.default_schema_name is None + + # restore the pipeline + p = dlt.attach(pipeline_name) + assert p.first_run is True + assert p.has_data is False + assert p._schema_storage.list_schemas() == [] + assert p.default_schema_name is None + + # same but with multiple sources generating many schemas + + @dlt.source + def data_schema_1(): + return data_piece_1 + + @dlt.source + def data_schema_2(): + return data_piece_1 + + @dlt.source + def data_schema_3(): + return data_piece_2 + + # new pipeline + pipeline_name = "pipe_" + uniq_id() + p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy") + + with pytest.raises(PipelineStepFailed): + p.run([data_schema_1(), data_schema_2(), data_schema_3()], write_disposition="replace") + + # first run didn't really happen + assert p.first_run is True + assert p.has_data is False + assert p._schema_storage.list_schemas() == [] + assert p.default_schema_name is None + + os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately + p.run([data_schema_1(), data_schema_2()], write_disposition="replace") + assert p.schema_names == p._schema_storage.list_schemas() + + @pytest.mark.skip("Not implemented") def test_extract_exception() -> None: # make sure that PipelineStepFailed contains right step information From 8cccf7ad55ea26e4a79f57c9c0b906d25fbcce95 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 6 Jan 2023 22:42:45 +0100 Subject: [PATCH 2/6] fixes config/secrets injection when union with base configuration is used, fixes secret hint detection --- INSTALLATION.md | 5 -- dlt/cli/config_toml_writer.py | 5 +- dlt/common/configuration/accessors.py | 2 +- dlt/common/configuration/resolve.py | 12 +-- dlt/common/configuration/specs/__init__.py | 2 +- .../configuration/specs/base_configuration.py | 41 +++++++--- dlt/common/configuration/utils.py | 13 +-- dlt/common/reflection/spec.py | 3 + dlt/common/typing.py | 15 ++-- docs/examples/google_sheets.py | 2 +- docs/examples/sources/google_sheets.py | 2 +- .../configuration/test_configuration.py | 37 ++++++++- tests/common/configuration/test_inject.py | 79 ++++++++++++++++++- tests/common/test_typing.py | 15 ++-- 14 files changed, 174 insertions(+), 59 deletions(-) delete mode 100644 INSTALLATION.md diff --git a/INSTALLATION.md b/INSTALLATION.md deleted file mode 100644 index b2e52510eb..0000000000 --- a/INSTALLATION.md +++ /dev/null @@ -1,5 +0,0 @@ -```shell -sudo apt-get install python3.11 -sudo apt-get install python3.11-distutils -sudo apt install python3.11-venv -``` \ No newline at end of file diff --git a/dlt/cli/config_toml_writer.py b/dlt/cli/config_toml_writer.py index fbe4a5c6a0..097d7e8307 100644 --- a/dlt/cli/config_toml_writer.py +++ b/dlt/cli/config_toml_writer.py @@ -2,8 +2,7 @@ import tomlkit from tomlkit.items import Table as TOMLTable -from dlt.common.configuration.resolve import extract_inner_hint -from dlt.common.configuration.specs import BaseConfiguration, is_base_configuration_hint +from dlt.common.configuration.specs import BaseConfiguration, is_base_configuration_inner_hint, extract_inner_hint from dlt.common.typing import AnyType, is_final_type, is_optional_type @@ -21,7 +20,7 @@ def write_value(toml_table: TOMLTable, name: str, hint: AnyType, default_value: # get the inner hint to generate cool examples hint = extract_inner_hint(hint) - if is_base_configuration_hint(hint): + if is_base_configuration_inner_hint(hint): inner_table = tomlkit.table(False) toml_table[name] = inner_table write_spec(inner_table, hint()) diff --git a/dlt/common/configuration/accessors.py b/dlt/common/configuration/accessors.py index 2192c28bfd..fc2f8ef461 100644 --- a/dlt/common/configuration/accessors.py +++ b/dlt/common/configuration/accessors.py @@ -6,7 +6,7 @@ from dlt.common.configuration.exceptions import LookupTrace from dlt.common.configuration.providers.provider import ConfigProvider -from dlt.common.configuration.specs import is_base_configuration_hint +from dlt.common.configuration.specs import is_base_configuration_inner_hint from dlt.common.configuration.utils import deserialize_value, log_traces from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext from dlt.common.schema.utils import coerce_value diff --git a/dlt/common/configuration/resolve.py b/dlt/common/configuration/resolve.py index 703c25f400..7e7f0d042a 100644 --- a/dlt/common/configuration/resolve.py +++ b/dlt/common/configuration/resolve.py @@ -4,11 +4,11 @@ from dlt.common.configuration.providers.provider import ConfigProvider from dlt.common.typing import AnyType, StrAny, TSecretValue, is_final_type, is_optional_type -from dlt.common.configuration.specs.base_configuration import BaseConfiguration, CredentialsConfiguration, is_secret_hint, is_context_hint, is_base_configuration_hint +from dlt.common.configuration.specs.base_configuration import BaseConfiguration, CredentialsConfiguration, is_secret_hint, extract_inner_hint, is_context_inner_hint, is_base_configuration_inner_hint from dlt.common.configuration.specs.config_namespace_context import ConfigNamespacesContext from dlt.common.configuration.container import Container from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext -from dlt.common.configuration.utils import extract_inner_hint, log_traces, deserialize_value +from dlt.common.configuration.utils import log_traces, deserialize_value from dlt.common.configuration.exceptions import (FinalConfigFieldException, LookupTrace, ConfigFieldMissingException, ConfigurationWrongTypeException, ValueNotSecretException, InvalidNativeValue) TConfiguration = TypeVar("TConfiguration", bound=BaseConfiguration) @@ -166,10 +166,10 @@ def _resolve_config_field( value, traces = _resolve_single_value(key, hint, inner_hint, config_namespace, explicit_namespaces, embedded_namespaces) log_traces(config, key, hint, value, default_value, traces) # contexts must be resolved as a whole - if is_context_hint(inner_hint): + if is_context_inner_hint(inner_hint): pass # if inner_hint is BaseConfiguration then resolve it recursively - elif is_base_configuration_hint(inner_hint): + elif is_base_configuration_inner_hint(inner_hint): if isinstance(value, BaseConfiguration): # if resolved value is instance of configuration (typically returned by context provider) embedded_config = value @@ -245,11 +245,11 @@ def _resolve_single_value( # get providers from container providers_context = container[ConfigProvidersContext] # we may be resolving context - if is_context_hint(inner_hint): + if is_context_inner_hint(inner_hint): # resolve context with context provider and do not look further value, _ = providers_context.context_provider.get_value(key, inner_hint) return value, traces - if is_base_configuration_hint(inner_hint): + if is_base_configuration_inner_hint(inner_hint): # cannot resolve configurations directly return value, traces diff --git a/dlt/common/configuration/specs/__init__.py b/dlt/common/configuration/specs/__init__.py index c1eb8dc3f0..aaa2b5edee 100644 --- a/dlt/common/configuration/specs/__init__.py +++ b/dlt/common/configuration/specs/__init__.py @@ -1,5 +1,5 @@ from .run_configuration import RunConfiguration # noqa: F401 -from .base_configuration import BaseConfiguration, CredentialsConfiguration, ContainerInjectableContext, is_base_configuration_hint # noqa: F401 +from .base_configuration import BaseConfiguration, CredentialsConfiguration, ContainerInjectableContext, extract_inner_hint, is_base_configuration_inner_hint # noqa: F401 from .normalize_volume_configuration import NormalizeVolumeConfiguration # noqa: F401 from .load_volume_configuration import LoadVolumeConfiguration # noqa: F401 from .schema_volume_configuration import SchemaVolumeConfiguration, TSchemaFileFormat # noqa: F401 diff --git a/dlt/common/configuration/specs/base_configuration.py b/dlt/common/configuration/specs/base_configuration.py index d89528e744..2c88e58fdd 100644 --- a/dlt/common/configuration/specs/base_configuration.py +++ b/dlt/common/configuration/specs/base_configuration.py @@ -18,21 +18,27 @@ _F_ContainerInjectableContext: Any = type(object) -def is_secret_hint(hint: Type[Any]) -> bool: - return hint is TSecretValue or (inspect.isclass(hint) and issubclass(hint, CredentialsConfiguration)) +def is_base_configuration_inner_hint(inner_hint: Type[Any]) -> bool: + return inspect.isclass(inner_hint) and issubclass(inner_hint, BaseConfiguration) + + +def is_context_inner_hint(inner_hint: Type[Any]) -> bool: + return inspect.isclass(inner_hint) and issubclass(inner_hint, ContainerInjectableContext) -def is_base_configuration_hint(hint: Type[Any]) -> bool: - return inspect.isclass(hint) and issubclass(hint, BaseConfiguration) +def is_credentials_inner_hint(inner_hint: Type[Any]) -> bool: + return inspect.isclass(inner_hint) and issubclass(inner_hint, CredentialsConfiguration) -def is_context_hint(hint: Type[Any]) -> bool: - return inspect.isclass(hint) and issubclass(hint, ContainerInjectableContext) +def get_config_if_union_hint(hint: Type[Any]) -> Type[Any]: + if get_origin(hint) is Union: + return next((t for t in get_args(hint) if is_base_configuration_inner_hint(t)), None) + return None def is_valid_hint(hint: Type[Any]) -> bool: hint = extract_inner_type(hint) - hint = get_config_if_union(hint) or hint + hint = get_config_if_union_hint(hint) or hint hint = get_origin(hint) or hint if hint is Any: @@ -40,7 +46,7 @@ def is_valid_hint(hint: Type[Any]) -> bool: if hint is ClassVar: # class vars are skipped by dataclass return True - if is_base_configuration_hint(hint): + if is_base_configuration_inner_hint(hint): return True with contextlib.suppress(TypeError): py_type_to_sc_type(hint) @@ -48,10 +54,21 @@ def is_valid_hint(hint: Type[Any]) -> bool: return False -def get_config_if_union(hint: Type[Any]) -> Type[Any]: - if get_origin(hint) is Union: - return next((t for t in get_args(hint) if is_base_configuration_hint(t)), None) - return None +def extract_inner_hint(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]: + # extract hint from Optional / Literal / NewType hints + inner_hint = extract_inner_type(hint, preserve_new_types) + # get base configuration from union type + inner_hint = get_config_if_union_hint(inner_hint) or inner_hint + # extract origin from generic types (ie List[str] -> List) + return get_origin(inner_hint) or inner_hint + + +def is_secret_hint(hint: Type[Any]) -> bool: + is_secret = hint is TSecretValue + if not is_secret: + hint = extract_inner_hint(hint, preserve_new_types=True) + is_secret = hint is TSecretValue or is_credentials_inner_hint(hint) + return is_secret @overload diff --git a/dlt/common/configuration/utils.py b/dlt/common/configuration/utils.py index fdcf54d0fc..acaafc05d6 100644 --- a/dlt/common/configuration/utils.py +++ b/dlt/common/configuration/utils.py @@ -8,7 +8,7 @@ from dlt.common.schema.utils import coerce_value, py_type_to_sc_type from dlt.common.configuration import DOT_DLT from dlt.common.configuration.exceptions import ConfigValueCannotBeCoercedException, LookupTrace -from dlt.common.configuration.specs.base_configuration import BaseConfiguration, get_config_if_union, is_base_configuration_hint +from dlt.common.configuration.specs.base_configuration import BaseConfiguration, get_config_if_union_hint, is_base_configuration_inner_hint class ResolvedValueTrace(NamedTuple): @@ -28,7 +28,7 @@ def deserialize_value(key: str, value: Any, hint: Type[TAny]) -> TAny: try: if hint != Any: # if deserializing to base configuration, try parse the value - if is_base_configuration_hint(hint): + if is_base_configuration_inner_hint(hint): c = hint() if isinstance(value, dict): c.update(value) @@ -89,15 +89,6 @@ def serialize_value(value: Any) -> Any: return coerce_value("text", value_dt, value) -def extract_inner_hint(hint: Type[Any]) -> Type[Any]: - # extract hint from Optional / Literal / NewType hints - inner_hint = extract_inner_type(hint) - # get base configuration from union type - inner_hint = get_config_if_union(inner_hint) or inner_hint - # extract origin from generic types (ie List[str] -> List) - return get_origin(inner_hint) or inner_hint - - def log_traces(config: BaseConfiguration, key: str, hint: Type[Any], value: Any, default_value: Any, traces: Sequence[LookupTrace]) -> None: if logger.is_logging() and logger.log_level() == "DEBUG": logger.debug(f"Field {key} with type {hint} in {type(config).__name__} {'NOT RESOLVED' if value is None else 'RESOLVED'}") diff --git a/dlt/common/reflection/spec.py b/dlt/common/reflection/spec.py index 76da3ec4a9..09d18da307 100644 --- a/dlt/common/reflection/spec.py +++ b/dlt/common/reflection/spec.py @@ -2,6 +2,7 @@ import inspect from typing import Dict, List, Type, Any, Optional from inspect import Signature, Parameter +from dlt.common.configuration.specs.base_configuration import get_config_if_union_hint from dlt.common.typing import AnyType, AnyFun, TSecretValue from dlt.common.configuration import configspec, is_valid_hint, is_secret_hint @@ -68,6 +69,8 @@ def dlt_config_literal_to_type(arg_name: str) -> AnyType: # try to get type from default if field_type is AnyType and p.default is not None: field_type = type(p.default) + # extract base type from union to let it parse native values + field_type = get_config_if_union_hint(field_type) or field_type # make type optional if explicit None is provided as default if p.default is None: # check if the defaults were attributes of the form .config.value or .secrets.value diff --git a/dlt/common/typing.py b/dlt/common/typing.py index 76a74dd44a..831c55a9de 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -85,22 +85,23 @@ def is_dict_generic_type(t: Any) -> bool: return False -def extract_inner_type(hint: Type[Any]) -> Type[Any]: - """Gets the inner type from Literal, Optional and NewType +def extract_inner_type(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]: + """Gets the inner type from Literal, Optional, Final and NewType Args: - hint (Type[Any]): Any type + hint (Type[Any]): Type to extract + preserve_new_types (bool): Do not extract supertype of a NewType Returns: Type[Any]: Inner type if hint was Literal, Optional or NewType, otherwise hint """ if is_literal_type(hint): # assume that all literals are of the same type - return extract_inner_type(type(get_args(hint)[0])) + return extract_inner_type(type(get_args(hint)[0]), preserve_new_types) if is_optional_type(hint) or is_final_type(hint): # extract specialization type and call recursively - return extract_inner_type(get_args(hint)[0]) - if is_newtype_type(hint): + return extract_inner_type(get_args(hint)[0], preserve_new_types) + if is_newtype_type(hint) and not preserve_new_types: # descend into supertypes of NewType - return extract_inner_type(hint.__supertype__) + return extract_inner_type(hint.__supertype__, preserve_new_types) return hint diff --git a/docs/examples/google_sheets.py b/docs/examples/google_sheets.py index 295c2aa8c0..64cb69fd3e 100644 --- a/docs/examples/google_sheets.py +++ b/docs/examples/google_sheets.py @@ -1,6 +1,6 @@ import dlt -from examples.sources.google_sheets import google_spreadsheet +from sources.google_sheets import google_spreadsheet dlt.pipeline(destination="bigquery", full_refresh=False) # see example.secrets.toml to where to put credentials diff --git a/docs/examples/sources/google_sheets.py b/docs/examples/sources/google_sheets.py index 7960cea41a..a1226cb964 100644 --- a/docs/examples/sources/google_sheets.py +++ b/docs/examples/sources/google_sheets.py @@ -8,7 +8,7 @@ try: from apiclient.discovery import build except ImportError: - raise MissingDependencyException("Google Sheets Source", ["google1", "google2"]) + raise MissingDependencyException("Google API Client", ["google-api-python-client"]) # gets sheet values from a single spreadsheet preserving original typing for schema generation diff --git a/tests/common/configuration/test_configuration.py b/tests/common/configuration/test_configuration.py index 6b40fdb2cf..7cea75564b 100644 --- a/tests/common/configuration/test_configuration.py +++ b/tests/common/configuration/test_configuration.py @@ -1,11 +1,12 @@ import pytest import datetime # noqa: I251 from unittest.mock import patch -from typing import Any, Dict, Final, List, Mapping, MutableMapping, NewType, Optional, Type +from typing import Any, Dict, Final, List, Mapping, MutableMapping, NewType, Optional, Sequence, Type, Union from dlt.common import json, pendulum, Decimal, Wei +from dlt.common.configuration.specs.gcp_client_credentials import GcpClientCredentials from dlt.common.utils import custom_environ -from dlt.common.typing import TSecretValue, extract_inner_type +from dlt.common.typing import AnyType, DictStrAny, StrAny, TSecretValue, extract_inner_type from dlt.common.configuration.exceptions import ConfigFieldMissingTypeHintException, ConfigFieldTypeHintNotSupported, FinalConfigFieldException, InvalidNativeValue, LookupTrace, ValueNotSecretException from dlt.common.configuration import configspec, ConfigFieldMissingException, ConfigValueCannotBeCoercedException, resolve, is_valid_hint from dlt.common.configuration.specs import BaseConfiguration, RunConfiguration, ConnectionStringCredentials @@ -805,6 +806,38 @@ def test_resolved_trace(environment: Any) -> None: assert traces[".snake"] == ResolvedValueTrace("snake", "h>t>t>t>he", None, InstrumentedConfiguration, [], prov_name, None) +def test_extract_inner_hint() -> None: + # extracts base config from an union + assert resolve.extract_inner_hint(Union[GcpClientCredentials, StrAny, str]) is GcpClientCredentials + assert resolve.extract_inner_hint(Union[InstrumentedConfiguration, StrAny, str]) is InstrumentedConfiguration + # keeps unions + assert resolve.extract_inner_hint(Union[StrAny, str]) is Union + # ignores specialization in list and dict, leaving origin + assert resolve.extract_inner_hint(List[str]) is list + assert resolve.extract_inner_hint(DictStrAny) is dict + # extracts new types + assert resolve.extract_inner_hint(TSecretValue) is AnyType + # preserves new types on extract + assert resolve.extract_inner_hint(TSecretValue, preserve_new_types=True) is TSecretValue + + +def test_is_secret_hint() -> None: + assert resolve.is_secret_hint(GcpClientCredentials) is True + assert resolve.is_secret_hint(Optional[GcpClientCredentials]) is True + assert resolve.is_secret_hint(InstrumentedConfiguration) is False + # do not recognize new types + TTestSecretNt = NewType("TTestSecretNt", GcpClientCredentials) + assert resolve.is_secret_hint(TTestSecretNt) is False + # recognize unions with credentials + assert resolve.is_secret_hint(Union[GcpClientCredentials, StrAny, str]) is True + # we do not recognize unions if they do not contain configuration types + assert resolve.is_secret_hint(Union[TSecretValue, StrAny, str]) is False + assert resolve.is_secret_hint(Optional[TSecretValue]) is True + assert resolve.is_secret_hint(Optional[str]) is False + assert resolve.is_secret_hint(str) is False + assert resolve.is_secret_hint(AnyType) is False + + def coerce_single_value(key: str, value: str, hint: Type[Any]) -> Any: hint = extract_inner_type(hint) return resolve.deserialize_value(key, value, hint) diff --git a/tests/common/configuration/test_inject.py b/tests/common/configuration/test_inject.py index 97ca774942..24df40b0b1 100644 --- a/tests/common/configuration/test_inject.py +++ b/tests/common/configuration/test_inject.py @@ -1,4 +1,4 @@ -from typing import Any, Optional +from typing import Any, Optional, Union import pytest @@ -6,12 +6,16 @@ from dlt.common.configuration.exceptions import ConfigFieldMissingException from dlt.common.configuration.inject import get_fun_spec, with_config +from dlt.common.configuration.providers import EnvironProvider +from dlt.common.configuration.providers.toml import CONFIG_TOML, SECRETS_TOML, TomlProvider from dlt.common.configuration.specs import BaseConfiguration +from dlt.common.configuration.specs import PostgresCredentials +from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext from dlt.common.reflection.spec import _get_spec_name_from_f -from dlt.common.typing import TSecretValue +from dlt.common.typing import StrAny, TSecretValue from tests.utils import preserve_environ -from tests.common.configuration.utils import environment +from tests.common.configuration.utils import environment, toml_providers def test_arguments_are_explicit(environment: Any) -> None: @@ -77,15 +81,18 @@ def f_secret(password=dlt.secrets.value): assert f_secret(None) == "password" +@pytest.mark.skip("not implemented") def test_inject_with_non_injectable_param() -> None: # one of parameters in signature has not valid hint and is skipped (ie. from_pipe) pass +@pytest.mark.skip("not implemented") def test_inject_without_spec() -> None: pass +@pytest.mark.skip("not implemented") def test_inject_without_spec_kw_only() -> None: pass @@ -104,55 +111,121 @@ def f(pipeline_name=dlt.config.value, value=dlt.secrets.value): assert hasattr(get_fun_spec(f), "pipeline_name") +@pytest.mark.skip("not implemented") def test_inject_with_spec() -> None: pass +@pytest.mark.skip("not implemented") def test_inject_with_str_namespaces() -> None: # namespaces param is str not tuple pass +@pytest.mark.skip("not implemented") def test_inject_with_func_namespace() -> None: # function to get namespaces from the arguments is provided pass +@pytest.mark.skip("not implemented") def test_inject_on_class_and_methods() -> None: pass +@pytest.mark.skip("not implemented") def test_set_defaults_for_positional_args() -> None: # set defaults for positional args that are part of derived SPEC # set defaults for positional args that are part of provided SPEC pass +@pytest.mark.skip("not implemented") def test_inject_spec_remainder_in_kwargs() -> None: # if the wrapped func contains kwargs then all the fields from spec without matching func args must be injected in kwargs pass +@pytest.mark.skip("not implemented") def test_inject_spec_in_kwargs() -> None: # the resolved spec is injected in kwargs pass +@pytest.mark.skip("not implemented") def test_resolved_spec_in_kwargs_pass_through() -> None: # if last_config is in kwargs then use it and do not resolve it anew pass +@pytest.mark.skip("not implemented") def test_inject_spec_into_argument_with_spec_type() -> None: # if signature contains argument with type of SPEC, it gets injected there pass +@pytest.mark.skip("not implemented") def test_initial_spec_from_arg_with_spec_type() -> None: # if signature contains argument with type of SPEC, get its value to init SPEC (instead of calling the constructor()) pass +def test_use_most_specific_union_type(environment: Any, toml_providers: ConfigProvidersContext) -> None: + + @with_config + def postgres_union(local_credentials: Union[PostgresCredentials, str, StrAny] = dlt.secrets.value): + return local_credentials + + @with_config + def postgres_direct(local_credentials: PostgresCredentials = dlt.secrets.value): + return local_credentials + + conn_str = "postgres://loader:loader@localhost:5432/dlt_data" + conn_dict = {"host": "localhost", "database": "dlt_test", "username": "loader", "password": "loader"} + conn_cred = PostgresCredentials() + conn_cred.parse_native_representation(conn_str) + + # pass explicit: str, Dict and credentials object + assert isinstance(postgres_direct(conn_cred), PostgresCredentials) + assert isinstance(postgres_direct(conn_str), PostgresCredentials) + assert isinstance(postgres_direct(conn_dict), PostgresCredentials) + assert isinstance(postgres_union(conn_cred), PostgresCredentials) + assert isinstance(postgres_union(conn_str), PostgresCredentials) + assert isinstance(postgres_union(conn_dict), PostgresCredentials) + + # pass via env as conn string + environment["LOCAL_CREDENTIALS"] = conn_str + assert isinstance(postgres_direct(), PostgresCredentials) + assert isinstance(postgres_union(), PostgresCredentials) + del environment["LOCAL_CREDENTIALS"] + # make sure config is successfully deleted + with pytest.raises(ConfigFieldMissingException): + postgres_union() + # create env with elements + for k, v in conn_dict.items(): + environment[EnvironProvider.get_key_name(k, "local_credentials")] = v + assert isinstance(postgres_direct(), PostgresCredentials) + assert isinstance(postgres_union(), PostgresCredentials) + + environment.clear() + + # pass via toml + secrets_toml = toml_providers[SECRETS_TOML]._toml + secrets_toml["local_credentials"] = conn_str + assert isinstance(postgres_direct(), PostgresCredentials) + assert isinstance(postgres_union(), PostgresCredentials) + secrets_toml.pop("local_credentials") + # make sure config is successfully deleted + with pytest.raises(ConfigFieldMissingException): + postgres_union() + # config_toml = toml_providers[CONFIG_TOML]._toml + secrets_toml["local_credentials"] = {} + for k, v in conn_dict.items(): + secrets_toml["local_credentials"][k] = v + assert isinstance(postgres_direct(), PostgresCredentials) + assert isinstance(postgres_union(), PostgresCredentials) + + def test_auto_derived_spec_type_name() -> None: diff --git a/tests/common/test_typing.py b/tests/common/test_typing.py index a7a29f480d..c8a16a3c04 100644 --- a/tests/common/test_typing.py +++ b/tests/common/test_typing.py @@ -1,6 +1,7 @@ from typing import List, Literal, Mapping, MutableMapping, MutableSequence, NewType, Sequence, TypeVar, TypedDict, Optional, Union -from dlt.common.configuration.specs.base_configuration import BaseConfiguration, get_config_if_union +from dlt.common.configuration.specs.base_configuration import BaseConfiguration, get_config_if_union_hint +from dlt.common.configuration.specs.gcp_client_credentials import GcpClientCredentials from dlt.common.typing import StrAny, extract_inner_type, extract_optional_type, is_dict_generic_type, is_list_generic_type, is_literal_type, is_newtype_type, is_optional_type, is_typeddict @@ -64,13 +65,15 @@ def test_extract_inner_type() -> None: l_1 = Literal[1, 2, 3] assert extract_inner_type(l_1) is int nt_l_2 = NewType("NTL2", float) + assert extract_inner_type(nt_l_2, preserve_new_types=True) is nt_l_2 l_2 = Literal[nt_l_2(1.238), nt_l_2(2.343)] assert extract_inner_type(l_2) is float def test_get_config_if_union() -> None: - assert get_config_if_union(str) is None - assert get_config_if_union(Optional[str]) is None - assert get_config_if_union(Union[BaseException, str, StrAny]) is None - assert get_config_if_union(Union[BaseConfiguration, str, StrAny]) is BaseConfiguration - assert get_config_if_union(Union[str, BaseConfiguration, StrAny]) is BaseConfiguration \ No newline at end of file + assert get_config_if_union_hint(str) is None + assert get_config_if_union_hint(Optional[str]) is None + assert get_config_if_union_hint(Union[BaseException, str, StrAny]) is None + assert get_config_if_union_hint(Union[BaseConfiguration, str, StrAny]) is BaseConfiguration + assert get_config_if_union_hint(Union[str, BaseConfiguration, StrAny]) is BaseConfiguration + assert get_config_if_union_hint(Union[GcpClientCredentials, StrAny, str]) is GcpClientCredentials From 642ccb2c9a93180a6e0437e68ba4819b2e1d9051 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 6 Jan 2023 22:43:09 +0100 Subject: [PATCH 3/6] bumps to version 0.1.0a15 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 76e4e0bfd5..d082d72942 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "python-dlt" -version = "0.2.0a14" +version = "0.2.0a15" description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run." authors = ["dltHub Inc. "] maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Ty Dunn "] From 00ef00593a7e89750c4152bfb76b8bcd8ca9ee23 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 6 Jan 2023 22:43:34 +0100 Subject: [PATCH 4/6] makes full_refresh configurable --- dlt/pipeline/configuration.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dlt/pipeline/configuration.py b/dlt/pipeline/configuration.py index c77217833f..6a9ea42f69 100644 --- a/dlt/pipeline/configuration.py +++ b/dlt/pipeline/configuration.py @@ -20,6 +20,8 @@ class PipelineConfiguration(BaseConfiguration): """Enables the tracing. Tracing saves the execution trace locally and is required by `dlt deploy`.""" use_single_dataset: bool = True """Stores all schemas in single dataset. When False, each schema will get a separate dataset with `{dataset_name}_{schema_name}""" + full_refresh: bool = False + """When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset.""" runtime: RunConfiguration def on_resolved(self) -> None: From 6855453d1c0647ce6bbc1c7df43ec895e07b388c Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 6 Jan 2023 22:44:18 +0100 Subject: [PATCH 5/6] allows hash mismatch when loading schema that needs migration --- dlt/common/schema/schema.py | 4 +- dlt/common/schema/typing.py | 10 +- dlt/common/schema/utils.py | 14 +- dlt/common/storages/schema_storage.py | 2 +- .../sheets/google_spreadsheet_v4.schema.json | 397 ++++++++++++++++++ tests/common/storages/test_schema_storage.py | 8 + 6 files changed, 422 insertions(+), 13 deletions(-) create mode 100644 tests/common/cases/schemas/sheets/google_spreadsheet_v4.schema.json diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 3f076541ad..82dd9e73cc 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -6,7 +6,7 @@ from dlt.common.typing import DictStrAny, StrAny, REPattern, SupportsVariant, VARIANT_FIELD_FORMAT from dlt.common.normalizers.names import TNormalizeBreakPath, TNormalizeMakePath, TNormalizeNameFunc from dlt.common.normalizers.json import TNormalizeJSONFunc -from dlt.common.schema.typing import (LOADS_TABLE_NAME, VERSION_TABLE_NAME, TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema, +from dlt.common.schema.typing import (SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, VERSION_TABLE_NAME, TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema, TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TDataType, TColumnHint, TWriteDisposition) from dlt.common.schema import utils @@ -16,7 +16,7 @@ class Schema: - ENGINE_VERSION: ClassVar[int] = 5 + ENGINE_VERSION: ClassVar[int] = SCHEMA_ENGINE_VERSION # name normalization functions normalize_table_name: TNormalizeNameFunc diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index e0d91c6e8a..c554e509ad 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -2,6 +2,12 @@ from dlt.common.typing import StrAny +# current version of schema engine +SCHEMA_ENGINE_VERSION = 5 + +# dlt tables +VERSION_TABLE_NAME = "_dlt_version" +LOADS_TABLE_NAME = "_dlt_loads" TDataType = Literal["text", "double", "bool", "timestamp", "bigint", "binary", "complex", "decimal", "wei"] TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique"] @@ -15,10 +21,6 @@ COLUMN_HINTS: Set[TColumnHint] = set(["partition", "cluster", "primary_key", "foreign_key", "sort", "unique"]) WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition)) -# dlt tables -VERSION_TABLE_NAME = "_dlt_version" -LOADS_TABLE_NAME = "_dlt_loads" - class TColumnSchemaBase(TypedDict, total=True): name: Optional[str] diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index d7e9a9be7b..90108dccd4 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -21,7 +21,7 @@ from dlt.common.utils import map_nested_in_place, str2bool from dlt.common.validation import TCustomValidator, validate_dict from dlt.common.schema import detections -from dlt.common.schema.typing import LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TNormalizersConfig, TPartialTableSchema, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TDataType, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition +from dlt.common.schema.typing import SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TNormalizersConfig, TPartialTableSchema, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TDataType, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition from dlt.common.schema.exceptions import CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException, TablePropertiesConflictException @@ -96,14 +96,16 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str: return base64.b64encode(h.digest()).decode('ascii') -def verify_schema_hash(stored_schema: DictStrAny, empty_hash_verifies: bool = True) -> bool: +def verify_schema_hash(loaded_schema_dict: DictStrAny, verifies_if_not_migrated: bool = False) -> bool: # generates content hash and compares with existing - current_hash: str = stored_schema.get("version_hash") - if not current_hash and empty_hash_verifies: + engine_version: str = loaded_schema_dict.get("engine_version") + # if upgrade is needed, the hash cannot be compared + if verifies_if_not_migrated and engine_version != SCHEMA_ENGINE_VERSION: return True # if hash is present we can assume at least v4 engine version so hash is computable - hash_ = generate_version_hash(cast(TStoredSchema, stored_schema)) - return hash_ == current_hash + stored_schema = cast(TStoredSchema, loaded_schema_dict) + hash_ = generate_version_hash(stored_schema) + return hash_ == stored_schema["version_hash"] def simple_regex_validator(path: str, pk: str, pv: Any, t: Any) -> bool: diff --git a/dlt/common/storages/schema_storage.py b/dlt/common/storages/schema_storage.py index e7b6e53260..cd4aaec377 100644 --- a/dlt/common/storages/schema_storage.py +++ b/dlt/common/storages/schema_storage.py @@ -30,7 +30,7 @@ def load_schema(self, name: str) -> Schema: try: storage_schema = json.loads(self.storage.load(schema_file)) # prevent external modifications of schemas kept in storage - if not verify_schema_hash(storage_schema, empty_hash_verifies=True): + if not verify_schema_hash(storage_schema, verifies_if_not_migrated=True): raise InStorageSchemaModified(name, self.config.schema_volume_path) except FileNotFoundError: # maybe we can import from external storage diff --git a/tests/common/cases/schemas/sheets/google_spreadsheet_v4.schema.json b/tests/common/cases/schemas/sheets/google_spreadsheet_v4.schema.json new file mode 100644 index 0000000000..b74a4a5c51 --- /dev/null +++ b/tests/common/cases/schemas/sheets/google_spreadsheet_v4.schema.json @@ -0,0 +1,397 @@ +{ + "version": 3, + "version_hash": "vgxR7fdg9HNUY/7iHKNQTS2qkYaEvKgrSiHIMWoIx7Y=", + "engine_version": 4, + "name": "google_spreadsheet", + "tables": { + "_dlt_version": { + "name": "_dlt_version", + "columns": { + "version": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "version", + "data_type": "bigint", + "nullable": false + }, + "engine_version": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "engine_version", + "data_type": "bigint", + "nullable": false + }, + "inserted_at": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "inserted_at", + "data_type": "timestamp", + "nullable": false + }, + "schema_name": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "schema_name", + "data_type": "text", + "nullable": false + }, + "version_hash": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "version_hash", + "data_type": "text", + "nullable": false + }, + "schema": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "schema", + "data_type": "text", + "nullable": false + } + }, + "write_disposition": "skip", + "description": "Created by DLT. Tracks schema updates" + }, + "_dlt_loads": { + "name": "_dlt_loads", + "columns": { + "load_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "load_id", + "data_type": "text", + "nullable": false + }, + "status": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "status", + "data_type": "bigint", + "nullable": false + }, + "inserted_at": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "inserted_at", + "data_type": "timestamp", + "nullable": false + } + }, + "write_disposition": "skip", + "description": "Created by DLT. Tracks completed loads" + }, + "_2022_05": { + "name": "_2022_05", + "columns": { + "sender_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "sender_id", + "data_type": "text", + "nullable": true + }, + "message_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "message_id", + "data_type": "bigint", + "nullable": true + }, + "annotation": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "annotation", + "data_type": "text", + "nullable": true + }, + "confidence": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "confidence", + "data_type": "double", + "nullable": true + }, + "count": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "count", + "data_type": "bigint", + "nullable": true + }, + "added_at": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "added_at", + "data_type": "text", + "nullable": true + }, + "reviewed": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "reviewed", + "data_type": "bool", + "nullable": true + }, + "_dlt_load_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_load_id", + "data_type": "text", + "nullable": false + }, + "_dlt_id": { + "partition": false, + "cluster": false, + "unique": true, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_id", + "data_type": "text", + "nullable": false + } + }, + "write_disposition": "replace" + }, + "model_metadata": { + "name": "model_metadata", + "columns": { + "model_name": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "model_name", + "data_type": "text", + "nullable": true + }, + "accuracy": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "accuracy", + "data_type": "double", + "nullable": true + }, + "last_run_time": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "last_run_time", + "data_type": "text", + "nullable": true + }, + "_dlt_load_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_load_id", + "data_type": "text", + "nullable": false + }, + "_dlt_id": { + "partition": false, + "cluster": false, + "unique": true, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_id", + "data_type": "text", + "nullable": false + } + }, + "write_disposition": "replace" + }, + "_dlt_pipeline_state": { + "name": "_dlt_pipeline_state", + "columns": { + "version": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "version", + "data_type": "bigint", + "nullable": true + }, + "engine_version": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "engine_version", + "data_type": "bigint", + "nullable": true + }, + "pipeline_name": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "pipeline_name", + "data_type": "text", + "nullable": true + }, + "state": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "state", + "data_type": "text", + "nullable": true + }, + "created_at": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "created_at", + "data_type": "timestamp", + "nullable": true + }, + "_dlt_load_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_load_id", + "data_type": "text", + "nullable": false + }, + "_dlt_id": { + "partition": false, + "cluster": false, + "unique": true, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_id", + "data_type": "text", + "nullable": false + } + }, + "write_disposition": "append" + } + }, + "settings": { + "default_hints": { + "not_null": [ + "_dlt_id", + "_dlt_root_id", + "_dlt_parent_id", + "_dlt_list_idx", + "_dlt_load_id" + ], + "foreign_key": [ + "_dlt_parent_id" + ], + "unique": [ + "_dlt_id" + ] + } + }, + "normalizers": { + "detections": [ + "timestamp", + "iso_timestamp" + ], + "names": "dlt.common.normalizers.names.snake_case", + "json": { + "module": "dlt.common.normalizers.json.relational" + } + } +} \ No newline at end of file diff --git a/tests/common/storages/test_schema_storage.py b/tests/common/storages/test_schema_storage.py index 254a3a8449..6e4cb42545 100644 --- a/tests/common/storages/test_schema_storage.py +++ b/tests/common/storages/test_schema_storage.py @@ -48,6 +48,14 @@ def test_load_non_existing(storage: SchemaStorage) -> None: storage.load_schema("nonexisting") +def test_load_schema_with_upgrade() -> None: + # point the storage root to v4 schema google_spreadsheet_v3.schema + storage = LiveSchemaStorage(SchemaVolumeConfiguration(COMMON_TEST_CASES_PATH + "schemas/sheets")) + # the hash when computed on the schema does not match the version_hash in the file so it should raise InStorageSchemaModified + # but because the version upgrade is required, the check is skipped and the load succeeds + storage.load_schema("google_spreadsheet_v4") + + def test_import_non_existing(synced_storage: SchemaStorage) -> None: with pytest.raises(SchemaNotFoundError): synced_storage.load_schema("nonexisting") From 739df5225847b6b313ba8cfdef0dfb95c923198d Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sun, 8 Jan 2023 19:17:18 +0100 Subject: [PATCH 6/6] fixes remaining tests --- dlt/pipeline/pipeline.py | 9 +++++---- tests/pipeline/test_pipeline.py | 6 +++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 599fb0783b..cb4f700077 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -1002,10 +1002,11 @@ def _managed_state(self, *, extract_state: bool = False) -> Iterator[TPipelineSt # restore original pipeline props self._state_to_props(backup_state) # synchronize schema storage with initial list of schemas, note that we'll not be able to synchronize the schema content - # TODO: we should restore schemas backup here - for existing_schema_name in self._schema_storage.list_schemas(): - if existing_schema_name not in self.schema_names: - self._schema_storage.remove_schema(existing_schema_name) + if self._schema_storage: + # TODO: we should restore schemas backup here + for existing_schema_name in self._schema_storage.list_schemas(): + if existing_schema_name not in self.schema_names: + self._schema_storage.remove_schema(existing_schema_name) # raise original exception raise else: diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 7b023dde43..b8741cda02 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -212,9 +212,9 @@ def i_fail(): # nothing to normalize assert len(storage.list_files_to_normalize_sorted()) == 0 - # but the schemas are stored in the pipeline - p.schemas["default_3"] - p.schemas["default_4"] + # pipeline state is successfully rollbacked after the last extract and default_3 and 4 schemas are not present + assert set(p.schema_names) == {"default", "default_2"} + assert set(p._schema_storage.list_schemas()) == {"default", "default_2"} def test_restore_state_on_dummy() -> None: