Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema contract #594

Merged
merged 85 commits into from
Nov 21, 2023
Merged

schema contract #594

merged 85 commits into from
Nov 21, 2023

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Aug 27, 2023

issue #135
TODO:

  • migrate schemas to new version
  • add support for source level schema evolution control
  • add support for pipeline level overrides
  • add more tests for mixed schema evolution modes

@netlify
Copy link

netlify bot commented Aug 27, 2023

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 340ed3d
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/6556a7a16f33c700080570e0

@sh-rp sh-rp marked this pull request as ready for review August 28, 2023 12:44
@sh-rp sh-rp force-pushed the d#/data_contracts branch 3 times, most recently from 0daed01 to 3bbb8b1 Compare August 30, 2023 17:44
# coerce row of values into schema table, generating partial table with new columns if any
row, partial_table = schema.coerce_row(table_name, parent_table, row)
# check update
row, partial_table = schema.check_schema_update(table_name, row, partial_table, config.schema_update_mode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't call it if partial table is None! this is 99.99% of cases and this code is time critical

@@ -196,7 +203,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TM
workers = self.pool._processes # type: ignore
chunk_files = self.group_worker_files(files, workers)
schema_dict: TStoredSchema = schema.to_dict()
config_tuple = (self.normalize_storage.config, self.load_storage.config, self.config.destination_capabilities, schema_dict)
config_tuple = (self.config, self.normalize_storage.config, self.load_storage.config, self.config.destination_capabilities, schema_dict)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just FYI: this goes through process boundary and will be pickled. pay attention

@@ -174,7 +177,32 @@ def coerce_row(self, table_name: str, parent_table: str, row: StrAny) -> Tuple[D
updated_table_partial["columns"][new_col_name] = new_col_def

return new_row, updated_table_partial


def check_schema_update(self, table_name: str, row: DictStrAny, partial_table: TPartialTableSchema, schema_update_mode: TSchemaUpdateMode) -> Tuple[DictStrAny, TPartialTableSchema]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a solid implementation. but I'd like to describe it in the ticket (we can resue it for the docs) and maybe modify the requirements.

t = extract_optional_type(t)

if is_literal_type(t):
# TODO: support for union types?
if pk == "schema_evolution_settings":
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a look at this..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - and that should be trivial

@sh-rp sh-rp changed the title [wip] basic schema freezing basic schema freezing Sep 6, 2023
@rudolfix
Copy link
Collaborator

@sh-rp changes from previous version

  • extract is working on just one schema that is the most recent schema present in the pipeline. it updates this schema as the changes come
  • instead of filtering data in apply contracts in schema, I generate filters that get executed in extract and normalize phase. that's something we could do with Pydantic in the future
  • Pydantic is now tightly integrated with contracts - models are synthesized according to the settings.
  • I've improved arrow support. contracts now apply to it as well

@rudolfix rudolfix merged commit 02bac98 into devel Nov 21, 2023
37 of 40 checks passed
@rudolfix rudolfix deleted the d#/data_contracts branch November 21, 2023 09:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants