Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema contract #594

Merged
merged 85 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
78d6d71
basic schema freezing
sh-rp Aug 27, 2023
09d8c63
small changes
sh-rp Aug 27, 2023
aec10b1
temp
sh-rp Aug 28, 2023
9441dcb
add new schema update mode
sh-rp Aug 28, 2023
edad4ad
fix linting errors and one bug
sh-rp Aug 31, 2023
bef7ea4
move freeze code to schema
sh-rp Aug 31, 2023
fc6f083
some work on schema evolution modes
sh-rp Sep 4, 2023
b36a74f
add tests
sh-rp Sep 4, 2023
5659827
small tests change
sh-rp Sep 4, 2023
5343f4b
Merge branch 'devel' into d#/data_contracts
sh-rp Sep 4, 2023
894875f
small fix
sh-rp Sep 5, 2023
8bccfe5
fix some tests
sh-rp Sep 5, 2023
cfd3f64
add global override for schema evolution
sh-rp Sep 5, 2023
e05855c
finish implemention of global override
sh-rp Sep 5, 2023
3f07127
better tests
sh-rp Sep 6, 2023
6308369
carry over schema settings on update
sh-rp Sep 6, 2023
ab0b8d7
add tests for single values
sh-rp Sep 6, 2023
6e99ed9
small changes to tests and code
sh-rp Sep 6, 2023
1a10ec4
fix small error
sh-rp Sep 6, 2023
175bee5
add tests for data contract interaction
sh-rp Sep 6, 2023
18b9341
fix tests
sh-rp Sep 7, 2023
6dcaa7d
Merge branch 'devel' into d#/data_contracts
sh-rp Sep 12, 2023
881d79a
some PR work
sh-rp Sep 12, 2023
e707580
update schema management
sh-rp Sep 13, 2023
b3dc41d
fix schema related tests
sh-rp Sep 13, 2023
ac8f766
add nice schema tests
sh-rp Sep 13, 2023
869f278
add docs page
sh-rp Sep 13, 2023
0f22ba0
small test fix
sh-rp Sep 13, 2023
463c447
smaller PR fixes
sh-rp Sep 14, 2023
db3f447
more work
sh-rp Sep 17, 2023
c17577a
tests update
rudolfix Sep 18, 2023
fb1d224
almost there
sh-rp Sep 23, 2023
6a15fa2
tmp
sh-rp Sep 26, 2023
d66c2e6
fix freeze tests
sh-rp Sep 26, 2023
5e238c5
Merge branch 'devel' into d#/data_contracts
sh-rp Sep 26, 2023
bf9da7e
cleanup
sh-rp Sep 26, 2023
9ba2496
Merge branch 'devel' into d#/data_contracts
sh-rp Sep 26, 2023
038d03a
create data contracts page
sh-rp Sep 26, 2023
84384c3
small cleanup
sh-rp Sep 26, 2023
44dfb69
add pydantic dep to destination tests
sh-rp Sep 26, 2023
ece2bfc
Merge branch 'devel' into d#/data_contracts
sh-rp Sep 28, 2023
7b8f2d2
rename contract settings
sh-rp Sep 29, 2023
00c540b
rename schema contract dict keys
sh-rp Sep 29, 2023
3ed4630
some work
sh-rp Sep 29, 2023
333217c
more work...
sh-rp Sep 29, 2023
d69e54d
more work
sh-rp Oct 1, 2023
041da6d
move checking of new tables into extract function
sh-rp Oct 2, 2023
b72a1a9
fix most tests
sh-rp Oct 2, 2023
f2abadf
Merge branch 'devel' into d#/data_contracts
sh-rp Oct 2, 2023
2ae36e3
fix linter after merge
sh-rp Oct 2, 2023
d85e04f
small cleanup
sh-rp Oct 2, 2023
1d7be25
Merge branch 'devel' into d#/data_contracts
rudolfix Oct 10, 2023
96785c4
Merge branch 'devel' into d#/data_contracts
sh-rp Oct 17, 2023
302d909
post merge code updates
sh-rp Oct 17, 2023
4a1fab0
small fixes
sh-rp Oct 17, 2023
903f000
some cleanup
sh-rp Oct 18, 2023
912dd8b
update docs
sh-rp Oct 18, 2023
ef1b10f
Merge branch 'devel' into d#/data_contracts
rudolfix Oct 29, 2023
256920e
Merge branch 'devel' into d#/data_contracts
rudolfix Nov 1, 2023
168f0da
makes bumping version optional in Schema, preserves hashes on replace…
rudolfix Nov 1, 2023
760cc43
extracts on single pipeline schema
rudolfix Nov 1, 2023
b645927
allows to control relational normalizer descend with send
rudolfix Nov 12, 2023
8989a75
refactors data contract apply to generate filters instead of actual f…
rudolfix Nov 12, 2023
57842cc
detects if bytes string possibly contains pue characters
rudolfix Nov 12, 2023
441299f
applies schema contracts in item normalizer, uses binary stream, dete…
rudolfix Nov 12, 2023
fc0eb47
methods to remove and rename arrow columns, need arrow 12+
rudolfix Nov 12, 2023
9977227
implements contracts in extract, fixes issues in apply hints, arrow d…
rudolfix Nov 12, 2023
d74242a
always uses pipeline schema when extracting
rudolfix Nov 12, 2023
e9344ee
returns new items count from buffered write
rudolfix Nov 12, 2023
e980396
bumps pyarrow to 12, temporary removes snowflake extra
rudolfix Nov 12, 2023
5e2d131
Merge branch 'devel' into d#/data_contracts
rudolfix Nov 12, 2023
3dc4fa5
fixes arrow imports and normalizer config
rudolfix Nov 13, 2023
c76788a
fixes normalizer config tests and pipeline state serialization
rudolfix Nov 13, 2023
423a163
normalizes arrow tables before saving
rudolfix Nov 13, 2023
c1c32d6
adds validation and model synth for contracts to pydantic helper
rudolfix Nov 16, 2023
c24b643
splits extractor into files, improves pydantic validator
rudolfix Nov 16, 2023
2b97a4f
runs tests on ci with minimal dependencies
rudolfix Nov 16, 2023
c35ec2d
fixes deps in ci workflows
rudolfix Nov 16, 2023
340ed3d
re-adds snowflake connector
rudolfix Nov 16, 2023
35c10b7
Merge branch 'devel' into d#/data_contracts
rudolfix Nov 18, 2023
2d260e8
updates pydantic helper
rudolfix Nov 19, 2023
aa990f5
improves contract violation exception
rudolfix Nov 19, 2023
a6a782b
splits source and resource in extract, adds more tests
rudolfix Nov 19, 2023
f4d2cac
temp disable pydantic 1 tests
rudolfix Nov 19, 2023
f556584
fixes generic type parametrization on 3.8
rudolfix Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dlt/common/schema/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ def __init__(self, schema_name: str, init_engine: int, from_engine: int, to_engi
self.from_engine = from_engine
self.to_engine = to_engine
super().__init__(f"No engine upgrade path in schema {schema_name} from {init_engine} to {to_engine}, stopped at {from_engine}")


class SchemaFrozenException(SchemaException):
def __init__(self, msg: str) -> None:
super().__init__(msg)
40 changes: 37 additions & 3 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import yaml
from copy import copy, deepcopy
from typing import ClassVar, Dict, List, Mapping, Optional, Sequence, Tuple, Any, cast
from typing import ClassVar, Dict, List, Mapping, Optional, Sequence, Tuple, Any, cast, Literal
from dlt.common import json

from dlt.common.utils import extend_list_deduplicated
Expand All @@ -11,10 +11,13 @@
from dlt.common.schema import utils
from dlt.common.data_types import py_type_to_sc_type, coerce_value, TDataType
from dlt.common.schema.typing import (COLUMN_HINTS, SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, VERSION_TABLE_NAME, TColumnSchemaBase, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema,
TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TColumnHint, TTypeDetections, TWriteDisposition)
TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TColumnHint, TTypeDetections, TWriteDisposition, TSchemaUpdateMode)
from dlt.common.schema.exceptions import (CannotCoerceColumnException, CannotCoerceNullException, InvalidSchemaName,
ParentTableNotFoundException, SchemaCorruptedException)
from dlt.common.validation import validate_dict
from dlt.common.schema.exceptions import SchemaFrozenException




