Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish stream status messages in CDK #24994

Merged
merged 42 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
3ed82e1
Publish stream status messages in CDK
jdpgrailsdev Apr 7, 2023
d49e33d
Automated Commit - Formatting Changes
jdpgrailsdev Apr 7, 2023
52b7c66
Convert to StreamDescriptor
jdpgrailsdev Apr 7, 2023
361b7b5
Automated Commit - Formatting Changes
jdpgrailsdev Apr 7, 2023
b716aba
Bump to latest protocol model
jdpgrailsdev Apr 13, 2023
681cc58
Automated Commit - Formatting Changes
jdpgrailsdev Apr 13, 2023
31cbde1
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 13, 2023
c31f0bc
Bump protocol version
jdpgrailsdev Apr 13, 2023
6735aee
Add tests for stream status message creation
jdpgrailsdev Apr 13, 2023
f53ef81
Formatting
jdpgrailsdev Apr 13, 2023
105a1a2
Formatting
jdpgrailsdev Apr 13, 2023
f64867f
Fix failing test
jdpgrailsdev Apr 13, 2023
89b4e50
Actually emit state message
jdpgrailsdev Apr 13, 2023
5ba0e30
Automated Commit - Formatting Changes
jdpgrailsdev Apr 13, 2023
8752a28
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 13, 2023
b242ec4
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 13, 2023
21f4e8e
Bump airbyte-protocol
jdpgrailsdev Apr 14, 2023
9e063b8
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 14, 2023
8f02fce
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 14, 2023
9b37b7b
PR feedback
jdpgrailsdev Apr 15, 2023
59d4595
Fix parameter input
jdpgrailsdev Apr 15, 2023
311f5e7
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 17, 2023
207c64b
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 18, 2023
1297f6b
Correctly yield status message
jdpgrailsdev Apr 19, 2023
92e409f
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 19, 2023
91caacf
PR feedback
jdpgrailsdev Apr 19, 2023
ef16df4
Formatting
jdpgrailsdev Apr 19, 2023
bd5a74b
Fix failing tests
jdpgrailsdev Apr 19, 2023
9069ca2
Automated Commit - Formatting Changes
jdpgrailsdev Apr 19, 2023
557c158
Revert accidental change
jdpgrailsdev Apr 19, 2023
518a34d
Merge branch 'jonathan/cdk-stream-status-messages' of github.com:airb…
jdpgrailsdev Apr 19, 2023
a2de336
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 20, 2023
3ae7137
Automated Change
jdpgrailsdev Apr 20, 2023
4ae440b
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 21, 2023
718453e
Replace STOPPED with COMPLETE/INCOMPLETE
jdpgrailsdev Apr 21, 2023
c381cec
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 21, 2023
90d6202
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 24, 2023
d6e676e
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 25, 2023
e80397e
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 25, 2023
709edb8
Update source-facebook-marketing changelog
jdpgrailsdev Apr 25, 2023
95cffdb
Revert "Update source-facebook-marketing changelog"
jdpgrailsdev Apr 26, 2023
569cce5
Merge branch 'master' into jonathan/cdk-stream-status-messages
jdpgrailsdev Apr 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
AirbyteLogMessage,
AirbyteMessage,
AirbyteStateMessage,
AirbyteStreamStatus,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Level,
Expand All @@ -28,6 +29,7 @@
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


Expand Down Expand Up @@ -113,17 +115,20 @@ def read(
continue
try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
self._emit_stream_status(configured_stream, AirbyteStreamStatus.STARTED, None)
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
state_manager=state_manager,
internal_config=internal_config,
)
self._emit_stream_status(configured_stream, AirbyteStreamStatus.STOPPED, True)
except AirbyteTracedException as e:
raise e
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
self._emit_stream_status(configured_stream, AirbyteStreamStatus.STOPPED, False)
jdpgrailsdev marked this conversation as resolved.
Show resolved Hide resolved
display_message = stream_instance.get_error_display_message(e)
if display_message:
raise AirbyteTracedException.from_exception(e, message=display_message) from e
Expand Down Expand Up @@ -185,6 +190,9 @@ def _read_stream(
for record in record_iterator:
if record.type == MessageType.RECORD:
record_counter += 1
if record_counter == 1:
# If we just read the first record of the stream, emit the transition to the RUNNING state
self._emit_stream_status(configured_stream, AirbyteStreamStatus.RUNNING, None)
yield record

logger.info(f"Read {record_counter} records from {stream_name} stream")
Expand Down Expand Up @@ -336,3 +344,9 @@ def _get_message(self, record_data_or_message: Union[StreamData, AirbyteMessage]
return record_data_or_message
else:
return stream_data_to_airbyte_message(stream.name, record_data_or_message, stream.transformer, stream.get_json_schema())

def _emit_stream_status(self, stream: ConfiguredAirbyteStream, stream_status: AirbyteStreamStatus, successful: bool):
"""
Emits a new AirbyteStreamStatusTraceMessage
"""
print(stream_status_as_airbyte_message(stream, AirbyteStreamStatus.RUNNING, None).json(exclude_unset=True))
jdpgrailsdev marked this conversation as resolved.
Show resolved Hide resolved
29 changes: 29 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/utils/stream_status_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from datetime import datetime

from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatusTraceMessage, AirbyteTraceMessage, StreamDescriptor, TraceType
from airbyte_cdk.models import Type as MessageType


def as_airbyte_message(stream, current_status, successful) -> AirbyteMessage:
jdpgrailsdev marked this conversation as resolved.
Show resolved Hide resolved
"""
Builds an AirbyteStreamStatusTraceMessage for the provided stream
"""

now_millis = datetime.now().timestamp() * 1000.0

trace_message = AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
emitted_at=now_millis,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name=stream.stream.name, namespace=stream.stream.namespace),
status=current_status,
success=successful,
),
)

