Skip to content

Commit

Permalink
[ISSUE #26909] add latest connector config control message to connect… (
Browse files Browse the repository at this point in the history
#26922)

* [ISSUE #26909] add latest connector config control message to connector builder API

* [ISSUE #26909] flake

* Automated Commit - Formatting Changes

* [ISSUE #26909] fallback on in-memory dict if no config control message

* [ISSUE #26909] update and add tests
  • Loading branch information
maxi297 committed Jun 7, 2023
1 parent 6bf8c21 commit 4625cef
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 23 deletions.
36 changes: 17 additions & 19 deletions airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer
from airbyte_protocol.models.airbyte_protocol import (
AirbyteControlMessage,
AirbyteLogMessage,
AirbyteMessage,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
Level,
OrchestratorType,
TraceType,
)
from airbyte_protocol.models.airbyte_protocol import Type as MessageType
Expand Down Expand Up @@ -52,6 +54,7 @@ def get_message_groups(

slices = []
log_messages = []
latest_config_update: AirbyteControlMessage = None
for message_group in self._get_message_groups(
self._read_stream(source, config, configured_catalog),
schema_inferrer,
Expand All @@ -63,7 +66,9 @@ def get_message_groups(
if message_group.type == TraceType.ERROR:
error_message = f"{message_group.error.message} - {message_group.error.stack_trace}"
log_messages.append(LogMessage(**{"message": error_message, "level": "ERROR"}))

elif isinstance(message_group, AirbyteControlMessage):
if not latest_config_update or latest_config_update.emitted_at <= message_group.emitted_at:
latest_config_update = message_group
else:
slices.append(message_group)

Expand All @@ -74,11 +79,12 @@ def get_message_groups(
inferred_schema=schema_inferrer.get_stream_schema(
configured_catalog.streams[0].stream.name
), # The connector builder currently only supports reading from a single stream at a time
latest_config_update=latest_config_update.connectorConfig.config if latest_config_update else self._clean_config(config),
)

def _get_message_groups(
self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int
) -> Iterable[Union[StreamReadPages, AirbyteLogMessage, AirbyteTraceMessage]]:
) -> Iterable[Union[StreamReadPages, AirbyteControlMessage, AirbyteLogMessage, AirbyteTraceMessage]]:
"""
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
and record messages belong to the prior request log message and when we encounter another request, append the latest
Expand Down Expand Up @@ -135,6 +141,8 @@ def _get_message_groups(
current_page_records.append(message.record.data)
records_count += 1
schema_inferrer.accumulate(message.record)
elif message.type == MessageType.CONTROL and message.control.type == OrchestratorType.CONNECTOR_CONFIG:
yield message.control
else:
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records, validate_page_complete=not had_error)
yield StreamReadSlices(pages=current_slice_pages, slice_descriptor=current_slice_descriptor)
Expand Down Expand Up @@ -217,20 +225,10 @@ def _has_reached_limit(self, slices: List[StreamReadPages]):
def _parse_slice_description(self, log_message):
return json.loads(log_message.replace(AbstractSource.SLICE_LOG_PREFIX, "", 1))

@classmethod
def _create_configure_catalog(cls, stream_name: str) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog.parse_obj(
{
"streams": [
{
"stream": {
"name": stream_name,
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
}
)
@staticmethod
def _clean_config(config: Mapping[str, Any]):
cleaned_config = deepcopy(config)
for key in config.keys():
if key.startswith("__"):
del cleaned_config[key]
return cleaned_config
1 change: 1 addition & 0 deletions airbyte-cdk/python/airbyte_cdk/connector_builder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class StreamRead(object):
slices: List[StreamReadSlices]
test_read_limit_reached: bool
inferred_schema: Optional[Dict[str, Any]]
latest_config_update: Optional[Dict[str, Any]]


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def test_read():
],
test_read_limit_reached=False,
inferred_schema=None,
latest_config_update={}
)

expected_airbyte_message = AirbyteMessage(
Expand All @@ -367,6 +368,7 @@ def test_read():
],
"test_read_limit_reached": False,
"inferred_schema": None,
"latest_config_update": {}
},
emitted_at=1,
),
Expand Down Expand Up @@ -407,7 +409,8 @@ def check_config_against_spec(self):
pages=[StreamReadPages(records=[], request=None, response=None)],
slice_descriptor=None, state=None)],
test_read_limit_reached=False,
inferred_schema=None)
inferred_schema=None,
latest_config_update={})

expected_message = AirbyteMessage(
type=MessageType.RECORD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@
import pytest
from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
from airbyte_cdk.connector_builder.models import HttpRequest, HttpResponse, LogMessage, StreamRead, StreamReadPages
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, Level
from airbyte_cdk.models import (
AirbyteControlConnectorConfigMessage,
AirbyteControlMessage,
AirbyteLogMessage,
AirbyteMessage,
AirbyteRecordMessage,
Level,
OrchestratorType,
)
from airbyte_cdk.models import Type as MessageType
from unit_tests.connector_builder.utils import create_configured_catalog

Expand Down Expand Up @@ -463,9 +471,9 @@ def test_get_grouped_messages_with_many_slices(mock_entrypoint_read):
)
)

connecto_builder_handler = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES)
connector_builder_handler = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES)

stream_read: StreamRead = connecto_builder_handler.get_message_groups(
stream_read: StreamRead = connector_builder_handler.get_message_groups(
source=mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras")
)

Expand Down Expand Up @@ -530,6 +538,76 @@ def test_read_stream_returns_error_if_stream_does_not_exist():
assert "ERROR" in actual_response.logs[0].level


@patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read')
def test_given_control_message_then_stream_read_has_config_update(mock_entrypoint_read):
updated_config = {"x": 1}
mock_source = make_mock_source(mock_entrypoint_read, iter(
any_request_and_response_with_a_record() + [connector_configuration_control_message(1, updated_config)]
))
connector_builder_handler = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES)
stream_read: StreamRead = connector_builder_handler.get_message_groups(
source=mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras")
)

assert stream_read.latest_config_update == updated_config


@patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read')
def test_given_no_control_message_then_use_in_memory_config_change_as_update(mock_entrypoint_read):
mock_source = make_mock_source(mock_entrypoint_read, iter(any_request_and_response_with_a_record()))
connector_builder_handler = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES)
full_config = {**CONFIG, **{"__injected_declarative_manifest": MANIFEST}}
stream_read: StreamRead = connector_builder_handler.get_message_groups(
source=mock_source, config=full_config, configured_catalog=create_configured_catalog("hashiras")
)

assert stream_read.latest_config_update == CONFIG


@patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read')
def test_given_multiple_control_messages_then_stream_read_has_latest_based_on_emitted_at(mock_entrypoint_read):
earliest = 0
earliest_config = {"earliest": 0}
latest = 1
latest_config = {"latest": 1}
mock_source = make_mock_source(mock_entrypoint_read, iter(
any_request_and_response_with_a_record() +
[
# here, we test that even if messages are emitted in a different order, we still rely on `emitted_at`
connector_configuration_control_message(latest, latest_config),
connector_configuration_control_message(earliest, earliest_config),
]
)
)
connector_builder_handler = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES)
stream_read: StreamRead = connector_builder_handler.get_message_groups(
source=mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras")
)

assert stream_read.latest_config_update == latest_config


@patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read')
def test_given_multiple_control_messages_with_same_timestamp_then_stream_read_has_latest_based_on_message_order(mock_entrypoint_read):
emitted_at = 0
earliest_config = {"earliest": 0}
latest_config = {"latest": 1}
mock_source = make_mock_source(mock_entrypoint_read, iter(
any_request_and_response_with_a_record() +
[
connector_configuration_control_message(emitted_at, earliest_config),
connector_configuration_control_message(emitted_at, latest_config),
]
)
)
connector_builder_handler = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES)
stream_read: StreamRead = connector_builder_handler.get_message_groups(
source=mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras")
)

assert stream_read.latest_config_update == latest_config


def make_mock_source(mock_entrypoint_read, return_value: Iterator) -> MagicMock:
mock_source = MagicMock()
mock_entrypoint_read.return_value = return_value
Expand All @@ -550,3 +628,22 @@ def record_message(stream: str, data: dict) -> AirbyteMessage:

def slice_message(slice_descriptor: str = '{"key": "value"}') -> AirbyteMessage:
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message="slice:" + slice_descriptor))


def connector_configuration_control_message(emitted_at: float, config: dict) -> AirbyteMessage:
return AirbyteMessage(
type=MessageType.CONTROL,
control=AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=emitted_at,
connectorConfig=AirbyteControlConnectorConfigMessage(config=config),
)
)


def any_request_and_response_with_a_record():
return [
request_log_message({"request": 1}),
response_log_message({"response": 2}),
record_message("hashiras", {"name": "Shinobu Kocho"}),
]

0 comments on commit 4625cef

Please sign in to comment.