Skip to content

Commit

Permalink
Airbyte CDK: add filter to RemoveFields (#35326)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 committed Feb 21, 2024
1 parent 29bcceb commit 3355c5c
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 17 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pip install -e ".[dev]" # [dev] installs development-only dependencies
If the iteration you are working on includes changes to the models, you might want to regenerate them. In order to do that, you can run:

```bash
./gradlew :airbyte-cdk:python:format
./gradlew :airbyte-cdk:python:build
```

This will generate the files based on the schemas, add the license information and format the code. If you want to only do the former and rely on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1847,6 +1847,19 @@ definitions:
type:
type: string
enum: [RemoveFields]
condition:
description: The predicate to filter a property by a property value. Property will be removed if it is empty OR expression is evaluated to True.,
type: string
default: ""
interpolation_context:
- config
- property
- parameters
examples:
- "{{ property|string == '' }}"
- "{{ property is integer }}"
- "{{ property|length > 5 }}"
- "{{ property == 'some_string_to_match' }}"
field_pointers:
title: Field Paths
description: Array of paths defining the field to remove. Each item is an array whose field describe the path of a field to remove.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,16 @@ class SchemaNormalization(Enum):

class RemoveFields(BaseModel):
type: Literal['RemoveFields']
condition: Optional[str] = Field(
'',
description="The predicate to filter a property by a property value. Property will be removed if it is empty OR expression is evaluated to True.",
examples=[
"{{ property|string == '' }}",
'{{ property is integer }}',
'{{ property|length > 5 }}',
"{{ property == 'some_string_to_match' }}",
],
)
field_pointers: List[List[str]] = Field(
...,
description='Array of paths defining the field to remove. Each item is an array whose field describe the path of a field to remove.',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ def create_record_selector(

@staticmethod
def create_remove_fields(model: RemoveFieldsModel, config: Config, **kwargs: Any) -> RemoveFields:
return RemoveFields(field_pointers=model.field_pointers, parameters={})
return RemoveFields(field_pointers=model.field_pointers, condition=model.condition or "", parameters={})

def create_selective_authenticator(self, model: SelectiveAuthenticatorModel, config: Config, **kwargs: Any) -> DeclarativeAuthenticator:
authenticators = {name: self._create_component_from_model(model=auth, config=config) for name, auth in model.authenticators.items()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import dpath.exceptions
import dpath.util
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, FieldPointer, StreamSlice, StreamState

Expand Down Expand Up @@ -40,6 +41,10 @@ class RemoveFields(RecordTransformation):

field_pointers: List[FieldPointer]
parameters: InitVar[Mapping[str, Any]]
condition: str = ""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters=parameters)

def transform(
self,
Expand All @@ -55,7 +60,11 @@ def transform(
for pointer in self.field_pointers:
# the dpath library by default doesn't delete fields from arrays
try:
dpath.util.delete(record, pointer)
dpath.util.delete(
record,
pointer,
afilter=(lambda x: self._filter_interpolator.eval(config or {}, property=x)) if self.condition else None,
)
except dpath.exceptions.PathNotFound:
# if the (potentially nested) property does not exist, silently skip
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,79 @@


@pytest.mark.parametrize(
["input_record", "field_pointers", "expected"],
["input_record", "field_pointers", "condition", "expected"],
[
pytest.param({"k1": "v", "k2": "v"}, [["k1"]], {"k2": "v"}, id="remove a field that exists (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k3"]], {"k1": "v", "k2": "v"}, id="remove a field that doesn't exist (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k1"], ["k2"]], {}, id="remove multiple fields that exist (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k1"]], None, {"k2": "v"}, id="remove a field that exists (flat dict), condition = None"),
pytest.param({"k1": "v", "k2": "v"}, [["k1"]], "", {"k2": "v"}, id="remove a field that exists (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k3"]], "", {"k1": "v", "k2": "v"}, id="remove a field that doesn't exist (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k1"], ["k2"]], "", {}, id="remove multiple fields that exist (flat dict)"),
# TODO: should we instead splice the element out of the array? I think that's the more intuitive solution
# Otherwise one could just set the field's value to null.
pytest.param({"k1": [1, 2]}, [["k1", 0]], {"k1": [None, 2]}, id="remove field inside array (int index)"),
pytest.param({"k1": [1, 2]}, [["k1", "0"]], {"k1": [None, 2]}, id="remove field inside array (string index)"),
pytest.param({"k1": [1, 2]}, [["k1", 0]], "", {"k1": [None, 2]}, id="remove field inside array (int index)"),
pytest.param({"k1": [1, 2]}, [["k1", "0"]], "", {"k1": [None, 2]}, id="remove field inside array (string index)"),
pytest.param(
{"k1": "v", "k2": "v", "k3": [0, 1], "k4": "v"},
[["k1"], ["k2"], ["k3", 0]],
"",
{"k3": [None, 1], "k4": "v"},
id="test all cases (flat)",
),
pytest.param({"k1": [0, 1]}, [[".", "k1", 10]], {"k1": [0, 1]}, id="remove array index that doesn't exist (flat)"),
pytest.param({".": {"k1": [0, 1]}}, [[".", "k1", 10]], {".": {"k1": [0, 1]}}, id="remove array index that doesn't exist (nested)"),
pytest.param({".": {"k2": "v", "k1": "v"}}, [[".", "k1"]], {".": {"k2": "v"}}, id="remove nested field that exists"),
pytest.param({"k1": [0, 1]}, [[".", "k1", 10]], "", {"k1": [0, 1]}, id="remove array index that doesn't exist (flat)"),
pytest.param(
{".": {"k2": "v", "k1": "v"}}, [[".", "k3"]], {".": {"k2": "v", "k1": "v"}}, id="remove field that doesn't exist (nested)"
{".": {"k1": [0, 1]}}, [[".", "k1", 10]], "", {".": {"k1": [0, 1]}}, id="remove array index that doesn't exist (nested)"
),
pytest.param({".": {"k2": "v", "k1": "v"}}, [[".", "k1"], [".", "k2"]], {".": {}}, id="remove multiple fields that exist (nested)"),
pytest.param({".": {"k2": "v", "k1": "v"}}, [[".", "k1"]], "", {".": {"k2": "v"}}, id="remove nested field that exists"),
pytest.param(
{".": {"k1": [0, 1]}}, [[".", "k1", 0]], {".": {"k1": [None, 1]}}, id="remove multiple fields that exist in arrays (nested)"
{".": {"k2": "v", "k1": "v"}}, [[".", "k3"]], "", {".": {"k2": "v", "k1": "v"}}, id="remove field that doesn't exist (nested)"
),
pytest.param(
{".": {"k2": "v", "k1": "v"}}, [[".", "k1"], [".", "k2"]], "", {".": {}}, id="remove multiple fields that exist (nested)"
),
pytest.param(
{".": {"k1": [0, 1]}},
[[".", "k1", 0]],
"",
{".": {"k1": [None, 1]}},
id="remove multiple fields that exist in arrays (nested)",
),
pytest.param(
{".": {"k1": [{"k2": "v", "k3": "v"}, {"k4": "v"}]}},
[[".", "k1", 0, "k2"], [".", "k1", 1, "k4"]],
"",
{".": {"k1": [{"k3": "v"}, {}]}},
id="remove fields that exist in arrays (deeply nested)",
),
pytest.param(
{"k1": "v", "k2": "v"},
[["**"]],
"{{ False }}",
{"k1": "v", "k2": "v"},
id="do not remove any field if condition is boolean False",
),
pytest.param({"k1": "v", "k2": "v"}, [["**"]], "{{ True }}", {}, id="remove all field if condition is boolean True"),
pytest.param(
{"k1": "v", "k2": "v1", "k3": "v1", "k4": {"k_nested": "v1", "k_nested2": "v2"}},
[["**"]],
"{{ property == 'v1' }}",
{"k1": "v", "k4": {"k_nested2": "v2"}},
id="recursively remove any field that matches property condition and leave that does not",
),
pytest.param(
{"k1": "v", "k2": "some_long_string", "k3": "some_long_string", "k4": {"k_nested": "v1", "k_nested2": "v2"}},
[["**"]],
"{{ property|length > 5 }}",
{"k1": "v", "k4": {"k_nested": "v1", "k_nested2": "v2"}},
id="remove any field that have length > 5 and leave that does not",
),
pytest.param(
{"k1": 255, "k2": "some_string", "k3": "some_long_string", "k4": {"k_nested": 123123, "k_nested2": "v2"}},
[["**"]],
"{{ property is integer }}",
{"k2": "some_string", "k3": "some_long_string", "k4": {"k_nested2": "v2"}},
id="recursively remove any field that of type integer and leave that does not",
),
],
)
def test_remove_fields(input_record: Mapping[str, Any], field_pointers: List[FieldPointer], expected: Mapping[str, Any]):
transformation = RemoveFields(field_pointers=field_pointers, parameters={})
def test_remove_fields(input_record: Mapping[str, Any], field_pointers: List[FieldPointer], condition: str, expected: Mapping[str, Any]):
transformation = RemoveFields(field_pointers=field_pointers, condition=condition, parameters={})
assert transformation.transform(input_record) == expected

0 comments on commit 3355c5c

Please sign in to comment.