-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingest/snowflake): Improve memory usage of metadata extraction #7349
fix(ingest/snowflake): Improve memory usage of metadata extraction #7349
Conversation
key=lambda x: (x[1]["name"].casefold(), x[1]["name"]) | ||
if "name" in x[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Result of running black
self.state_handler.add_to_state( | ||
dataset_urn, int(datetime.now().timestamp() * 1000) | ||
) | ||
for request, profile in self.generate_profiles( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No changes below, just changing indentation
|
||
from datahub.ingestion.source.snowflake.constants import SnowflakeEdition | ||
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport | ||
from datahub.ingestion.source_report.sql.snowflake import SnowflakeReport | ||
from datahub.ingestion.source_report.usage.snowflake_usage import SnowflakeUsageReport | ||
|
||
|
||
@dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure we want this on every class, even though it inherits from a dataclass?
_processed_tags: MutableSet[str] = field(default_factory=set) | ||
_scanned_tags: MutableSet[str] = field(default_factory=set) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a result of adding @dataclass
|
||
if self.config.profiling.enabled and len(databases) != 0: | ||
yield from self.profiler.get_workunits(databases) | ||
# TODO: The checkpoint state for stale entity detection can be committed here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just fixing a typo
@@ -504,29 +492,36 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: | |||
yield from self._process_database(snowflake_db) | |||
|
|||
except SnowflakePermissionError as e: | |||
# FIXME - This may break satetful ingestion if new tables than previous run are emitted above | |||
# FIXME - This may break stateful ingestion if new tables than previous run are emitted above |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although this PR is merged, submitting comments as they are still valid. Probably, can be discussed offline and handled in followup PR, if it makes sense.
(I had added these comments earlier but they did not get published as I forgot to click "Submit Review" button at the end.)
Tuple[str, str], Dict[str, List[SnowflakeFK]] | ||
] = {} | ||
# Caches tables for a single database. Consider moving to disk or S3 when possible. | ||
self.db_tables: Dict[str, List[SnowflakeTable]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so db_views is not here since we don't profile views, right ?
Just confirming my understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup! I initially had neither, but had to add this one back for profiling
for snowflake_schema in snowflake_db.schemas: | ||
yield from self._process_schema(snowflake_schema, db_name) | ||
|
||
if self.config.profiling.enabled and self.db_tables: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay - so earlier profiling started after ingesting all basic workunits (schemaMetadata, subTypes, datasetProperties) for tables from all databases.
This change will change that behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since profiling usually takes longer, is it possible to refractor in such a way that profiling happens after ingesting all tables from all databases ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need 2 things out from In the technical schema phase (subtypes, schema, etc)
- list of all tables/views for lineage, usage, operational history etc
2.database wise table metadata listing (size, last updated, number of rows) for profiler
Probably we can create profiler requests in advance and start profiling later ?
# Should not be more than the number of databases / schemas scanned. | ||
# Maps (function name) -> (stat_name) -> (stat_value) | ||
lru_cache_info: Dict[str, Dict[str, int]] = field(default_factory=dict) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
Similar to #7315, attempts to reduce the memory usage of the snowflake connector by remove global data stores.
This differs from the bigquery changes, as I attempt to entirely remove the global stores, instead using the
lru_cache
decorator to cache the results of repeated function calls. We order our metadata extraction queries such that we should be doing no more queries while only caching the results of a single query at a time (vs. caching all the results). I have to keep a single store,db_tables
to support per-database profiling, but make this per-database rather than fully global.This does not include any lineage memory optimization -- I'll do that in a followup PR.
Checklist