Skip to content

Commit

Permalink
Add unit tests for normalization. #2048 (#2211)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopheDuong committed Feb 26, 2021
1 parent 14acd25 commit 79ce443
Show file tree
Hide file tree
Showing 14 changed files with 36,037 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,46 @@ def process(self, catalog_file: str, json_column_name: str, target_schema: str):
catalog = read_json(catalog_file)
# print(json.dumps(catalog, separators=(",", ":")))
substreams = []
for stream_processor in self.build_stream_processor(
catalog=catalog,
json_column_name=json_column_name,
target_schema=target_schema,
name_transformer=self.name_transformer,
destination_type=self.integration_type,
tables_registry=tables_registry,
):
# Check properties
if not stream_processor.properties:
raise EOFError("Invalid Catalog: Unexpected empty properties in catalog")

raw_table_name = self.name_transformer.normalize_table_name(f"_airbyte_raw_{stream_processor.stream_name}", truncate=False)
add_table_to_sources(schema_to_source_tables, stream_processor.schema, raw_table_name)

nested_processors = stream_processor.process()
add_table_to_registry(tables_registry, stream_processor)
if nested_processors and len(nested_processors) > 0:
substreams += nested_processors
for file in stream_processor.sql_outputs:
output_sql_file(os.path.join(self.output_directory, file), stream_processor.sql_outputs[file])
self.write_yaml_sources_file(schema_to_source_tables)
self.process_substreams(substreams, tables_registry)

@staticmethod
def build_stream_processor(
catalog: Dict,
json_column_name: str,
target_schema: str,
name_transformer: DestinationNameTransformer,
destination_type: DestinationType,
tables_registry: Set[str],
) -> List[StreamProcessor]:
result = []
for configured_stream in get_field(catalog, "streams", "Invalid Catalog: 'streams' is not defined in Catalog"):
stream_config = get_field(configured_stream, "stream", "Invalid Stream: 'stream' is not defined in Catalog streams")
schema_name = self.name_transformer.normalize_schema_name(target_schema)
raw_schema_name = self.name_transformer.normalize_schema_name(f"_airbyte_{target_schema}", truncate=False)
schema_name = name_transformer.normalize_schema_name(target_schema)
raw_schema_name = name_transformer.normalize_schema_name(f"_airbyte_{target_schema}", truncate=False)
stream_name = get_field(stream_config, "name", f"Invalid Stream: 'name' is not defined in stream: {str(stream_config)}")
raw_table_name = self.name_transformer.normalize_table_name(f"_airbyte_raw_{stream_name}", truncate=False)
raw_table_name = name_transformer.normalize_table_name(f"_airbyte_raw_{stream_name}", truncate=False)

message = f"'json_schema'.'properties' are not defined for stream {stream_name}"
properties = get_field(get_field(stream_config, "json_schema", message), "properties", message)
Expand All @@ -85,26 +119,18 @@ def process(self, catalog_file: str, json_column_name: str, target_schema: str):
if not properties:
raise EOFError("Invalid Catalog: Unexpected empty properties in catalog")

add_table_to_sources(schema_to_source_tables, schema_name, raw_table_name)

stream_processor = StreamProcessor.create(
stream_name=stream_name,
integration_type=self.integration_type,
integration_type=destination_type,
raw_schema=raw_schema_name,
schema=schema_name,
json_column_name=f"'{json_column_name}'",
properties=properties,
tables_registry=tables_registry,
from_table=from_table,
)
nested_processors = stream_processor.process()
add_table_to_registry(tables_registry, stream_processor)
if nested_processors and len(nested_processors) > 0:
substreams += nested_processors
for file in stream_processor.sql_outputs:
output_sql_file(os.path.join(self.output_directory, file), stream_processor.sql_outputs[file])
self.write_yaml_sources_file(schema_to_source_tables)
self.process_substreams(substreams, tables_registry)
result.append(stream_processor)
return result

def process_substreams(self, substreams: List[StreamProcessor], tables_registry: Set[str]):
"""
Expand Down
Loading

0 comments on commit 79ce443

Please sign in to comment.