-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingest/bigquery) Lowering significantly the memory usage of the BigQuery connector #7315
fix(ingest/bigquery) Lowering significantly the memory usage of the BigQuery connector #7315
Conversation
Adding blackhole sink for testing
@@ -576,6 +576,7 @@ def get_long_description(): | |||
"datahub.ingestion.sink.plugins": [ | |||
"file = datahub.ingestion.sink.file:FileSink", | |||
"console = datahub.ingestion.sink.console:ConsoleSink", | |||
"blackhole = datahub.ingestion.sink.blackhole:BlackHoleSink", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty fun :) DevNull!
mem_usage = psutil.Process(os.getpid()).memory_info().rss | ||
if self._peek_memory_usage < mem_usage: | ||
self._peek_memory_usage = mem_usage | ||
self.peek_memory_usage = humanfriendly.format_size(self._peek_memory_usage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - this is SUPER useful since logs get cutoff
] | ||
for dataset in self.db_views[project_id]: | ||
for dataset in self.db_views.keys(): | ||
tables[dataset].extend( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq -- is there a reason we are adding VIEWS into a structure called TABLES?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just for simplicity to pass in all the table/view names and validate usage with it. Maybe I should add some more expressive names.
BigqueryTableIdentifier( | ||
project_id, dataset, table.name | ||
).get_table_name() | ||
for table in self.db_views[dataset] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for view in self.db_views?
@@ -692,25 +696,50 @@ def _process_schema( | |||
project_id, | |||
) | |||
|
|||
columns = BigQueryDataDictionary.get_columns_for_dataset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a dictionary? Of table name to columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh this fetches ALL columns for ALL tables in the dataset. very interesting.
view, view_columns, project_id, dataset_name | ||
) | ||
|
||
# This methode is used to generate the ignore list for datatypes the profiler doesn't support we have to do it here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: method
logger.warning( | ||
f"Table doesn't have any column or unable to get columns for table: {table_identifier}" | ||
) | ||
|
||
yield from self.gen_table_dataset_workunits(table, project_id, schema_name) | ||
# If table has time partitioning, set the data type of the partitioning field | ||
if table.time_partitioning: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we now need this??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bigquery in the table metadata returns the Partition column's name but not the data type. We have to look up the list of columns.
yield from self._process_table(conn, table, project_id, dataset_name) | ||
tables = self.get_tables_for_dataset(conn, project_id, dataset_name) | ||
for table in tables: | ||
table_columns = columns.get(table.name, []) if columns else [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice handling!
table_identifier.project_id, | ||
table_identifier.dataset, | ||
) not in self.schema_columns.keys(): | ||
columns = BigQueryDataDictionary.get_columns_for_dataset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this making a full call for EVERY table before?
dataset_name=table_identifier.dataset, | ||
column_limit=column_limit, | ||
) | ||
self.schema_columns[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see we were caching this. Wow! All columns cached here. :(
|
||
if table.time_partitioning.type_ in ("HOUR", "DAY", "MONTH", "YEAR"): | ||
partition_where_clause = f"{partition_column_type}(`{table.time_partitioning.field}`) BETWEEN {partition_column_type}('{partition_datetime}') AND {partition_column_type}('{upper_bound_partition_datetime}')" | ||
if table.time_partitioning.type in ("HOUR", "DAY", "MONTH", "YEAR"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ensure we have sufficient try: except around this method. There are a LOT of new changes and it makes me quite nervous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wouldn't worry about this now as here we have typed objects with non-optional properties, and earlier, we used Bigquery's object, which was backed by a dict. (that is where the key
error happened because it's to string method failed)
normalized_table_name = BigqueryTableIdentifier( | ||
project_id=project, dataset=dataset, table=table.name | ||
).get_table_name() | ||
for column in table.columns: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm glad we are better organizing this.
@@ -257,7 +247,7 @@ def get_bigquery_profile_request( | |||
+ 1 | |||
) | |||
|
|||
if not table.columns: | |||
if not table.column_count: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice...
metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
self._peek_memory_usage = mem_usage | ||
self.peek_memory_usage = humanfriendly.format_size(self._peek_memory_usage) | ||
|
||
self.mem_info = humanfriendly.format_size(self._peek_memory_usage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be on peek memory usage
# These sinks are always enabled | ||
assert sink_registry.get("console") | ||
assert sink_registry.get("file") | ||
assert sink_registry.get("blackhole") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a bad merge - we don't need these lines
Adding blackhole sink for testing
Checklist