Skip to content

Commit

Permalink
fix(ingest): refactor sync-async config, thread-safety for sink repor…
Browse files Browse the repository at this point in the history
…ting
  • Loading branch information
shirshanka committed Aug 22, 2022
1 parent 9511715 commit 38ac85f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 20 deletions.
Expand Up @@ -97,7 +97,10 @@ def create(
sink_class = sink_registry.get(sink_type)
sink_config = sink_config_holder.dict().get("config") or {}
if sink_type == "datahub-rest":
sink_config["use_sync_emitter_on_async_failure"] = True
# for the rest emitter we want to use sync mode to emit
# regardless of the default sink config since that makes it
# immune to process shutdown related failures
sink_config["mode"] = "SYNC"

sink: Sink = sink_class.create(sink_config, ctx)
return cls(sink, reporter_config.report_recipe, ctx)
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Expand Up @@ -272,13 +272,13 @@ def _notify_reporters_on_ingestion_completion(self) -> None:
for reporter in self.reporters:
try:
reporter.on_completion(
status="FAILURE"
status="CANCELLED"
if self.final_status == "cancelled"
else "FAILURE"
if self.source.get_report().failures
or self.sink.get_report().failures
else "SUCCESS"
if self.final_status == "completed"
else "CANCELLED"
if self.final_status == "cancelled"
else "UNKNOWN",
report=self._get_structured_report(),
ctx=self.ctx,
Expand Down
54 changes: 38 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Expand Up @@ -2,12 +2,15 @@
import contextlib
import functools
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum
from threading import BoundedSemaphore
from typing import Union, cast

from pydantic import validator
from tdigest import TDigest

from datahub.cli.cli_utils import set_env_variables_override_config
Expand All @@ -28,26 +31,47 @@
logger = logging.getLogger(__name__)


class SyncOrAsync(Enum):
SYNC = "SYNC"
ASYNC = "ASYNC"


class DatahubRestSinkConfig(DatahubClientConfig):
max_pending_requests: int = 1000
use_sync_emitter_on_async_failure: bool = False
mode: SyncOrAsync = SyncOrAsync.ASYNC

@validator("mode", pre=True)
def str_to_enum_value(cls, v):
if v and isinstance(v, str):
return v.upper()


@dataclass
class DataHubRestSinkReport(SinkReport):
gms_version: str = ""
pending_requests: int = 0
_digest = TDigest()
_lock = threading.Lock()

def compute_stats(self) -> None:
super().compute_stats()
self.ninety_fifth_percentile_write_latency_in_millis = self._digest.percentile(
95
)
self.fiftieth_percentile_write_latency_in_millis = self._digest.percentile(50)
self._lock.acquire()
try:
self.ninety_fifth_percentile_write_latency_in_millis = (
self._digest.percentile(95)
)
self.fiftieth_percentile_write_latency_in_millis = self._digest.percentile(
50
)
finally:
self._lock.release()

def report_write_latency(self, delta: timedelta) -> None:
self._digest.update(round(delta.total_seconds() * 1000.0))
self._lock.acquire()
try:
self._digest.update(round(delta.total_seconds() * 1000.0))
finally:
self._lock.release()


class BoundedExecutor:
Expand Down Expand Up @@ -197,23 +221,21 @@ def write_record_async(
write_callback: WriteCallback,
) -> None:
record = record_envelope.record
try:
if self.config.mode == SyncOrAsync.ASYNC:
write_future = self.executor.submit(self.emitter.emit, record)
write_future.add_done_callback(
functools.partial(
self._write_done_callback, record_envelope, write_callback
)
)
self.report.pending_requests += 1
except RuntimeError:
if self.config.use_sync_emitter_on_async_failure:
try:
(start, end) = self.emitter.emit(record)
write_callback.on_success(record_envelope, success_metadata={})
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})
else:
raise
else:
# execute synchronously
try:
(start, end) = self.emitter.emit(record)
write_callback.on_success(record_envelope, success_metadata={})
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})

def get_report(self) -> SinkReport:
return self.report
Expand Down

0 comments on commit 38ac85f

Please sign in to comment.