Skip to content

Commit

Permalink
Emit multiple error trace messages and continue syncs by default (#34636
Browse files Browse the repository at this point in the history
)
  • Loading branch information
brianjlai committed Feb 7, 2024
1 parent e1f7925 commit cc2a6e2
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 52 deletions.
45 changes: 32 additions & 13 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Status,
StreamDescriptor,
SyncMode,
)
from airbyte_cdk.models import Type as MessageType
Expand All @@ -27,6 +28,7 @@
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down Expand Up @@ -133,27 +135,44 @@ def read(
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.COMPLETE)
except AirbyteTracedException as e:
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
if self.continue_sync_on_stream_failure:
stream_name_to_exception[stream_instance.name] = e
else:
raise e
yield e.as_sanitized_airbyte_message(stream_descriptor=StreamDescriptor(name=configured_stream.stream.name))
stream_name_to_exception[stream_instance.name] = e
if self.stop_sync_on_stream_failure:
logger.info(
f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
)
break
except Exception as e:
yield from self._emit_queued_messages()
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
display_message = stream_instance.get_error_display_message(e)
if display_message:
raise AirbyteTracedException.from_exception(e, message=display_message) from e
raise e
traced_exception = AirbyteTracedException.from_exception(e, message=display_message)
else:
traced_exception = AirbyteTracedException.from_exception(e)
yield traced_exception.as_sanitized_airbyte_message(
stream_descriptor=StreamDescriptor(name=configured_stream.stream.name)
)
stream_name_to_exception[stream_instance.name] = traced_exception
if self.stop_sync_on_stream_failure:
logger.info(f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}")
break
finally:
timer.finish_event()
logger.info(f"Finished syncing {configured_stream.stream.name}")
logger.info(timer.report())

if self.continue_sync_on_stream_failure and len(stream_name_to_exception) > 0:
raise AirbyteTracedException(message=self._generate_failed_streams_error_message(stream_name_to_exception))
if len(stream_name_to_exception) > 0:
error_message = self._generate_failed_streams_error_message(stream_name_to_exception)
logger.info(error_message)
# We still raise at least one exception when a stream raises an exception because the platform
# currently relies on a non-zero exit code to determine if a sync attempt has failed
raise AirbyteTracedException(message=error_message)
logger.info(f"Finished syncing {self.name}")

@property
Expand Down Expand Up @@ -282,17 +301,17 @@ def message_repository(self) -> Union[None, MessageRepository]:
return _default_message_repository

@property
def continue_sync_on_stream_failure(self) -> bool:
def stop_sync_on_stream_failure(self) -> bool:
"""
WARNING: This function is in-development which means it is subject to change. Use at your own risk.
By default, a source should raise an exception and stop the sync when it encounters an error while syncing a stream. This
method can be overridden on a per-source basis so that a source will continue syncing streams other streams even if an
exception is raised for a stream.
By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
continue syncing the next stream. This can be overwridden on a per-source basis so that the source will stop the sync
on the first error seen and emit a single error trace message for that stream.
"""
return False

@staticmethod
def _generate_failed_streams_error_message(stream_failures: Mapping[str, AirbyteTracedException]) -> str:
failures = ", ".join([f"{stream}: {exception.__repr__()}" for stream, exception in stream_failures.items()])
failures = ", ".join([f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exception in stream_failures.items()])
return f"During the sync, the following streams did not sync successfully: {failures}"
17 changes: 16 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
AirbyteTraceMessage,
FailureType,
Status,
StreamDescriptor,
TraceType,
)
from airbyte_cdk.models import Type as MessageType
Expand Down Expand Up @@ -43,7 +44,7 @@ def __init__(
self._exception = exception
super().__init__(internal_message)

def as_airbyte_message(self) -> AirbyteMessage:
def as_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
"""
Builds an AirbyteTraceMessage from the exception
"""
Expand All @@ -60,6 +61,7 @@ def as_airbyte_message(self) -> AirbyteMessage:
internal_message=self.internal_message,
failure_type=self.failure_type,
stack_trace=stack_trace_str,
stream_descriptor=stream_descriptor,
),
)

Expand Down Expand Up @@ -88,3 +90,16 @@ def from_exception(cls, exc: BaseException, *args, **kwargs) -> "AirbyteTracedEx
:param exc: the exception that caused the error
"""
return cls(internal_message=str(exc), exception=exc, *args, **kwargs) # type: ignore # ignoring because of args and kwargs

def as_sanitized_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
"""
Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
"""
error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
if error_message.trace.error.message:
error_message.trace.error.message = filter_secrets(error_message.trace.error.message)
if error_message.trace.error.internal_message:
error_message.trace.error.internal_message = filter_secrets(error_message.trace.error.internal_message)
if error_message.trace.error.stack_trace:
error_message.trace.error.stack_trace = filter_secrets(error_message.trace.error.stack_trace)
return error_message
Loading

0 comments on commit cc2a6e2

Please sign in to comment.