Skip to content

Commit

Permalink
deterministic table name collision resolution for normalization (#2206)
Browse files Browse the repository at this point in the history
* deterministic collision handling for table names

* remove debugging print statement

* fmt

* fix flake check

* fix

* fix

* fix usage

* respond to more feedback

* fix everything except truncation

* fix everything but expected values

* add test for just table name middle truncation

* handle inconsistent suffixes

* update tests

* fmt

* refactor (again)

* fix

* update comments

* remove formatting

* use full path

* remove logging

* remove print statements
  • Loading branch information
jrhizor committed Mar 1, 2021
1 parent 9f5b21d commit fa505c7
Show file tree
Hide file tree
Showing 20 changed files with 22,716 additions and 18,411 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ class CatalogProcessor:
This is relying on a StreamProcessor to handle the conversion of a stream to a table one at a time.
"""

def __init__(self, output_directory: str, integration_type: DestinationType):
def __init__(self, output_directory: str, destination_type: DestinationType):
"""
@param output_directory is the path to the directory where this processor should write the resulting SQL files (DBT models)
@param integration_type is the destination type of warehouse
@param destination_type is the destination type of warehouse
"""
self.output_directory: str = output_directory
self.integration_type: DestinationType = integration_type
self.name_transformer: DestinationNameTransformer = DestinationNameTransformer(integration_type)
self.destination_type: DestinationType = destination_type
self.name_transformer: DestinationNameTransformer = DestinationNameTransformer(destination_type)

def process(self, catalog_file: str, json_column_name: str, target_schema: str):
"""
Expand All @@ -74,7 +74,7 @@ def process(self, catalog_file: str, json_column_name: str, target_schema: str):
json_column_name=json_column_name,
target_schema=target_schema,
name_transformer=self.name_transformer,
destination_type=self.integration_type,
destination_type=self.destination_type,
tables_registry=tables_registry,
):
# Check properties
Expand Down Expand Up @@ -121,7 +121,7 @@ def build_stream_processor(

stream_processor = StreamProcessor.create(
stream_name=stream_name,
integration_type=destination_type,
destination_type=destination_type,
raw_schema=raw_schema_name,
schema=schema_name,
json_column_name=f"'{json_column_name}'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@ class DestinationNameTransformer:
- column
"""

def __init__(self, integration_type: DestinationType):
def __init__(self, destination_type: DestinationType):
"""
@param integration_type is the destination type of warehouse
@param destination_type is the destination type of warehouse
"""
self.integration_type: DestinationType = integration_type
self.destination_type: DestinationType = destination_type

# Public methods

def needs_quotes(self, input_name: str) -> bool:
"""
@param input_name to test if it needs to manipulated with quotes or not
"""
if is_reserved_keyword(input_name, self.integration_type):
if is_reserved_keyword(input_name, self.destination_type):
return True
if self.integration_type.value == DestinationType.BIGQUERY.value:
if self.destination_type.value == DestinationType.BIGQUERY.value:
return False
doesnt_start_with_alphaunderscore = match("[^A-Za-z_]", input_name[0]) is not None
contains_non_alphanumeric = match(".*[^A-Za-z0-9_].*", input_name) is not None
Expand Down Expand Up @@ -100,21 +100,46 @@ def normalize_column_name(self, column_name: str, in_jinja: bool = False, trunca
"""
return self.__normalize_identifier_name(column_name=column_name, in_jinja=in_jinja, truncate=truncate)

def truncate_identifier_name(self, input_name: str, custom_limit: int = -1) -> str:
"""
@param input_name is the identifier name to middle truncate
@param custom_limit uses a custom length as the max instead of the destination max length
"""
limit = custom_limit if custom_limit > 0 else self.get_name_max_length()

if limit < len(input_name):
middle = round(limit / 2)
# truncate in the middle to preserve prefix/suffix instead
prefix = input_name[: limit - middle - 1]
suffix = input_name[1 - middle :]
# Add extra characters '__', signaling a truncate in identifier
print(f"Truncating {input_name} (#{len(input_name)}) to {prefix}__{suffix} (#{2 + len(prefix) + len(suffix)})")
input_name = f"{prefix}__{suffix}"

return input_name

def get_name_max_length(self):
if self.destination_type.value in DESTINATION_SIZE_LIMITS:
destination_limit = DESTINATION_SIZE_LIMITS[self.destination_type.value]
return destination_limit - TRUNCATE_DBT_RESERVED_SIZE - TRUNCATE_RESERVED_SIZE
else:
raise KeyError(f"Unknown destination type {self.destination_type}")

# Private methods

def __normalize_non_column_identifier_name(self, input_name: str, in_jinja: bool = False, truncate: bool = True) -> str:
# We force standard naming for non column names (see issue #1785)
result = transform_standard_naming(input_name)
result = self.__normalize_naming_conventions(result)
if truncate:
result = self.__truncate_identifier_name(result)
result = self.truncate_identifier_name(result)
result = self.__normalize_identifier_case(result, is_quoted=False)
return result

def __normalize_identifier_name(self, column_name: str, in_jinja: bool = False, truncate: bool = True) -> str:
result = self.__normalize_naming_conventions(column_name)
if truncate:
result = self.__truncate_identifier_name(result)
result = self.truncate_identifier_name(result)
if self.needs_quotes(result):
result = result.replace('"', '""')
result = result.replace("'", "\\'")
Expand All @@ -132,44 +157,28 @@ def __normalize_identifier_name(self, column_name: str, in_jinja: bool = False,

def __normalize_naming_conventions(self, input_name: str) -> str:
result = input_name
if self.integration_type.value == DestinationType.BIGQUERY.value:
if self.destination_type.value == DestinationType.BIGQUERY.value:
result = transform_standard_naming(result)
doesnt_start_with_alphaunderscore = match("[^A-Za-z_]", result[0]) is not None
if doesnt_start_with_alphaunderscore:
result = f"_{result}"
return result

def __truncate_identifier_name(self, input_name: str) -> str:
if self.integration_type.value in DESTINATION_SIZE_LIMITS:
destination_limit = DESTINATION_SIZE_LIMITS[self.integration_type.value]
limit = destination_limit - TRUNCATE_DBT_RESERVED_SIZE - TRUNCATE_RESERVED_SIZE
if limit < len(input_name):
middle = round(limit / 2)
# truncate in the middle to preserve prefix/suffix instead
prefix = input_name[: limit - middle - 1]
suffix = input_name[1 - middle :]
# Add extra characters '__', signaling a truncate in identifier
print(f"Truncating {input_name} (#{len(input_name)}) to {prefix}__{suffix} (#{2 + len(prefix) + len(suffix)})")
input_name = f"{prefix}__{suffix}"
else:
raise KeyError(f"Unknown integration type {self.integration_type}")
return input_name

def __normalize_identifier_case(self, input_name: str, is_quoted: bool = False) -> str:
result = input_name
if self.integration_type.value == DestinationType.BIGQUERY.value:
if self.destination_type.value == DestinationType.BIGQUERY.value:
pass
elif self.integration_type.value == DestinationType.REDSHIFT.value:
elif self.destination_type.value == DestinationType.REDSHIFT.value:
# all tables (even quoted ones) are coerced to lowercase.
result = input_name.lower()
elif self.integration_type.value == DestinationType.POSTGRES.value:
elif self.destination_type.value == DestinationType.POSTGRES.value:
if not is_quoted and not self.needs_quotes(input_name):
result = input_name.lower()
elif self.integration_type.value == DestinationType.SNOWFLAKE.value:
elif self.destination_type.value == DestinationType.SNOWFLAKE.value:
if not is_quoted and not self.needs_quotes(input_name):
result = input_name.upper()
else:
raise KeyError(f"Unknown integration type {self.integration_type}")
raise KeyError(f"Unknown destination type {self.destination_type}")
return result


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
SOFTWARE.
"""

import hashlib
import os
from typing import Dict, List, Optional, Set, Tuple

Expand All @@ -41,15 +42,18 @@
jinja_call,
)

# minimum length of parent name used for nested streams
MINIMUM_PARENT_LENGTH = 10


class StreamProcessor(object):
"""
Takes as input an Airbyte Stream as described in the (configured) Airbyte Catalog (stored as Json Schema).
Takes as input an Airbyte Stream as described in the (configured) Airbyte Catalog's Json Schema.
Associated input raw data is expected to be stored in a staging area table.
This processor generates SQL models to transform such a stream into a final table in the destination schema.
This is done by generating a DBT pipeline of transformations (multiple SQL models queries) that may be materialized
in the intermediate schema "raw_schema" (changing the dbt_project.yml settings.
in the intermediate schema "raw_schema" (changing the dbt_project.yml settings).
The final output data should be written in "schema".
The pipeline includes transformations such as:
Expand All @@ -66,7 +70,7 @@ class StreamProcessor(object):
def __init__(
self,
stream_name: str,
integration_type: DestinationType,
destination_type: DestinationType,
raw_schema: str,
schema: str,
json_column_name: str,
Expand All @@ -78,15 +82,15 @@ def __init__(
See StreamProcessor.create()
"""
self.stream_name: str = stream_name
self.integration_type: DestinationType = integration_type
self.destination_type: DestinationType = destination_type
self.raw_schema: str = raw_schema
self.schema: str = schema
self.json_column_name: str = json_column_name
self.properties: Dict = properties
self.tables_registry: Set[str] = tables_registry
self.from_table: str = from_table

self.name_transformer: DestinationNameTransformer = DestinationNameTransformer(integration_type)
self.name_transformer: DestinationNameTransformer = DestinationNameTransformer(destination_type)
self.json_path: List[str] = [stream_name]
self.final_table_name: str = ""
self.sql_outputs: Dict[str, str] = {}
Expand All @@ -112,7 +116,7 @@ def create_from_parent(
"""
result = StreamProcessor.create(
stream_name=child_name,
integration_type=parent.integration_type,
destination_type=parent.destination_type,
raw_schema=parent.raw_schema,
schema=parent.schema,
json_column_name=json_column_name,
Expand All @@ -128,7 +132,7 @@ def create_from_parent(
@staticmethod
def create(
stream_name: str,
integration_type: DestinationType,
destination_type: DestinationType,
raw_schema: str,
schema: str,
json_column_name: str,
Expand All @@ -139,7 +143,7 @@ def create(
"""
@param stream_name of the stream being processed
@param integration_type is the destination type of warehouse
@param destination_type is the destination type of warehouse
@param raw_schema is the name of the staging intermediate schema where to create internal tables/views
@param schema is the name of the schema where to store the final tables where to store the transformed data
Expand All @@ -151,7 +155,7 @@ def create(
"""
return StreamProcessor(
stream_name,
integration_type,
destination_type,
raw_schema,
schema,
json_column_name,
Expand Down Expand Up @@ -339,13 +343,13 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column:
return f"cast({column_name} as {sql_type}) as {column_name}"

def cast_property_type_as_array(self, property_name: str, column_name: str) -> str:
if self.integration_type.value == DestinationType.BIGQUERY.value:
if self.destination_type.value == DestinationType.BIGQUERY.value:
# TODO build a struct/record type from properties JSON schema
pass
return column_name

def cast_property_type_as_object(self, property_name: str, column_name: str) -> str:
if self.integration_type.value == DestinationType.BIGQUERY.value:
if self.destination_type.value == DestinationType.BIGQUERY.value:
# TODO build a struct/record type from properties JSON schema
pass
return jinja_call("type_json()")
Expand Down Expand Up @@ -454,7 +458,7 @@ def generate_new_table_name(self, is_intermediate: bool, suffix: str) -> str:
Generates a new table names that is not registered in the schema yet (based on normalized_stream_name())
"""
tables_registry = self.tables_registry.union(self.local_registry)
new_table_name = table_name = self.normalized_stream_name()
new_table_name = self.normalized_stream_name()
if not is_intermediate and self.parent is None:
# Top-level stream has priority on table_names
if new_table_name in tables_registry:
Expand All @@ -463,18 +467,10 @@ def generate_new_table_name(self, is_intermediate: bool, suffix: str) -> str:
# see alias in dbt: https://docs.getdbt.com/docs/building-a-dbt-project/building-models/using-custom-aliases/
pass
pass
elif not self.parent:
new_table_name = get_table_name(self.name_transformer, "", new_table_name, suffix, self.json_path)
else:
if suffix:
new_table_name = self.name_transformer.normalize_table_name(f"{table_name}_{suffix}")
if new_table_name in tables_registry:
# TODO handle collisions deterministically between intermediate tables and children
for i in range(1, 1000):
if suffix:
new_table_name = self.name_transformer.normalize_table_name(f"{table_name}_{i}_{suffix}")
else:
new_table_name = self.name_transformer.normalize_table_name(f"{table_name}_{i}")
if new_table_name not in tables_registry:
break
new_table_name = get_table_name(self.name_transformer, "_".join(self.json_path[:-1]), new_table_name, suffix, self.json_path)
if not is_intermediate:
self.final_table_name = new_table_name
return new_table_name
Expand Down Expand Up @@ -595,3 +591,35 @@ def find_properties_object(path: List[str], field: str, properties) -> Dict[str,
if child:
result.update(child)
return result


def hash_json_path(json_path: List[str]) -> str:
lineage = "&airbyte&".join(json_path)
h = hashlib.sha1()
h.update(lineage.encode("utf-8"))
return h.hexdigest()[:3]


def get_table_name(name_transformer: DestinationNameTransformer, parent: str, child: str, suffix: str, json_path: List[str]) -> str:
max_length = name_transformer.get_name_max_length() - 2 # less two for the underscores
json_path_hash = hash_json_path(json_path)
norm_suffix = suffix if not suffix or suffix.startswith("_") else f"_{suffix}"
norm_parent = parent if not parent else name_transformer.normalize_table_name(parent, False, False)
norm_child = name_transformer.normalize_table_name(child, False, False)
min_parent_length = min(MINIMUM_PARENT_LENGTH, len(norm_parent))

# no parent
if not parent:
return name_transformer.truncate_identifier_name(f"{norm_child}{norm_suffix}")
# if everything fits without truncation, don't truncate anything
elif (len(norm_parent) + len(norm_child) + len(json_path_hash) + len(norm_suffix)) < max_length:
return f"{norm_parent}_{json_path_hash}_{norm_child}{norm_suffix}"
# if everything fits except for the parent, just truncate the parent
elif (len(norm_child) + len(json_path_hash) + len(norm_suffix)) < (max_length - min_parent_length):
max_parent_length = max_length - len(norm_child) - len(json_path_hash) - len(norm_suffix)
return f"{norm_parent[:max_parent_length]}_{json_path_hash}_{norm_child}{norm_suffix}"
# otherwise first truncate parent to the minimum length and middle truncate the child
else:
norm_child_max_length = max_length - min_parent_length - len(json_path_hash) - len(norm_suffix)
trunc_norm_child = name_transformer.truncate_identifier_name(norm_child, norm_child_max_length)
return f"{norm_parent[:min_parent_length]}_{json_path_hash}_{trunc_norm_child}{norm_suffix}"
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ def parse(self, args) -> None:
}

def process_catalog(self) -> None:
integration_type = DestinationType.from_string(self.config["integration_type"])
destination_type = DestinationType.from_string(self.config["integration_type"])
schema = self.config["schema"]
output = self.config["output_path"]
json_col = self.config["json_column"]
processor = CatalogProcessor(output_directory=output, integration_type=integration_type)
processor = CatalogProcessor(output_directory=output, destination_type=destination_type)
for catalog_file in self.config["catalog"]:
print(f"Processing {catalog_file}...")
processor.process(catalog_file=catalog_file, json_column_name=json_col, target_schema=schema)
Expand Down
Loading

0 comments on commit fa505c7

Please sign in to comment.