Skip to content

Commit

Permalink
fix(ingest/snowflake): Improve memory usage of metadata extraction (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored and Oleg Ruban committed Feb 28, 2023
1 parent 91205e1 commit a868257
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 117 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion/scripts/docgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,8 @@ def generate(
i = 0
for platform_id, platform_docs in sorted(
source_documentation.items(),
key=lambda x: (x[1]['name'].casefold(), x[1]['name'])
if 'name' in x[1]
key=lambda x: (x[1]["name"].casefold(), x[1]["name"])
if "name" in x[1]
else (x[0].casefold(), x[0]),
):
if source and platform_id != source:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dataclasses
import logging
from datetime import datetime
from typing import Callable, Iterable, List, Optional, cast
from typing import Callable, Dict, Iterable, List, Optional, cast

from snowflake.sqlalchemy import snowdialect
from sqlalchemy import create_engine, inspect
Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(
self.logger = logger

def get_workunits(
self, databases: List[SnowflakeDatabase]
self, database: SnowflakeDatabase, db_tables: Dict[str, List[SnowflakeTable]]
) -> Iterable[MetadataWorkUnit]:
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
Expand All @@ -62,60 +62,57 @@ def get_workunits(
"max_overflow", self.config.profiling.max_workers
)

for db in databases:
if not self.config.database_pattern.allowed(db.name):
continue
profile_requests = []
for schema in db.schemas:
if not is_schema_allowed(
self.config.schema_pattern,
schema.name,
db.name,
self.config.match_fully_qualified_names,
):
continue
for table in schema.tables:
# Emit the profile work unit
profile_request = self.get_snowflake_profile_request(
table, schema.name, db.name
)
if profile_request is not None:
profile_requests.append(profile_request)

if len(profile_requests) == 0:
profile_requests = []
for schema in database.schemas:
if not is_schema_allowed(
self.config.schema_pattern,
schema.name,
database.name,
self.config.match_fully_qualified_names,
):
continue

table_profile_requests = cast(List[TableProfilerRequest], profile_requests)

for request, profile in self.generate_profiles(
table_profile_requests,
self.config.profiling.max_workers,
db.name,
platform=self.platform,
profiler_args=self.get_profile_args(),
):
if profile is None:
continue
profile.sizeInBytes = cast(
SnowflakeProfilerRequest, request
).table.size_in_bytes
dataset_name = request.pretty_name
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
for table in db_tables[schema.name]:
profile_request = self.get_snowflake_profile_request(
table, schema.name, database.name
)
if profile_request is not None:
profile_requests.append(profile_request)

if len(profile_requests) == 0:
return

table_profile_requests = cast(List[TableProfilerRequest], profile_requests)

# We don't add to the profiler state if we only do table level profiling as it always happens
if self.state_handler:
self.state_handler.add_to_state(
dataset_urn, int(datetime.now().timestamp() * 1000)
)
for request, profile in self.generate_profiles(
table_profile_requests,
self.config.profiling.max_workers,
database.name,
platform=self.platform,
profiler_args=self.get_profile_args(),
):
if profile is None:
continue
profile.sizeInBytes = cast(
SnowflakeProfilerRequest, request
).table.size_in_bytes
dataset_name = request.pretty_name
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)

# We don't add to the profiler state if we only do table level profiling as it always happens
if self.state_handler:
self.state_handler.add_to_state(
dataset_urn, int(datetime.now().timestamp() * 1000)
)

yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=profile
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=profile
).as_workunit()

def get_snowflake_profile_request(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import MutableSet, Optional
from dataclasses import dataclass, field
from typing import Dict, MutableSet, Optional

from datahub.ingestion.source.snowflake.constants import SnowflakeEdition
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.ingestion.source_report.sql.snowflake import SnowflakeReport
from datahub.ingestion.source_report.usage.snowflake_usage import SnowflakeUsageReport


@dataclass
class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlReport):
account_locator: Optional[str] = None
region: Optional[str] = None
Expand All @@ -25,6 +27,12 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlRepor
view_downstream_lineage_query_secs: float = -1
external_lineage_queries_secs: float = -1

# Reports how many times we reset in-memory `functools.lru_cache` caches of data,
# which occurs when we occur a different database / schema.
# Should not be more than the number of databases / schemas scanned.
# Maps (function name) -> (stat_name) -> (stat_value)
lru_cache_info: Dict[str, Dict[str, int]] = field(default_factory=dict)

# These will be non-zero if snowflake information_schema queries fail with error -
# "Information schema query returned too much data. Please repeat query with more selective predicates.""
# This will result in overall increase in time complexity
Expand All @@ -39,8 +47,8 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlRepor

rows_zero_objects_modified: int = 0

_processed_tags: MutableSet[str] = set()
_scanned_tags: MutableSet[str] = set()
_processed_tags: MutableSet[str] = field(default_factory=set)
_scanned_tags: MutableSet[str] = field(default_factory=set)

edition: Optional[SnowflakeEdition] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from functools import lru_cache
from typing import Dict, List, Optional

import pandas as pd
Expand Down Expand Up @@ -95,8 +96,8 @@ class SnowflakeSchema:
created: Optional[datetime]
last_altered: Optional[datetime]
comment: Optional[str]
tables: List[SnowflakeTable] = field(default_factory=list)
views: List[SnowflakeView] = field(default_factory=list)
tables: List[str] = field(default_factory=list)
views: List[str] = field(default_factory=list)
tags: Optional[List[SnowflakeTag]] = None


Expand Down Expand Up @@ -237,6 +238,7 @@ def get_schemas_for_database(self, db_name: str) -> List[SnowflakeSchema]:
snowflake_schemas.append(snowflake_schema)
return snowflake_schemas

@lru_cache(maxsize=1)
def get_tables_for_database(
self, db_name: str
) -> Optional[Dict[str, List[SnowflakeTable]]]:
Expand Down Expand Up @@ -291,6 +293,7 @@ def get_tables_for_schema(
)
return tables

@lru_cache(maxsize=1)
def get_views_for_database(
self, db_name: str
) -> Optional[Dict[str, List[SnowflakeView]]]:
Expand Down Expand Up @@ -338,6 +341,7 @@ def get_views_for_schema(
)
return views

@lru_cache(maxsize=1)
def get_columns_for_schema(
self, schema_name: str, db_name: str
) -> Optional[Dict[str, List[SnowflakeColumn]]]:
Expand Down Expand Up @@ -393,6 +397,7 @@ def get_columns_for_table(
)
return columns

@lru_cache(maxsize=1)
def get_pk_constraints_for_schema(
self, schema_name: str, db_name: str
) -> Dict[str, SnowflakePK]:
Expand All @@ -409,6 +414,7 @@ def get_pk_constraints_for_schema(
constraints[row["table_name"]].column_names.append(row["column_name"])
return constraints

@lru_cache(maxsize=1)
def get_fk_constraints_for_schema(
self, schema_name: str, db_name: str
) -> Dict[str, List[SnowflakeFK]]:
Expand Down
Loading

0 comments on commit a868257

Please sign in to comment.