Skip to content

Commit

Permalink
fix(ingest/bigquery): Lowering significantly the memory usage of the …
Browse files Browse the repository at this point in the history
…BigQuery connector (datahub-project#7315)
  • Loading branch information
treff7es authored and Oleg Ruban committed Feb 28, 2023
1 parent 9c2cacd commit 70009cd
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 157 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ def get_long_description():
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
"console = datahub.ingestion.sink.console:ConsoleSink",
"blackhole = datahub.ingestion.sink.blackhole:BlackHoleSink",
"datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink",
"datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink",
"datahub-lite = datahub.ingestion.sink.datahub_lite:DataHubLiteSink",
Expand Down
10 changes: 7 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,15 @@ class CliReport(Report):
py_version: str = sys.version
py_exec_path: str = sys.executable
os_details: str = platform.platform()
_peek_memory_usage: int = 0

def compute_stats(self) -> None:
self.mem_info = humanfriendly.format_size(
psutil.Process(os.getpid()).memory_info().rss
)
mem_usage = psutil.Process(os.getpid()).memory_info().rss
if self._peek_memory_usage < mem_usage:
self._peek_memory_usage = mem_usage
self.peek_memory_usage = humanfriendly.format_size(self._peek_memory_usage)

self.mem_info = humanfriendly.format_size(self._peek_memory_usage)
return super().compute_stats()


Expand Down
16 changes: 16 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/sink/blackhole.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback

logger = logging.getLogger(__name__)


class BlackHoleSink(Sink[ConfigModel, SinkReport]):
def write_record_async(
self, record_envelope: RecordEnvelope, write_callback: WriteCallback
) -> None:
if write_callback:
self.report.report_record_written(record_envelope)
write_callback.on_success(record_envelope, {})
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ def _check_sink_classes(cls: Type[Sink]) -> None:

sink_registry = PluginRegistry[Sink](extra_cls_check=_check_sink_classes)
sink_registry.register_from_entrypoint("datahub.ingestion.sink.plugins")

# These sinks are always enabled
assert sink_registry.get("console")
assert sink_registry.get("file")
assert sink_registry.get("blackhole")
Loading

0 comments on commit 70009cd

Please sign in to comment.