Skip to content

Commit

Permalink
fix(ingestion): add logging, make job more resilient to errors (#4331)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal committed Mar 7, 2022
1 parent 2903646 commit beb51eb
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 97 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ smoke-test/spark-smoke-test/__pycache__/
metadata-ingestion/generated/**

# docs
docs/generated/
docs/generated/
tmp*
5 changes: 4 additions & 1 deletion metadata-ingestion/source_docs/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ create or replace role datahub_role;

// Grant privileges to use and select from your target warehouses / dbs / schemas / tables
grant operate, usage on warehouse <your-warehouse> to role datahub_role;
grant usage on <your-database> to role datahub_role;
grant usage on DATABASE <your-database> to role datahub_role;
grant usage on all schemas in database <your-database> to role datahub_role;
grant select on all tables in database <your-database> to role datahub_role;
grant select on all external tables in database <your-database> to role datahub_role;
Expand All @@ -27,6 +27,9 @@ grant select on all views in database <your-database> to role datahub_role;
grant usage on future schemas in database "<your-database>" to role datahub_role;
grant select on future tables in database "<your-database>" to role datahub_role;

// Grant privileges on snowflake default database - needed for lineage
grant imported privileges on DATABASE snowflake to role datahub_role;

// Create a new DataHub user and assign the DataHub role to it
create user datahub_user display_name = 'DataHub' password='' default_role = datahub_role default_warehouse = '<your-warehouse>';

Expand Down
9 changes: 7 additions & 2 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,17 @@ def __init__(
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)

def test_connection(self) -> None:
def test_connection(self) -> str:
response = self._session.get(f"{self._gms_server}/config")
if response.status_code == 200:
config: dict = response.json()
if config.get("noCode") == "true":
return
return (
config.get("versions", {})
.get("linkedin/datahub", {})
.get("version", "")
)

else:
# Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error
# A common misconfiguration is connecting to datahub-frontend so we special-case this check
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class SourceReport(Report):

warnings: Dict[str, List[str]] = field(default_factory=dict)
failures: Dict[str, List[str]] = field(default_factory=dict)
cli_version: str = ""

def report_workunit(self, wu: WorkUnit) -> None:
self.workunits_produced += 1
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import click
from pydantic import validator

import datahub
from datahub.configuration.common import (
ConfigModel,
DynamicTypedConfig,
Expand Down Expand Up @@ -178,6 +179,7 @@ def run(self) -> None:

callback = LoggingCallback()
extractor: Extractor = self.extractor_class()
self.source.get_report().cli_version = datahub.nice_version_name()
for wu in itertools.islice(
self.source.get_workunits(), 10 if self.preview_mode else None
):
Expand Down
11 changes: 8 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@ class DatahubRestSinkConfig(DatahubClientConfig):
pass


@dataclass
class DataHubRestSinkReport(SinkReport):
gms_version: str = ""


@dataclass
class DatahubRestSink(Sink):
config: DatahubRestSinkConfig
emitter: DatahubRestEmitter
report: SinkReport
report: DataHubRestSinkReport
treat_errors_as_warnings: bool = False

def __init__(self, ctx: PipelineContext, config: DatahubRestSinkConfig):
super().__init__(ctx)
self.config = config
self.report = SinkReport()
self.report = DataHubRestSinkReport()
self.emitter = DatahubRestEmitter(
self.config.server,
self.config.token,
Expand All @@ -45,7 +50,7 @@ def __init__(self, ctx: PipelineContext, config: DatahubRestSinkConfig):
extra_headers=self.config.extra_headers,
ca_certificate_path=self.config.ca_certificate_path,
)
self.emitter.test_connection()
self.report.gms_version = self.emitter.test_connection()
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.config.max_threads
)
Expand Down
14 changes: 8 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,10 @@ def _compute_bigquery_lineage_via_gcp_logging(
)
self.lineage_metadata = self._create_lineage_map(parsed_entries)
except Exception as e:
logger.error(
"Error computing lineage information using GCP logs.",
e,
self.error(
logger,
"lineage-gcp-logs",
f"Error was {e}",
)

def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
Expand All @@ -385,9 +386,10 @@ def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
)
self.lineage_metadata = self._create_lineage_map(parsed_entries)
except Exception as e:
logger.error(
"Error computing lineage information using exported GCP audit logs.",
e,
self.error(
logger,
"lineage-exported-gcp-audit-logs",
f"Error: {e}",
)

def _make_bigquery_client(
Expand Down
103 changes: 72 additions & 31 deletions metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class SnowflakeReport(SQLSourceReport):
num_view_to_table_edges_scanned: int = 0
num_external_table_edges_scanned: int = 0
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict)
# https://community.snowflake.com/s/topic/0TO0Z000000Unu5WAC/releases
saas_version: str = ""
role: str = ""
role_grants: List[str] = field(default_factory=list)


class BaseSnowflakeConfig(BaseTimeWindowConfig):
Expand Down Expand Up @@ -203,27 +207,59 @@ def create(cls, config_dict, ctx):
config = SnowflakeConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_inspectors(self) -> Iterable[Inspector]:
url = self.config.get_sql_alchemy_url(database=None)
def get_metadata_engine(
self, database: Optional[str] = None
) -> sqlalchemy.engine.Engine:
url = self.config.get_sql_alchemy_url(database=database)
logger.debug(f"sql_alchemy_url={url}")

db_listing_engine = create_engine(
return create_engine(
url,
connect_args=self.config.get_sql_alchemy_connect_args(),
**self.config.options,
)

def inspect_version(self) -> Any:
db_engine = self.get_metadata_engine()
logger.info("Checking current version")
for db_row in db_engine.execute("select CURRENT_VERSION()"):
self.report.saas_version = db_row[0]

def inspect_role_grants(self) -> Any:
db_engine = self.get_metadata_engine()
cur_role = None
if self.config.role is None:
for db_row in db_engine.execute("select CURRENT_ROLE()"):
cur_role = db_row[0]
else:
cur_role = self.config.role

if cur_role is None:
return

self.report.role = cur_role
logger.info(f"Current role is {cur_role}")
if cur_role.lower() == "accountadmin":
return

logger.info(f"Checking grants for role {cur_role}")
for db_row in db_engine.execute(text(f"show grants to role {cur_role}")):
privilege = db_row["privilege"]
granted_on = db_row["granted_on"]
name = db_row["name"]
self.report.role_grants.append(
f"{privilege} granted on {granted_on} {name}"
)

def get_inspectors(self) -> Iterable[Inspector]:
db_listing_engine = self.get_metadata_engine(database=None)

for db_row in db_listing_engine.execute(text("SHOW DATABASES")):
db = db_row.name
if self.config.database_pattern.allowed(db):
# We create a separate engine for each database in order to ensure that
# they are isolated from each other.
self.current_database = db
engine = create_engine(
self.config.get_sql_alchemy_url(database=db),
connect_args=self.config.get_sql_alchemy_connect_args(),
**self.config.options,
)
engine = self.get_metadata_engine(database=db)

with engine.connect() as conn:
inspector = inspect(conn)
Expand Down Expand Up @@ -273,9 +309,11 @@ def _populate_view_upstream_lineage(self, engine: sqlalchemy.engine.Engine) -> N
f"Upstream->View: Lineage[View(Down)={view_name}]:Upstream={view_upstream}"
)
except Exception as e:
logger.warning(
f"Extracting the upstream view lineage from Snowflake failed."
f"Please check your permissions. Continuing...\nError was {e}."
self.warn(
logger,
"view_upstream_lineage",
"Extracting the upstream view lineage from Snowflake failed."
+ f"Please check your permissions. Continuing...\nError was {e}.",
)
logger.info(f"A total of {num_edges} View upstream edges found.")
self.report.num_table_to_view_edges_scanned = num_edges
Expand Down Expand Up @@ -387,9 +425,11 @@ def _populate_view_downstream_lineage(
num_edges += 1

except Exception as e:
logger.warning(
self.warn(
logger,
"view_downstream_lineage",
f"Extracting the view lineage from Snowflake failed."
f"Please check your permissions. Continuing...\nError was {e}."
f"Please check your permissions. Continuing...\nError was {e}.",
)
logger.info(
f"Found {num_edges} View->Table edges. Removed {num_false_edges} false Table->Table edges."
Expand All @@ -399,16 +439,12 @@ def _populate_view_downstream_lineage(
def _populate_view_lineage(self) -> None:
if not self.config.include_view_lineage:
return
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
engine = self.get_metadata_engine(database=None)
self._populate_view_upstream_lineage(engine)
self._populate_view_downstream_lineage(engine)

def _populate_external_lineage(self) -> None:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
engine = self.get_metadata_engine(database=None)
# Handles the case where a table is populated from an external location via copy.
# Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv';
query: str = """
Expand Down Expand Up @@ -464,21 +500,17 @@ def _populate_external_lineage(self) -> None:
)
num_edges += 1
except Exception as e:
logger.warning(
self.warn(
logger,
"external_lineage",
f"Populating external table lineage from Snowflake failed."
f"Please check your premissions. Continuing...\nError was {e}."
f"Please check your premissions. Continuing...\nError was {e}.",
)
logger.info(f"Found {num_edges} external lineage edges.")
self.report.num_external_table_edges_scanned = num_edges

def _populate_lineage(self) -> None:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(
url,
connect_args=self.config.get_sql_alchemy_connect_args(),
**self.config.options,
)
engine = self.get_metadata_engine(database=None)
query: str = """
WITH table_lineage_history AS (
SELECT
Expand Down Expand Up @@ -521,9 +553,11 @@ def _populate_lineage(self) -> None:
f"Lineage[Table(Down)={key}]:Table(Up)={self._lineage_map[key]}"
)
except Exception as e:
logger.warning(
self.warn(
logger,
"lineage",
f"Extracting lineage from Snowflake failed."
f"Please check your premissions. Continuing...\nError was {e}."
f"Please check your premissions. Continuing...\nError was {e}.",
)
logger.info(
f"A total of {num_edges} Table->Table edges found"
Expand Down Expand Up @@ -611,6 +645,13 @@ def _get_upstream_lineage_info(

# Override the base class method.
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
try:
self.inspect_version()
except Exception as e:
self.report.report_failure("version", f"Error: {e}")
return

self.inspect_role_grants()
for wu in super().get_workunits():
if (
self.config.include_table_lineage
Expand Down

0 comments on commit beb51eb

Please sign in to comment.