Skip to content

Commit

Permalink
🐛 [CDK, Declarative Source]: fix bug when type is missing for `anyO…
Browse files Browse the repository at this point in the history
…f` in nested arrays (#40667)
  • Loading branch information
bazarnov committed Jul 8, 2024
1 parent 202f935 commit 0c237d8
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 36 deletions.
108 changes: 72 additions & 36 deletions airbyte-cdk/python/airbyte_cdk/utils/schema_inferrer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
from genson.schema.strategies.object import Object
from genson.schema.strategies.scalar import Number

# schema keywords
_TYPE = "type"
_NULL_TYPE = "null"
_OBJECT_TYPE = "object"
_ANY_OF = "anyOf"
_ITEMS = "items"
_PROPERTIES = "properties"
_REQUIRED = "required"


class NoRequiredObj(Object):
Expand Down Expand Up @@ -86,36 +93,65 @@ def accumulate(self, record: AirbyteRecordMessage) -> None:
"""Uses the input record to add to the inferred schemas maintained by this object"""
self.stream_to_builder[record.stream].add_object(record.data)

def _null_type_in_any_of(self, node: InferredSchema) -> bool:
if _ANY_OF in node:
return {_TYPE: _NULL_TYPE} in node[_ANY_OF]
else:
return False

def _remove_type_from_any_of(self, node: InferredSchema) -> None:
if _ANY_OF in node:
node.pop(_TYPE, None)

def _clean_any_of(self, node: InferredSchema) -> None:
if len(node[_ANY_OF]) == 2 and self._null_type_in_any_of(node):
real_type = node[_ANY_OF][1] if node[_ANY_OF][0][_TYPE] == _NULL_TYPE else node[_ANY_OF][0]
node.update(real_type)
node[_TYPE] = [node[_TYPE], _NULL_TYPE]
node.pop(_ANY_OF)
# populate `type` for `anyOf` if it's not present to pass all other checks
elif len(node[_ANY_OF]) == 2 and not self._null_type_in_any_of(node):
node[_TYPE] = [_NULL_TYPE]

def _clean_properties(self, node: InferredSchema) -> None:
for key, value in list(node[_PROPERTIES].items()):
if isinstance(value, dict) and value.get(_TYPE, None) == _NULL_TYPE:
node[_PROPERTIES].pop(key)
else:
self._clean(value)

def _ensure_null_type_on_top(self, node: InferredSchema) -> None:
if isinstance(node[_TYPE], list):
if _NULL_TYPE in node[_TYPE]:
# we want to make sure null is always at the end as it makes schemas more readable
node[_TYPE].remove(_NULL_TYPE)
node[_TYPE].append(_NULL_TYPE)
else:
node[_TYPE] = [node[_TYPE], _NULL_TYPE]

def _clean(self, node: InferredSchema) -> InferredSchema:
"""
Recursively cleans up a produced schema:
- remove anyOf if one of them is just a null value
- remove properties of type "null"
"""

if isinstance(node, dict):
if "anyOf" in node:
if len(node["anyOf"]) == 2 and {"type": _NULL_TYPE} in node["anyOf"]:
real_type = node["anyOf"][1] if node["anyOf"][0]["type"] == _NULL_TYPE else node["anyOf"][0]
node.update(real_type)
node["type"] = [node["type"], _NULL_TYPE]
node.pop("anyOf")
if "properties" in node and isinstance(node["properties"], dict):
for key, value in list(node["properties"].items()):
if isinstance(value, dict) and value.get("type", None) == _NULL_TYPE:
node["properties"].pop(key)
else:
self._clean(value)
if "items" in node:
self._clean(node["items"])
if _ANY_OF in node:
self._clean_any_of(node)

if _PROPERTIES in node and isinstance(node[_PROPERTIES], dict):
self._clean_properties(node)

if _ITEMS in node:
self._clean(node[_ITEMS])

# this check needs to follow the "anyOf" cleaning as it might populate `type`
if isinstance(node["type"], list):
if _NULL_TYPE in node["type"]:
# we want to make sure null is always at the end as it makes schemas more readable
node["type"].remove(_NULL_TYPE)
node["type"].append(_NULL_TYPE)
else:
node["type"] = [node["type"], _NULL_TYPE]
self._ensure_null_type_on_top(node)

# remove added `type: ["null"]` for `anyOf` nested node
self._remove_type_from_any_of(node)

return node

def _add_required_properties(self, node: InferredSchema) -> InferredSchema:
Expand All @@ -124,7 +160,7 @@ def _add_required_properties(self, node: InferredSchema) -> InferredSchema:
node as required.
"""
# Removing nullable for the root as when we call `_clean`, we make everything nullable
node["type"] = "object"
node[_TYPE] = _OBJECT_TYPE

exceptions = []
for field in [x for x in [self._pk, self._cursor_field] if x]:
Expand Down Expand Up @@ -164,42 +200,42 @@ def _add_field_as_required(self, node: InferredSchema, path: List[str], traveled
if not traveled_path:
traveled_path = []

if "properties" not in node:
if _PROPERTIES not in node:
# This validation is only relevant when `traveled_path` is empty
raise ValueError(
f"Path {traveled_path} does not refer to an object but is `{node}` and hence {path} can't be marked as required."
)

next_node = path[0]
if next_node not in node["properties"]:
if next_node not in node[_PROPERTIES]:
raise ValueError(f"Path {traveled_path} does not have field `{next_node}` in the schema and hence can't be marked as required.")

if "type" not in node:
if _TYPE not in node:
# We do not expect this case to happen but we added a specific error message just in case
raise ValueError(
f"Unknown schema error: {traveled_path} is expected to have a type but did not. Schema inferrence is probably broken"
)

if node["type"] not in ["object", ["null", "object"], ["object", "null"]]:
if node[_TYPE] not in [_OBJECT_TYPE, [_NULL_TYPE, _OBJECT_TYPE], [_OBJECT_TYPE, _NULL_TYPE]]:
raise ValueError(f"Path {traveled_path} is expected to be an object but was of type `{node['properties'][next_node]['type']}`")

if "required" not in node or not node["required"]:
node["required"] = [next_node]
elif next_node not in node["required"]:
node["required"].append(next_node)
if _REQUIRED not in node or not node[_REQUIRED]:
node[_REQUIRED] = [next_node]
elif next_node not in node[_REQUIRED]:
node[_REQUIRED].append(next_node)

traveled_path.append(next_node)
self._add_field_as_required(node["properties"][next_node], path[1:], traveled_path)
self._add_field_as_required(node[_PROPERTIES][next_node], path[1:], traveled_path)

def _is_leaf(self, path: List[str]) -> bool:
return len(path) == 0

def _remove_null_from_type(self, node: InferredSchema) -> None:
if isinstance(node["type"], list):
if "null" in node["type"]:
node["type"].remove("null")
if len(node["type"]) == 1:
node["type"] = node["type"][0]
if isinstance(node[_TYPE], list):
if _NULL_TYPE in node[_TYPE]:
node[_TYPE].remove(_NULL_TYPE)
if len(node[_TYPE]) == 1:
node[_TYPE] = node[_TYPE][0]

def get_stream_schema(self, stream_name: str) -> Optional[InferredSchema]:
"""
Expand Down
86 changes: 86 additions & 0 deletions airbyte-cdk/python/unit_tests/utils/test_schema_inferrer.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,92 @@
{"my_stream": {"field_A": {"type": ["string", "null"]}}},
id="test_null_string",
),
pytest.param(
[
{
"stream": "data_with_nested_arrays",
"data": {
"root_property_object": {
"property_array": [
{
"title": "Nested_1",
"type": "multi-value",
"value": ["XL"]
},
{
"title": "Nested_2",
"type": "location",
"value": {
"nested_key_1": "GB",
"nested_key_2": "United Kingdom"
}
}
],
}
}
},
],
{
"data_with_nested_arrays": {
"root_property_object": {
"type": [
"object",
"null"
],
"properties": {
"property_array": {
"type": [
"array",
"null"
],
"items": {
"type": [
"object",
"null"
],
"properties": {
"title": {
"type": [
"string",
"null"
]
},
"type": {
"type": [
"string",
"null"
]
},
"value": {
"anyOf": [
{
"type": "array",
"items": {
"type": "string"
}
},
{
"type": "object",
"properties": {
"nested_key_1": {
"type": "string"
},
"nested_key_2": {
"type": "string"
}
}
}
]
}
}
}
}
}
}
}
},
id="test_data_with_nested_arrays",
),
],
)
def test_schema_derivation(input_records: List, expected_schemas: Mapping):
Expand Down

0 comments on commit 0c237d8

Please sign in to comment.