Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): refactor sync-async config, thread-safety for sink repor… #5705

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -97,7 +97,10 @@ def create(
sink_class = sink_registry.get(sink_type)
sink_config = sink_config_holder.dict().get("config") or {}
if sink_type == "datahub-rest":
sink_config["use_sync_emitter_on_async_failure"] = True
# for the rest emitter we want to use sync mode to emit
# regardless of the default sink config since that makes it
# immune to process shutdown related failures
sink_config["mode"] = "SYNC"

sink: Sink = sink_class.create(sink_config, ctx)
return cls(sink, reporter_config.report_recipe, ctx)
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Expand Up @@ -272,13 +272,13 @@ def _notify_reporters_on_ingestion_completion(self) -> None:
for reporter in self.reporters:
try:
reporter.on_completion(
status="FAILURE"
status="CANCELLED"
if self.final_status == "cancelled"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ternary is pretty unwieldy

else "FAILURE"
if self.source.get_report().failures
or self.sink.get_report().failures
else "SUCCESS"
if self.final_status == "completed"
else "CANCELLED"
if self.final_status == "cancelled"
else "UNKNOWN",
report=self._get_structured_report(),
ctx=self.ctx,
Expand Down
54 changes: 38 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Expand Up @@ -2,12 +2,15 @@
import contextlib
import functools
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum
from threading import BoundedSemaphore
from typing import Union, cast

from pydantic import validator
from tdigest import TDigest

from datahub.cli.cli_utils import set_env_variables_override_config
Expand All @@ -28,26 +31,47 @@
logger = logging.getLogger(__name__)


class SyncOrAsync(Enum):
SYNC = "SYNC"
ASYNC = "ASYNC"


class DatahubRestSinkConfig(DatahubClientConfig):
max_pending_requests: int = 1000
use_sync_emitter_on_async_failure: bool = False
mode: SyncOrAsync = SyncOrAsync.ASYNC

@validator("mode", pre=True)
def str_to_enum_value(cls, v):
if v and isinstance(v, str):
return v.upper()


@dataclass
class DataHubRestSinkReport(SinkReport):
gms_version: str = ""
pending_requests: int = 0
_digest = TDigest()
_lock = threading.Lock()

def compute_stats(self) -> None:
super().compute_stats()
self.ninety_fifth_percentile_write_latency_in_millis = self._digest.percentile(
95
)
self.fiftieth_percentile_write_latency_in_millis = self._digest.percentile(50)
self._lock.acquire()
try:
self.ninety_fifth_percentile_write_latency_in_millis = (
self._digest.percentile(95)
)
self.fiftieth_percentile_write_latency_in_millis = self._digest.percentile(
50
)
finally:
self._lock.release()

def report_write_latency(self, delta: timedelta) -> None:
self._digest.update(round(delta.total_seconds() * 1000.0))
self._lock.acquire()
try:
self._digest.update(round(delta.total_seconds() * 1000.0))
finally:
self._lock.release()


class BoundedExecutor:
Expand Down Expand Up @@ -197,23 +221,21 @@ def write_record_async(
write_callback: WriteCallback,
) -> None:
record = record_envelope.record
try:
if self.config.mode == SyncOrAsync.ASYNC:
write_future = self.executor.submit(self.emitter.emit, record)
write_future.add_done_callback(
functools.partial(
self._write_done_callback, record_envelope, write_callback
)
)
self.report.pending_requests += 1
except RuntimeError:
if self.config.use_sync_emitter_on_async_failure:
try:
(start, end) = self.emitter.emit(record)
write_callback.on_success(record_envelope, success_metadata={})
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})
else:
raise
else:
# execute synchronously
try:
(start, end) = self.emitter.emit(record)
write_callback.on_success(record_envelope, success_metadata={})
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})

def get_report(self) -> SinkReport:
return self.report
Expand Down