class Schema:
Expand Down Expand Up @@ -174,7 +177,32 @@ def coerce_row(self, table_name: str, parent_table: str, row: StrAny) -> Tuple[D
updated_table_partial["columns"][new_col_name] = new_col_def

return new_row, updated_table_partial


def check_schema_update(self, table_name: str, row: DictStrAny, partial_table: TPartialTableSchema, schema_update_mode: TSchemaUpdateMode) -> Tuple[DictStrAny, TPartialTableSchema]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a solid implementation. but I'd like to describe it in the ticket (we can resue it for the docs) and maybe modify the requirements.

"""Checks if schema update mode allows for the requested changes, filter row or reject update, depending on the mode"""
has_columns = self.has_data_columns
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
# if there is a schema update and we froze schema and filter additional data, clean up
if has_columns and partial_table and schema_update_mode == "freeze-and-trim":
# do not create new tables
if table_name not in self.tables or not len(self.tables[table_name].get("columns", {})):
return None, None
# pop unknown values
for item in list(row.keys()):
if item not in self.tables[table_name]["columns"]:
row.pop(item)
return row, None

# if there is a schema update and we froze schema and discard additional rows, do nothing
elif has_columns and partial_table and schema_update_mode == "freeze-and-discard":
return None, None

# if there is a schema update and we disallow any data not fitting the schema, raise!
elif has_columns and partial_table and schema_update_mode == "freeze-and-raise":
raise SchemaFrozenException(f"Trying to modify table {table_name} but schema is frozen.")

return row, partial_table


def update_schema(self, partial_table: TPartialTableSchema) -> TPartialTableSchema:
table_name = partial_table["name"]
parent_table_name = partial_table.get("parent")
Expand Down Expand Up @@ -324,6 +352,12 @@ def tables(self) -> TSchemaTables:
def settings(self) -> TSchemaSettings:
return self._settings

@property
def has_data_columns(self) -> bool:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
for table in self.data_tables():
return bool(table.get("columns", None))
return False

def to_pretty_json(self, remove_defaults: bool = True) -> str:
d = self.to_dict(remove_defaults=remove_defaults)
return json.dumps(d, pretty=True)
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,5 @@ class TStoredSchema(TypedDict, total=False):
settings: Optional[TSchemaSettings]
tables: TSchemaTables
normalizers: TNormalizersConfig

TSchemaUpdateMode = Literal["evolve", "freeze-and-trim", "freeze-and-raise", "freeze-and-discard"]
5 changes: 3 additions & 2 deletions dlt/normalize/configuration.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal

from dlt.common.configuration import configspec
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.runners.configuration import PoolRunnerConfiguration, TPoolType
from dlt.common.storages import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration

from dlt.common.schema.typing import TSchemaUpdateMode

@configspec
class NormalizeConfiguration(PoolRunnerConfiguration):
pool_type: TPoolType = "process"
destination_capabilities: DestinationCapabilitiesContext = None # injectable
schema_update_mode: TSchemaUpdateMode = "evolve"
_schema_storage_config: SchemaStorageConfiguration
_normalize_storage_config: NormalizeStorageConfiguration
_load_storage_config: LoadStorageConfiguration
Expand Down
5 changes: 5 additions & 0 deletions dlt/normalize/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dlt.common.exceptions import DltException

class NormalizeException(DltException):
def __init__(self, msg: str) -> None:
super().__init__(msg)
66 changes: 37 additions & 29 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ def load_or_create_schema(schema_storage: SchemaStorage, schema_name: str) -> Sc

@staticmethod
def w_normalize_files(
normalize_config: NormalizeConfiguration,
normalize_storage_config: NormalizeStorageConfiguration,
loader_storage_config: LoadStorageConfiguration,
destination_caps: DestinationCapabilitiesContext,
stored_schema: TStoredSchema,
load_id: str,
extracted_items_files: Sequence[str],
) -> TWorkerRV:

schema_updates: List[TSchemaUpdate] = []
total_items = 0
row_counts: TRowCount = {}
Expand All @@ -98,7 +98,7 @@ def w_normalize_files(
items_count = 0
for line_no, line in enumerate(f):
items: List[TDataItem] = json.loads(line)
partial_update, items_count, r_counts = Normalize._w_normalize_chunk(load_storage, schema, load_id, root_table_name, items)
partial_update, items_count, r_counts = Normalize._w_normalize_chunk(normalize_config, load_storage, schema, load_id, root_table_name, items)
schema_updates.append(partial_update)
total_items += items_count
merge_row_count(row_counts, r_counts)
Expand Down Expand Up @@ -127,7 +127,7 @@ def w_normalize_files(
return schema_updates, total_items, load_storage.closed_files(), row_counts

@staticmethod
def _w_normalize_chunk(load_storage: LoadStorage, schema: Schema, load_id: str, root_table_name: str, items: List[TDataItem]) -> Tuple[TSchemaUpdate, int, TRowCount]:
def _w_normalize_chunk(config: NormalizeConfiguration, load_storage: LoadStorage, schema: Schema, load_id: str, root_table_name: str, items: List[TDataItem]) -> Tuple[TSchemaUpdate, int, TRowCount]:
column_schemas: Dict[str, TTableSchemaColumns] = {} # quick access to column schema for writers below
schema_update: TSchemaUpdate = {}
schema_name = schema.name
Expand All @@ -139,31 +139,38 @@ def _w_normalize_chunk(load_storage: LoadStorage, schema: Schema, load_id: str,
# filter row, may eliminate some or all fields
row = schema.filter_row(table_name, row)
# do not process empty rows
if row:
# decode pua types
for k, v in row.items():
row[k] = custom_pua_decode(v) # type: ignore
# coerce row of values into schema table, generating partial table with new columns if any
row, partial_table = schema.coerce_row(table_name, parent_table, row)
# theres a new table or new columns in existing table
if partial_table:
# update schema and save the change
schema.update_schema(partial_table)
table_updates = schema_update.setdefault(table_name, [])
table_updates.append(partial_table)
# update our columns
column_schemas[table_name] = schema.get_table_columns(table_name)
# get current columns schema
columns = column_schemas.get(table_name)
if not columns:
columns = schema.get_table_columns(table_name)
column_schemas[table_name] = columns
# store row
# TODO: it is possible to write to single file from many processes using this: https://gitlab.com/warsaw/flufl.lock
load_storage.write_data_item(load_id, schema_name, table_name, row, columns)
# count total items
items_count += 1
increase_row_count(row_counts, table_name, 1)
if not row:
continue
# decode pua types
for k, v in row.items():
row[k] = custom_pua_decode(v) # type: ignore
# coerce row of values into schema table, generating partial table with new columns if any
row, partial_table = schema.coerce_row(table_name, parent_table, row)
# check update
row, partial_table = schema.check_schema_update(table_name, row, partial_table, config.schema_update_mode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't call it if partial table is None! this is 99.99% of cases and this code is time critical


if not row:
continue

# theres a new table or new columns in existing table
if partial_table:
# update schema and save the change
schema.update_schema(partial_table)
table_updates = schema_update.setdefault(table_name, [])
table_updates.append(partial_table)
# update our columns
column_schemas[table_name] = schema.get_table_columns(table_name)
# get current columns schema
columns = column_schemas.get(table_name)
if not columns:
columns = schema.get_table_columns(table_name)
column_schemas[table_name] = columns
# store row
# TODO: it is possible to write to single file from many processes using this: https://gitlab.com/warsaw/flufl.lock
load_storage.write_data_item(load_id, schema_name, table_name, row, columns)
# count total items
items_count += 1
increase_row_count(row_counts, table_name, 1)
signals.raise_if_signalled()
return schema_update, items_count, row_counts

Expand Down Expand Up @@ -196,7 +203,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TM
workers = self.pool._processes # type: ignore
chunk_files = self.group_worker_files(files, workers)
schema_dict: TStoredSchema = schema.to_dict()
config_tuple = (self.normalize_storage.config, self.load_storage.config, self.config.destination_capabilities, schema_dict)
config_tuple = (self.config, self.normalize_storage.config, self.load_storage.config, self.config.destination_capabilities, schema_dict)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just FYI: this goes through process boundary and will be pickled. pay attention

param_chunk = [[*config_tuple, load_id, files] for files in chunk_files]
tasks: List[Tuple[AsyncResult[TWorkerRV], List[Any]]] = []
row_counts: TRowCount = {}
Expand Down Expand Up @@ -249,6 +256,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TM

def map_single(self, schema: Schema, load_id: str, files: Sequence[str]) -> TMapFuncRV:
result = Normalize.w_normalize_files(
self.config,
self.normalize_storage.config,
self.load_storage.config,
self.config.destination_capabilities,
Expand Down
76 changes: 76 additions & 0 deletions tests/load/test_freeze_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import dlt, os, pytest
from dlt.common.utils import uniq_id

from tests.load.pipeline.utils import load_table_counts
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration
from dlt.pipeline.exceptions import PipelineStepFailed
from dlt.common.schema.exceptions import SchemaFrozenException
from dlt.common.schema import utils

SCHEMA_UPDATE_MODES = ["evolve", "freeze-and-trim", "freeze-and-raise", "freeze-and-discard"]

@pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]), ids=lambda x: x.name)
@pytest.mark.parametrize("update_mode", SCHEMA_UPDATE_MODES)
def test_freeze_schema(update_mode: str, destination_config: DestinationTestConfiguration) -> None:

# freeze pipeline, drop additional values
# this will allow for the first run to create the schema, but will not accept further updates after that
os.environ['NORMALIZE__SCHEMA_UPDATE_MODE'] = update_mode
pipeline = destination_config.setup_pipeline("test_freeze_schema_2", dataset_name="freeze" + uniq_id())

@dlt.resource(name="items", write_disposition="append")
def load_items():
global offset
for _, index in enumerate(range(0, 10), 1):
yield {
"id": index,
"name": f"item {index}"
}

@dlt.resource(name="items", write_disposition="append")
def load_items_with_subitems():
global offset
for _, index in enumerate(range(0, 10), 1):
yield {
"id": index,
"name": f"item {index}",
"new_attribute": "hello",
"sub_items": [{
"id": index + 1000,
"name": f"sub item {index + 1000}"
},{
"id": index + 2000,
"name": f"sub item {index + 2000}"
}]
}

pipeline.run([load_items], loader_file_format=destination_config.file_format)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
# check data
assert table_counts["items"] == 10
schema_hash = utils.generate_version_hash(pipeline.default_schema.to_dict())

# on freeze and raise we expect an exception
if update_mode == "freeze-and-raise":
with pytest.raises(PipelineStepFailed) as py_ex:
pipeline.run([load_items_with_subitems], loader_file_format=destination_config.file_format)
assert isinstance(py_ex.value.__context__, SchemaFrozenException)
else:
pipeline.run([load_items_with_subitems], loader_file_format=destination_config.file_format)

# check data
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20 if update_mode not in ["freeze-and-raise", "freeze-and-discard"] else 10

# frozen schemas should not have changed
if update_mode != "evolve":
assert schema_hash == utils.generate_version_hash(pipeline.default_schema.to_dict())
assert "items__sub_items" not in table_counts
# schema was not migrated to contain new attribute
assert "new_attribute" not in pipeline.default_schema.tables["items"]["columns"]
# regular mode evolves the schema
else:
assert table_counts["items__sub_items"] == 20
# schema was not migrated to contain new attribute
assert "new_attribute" in pipeline.default_schema.tables["items"]["columns"]

Loading