Skip to content

Commit

Permalink
some work on schema evolution modes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Sep 4, 2023
1 parent bef7ea4 commit fc6f083
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 40 deletions.
29 changes: 19 additions & 10 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dlt.common.schema import utils
from dlt.common.data_types import py_type_to_sc_type, coerce_value, TDataType
from dlt.common.schema.typing import (COLUMN_HINTS, SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, VERSION_TABLE_NAME, TColumnSchemaBase, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema,
TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TColumnHint, TTypeDetections, TWriteDisposition, TSchemaUpdateMode)
TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TColumnHint, TTypeDetections, TSchemaEvolutionModes)
from dlt.common.schema.exceptions import (CannotCoerceColumnException, CannotCoerceNullException, InvalidSchemaName,
ParentTableNotFoundException, SchemaCorruptedException)
from dlt.common.validation import validate_dict
Expand Down Expand Up @@ -177,13 +177,23 @@ def coerce_row(self, table_name: str, parent_table: str, row: StrAny) -> Tuple[D
updated_table_partial["columns"][new_col_name] = new_col_def

return new_row, updated_table_partial
def check_schema_update(self, table_name: str, row: DictStrAny, partial_table: TPartialTableSchema, schema_update_mode: TSchemaUpdateMode) -> Tuple[DictStrAny, TPartialTableSchema]:

def check_schema_update(self, parent_table: str, table_name: str, row: DictStrAny, partial_table: TPartialTableSchema) -> Tuple[DictStrAny, TPartialTableSchema]:
"""Checks if schema update mode allows for the requested changes, filter row or reject update, depending on the mode"""

# for now we defined the schema as new if there are no data columns defined
has_columns = self.has_data_columns
if not has_columns:
return row, partial_table

# resolve evolution settings
table_with_settings = parent_table or table_name
evolution_settings = self.tables.get(table_with_settings, {}).get("schema_evolution_settings", "evolve")
if isinstance(evolution_settings, str):
evolution_settings = TSchemaEvolutionModes(table=evolution_settings, column=evolution_settings, column_variant=evolution_settings)

# if there is a schema update and we froze schema and filter additional data, clean up
if has_columns and partial_table and schema_update_mode == "freeze-and-trim":
# do not create new tables
if evolution_settings["table"] == "freeze-and-trim":
if table_name not in self.tables or not len(self.tables[table_name].get("columns", {})):
return None, None
# pop unknown values
Expand All @@ -193,16 +203,15 @@ def check_schema_update(self, table_name: str, row: DictStrAny, partial_table: T
return row, None

# if there is a schema update and we froze schema and discard additional rows, do nothing
elif has_columns and partial_table and schema_update_mode == "freeze-and-discard":
elif evolution_settings["table"] == "freeze-and-discard":
return None, None

# if there is a schema update and we disallow any data not fitting the schema, raise!
elif has_columns and partial_table and schema_update_mode == "freeze-and-raise":
elif evolution_settings["table"] == "freeze-and-raise":
raise SchemaFrozenException(f"Trying to modify table {table_name} but schema is frozen.")

return row, partial_table



def update_schema(self, partial_table: TPartialTableSchema) -> TPartialTableSchema:
table_name = partial_table["name"]
parent_table_name = partial_table.get("parent")
Expand Down
15 changes: 12 additions & 3 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ class TColumnSchema(TColumnSchemaBase, total=False):
TColumnName = NewType("TColumnName", str)
SIMPLE_REGEX_PREFIX = "re:"

TSchemaEvolutionMode = Literal["evolve", "freeze-and-trim", "freeze-and-raise", "freeze-and-discard"]

class TSchemaEvolutionModes(TypedDict, total=False):
"""TypedDict defining the schema update settings"""
table: TSchemaEvolutionMode
column: TSchemaEvolutionMode
column_variant: TSchemaEvolutionMode

TSchemaEvolutionSettings = Union[TSchemaEvolutionMode, TSchemaEvolutionModes]

class TRowFilters(TypedDict, total=True):
excludes: Optional[List[TSimpleRegex]]
Expand All @@ -72,7 +81,7 @@ class TTableSchema(TypedDict, total=False):
name: Optional[str]
description: Optional[str]
write_disposition: Optional[TWriteDisposition]
table_sealed: Optional[bool]
schema_evolution_settings: Optional[TSchemaEvolutionSettings]
parent: Optional[str]
filters: Optional[TRowFilters]
columns: TTableSchemaColumns
Expand All @@ -86,8 +95,9 @@ class TPartialTableSchema(TTableSchema):
TSchemaTables = Dict[str, TTableSchema]
TSchemaUpdate = Dict[str, List[TPartialTableSchema]]


class TSchemaSettings(TypedDict, total=False):
schema_sealed: Optional[bool]
schema_evolution_settings: Optional[TSchemaEvolutionSettings]
detections: Optional[List[TTypeDetections]]
default_hints: Optional[Dict[TColumnHint, List[TSimpleRegex]]]
preferred_types: Optional[Dict[TSimpleRegex, TDataType]]
Expand All @@ -105,4 +115,3 @@ class TStoredSchema(TypedDict, total=False):
tables: TSchemaTables
normalizers: TNormalizersConfig

TSchemaUpdateMode = Literal["evolve", "freeze-and-trim", "freeze-and-raise", "freeze-and-discard"]
11 changes: 9 additions & 2 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dlt.common.schema import detections
from dlt.common.schema.typing import (SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TPartialTableSchema, TSchemaTables, TSchemaUpdate,
TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp,
TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition)
TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition, TSchemaEvolutionSettings, TSchemaEvolutionModes)
from dlt.common.schema.exceptions import (CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException,
TablePropertiesConflictException, InvalidSchemaName)

Expand Down Expand Up @@ -403,6 +403,10 @@ def merge_tables(table: TTableSchema, partial_table: TPartialTableSchema) -> TPa
if table.get('parent') is None and (resource := partial_table.get('resource')):
table['resource'] = resource

partial_e_s = partial_table.get("schema_evolution_settings")
if partial_e_s:
table["schema_evolution_settings"] = partial_e_s

return diff_table


Expand Down Expand Up @@ -568,7 +572,8 @@ def new_table(
write_disposition: TWriteDisposition = None,
columns: Sequence[TColumnSchema] = None,
validate_schema: bool = False,
resource: str = None
resource: str = None,
schema_evolution_settings: TSchemaEvolutionSettings = None,
) -> TTableSchema:

table: TTableSchema = {
Expand All @@ -579,10 +584,12 @@ def new_table(
table["parent"] = parent_table_name
assert write_disposition is None
assert resource is None
assert schema_evolution_settings is None
else:
# set write disposition only for root tables
table["write_disposition"] = write_disposition or DEFAULT_WRITE_DISPOSITION
table["resource"] = resource or table_name
table["schema_evolution_settings"] = schema_evolution_settings
if validate_schema:
validate_dict_ignoring_xkeys(
spec=TColumnSchema,
Expand Down
3 changes: 3 additions & 0 deletions dlt/common/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil

def verify_prop(pk: str, pv: Any, t: Any) -> None:
if is_optional_type(t):
# pass if value actually is none
if pv is None:
return
t = extract_optional_type(t)

if is_literal_type(t):
Expand Down
9 changes: 7 additions & 2 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dlt.common.pipeline import PipelineContext
from dlt.common.source import _SOURCES, SourceInfo
from dlt.common.schema.schema import Schema
from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns, TWriteDisposition, TAnySchemaColumns
from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns, TWriteDisposition, TAnySchemaColumns, TSchemaEvolutionSettings
from dlt.extract.utils import ensure_table_schema_columns_hint
from dlt.common.storages.exceptions import SchemaNotFoundError
from dlt.common.storages.schema_storage import SchemaStorage
Expand Down Expand Up @@ -200,6 +200,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None
) -> Callable[TResourceFunParams, DltResource]:
Expand All @@ -215,6 +216,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None
) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]:
Expand All @@ -230,6 +232,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None
) -> DltResource:
Expand All @@ -245,6 +248,7 @@ def resource(
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
depends_on: TUnboundDltResource = None,
Expand Down Expand Up @@ -311,7 +315,8 @@ def make_resource(_name: str, _section: str, _data: Any, incremental: Incrementa
write_disposition=write_disposition,
columns=schema_columns,
primary_key=primary_key,
merge_key=merge_key
merge_key=merge_key,
schema_evolution_settings=schema_evolution_settings
)
return DltResource.from_data(_data, _name, _section, table_template, selected, cast(DltResource, depends_on), incremental=incremental)

Expand Down
9 changes: 5 additions & 4 deletions dlt/extract/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import List, TypedDict, cast, Any

from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION, merge_columns, new_column, new_table
from dlt.common.schema.typing import TColumnNames, TColumnProp, TColumnSchema, TPartialTableSchema, TTableSchemaColumns, TWriteDisposition, TAnySchemaColumns
from dlt.common.schema.typing import TColumnNames, TColumnProp, TColumnSchema, TPartialTableSchema, TTableSchemaColumns, TWriteDisposition, TAnySchemaColumns, TSchemaEvolutionSettings
from dlt.common.typing import TDataItem
from dlt.common.validation import validate_dict_ignoring_xkeys

Expand All @@ -23,6 +23,7 @@ class TTableSchemaTemplate(TypedDict, total=False):
primary_key: TTableHintTemplate[TColumnNames]
merge_key: TTableHintTemplate[TColumnNames]
incremental: Incremental[Any]
schema_evolution_settings: TSchemaEvolutionSettings


class DltResourceSchema:
Expand Down Expand Up @@ -181,7 +182,8 @@ def new_table_template(
write_disposition: TTableHintTemplate[TWriteDisposition] = None,
columns: TTableHintTemplate[TTableSchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None,
) -> TTableSchemaTemplate:
if not table_name:
raise TableNameMissing()
Expand All @@ -194,8 +196,7 @@ def new_table_template(
column["name"] = name
column_list.append(column)
columns = column_list # type: ignore

new_template: TTableSchemaTemplate = new_table(table_name, parent_table_name, write_disposition=write_disposition, columns=columns) # type: ignore
new_template: TTableSchemaTemplate = new_table(table_name, parent_table_name, write_disposition=write_disposition, columns=columns, schema_evolution_settings=schema_evolution_settings) # type: ignore
if primary_key:
new_template["primary_key"] = primary_key
if merge_key:
Expand Down
2 changes: 0 additions & 2 deletions dlt/normalize/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.runners.configuration import PoolRunnerConfiguration, TPoolType
from dlt.common.storages import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration
from dlt.common.schema.typing import TSchemaUpdateMode

@configspec
class NormalizeConfiguration(PoolRunnerConfiguration):
pool_type: TPoolType = "process"
destination_capabilities: DestinationCapabilitiesContext = None # injectable
schema_update_mode: TSchemaUpdateMode = "evolve"
_schema_storage_config: SchemaStorageConfiguration
_normalize_storage_config: NormalizeStorageConfiguration
_load_storage_config: LoadStorageConfiguration
Expand Down
14 changes: 6 additions & 8 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def load_or_create_schema(schema_storage: SchemaStorage, schema_name: str) -> Sc

@staticmethod
def w_normalize_files(
normalize_config: NormalizeConfiguration,
normalize_storage_config: NormalizeStorageConfiguration,
loader_storage_config: LoadStorageConfiguration,
destination_caps: DestinationCapabilitiesContext,
Expand Down Expand Up @@ -98,7 +97,7 @@ def w_normalize_files(
items_count = 0
for line_no, line in enumerate(f):
items: List[TDataItem] = json.loads(line)
partial_update, items_count, r_counts = Normalize._w_normalize_chunk(normalize_config, load_storage, schema, load_id, root_table_name, items)
partial_update, items_count, r_counts = Normalize._w_normalize_chunk(load_storage, schema, load_id, root_table_name, items)
schema_updates.append(partial_update)
total_items += items_count
merge_row_count(row_counts, r_counts)
Expand Down Expand Up @@ -127,7 +126,7 @@ def w_normalize_files(
return schema_updates, total_items, load_storage.closed_files(), row_counts

@staticmethod
def _w_normalize_chunk(config: NormalizeConfiguration, load_storage: LoadStorage, schema: Schema, load_id: str, root_table_name: str, items: List[TDataItem]) -> Tuple[TSchemaUpdate, int, TRowCount]:
def _w_normalize_chunk(load_storage: LoadStorage, schema: Schema, load_id: str, root_table_name: str, items: List[TDataItem]) -> Tuple[TSchemaUpdate, int, TRowCount]:
column_schemas: Dict[str, TTableSchemaColumns] = {} # quick access to column schema for writers below
schema_update: TSchemaUpdate = {}
schema_name = schema.name
Expand All @@ -146,9 +145,9 @@ def _w_normalize_chunk(config: NormalizeConfiguration, load_storage: LoadStorage
row[k] = custom_pua_decode(v) # type: ignore
# coerce row of values into schema table, generating partial table with new columns if any
row, partial_table = schema.coerce_row(table_name, parent_table, row)
# check update
row, partial_table = schema.check_schema_update(table_name, row, partial_table, config.schema_update_mode)

# if we detect a migration, the check update
if partial_table:
row, partial_table = schema.check_schema_update(parent_table, table_name, row, partial_table)
if not row:
continue

Expand Down Expand Up @@ -203,7 +202,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TM
workers = self.pool._processes # type: ignore
chunk_files = self.group_worker_files(files, workers)
schema_dict: TStoredSchema = schema.to_dict()
config_tuple = (self.config, self.normalize_storage.config, self.load_storage.config, self.config.destination_capabilities, schema_dict)
config_tuple = (self.normalize_storage.config, self.load_storage.config, self.config.destination_capabilities, schema_dict)
param_chunk = [[*config_tuple, load_id, files] for files in chunk_files]
tasks: List[Tuple[AsyncResult[TWorkerRV], List[Any]]] = []
row_counts: TRowCount = {}
Expand Down Expand Up @@ -256,7 +255,6 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TM

def map_single(self, schema: Schema, load_id: str, files: Sequence[str]) -> TMapFuncRV:
result = Normalize.w_normalize_files(
self.config,
self.normalize_storage.config,
self.load_storage.config,
self.config.destination_capabilities,
Expand Down
17 changes: 8 additions & 9 deletions tests/load/test_freeze_schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dlt, os, pytest
from dlt.common.utils import uniq_id
import duckdb

from tests.load.pipeline.utils import load_table_counts
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration
Expand All @@ -9,16 +10,14 @@

SCHEMA_UPDATE_MODES = ["evolve", "freeze-and-trim", "freeze-and-raise", "freeze-and-discard"]

@pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]), ids=lambda x: x.name)
@pytest.mark.parametrize("update_mode", SCHEMA_UPDATE_MODES)
def test_freeze_schema(update_mode: str, destination_config: DestinationTestConfiguration) -> None:
def test_freeze_schema(update_mode: str) -> None:

# freeze pipeline, drop additional values
# this will allow for the first run to create the schema, but will not accept further updates after that
os.environ['NORMALIZE__SCHEMA_UPDATE_MODE'] = update_mode
pipeline = destination_config.setup_pipeline("test_freeze_schema_2", dataset_name="freeze" + uniq_id())
pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:'))

@dlt.resource(name="items", write_disposition="append")
@dlt.resource(name="items", write_disposition="append", schema_evolution_settings=update_mode)
def load_items():
global offset
for _, index in enumerate(range(0, 10), 1):
Expand All @@ -27,7 +26,7 @@ def load_items():
"name": f"item {index}"
}

@dlt.resource(name="items", write_disposition="append")
@dlt.resource(name="items", write_disposition="append", schema_evolution_settings=update_mode)
def load_items_with_subitems():
global offset
for _, index in enumerate(range(0, 10), 1):
Expand All @@ -44,7 +43,7 @@ def load_items_with_subitems():
}]
}

pipeline.run([load_items], loader_file_format=destination_config.file_format)
pipeline.run([load_items])
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
# check data
assert table_counts["items"] == 10
Expand All @@ -53,10 +52,10 @@ def load_items_with_subitems():
# on freeze and raise we expect an exception
if update_mode == "freeze-and-raise":
with pytest.raises(PipelineStepFailed) as py_ex:
pipeline.run([load_items_with_subitems], loader_file_format=destination_config.file_format)
pipeline.run([load_items_with_subitems])
assert isinstance(py_ex.value.__context__, SchemaFrozenException)
else:
pipeline.run([load_items_with_subitems], loader_file_format=destination_config.file_format)
pipeline.run([load_items_with_subitems])

# check data
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
Expand Down

0 comments on commit fc6f083

Please sign in to comment.