Skip to content

Commit

Permalink
fixes schema versioning (#1140)
Browse files Browse the repository at this point in the history
* properly recognizes new and modified schemas, fixes several places where version was bumped incorrectly

* fixes saving and importing schemas in schema storage, adds missing tests

* fixes lacking write disposition when creating resource

* skips saving schemas when it was not modified in extract and normalize

* adds tables only tests to drop command

* splits destination exceptions, fixes new schemas in tests

* fixes test
  • Loading branch information
rudolfix committed Mar 24, 2024
1 parent c975ef4 commit b1b92f6
Show file tree
Hide file tree
Showing 43 changed files with 837 additions and 395 deletions.
126 changes: 126 additions & 0 deletions dlt/common/destination/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Any, Iterable, List

from dlt.common.exceptions import DltException, TerminalException, TransientException


class DestinationException(DltException):
pass


class UnknownDestinationModule(DestinationException):
def __init__(self, destination_module: str) -> None:
self.destination_module = destination_module
if "." in destination_module:
msg = f"Destination module {destination_module} could not be found and imported"
else:
msg = f"Destination {destination_module} is not one of the standard dlt destinations"
super().__init__(msg)


class InvalidDestinationReference(DestinationException):
def __init__(self, destination_module: Any) -> None:
self.destination_module = destination_module
msg = f"Destination {destination_module} is not a valid destination module."
super().__init__(msg)


class DestinationTerminalException(DestinationException, TerminalException):
pass


class DestinationUndefinedEntity(DestinationTerminalException):
pass


class DestinationTransientException(DestinationException, TransientException):
pass


class DestinationLoadingViaStagingNotSupported(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} does not support loading via staging.")


class DestinationLoadingWithoutStagingNotSupported(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} does not support loading without staging.")


class DestinationNoStagingMode(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} cannot be used as a staging")


class DestinationIncompatibleLoaderFileFormatException(DestinationTerminalException):
def __init__(
self, destination: str, staging: str, file_format: str, supported_formats: Iterable[str]
) -> None:
self.destination = destination
self.staging = staging
self.file_format = file_format
self.supported_formats = supported_formats
supported_formats_str = ", ".join(supported_formats)
if self.staging:
if not supported_formats:
msg = (
f"Staging {staging} cannot be used with destination {destination} because they"
" have no file formats in common."
)
else:
msg = (
f"Unsupported file format {file_format} for destination {destination} in"
f" combination with staging destination {staging}. Supported formats:"
f" {supported_formats_str}"
)
else:
msg = (
f"Unsupported file format {file_format} destination {destination}. Supported"
f" formats: {supported_formats_str}. Check the staging option in the dlt.pipeline"
" for additional formats."
)
super().__init__(msg)


class IdentifierTooLongException(DestinationTerminalException):
def __init__(
self,
destination_name: str,
identifier_type: str,
identifier_name: str,
max_identifier_length: int,
) -> None:
self.destination_name = destination_name
self.identifier_type = identifier_type
self.identifier_name = identifier_name
self.max_identifier_length = max_identifier_length
super().__init__(
f"The length of {identifier_type} {identifier_name} exceeds"
f" {max_identifier_length} allowed for {destination_name}"
)


class DestinationHasFailedJobs(DestinationTerminalException):
def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None:
self.destination_name = destination_name
self.load_id = load_id
self.failed_jobs = failed_jobs
super().__init__(
f"Destination {destination_name} has failed jobs in load package {load_id}"
)


class DestinationSchemaTampered(DestinationTerminalException):
def __init__(self, schema_name: str, version_hash: str, stored_version_hash: str) -> None:
self.version_hash = version_hash
self.stored_version_hash = stored_version_hash
super().__init__(
f"Schema {schema_name} content was changed - by a loader or by destination code - from"
" the moment it was retrieved by load package. Such schema cannot reliably be updated"
f" nor saved. Current version hash: {version_hash} != stored version hash"
f" {stored_version_hash}. If you are using destination client directly, without storing"
" schema in load package, you should first save it into schema storage. You can also"
" use schema._bump_version() in test code to remove modified flag."
)
18 changes: 12 additions & 6 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@
import inspect

from dlt.common import logger
from dlt.common.exceptions import (
IdentifierTooLongException,
InvalidDestinationReference,
UnknownDestinationModule,
)
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.exceptions import SchemaException
from dlt.common.schema.utils import (
Expand All @@ -43,13 +38,18 @@
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import (
IdentifierTooLongException,
InvalidDestinationReference,
UnknownDestinationModule,
DestinationSchemaTampered,
)
from dlt.common.schema.utils import is_complete_column
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
Expand Down Expand Up @@ -318,6 +318,12 @@ def update_stored_schema(
Optional[TSchemaTables]: Returns an update that was applied at the destination.
"""
self._verify_schema()
# make sure that schema being saved was not modified from the moment it was loaded from storage
version_hash = self.schema.version_hash
if self.schema.is_modified:
raise DestinationSchemaTampered(
self.schema.name, version_hash, self.schema.stored_version_hash
)
return expected_update

@abstractmethod
Expand Down
109 changes: 0 additions & 109 deletions dlt/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,115 +133,6 @@ class SystemConfigurationException(DltException):
pass


class DestinationException(DltException):
pass


class UnknownDestinationModule(DestinationException):
def __init__(self, destination_module: str) -> None:
self.destination_module = destination_module
if "." in destination_module:
msg = f"Destination module {destination_module} could not be found and imported"
else:
msg = f"Destination {destination_module} is not one of the standard dlt destinations"
super().__init__(msg)


class InvalidDestinationReference(DestinationException):
def __init__(self, destination_module: Any) -> None:
self.destination_module = destination_module
msg = f"Destination {destination_module} is not a valid destination module."
super().__init__(msg)


class DestinationTerminalException(DestinationException, TerminalException):
pass


class DestinationUndefinedEntity(DestinationTerminalException):
pass


class DestinationTransientException(DestinationException, TransientException):
pass


class DestinationLoadingViaStagingNotSupported(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} does not support loading via staging.")


class DestinationLoadingWithoutStagingNotSupported(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} does not support loading without staging.")


class DestinationNoStagingMode(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} cannot be used as a staging")


class DestinationIncompatibleLoaderFileFormatException(DestinationTerminalException):
def __init__(
self, destination: str, staging: str, file_format: str, supported_formats: Iterable[str]
) -> None:
self.destination = destination
self.staging = staging
self.file_format = file_format
self.supported_formats = supported_formats
supported_formats_str = ", ".join(supported_formats)
if self.staging:
if not supported_formats:
msg = (
f"Staging {staging} cannot be used with destination {destination} because they"
" have no file formats in common."
)
else:
msg = (
f"Unsupported file format {file_format} for destination {destination} in"
f" combination with staging destination {staging}. Supported formats:"
f" {supported_formats_str}"
)
else:
msg = (
f"Unsupported file format {file_format} destination {destination}. Supported"
f" formats: {supported_formats_str}. Check the staging option in the dlt.pipeline"
" for additional formats."
)
super().__init__(msg)


class IdentifierTooLongException(DestinationTerminalException):
def __init__(
self,
destination_name: str,
identifier_type: str,
identifier_name: str,
max_identifier_length: int,
) -> None:
self.destination_name = destination_name
self.identifier_type = identifier_type
self.identifier_name = identifier_name
self.max_identifier_length = max_identifier_length
super().__init__(
f"The length of {identifier_type} {identifier_name} exceeds"
f" {max_identifier_length} allowed for {destination_name}"
)


class DestinationHasFailedJobs(DestinationTerminalException):
def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None:
self.destination_name = destination_name
self.load_id = load_id
self.failed_jobs = failed_jobs
super().__init__(
f"Destination {destination_name} has failed jobs in load package {load_id}"
)


class PipelineException(DltException):
def __init__(self, pipeline_name: str, msg: str) -> None:
"""Base class for all pipeline exceptions. Should not be raised."""
Expand Down
10 changes: 2 additions & 8 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,18 @@
from dlt.common.configuration.paths import get_dlt_data_dir
from dlt.common.configuration.specs import RunConfiguration
from dlt.common.destination import TDestinationReferenceArg, TDestination
from dlt.common.exceptions import (
DestinationHasFailedJobs,
PipelineStateNotAvailable,
SourceSectionNotAvailable,
)
from dlt.common.destination.exceptions import DestinationHasFailedJobs
from dlt.common.exceptions import PipelineStateNotAvailable, SourceSectionNotAvailable
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition, TSchemaContract
from dlt.common.source import get_current_pipe_name
from dlt.common.storages.load_storage import LoadPackageInfo
from dlt.common.storages.load_package import PackageStorage

from dlt.common.time import ensure_pendulum_datetime, precise_time
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
from dlt.common.data_writers.writers import DataWriterMetrics, TLoaderFileFormat
from dlt.common.utils import RowCounts, merge_row_counts
from dlt.common.versioned_state import TVersionedState
from dlt.common.storages.load_package import TLoadPackageState


class _StepInfo(NamedTuple):
Expand Down

0 comments on commit b1b92f6

Please sign in to comment.