return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
packages=find_packages(exclude=("unit_tests",)),
package_data={"airbyte_cdk": ["py.typed", "sources/declarative/declarative_component_schema.yaml"]},
install_requires=[
"airbyte-protocol-models==1.0.0",
"airbyte-protocol-models==0.3.5",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a downgrade... but I don't think we ever were on v1.0.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. We were never on 1.0.0 even though we set that as the dependency version. The new changes were released as a patch on the current version (0.3), which is the reason for the correction.

"backoff",
"dpath~=2.0.1",
"isodate~=0.6.1",
Expand Down
74 changes: 74 additions & 0 deletions airbyte-cdk/python/unit_tests/utils/test_stream_status_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.models import (
AirbyteMessage,
AirbyteStream,
AirbyteStreamStatus,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
TraceType,
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message

stream = AirbyteStream(name="name", namespace="namespace", json_schema={}, supported_sync_modes=[SyncMode.full_refresh])
configured_stream = ConfiguredAirbyteStream(stream=stream, sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.overwrite)


def test_started_as_message():
stream_status = AirbyteStreamStatus.STARTED
airbyte_message = stream_status_as_airbyte_message(configured_stream, stream_status, None)

assert type(airbyte_message) == AirbyteMessage
assert airbyte_message.type == MessageType.TRACE
assert airbyte_message.trace.type == TraceType.STREAM_STATUS
assert airbyte_message.trace.emitted_at > 0
assert airbyte_message.trace.stream_status.stream_descriptor.name == configured_stream.stream.name
assert airbyte_message.trace.stream_status.stream_descriptor.namespace == configured_stream.stream.namespace
assert airbyte_message.trace.stream_status.status == stream_status
assert airbyte_message.trace.stream_status.success is None


def test_running_as_message():
stream_status = AirbyteStreamStatus.RUNNING
airbyte_message = stream_status_as_airbyte_message(configured_stream, stream_status, None)

assert type(airbyte_message) == AirbyteMessage
assert airbyte_message.type == MessageType.TRACE
assert airbyte_message.trace.type == TraceType.STREAM_STATUS
assert airbyte_message.trace.emitted_at > 0
assert airbyte_message.trace.stream_status.stream_descriptor.name == configured_stream.stream.name
assert airbyte_message.trace.stream_status.stream_descriptor.namespace == configured_stream.stream.namespace
assert airbyte_message.trace.stream_status.status == stream_status
assert airbyte_message.trace.stream_status.success is None


def test_stopped_successful_as_message():
stream_status = AirbyteStreamStatus.STOPPED
airbyte_message = stream_status_as_airbyte_message(configured_stream, stream_status, True)

assert type(airbyte_message) == AirbyteMessage
assert airbyte_message.type == MessageType.TRACE
assert airbyte_message.trace.type == TraceType.STREAM_STATUS
assert airbyte_message.trace.emitted_at > 0
assert airbyte_message.trace.stream_status.stream_descriptor.name == configured_stream.stream.name
assert airbyte_message.trace.stream_status.stream_descriptor.namespace == configured_stream.stream.namespace
assert airbyte_message.trace.stream_status.status == stream_status
assert airbyte_message.trace.stream_status.success is True


def test_stopped_failed_as_message():
stream_status = AirbyteStreamStatus.STOPPED
airbyte_message = stream_status_as_airbyte_message(configured_stream, stream_status, False)

assert type(airbyte_message) == AirbyteMessage
assert airbyte_message.type == MessageType.TRACE
assert airbyte_message.trace.type == TraceType.STREAM_STATUS
assert airbyte_message.trace.emitted_at > 0
assert airbyte_message.trace.stream_status.stream_descriptor.name == configured_stream.stream.name
assert airbyte_message.trace.stream_status.stream_descriptor.namespace == configured_stream.stream.namespace
assert airbyte_message.trace.stream_status.status == stream_status
assert airbyte_message.trace.stream_status.success is False
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DataTypeEnumTest {
@Test
void testConversionFromJsonSchemaPrimitiveToDataType() {
assertEquals(5, DataType.class.getEnumConstants().length);
assertEquals(16, JsonSchemaPrimitive.class.getEnumConstants().length);
assertEquals(17, JsonSchemaPrimitive.class.getEnumConstants().length);

assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase()));
assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase()));
Expand Down
2 changes: 1 addition & 1 deletion deps.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[versions]
airbyte-protocol = "1.0.0"
airbyte-protocol = "0.3.5"
commons_io = "2.7"
connectors-destination-testcontainers-clickhouse = "1.17.3"
connectors-destination-testcontainers-elasticsearch = "1.17.3"
Expand Down