Skip to content

Commit

Permalink
Source Facebook Marketing #5190 - use dpath util for excluding fields
Browse files Browse the repository at this point in the history
  • Loading branch information
vitaliizazmic committed Oct 18, 2021
1 parent 8c8d9dd commit 9b76ac3
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ class FullRefreshConfig(BaseConfig):
config_path: str = config_path
configured_catalog_path: str = configured_catalog_path
timeout_seconds: int = timeout_seconds
# ignored fields doesn't support nested fields so far
ignored_fields: Optional[List[str]] = Field(description="List of fields for ignoring in sequential reads test")
# Ignored fields doesn't support splitting by stream so far. The specified fields will be searched in all streams.
ignored_fields: Optional[List[str]] = Field(description="List of fields path for ignoring in sequential reads test")


class IncrementalConfig(BaseConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from logging import Logger
from pathlib import Path
from subprocess import run
from typing import Any, List, Mapping, MutableMapping, Optional
from typing import Any, List, MutableMapping, Optional

import pytest
from airbyte_cdk.models import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConnectorSpecification, Type
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from functools import partial

import pytest
Expand All @@ -11,23 +12,19 @@

@pytest.mark.default_timeout(20 * 60)
class TestFullRefresh(BaseTest):
def test_sequential_reads(
self, inputs, connector_config, configured_catalog, docker_runner: ConnectorRunner, detailed_logger
):
ignored_fields = getattr(inputs, "ignored_fields") or {}
print(ignored_fields)
# configured_catalog = full_refresh_only_catalog(configured_catalog)
# output = docker_runner.call_read(connector_config, configured_catalog)
# records_1 = [message.record.data for message in output if message.type == Type.RECORD]
def test_sequential_reads(self, inputs, connector_config, configured_catalog, docker_runner: ConnectorRunner, detailed_logger):
ignored_fields = getattr(inputs, "ignored_fields")
configured_catalog = full_refresh_only_catalog(configured_catalog)
output = docker_runner.call_read(connector_config, configured_catalog)
records_1 = [message.record.data for message in output if message.type == Type.RECORD]

# output = docker_runner.call_read(connector_config, configured_catalog)
# records_2 = [message.record.data for message in output if message.type == Type.RECORD]
output = docker_runner.call_read(connector_config, configured_catalog)
records_2 = [message.record.data for message in output if message.type == Type.RECORD]

# serializer = partial(serialize, exclude_fields=ignored_fields)
# output_diff = set(map(serializer, records_1))
# output_diff = set(map(serializer, records_1)) - set(map(serializer, records_2))
# if output_diff:
# msg = "The two sequential reads should produce either equal set of records or one of them is a strict subset of the other"
# detailed_logger.info(msg)
# detailed_logger.log_json_list(output_diff)
# pytest.fail(msg)
serializer = partial(serialize, exclude_fields=ignored_fields)
output_diff = set(map(serializer, records_1)) - set(map(serializer, records_2))
if output_diff:
msg = "The two sequential reads should produce either equal set of records or one of them is a strict subset of the other"
detailed_logger.info(msg)
detailed_logger.log_json_list(output_diff)
pytest.fail(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
"ConnectorRunner",
"diff_dicts",
"serialize",
"verify_records_schema"
"verify_records_schema",
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

from collections import UserDict
from pathlib import Path
from typing import Any, Iterable, List, Mapping
from typing import Iterable, List

import pytest
from airbyte_cdk.models import Type
from yaml import load

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import functools
import json
from typing import List, Mapping, Optional

import dpath.util
import icdiff
import py
from pprintpp import pformat
Expand Down Expand Up @@ -68,10 +68,14 @@ def __eq__(self, other):
return hash(self) == hash(other)


def serialize(value, exclude_fields=None) -> str:
def serialize(value, exclude_fields: List = None) -> str:
"""Simplify comparison of nested dicts/lists"""
print(value)
if isinstance(value, Mapping):
# If value is Mapping, some fields can be excluded
if exclude_fields:
for field in exclude_fields:
if dpath.util.search(value, field):
dpath.util.delete(value, field)
return DictWithHash(value)
if isinstance(value, List):
return sorted([serialize(v) for v in value])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
ignored_fields:
- "ads_insights_age_and_gender/cost_per_estimated_ad_recallers"
- "cost_per_estimated_ad_recallers"

0 comments on commit 9b76ac3

Please sign in to comment.