Skip to content

Commit

Permalink
Merge pull request #350 from dlt-hub/rfix/standard-resource-state
Browse files Browse the repository at this point in the history
* `resource_state` got final interface and is now exposed in `dlt.current.resource_state`
* adds transformer overload that may be used when creating transformers dynamically to pass the decorated function
* `source.with_resources` creates a clone of resource and selects in the clone. previously source was modified in place
* you can write back the secrets and configuration using `dlt.config` and `dlt.secrets` indexers

docs for the above are coming
  • Loading branch information
rudolfix authored May 21, 2023
2 parents 0e37033 + 6ff388d commit 5cae979
Show file tree
Hide file tree
Showing 55 changed files with 859 additions and 313 deletions.
2 changes: 1 addition & 1 deletion dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
from dlt.common.configuration.providers import CONFIG_TOML, SECRETS_TOML, ConfigTomlProvider, SecretsTomlProvider
from dlt.common.normalizers import default_normalizers, import_normalizers
from dlt.common.pipeline import get_dlt_repos_dir
from dlt.common.source import _SOURCES
from dlt.version import DLT_PKG_NAME, __version__
from dlt.common.destination.reference import DestinationReference
from dlt.common.reflection.utils import rewrite_python_script
from dlt.common.schema.exceptions import InvalidSchemaName
from dlt.common.storages.file_storage import FileStorage

from dlt.extract.decorators import _SOURCES
import dlt.reflection.names as n
from dlt.reflection.script_inspector import inspect_pipeline_script, load_script_module

Expand Down
6 changes: 3 additions & 3 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dlt.cli.exceptions import CliCommandException

from dlt.common import json
from dlt.common.pipeline import _resource_state, get_dlt_pipelines_dir, TSourceState
from dlt.common.pipeline import resource_state, get_dlt_pipelines_dir, TSourceState
from dlt.common.destination.reference import TDestinationReferenceArg
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
Expand Down Expand Up @@ -97,8 +97,8 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
if sources_state:
source_state = next(iter(sources_state.items()))[1] if is_single_schema else sources_state.get(schema_name)
if source_state:
resource_state = _resource_state(resource_name, source_state)
res_state_slots = len(resource_state)
resource_state_ = resource_state(resource_name, source_state)
res_state_slots = len(resource_state_)
fmt.echo("%s with %s table(s) and %s resource state slot(s)" % (fmt.bold(resource_name), fmt.bold(str(len(tables))), fmt.bold(str(res_state_slots))))
fmt.echo()
fmt.echo("Working dir content:")
Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.reflection.utils import creates_func_def_name_node
from dlt.common.typing import is_optional_type
from dlt.common.source import SourceInfo

from dlt.cli.config_toml_writer import WritableConfigValue
from dlt.cli.exceptions import CliCommandException
from dlt.extract.decorators import SourceInfo
from dlt.reflection.script_visitor import PipelineScriptVisitor


Expand Down
20 changes: 20 additions & 0 deletions dlt/common/configuration/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def __getitem__(self, field: str) -> Any:
else:
return value

def __setitem__(self, field: str, value: Any) -> None:
sections = field.split(".")
key = sections.pop()
self.writable_provider.set_value(key, value, None, *sections)

def get(self, field: str, expected_type: Type[TConfigAny] = None) -> TConfigAny:
value: TConfigAny
value, _ = self._get_value(field, expected_type)
Expand All @@ -47,6 +52,11 @@ def config_providers(self) -> Sequence[ConfigProvider]:
def default_type(self) -> AnyType:
pass

@property
@abc.abstractmethod
def writable_provider(self) -> ConfigProvider:
pass

def _get_providers_from_context(self) -> Sequence[ConfigProvider]:
return Container()[ConfigProvidersContext].providers

Expand Down Expand Up @@ -87,6 +97,11 @@ def config_providers(self) -> Sequence[ConfigProvider]:
def default_type(self) -> AnyType:
return AnyType

@property
def writable_provider(self) -> ConfigProvider:
"""find first writable provider that does not support secrets - should be config.toml"""
return next(p for p in self._get_providers_from_context() if p.is_writable and not p.supports_secrets)

value: ClassVar[None] = ConfigValue
"A placeholder that tells dlt to replace it with actual config value during the call to a source or resource decorated function."

Expand All @@ -103,6 +118,11 @@ def config_providers(self) -> Sequence[ConfigProvider]:
def default_type(self) -> AnyType:
return TSecretValue

@property
def writable_provider(self) -> ConfigProvider:
"""find first writable provider that supports secrets - should be secrets.toml"""
return next(p for p in self._get_providers_from_context() if p.is_writable and p.supports_secrets)

value: ClassVar[None] = ConfigValue
"A placeholder that tells dlt to replace it with actual secret during the call to a source or resource decorated function."

