Skip to content

Commit

Permalink
fixes column order and add hints table variants (#1127)
Browse files Browse the repository at this point in the history
* places incomplete columns at the end when inferring from data

* stores per table hints in resources, allows to compute them via item metas + tests

* fixes slots class and test
  • Loading branch information
rudolfix committed Mar 24, 2024
1 parent dbb8b1e commit c975ef4
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 99 deletions.
6 changes: 3 additions & 3 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ def update_table(self, partial_table: TPartialTableSchema) -> TPartialTableSchem
self._schema_tables[table_name] = partial_table
else:
# merge tables performing additional checks
partial_table = utils.merge_tables(table, partial_table)
partial_table = utils.merge_table(table, partial_table)

self.data_item_normalizer.extend_table(table_name)
return partial_table
Expand Down Expand Up @@ -515,7 +515,7 @@ def normalize_table_identifiers(self, table: TTableSchema) -> TTableSchema:
# re-index columns as the name changed, if name space was reduced then
# some columns now clash with each other. so make sure that we merge columns that are already there
if new_col_name in new_columns:
new_columns[new_col_name] = utils.merge_columns(
new_columns[new_col_name] = utils.merge_column(
new_columns[new_col_name], c, merge_defaults=False
)
else:
Expand Down Expand Up @@ -782,7 +782,7 @@ def _coerce_non_null_value(
# if there's incomplete new_column then merge it with inferred column
if new_column:
# use all values present in incomplete column to override inferred column - also the defaults
new_column = utils.merge_columns(inferred_column, new_column)
new_column = utils.merge_column(inferred_column, new_column)
else:
new_column = inferred_column

Expand Down
23 changes: 15 additions & 8 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def compare_complete_columns(a: TColumnSchema, b: TColumnSchema) -> bool:
return a["data_type"] == b["data_type"] and a["name"] == b["name"]


def merge_columns(
def merge_column(
col_a: TColumnSchema, col_b: TColumnSchema, merge_defaults: bool = True
) -> TColumnSchema:
"""Merges `col_b` into `col_a`. if `merge_defaults` is True, only hints from `col_b` that are not default in `col_a` will be set.
Expand All @@ -354,7 +354,7 @@ def merge_columns(
return col_a


def diff_tables(tab_a: TTableSchema, tab_b: TPartialTableSchema) -> TPartialTableSchema:
def diff_table(tab_a: TTableSchema, tab_b: TPartialTableSchema) -> TPartialTableSchema:
"""Creates a partial table that contains properties found in `tab_b` that are not present or different in `tab_a`.
The name is always present in returned partial.
It returns new columns (not present in tab_a) and merges columns from tab_b into tab_a (overriding non-default hint values).
Expand Down Expand Up @@ -389,7 +389,7 @@ def diff_tables(tab_a: TTableSchema, tab_b: TPartialTableSchema) -> TPartialTabl
None,
)
# all other properties can change
merged_column = merge_columns(copy(col_a), col_b)
merged_column = merge_column(copy(col_a), col_b)
if merged_column != col_a:
new_columns.append(merged_column)
else:
Expand Down Expand Up @@ -428,11 +428,12 @@ def diff_tables(tab_a: TTableSchema, tab_b: TPartialTableSchema) -> TPartialTabl
# return False


def merge_tables(table: TTableSchema, partial_table: TPartialTableSchema) -> TPartialTableSchema:
def merge_table(table: TTableSchema, partial_table: TPartialTableSchema) -> TPartialTableSchema:
"""Merges "partial_table" into "table". `table` is merged in place. Returns the diff partial table.
`table` and `partial_table` names must be identical. A table diff is generated and applied to `table`:
* new columns are added, updated columns are replaced from diff
* incomplete columns in `table` that got completed in `partial_table` are removed to preserve order
* table hints are added or replaced from diff
* nothing gets deleted
"""
Expand All @@ -441,14 +442,20 @@ def merge_tables(table: TTableSchema, partial_table: TPartialTableSchema) -> TPa
raise TablePropertiesConflictException(
table["name"], "name", table["name"], partial_table["name"]
)
diff_table = diff_tables(table, partial_table)
diff = diff_table(table, partial_table)
# remove incomplete columns in table that are complete in diff table
for col_name, column in diff["columns"].items():
if is_complete_column(column):
table_column = table["columns"].get(col_name)
if table_column and not is_complete_column(table_column):
table["columns"].pop(col_name)
# add new columns when all checks passed
table["columns"].update(diff_table["columns"])
table["columns"].update(diff["columns"])
updated_columns = table["columns"]
table.update(diff_table)
table.update(diff)
table["columns"] = updated_columns

return diff_table
return diff


def has_table_seen_data(table: TTableSchema) -> bool:
Expand Down
36 changes: 22 additions & 14 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No
if isinstance(meta, HintsMeta):
# update the resource with new hints, remove all caches so schema is recomputed
# and contracts re-applied
resource.merge_hints(meta.hints)
resource.merge_hints(meta.hints, meta.create_table_variant)
# convert to table meta if created table variant so item is assigned to this table
if meta.create_table_variant:
# name in hints meta must be a string, otherwise merge_hints would fail
meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type]
self._reset_contracts_cache()

if table_name := self._get_static_table_name(resource, meta):
# write item belonging to table with static name
self._write_to_static_table(resource, table_name, items)
self._write_to_static_table(resource, table_name, items, meta)
else:
# table has name or other hints depending on data items
self._write_to_dynamic_table(resource, items)
Expand Down Expand Up @@ -157,30 +161,32 @@ def _write_to_dynamic_table(self, resource: DltResource, items: TDataItems) -> N
if table_name in self._filtered_tables:
continue
if table_name not in self._table_contracts or resource._table_has_other_dynamic_hints:
item = self._compute_and_update_table(resource, table_name, item)
item = self._compute_and_update_table(
resource, table_name, item, TableNameMeta(table_name)
)
# write to storage with inferred table name
if table_name not in self._filtered_tables:
self._write_item(table_name, resource.name, item)

def _write_to_static_table(
self, resource: DltResource, table_name: str, items: TDataItems
self, resource: DltResource, table_name: str, items: TDataItems, meta: Any
) -> None:
if table_name not in self._table_contracts:
items = self._compute_and_update_table(resource, table_name, items)
items = self._compute_and_update_table(resource, table_name, items, meta)
if table_name not in self._filtered_tables:
self._write_item(table_name, resource.name, items)

def _compute_table(self, resource: DltResource, items: TDataItems) -> TTableSchema:
def _compute_table(self, resource: DltResource, items: TDataItems, meta: Any) -> TTableSchema:
"""Computes a schema for a new or dynamic table and normalizes identifiers"""
return self.schema.normalize_table_identifiers(resource.compute_table_schema(items))
return self.schema.normalize_table_identifiers(resource.compute_table_schema(items, meta))

def _compute_and_update_table(
self, resource: DltResource, table_name: str, items: TDataItems
self, resource: DltResource, table_name: str, items: TDataItems, meta: Any
) -> TDataItems:
"""
Computes new table and does contract checks, if false is returned, the table may not be created and no items should be written
"""
computed_table = self._compute_table(resource, items)
computed_table = self._compute_table(resource, items, meta)
# overwrite table name (if coming from meta)
computed_table["name"] = table_name
# get or compute contract
Expand All @@ -193,7 +199,7 @@ def _compute_and_update_table(
computed_table["x-normalizer"] = {"evolve-columns-once": True} # type: ignore[typeddict-unknown-key]
existing_table = self.schema._schema_tables.get(table_name, None)
if existing_table:
diff_table = utils.diff_tables(existing_table, computed_table)
diff_table = utils.diff_table(existing_table, computed_table)
else:
diff_table = computed_table

Expand Down Expand Up @@ -300,9 +306,11 @@ def _write_item(
]
super()._write_item(table_name, resource_name, items, columns)

def _compute_table(self, resource: DltResource, items: TDataItems) -> TPartialTableSchema:
def _compute_table(
self, resource: DltResource, items: TDataItems, meta: Any
) -> TPartialTableSchema:
items = items[0]
computed_table = super()._compute_table(resource, items)
computed_table = super()._compute_table(resource, items, Any)

# Merge the columns to include primary_key and other hints that may be set on the resource
arrow_table = copy(computed_table)
Expand All @@ -329,9 +337,9 @@ def _compute_table(self, resource: DltResource, items: TDataItems) -> TPartialTa
return arrow_table

def _compute_and_update_table(
self, resource: DltResource, table_name: str, items: TDataItems
self, resource: DltResource, table_name: str, items: TDataItems, meta: Any
) -> TDataItems:
items = super()._compute_and_update_table(resource, table_name, items)
items = super()._compute_and_update_table(resource, table_name, items, meta)
# filter data item as filters could be updated in compute table
items = [self._apply_contract_filters(item, resource, table_name) for item in items]
return items
Loading

0 comments on commit c975ef4

Please sign in to comment.