Skip to content

Commit

Permalink
fix(ingest): presto-on-hive - not failing on Hive type parsing error (#…
Browse files Browse the repository at this point in the history
…6118)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
  • Loading branch information
treff7es and shirshanka committed Oct 5, 2022
1 parent 95afc1d commit 2f79b50
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 15 deletions.
18 changes: 12 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
Expand Up @@ -507,22 +507,28 @@ def to_mce_fields(


def avro_schema_to_mce_fields(
avro_schema_string: str, is_key_schema: bool = False, default_nullable: bool = False
avro_schema_string: str,
is_key_schema: bool = False,
default_nullable: bool = False,
swallow_exceptions: bool = True,
) -> List[SchemaField]:
"""
Converts an avro schema into schema fields compatible with MCE.
:param avro_schema_string: String representation of the AVRO schema.
:param is_key_schema: True if it is a key-schema. Default is False (value-schema).
:param swallow_exceptions: True if the caller wants exceptions to be suppressed
:return: The list of MCE compatible SchemaFields.
"""
schema_fields: List[SchemaField] = []

try:
schema_fields = list(
return list(
AvroToMceSchemaConverter.to_mce_fields(
avro_schema_string, is_key_schema, default_nullable
)
)
except Exception:
logger.exception(f"Failed to parse {avro_schema_string} to mce_fields.")

return schema_fields
if swallow_exceptions:
logger.exception(f"Failed to parse {avro_schema_string} into mce fields.")
return []
else:
raise
39 changes: 30 additions & 9 deletions metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py
@@ -1,10 +1,14 @@
import json
import logging
import re
import uuid
from typing import Any, Dict, List, Optional, Union

from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.metadata.schema_classes import NullTypeClass, SchemaFieldDataTypeClass

logger: logging.Logger = logging.getLogger(__name__)


class HiveColumnToAvroConverter:
Expand Down Expand Up @@ -102,7 +106,7 @@ def _parse_datatype_string(
@staticmethod
def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]:
parts = HiveColumnToAvroConverter._ignore_brackets_split(s, ",")
fields = []
fields: List[Dict] = []
for part in parts:
name_and_type = HiveColumnToAvroConverter._ignore_brackets_split(
part.strip(), HiveColumnToAvroConverter._STRUCT_TYPE_SEPARATOR
Expand All @@ -123,7 +127,9 @@ def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]:
field_type = HiveColumnToAvroConverter._parse_datatype_string(
name_and_type[1]
)
fields.append({"name": field_name, "type": field_type})

if not any(field["name"] == field_name for field in fields):
fields.append({"name": field_name, "type": field_type})

if kwargs.get("ustruct_seqn") is not None:
struct_name = f'__structn_{kwargs["ustruct_seqn"]}_{str(uuid.uuid4()).replace("-", "")}'
Expand Down Expand Up @@ -259,13 +265,28 @@ def get_schema_fields_for_hive_column(
default_nullable: bool = False,
is_part_of_key: bool = False,
) -> List[SchemaField]:
avro_schema_json = get_avro_schema_for_hive_column(
hive_column_name=hive_column_name, hive_column_type=hive_column_type
)
schema_fields = avro_schema_to_mce_fields(
avro_schema_string=json.dumps(avro_schema_json),
default_nullable=default_nullable,
)

try:
avro_schema_json = get_avro_schema_for_hive_column(
hive_column_name=hive_column_name, hive_column_type=hive_column_type
)
schema_fields = avro_schema_to_mce_fields(
avro_schema_string=json.dumps(avro_schema_json),
default_nullable=default_nullable,
swallow_exceptions=False,
)
except Exception as e:
logger.warning(
f"Unable to parse column {hive_column_name} and type {hive_column_type} the error was: {e}"
)
schema_fields = [
SchemaField(
fieldPath=hive_column_name,
type=SchemaFieldDataTypeClass(type=NullTypeClass()),
nativeDataType=hive_column_type,
)
]

assert schema_fields
if HiveColumnToAvroConverter.is_primitive_hive_type(hive_column_type):
# Primitive avro schema does not have any field names. Append it to fieldPath.
Expand Down
Empty file.
@@ -0,0 +1,37 @@
from datahub.metadata.schema_classes import (
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
)
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column


def test_get_avro_schema_for_hive_column():
schema_fields = get_schema_fields_for_hive_column("test", "int")
assert schema_fields[0].type.type == NumberTypeClass()
# Len will be the struct + 2 key there which should remain after the deduplication
assert len(schema_fields) == 1


def test_get_avro_schema_for_struct_hive_column():
schema_fields = get_schema_fields_for_hive_column("test", "struct<test:int>")
assert schema_fields[0].type.type == RecordTypeClass()
assert len(schema_fields) == 2


def test_get_avro_schema_for_struct_hive_with_duplicate_column():
schema_fields = get_schema_fields_for_hive_column(
"test", "struct<test:int, test2:int, test:int>"
)
assert schema_fields[0].type.type == RecordTypeClass()
# Len will be the struct + 2 key there which should remain after the deduplication
assert len(schema_fields) == 3


def test_get_avro_schema_for_struct_hive_with_duplicate_column2():
invalid_schema: str = "struct!test:intdsfs, test2:int, test:int>"
schema_fields = get_schema_fields_for_hive_column("test", invalid_schema)
assert len(schema_fields) == 1
assert schema_fields[0].type.type == NullTypeClass()
assert schema_fields[0].fieldPath == "test"
assert schema_fields[0].nativeDataType == invalid_schema

0 comments on commit 2f79b50

Please sign in to comment.