Expand Down
31 changes: 1 addition & 30 deletions dlt/common/configuration/providers/google_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,36 +138,7 @@ def _update_from_vault(self, full_key: str, key: str, hint: type, pipeline_name:
secret = self._look_vault(full_key, hint)
self._vault_lookups[full_key] = pendulum.now()
if secret is not None:
self._add_node(secret, key, pipeline_name, sections)

def _add_node(self, secret: str, key: str, pipeline_name: str, sections: Tuple[str, ...]) -> None:
if pipeline_name:
sections = (pipeline_name, ) + sections

doc: Any = auto_cast(secret)
if isinstance(doc, TOMLContainer):
if key is None:
self._toml = doc
else:
# always update the top document
update_dict_nested(self._toml, doc)
else:
if key is None:
raise ValueError("dlt_secrets_toml must contain toml document")

master: TOMLContainer
# descend from root, create tables if necessary
master = self._toml
for k in sections:
if not isinstance(master, dict):
raise KeyError(k)
if k not in master:
master[k] = tomlkit.table()
master = master[k] # type: ignore
if isinstance(doc, dict):
update_dict_nested(master[key], doc) # type: ignore
else:
master[key] = doc
self.set_value(key, auto_cast(secret), pipeline_name, *sections)

@property
def is_empty(self) -> bool:
Expand Down
6 changes: 6 additions & 0 deletions dlt/common/configuration/providers/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class ConfigProvider(abc.ABC):
def get_value(self, key: str, hint: Type[Any], pipeline_name: str, *sections: str) -> Tuple[Optional[Any], str]:
pass

def set_value(self, key: str, value: Any, pipeline_name: str, *sections: str) -> None:
raise NotImplementedError()

@property
@abc.abstractmethod
def supports_secrets(self) -> bool:
Expand All @@ -29,6 +32,9 @@ def name(self) -> str:
def is_empty(self) -> bool:
return False

@property
def is_writable(self) -> bool:
return False


def get_key_name(key: str, separator: str, /, *sections: str) -> str:
Expand Down
36 changes: 36 additions & 0 deletions dlt/common/configuration/providers/toml.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,35 @@ def get_value(self, key: str, hint: Type[Any], pipeline_name: str, *sections: st
except KeyError:
return None, full_key

def set_value(self, key: str, value: Any, pipeline_name: str, *sections: str) -> None:
if pipeline_name:
sections = (pipeline_name, ) + sections

if isinstance(value, TOMLContainer):
if key is None:
self._toml = value
else:
# always update the top document
# TODO: verify that value contains only the elements under key
update_dict_nested(self._toml, value)
else:
if key is None:
raise ValueError("dlt_secrets_toml must contain toml document")

master: TOMLContainer
# descend from root, create tables if necessary
master = self._toml
for k in sections:
if not isinstance(master, dict):
raise KeyError(k)
if k not in master:
master[k] = tomlkit.table()
master = master[k] # type: ignore
if isinstance(value, dict) and isinstance(master.get(key), dict):
update_dict_nested(master[key], value) # type: ignore
else:
master[key] = value

@property
def supports_sections(self) -> bool:
return True
Expand Down Expand Up @@ -110,6 +139,9 @@ def name(self) -> str:
def supports_secrets(self) -> bool:
return False

@property
def is_writable(self) -> bool:
return True


class SecretsTomlProvider(TomlFileProvider):
Expand All @@ -125,6 +157,10 @@ def name(self) -> str:
def supports_secrets(self) -> bool:
return True

@property
def is_writable(self) -> bool:
return True


class TomlProviderReadException(ConfigProviderException):
def __init__(self, provider_name: str, file_name: str, full_path: str, toml_exception: str) -> None:
Expand Down
3 changes: 0 additions & 3 deletions dlt/common/configuration/specs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from .run_configuration import RunConfiguration # noqa: F401
from .base_configuration import BaseConfiguration, CredentialsConfiguration, CredentialsWithDefault, ContainerInjectableContext, extract_inner_hint, is_base_configuration_inner_hint, configspec # noqa: F401
from .normalize_volume_configuration import NormalizeVolumeConfiguration # noqa: F401
from .load_volume_configuration import LoadVolumeConfiguration # noqa: F401
from .schema_volume_configuration import SchemaVolumeConfiguration, TSchemaFileFormat # noqa: F401
from .config_section_context import ConfigSectionContext # noqa: F401

from .gcp_credentials import GcpServiceAccountCredentialsWithoutDefaults, GcpServiceAccountCredentials, GcpOAuthCredentialsWithoutDefaults, GcpOAuthCredentials, GcpCredentials # noqa: F401
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ConnectionStringCredentials(CredentialsConfiguration):
port: Optional[int] = None
query: Optional[Dict[str, str]] = None

__config_gen_annotations__: ClassVar[List[str]] = ["port"]
__config_gen_annotations__: ClassVar[List[str]] = ["port", "password", "host"]

def parse_native_representation(self, native_value: Any) -> None:
if not isinstance(native_value, str):
Expand Down
13 changes: 0 additions & 13 deletions dlt/common/configuration/specs/load_volume_configuration.py

This file was deleted.

12 changes: 0 additions & 12 deletions dlt/common/configuration/specs/normalize_volume_configuration.py

This file was deleted.

19 changes: 0 additions & 19 deletions dlt/common/configuration/specs/schema_volume_configuration.py

This file was deleted.

6 changes: 6 additions & 0 deletions dlt/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ def __init__(self, source_name: Optional[str] = None) -> None:
super().__init__(None, msg)


class ResourceNameNotAvailable(PipelineException):
def __init__(self) -> None:
super().__init__(None,
"A resource state was requested but no resource marked callable was found in the call stack. Resource state may be only requested from @dlt.resource decorated function or with explicit resource name.")


class SourceSectionNotAvailable(PipelineException):
def __init__(self) -> None:
msg = "Access to state was requested without source section active. State should be requested from within the @dlt.source and @dlt.resource decorated function."
Expand Down
Loading

0 comments on commit 5cae979

Please sign in to comment.