Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(redshift): Improve redshift error handling with new structured reporting system #10870

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1216,8 +1216,22 @@ def _generate_single_profile(
except Exception as e:
if not self.config.catch_exceptions:
raise e
logger.exception(f"Encountered exception while profiling {pretty_name}")
self.report.report_warning(pretty_name, f"Profiling exception {e}")

error_message = str(e).lower()
if "permission denied" in error_message:
self.report.warning(
title="Unauthorized to extract data profile statistics",
message="We were denied access while attempting to generate profiling statistics for some assets. Please ensure the provided user has permission to query these tables and views.",
context=f"Asset: {pretty_name}",
exc=e,
)
else:
self.report.warning(
title="Failed to extract statistics for some assets",
message="Caught unexpected exception while attempting to extract profiling statistics for some assets.",
context=f"Asset: {pretty_name}",
exc=e,
)
return None
finally:
if batch is not None and self.base_engine.engine.name == TRINO:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from typing import Callable, Iterable, TypeVar, Union

import redshift_connector
from typing_extensions import ParamSpec

from datahub.ingestion.source.redshift.report import RedshiftReport

T = TypeVar("T")
P = ParamSpec("P")


def handle_redshift_exceptions(
report: RedshiftReport,
func: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> Union[T, None]:
try:
return func(*args, **kwargs)
except redshift_connector.Error as e:
report_redshift_failure(report, e)
return None


def handle_redshift_exceptions_yield(
report: RedshiftReport,
func: Callable[P, Iterable[T]],
*args: P.args,
**kwargs: P.kwargs,
) -> Iterable[T]:
try:
yield from func(*args, **kwargs)
except redshift_connector.Error as e:
report_redshift_failure(report, e)


def report_redshift_failure(
report: RedshiftReport, e: redshift_connector.Error
) -> None:
error_message = str(e).lower()
if "permission denied" in error_message:
if "svv_table_info" in error_message:
report.report_failure(
title="Permission denied",
message="Failed to extract metadata due to insufficient permission to access 'svv_table_info' table. Please ensure the provided database user has access.",
exc=e,
)
elif "svl_user_info" in error_message:
report.report_failure(
title="Permission denied",
message="Failed to extract metadata due to insufficient permission to access 'svl_user_info' table. Please ensure the provided database user has access.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a huge fan of this pattern - ideally error handling code should be close to the code that causes the errors, especially for these sorts of cases where the error message is extremely custom

For the generic permission denied / failed to extract metadata, they're fine here. But the svv and svl table ones feel like they should be caught closer to the cause of the error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These happen in like 3 places across lineage, usage, and just normal table stuff.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the code is a mess in there so I think this fallback is okay.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I agree though that this is too much of a fallback, but in this case it would be a pretty significant undertaking to cover each place we are issuing queries via the client object

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(And this covered the cases I tested with a new sample user who I progressively added privileges for)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also see I'm just catching the "redshift" connector errors here as of now, also. So it's fairly targeted to mean that the exception came from a redshift connector call.

exc=e,
)
else:
report.report_failure(
title="Permission denied",
message="Failed to extract metadata due to insufficient permissions.",
exc=e,
)
else:
report.report_failure(
title="Failed to extract some metadata",
message="Failed to extract some metadata from Redshift.",
exc=e,
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import collections
import logging
import traceback
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Union

import redshift_connector
Expand Down Expand Up @@ -241,8 +240,10 @@ def _populate_lineage_agg(
processor(lineage_row)
except Exception as e:
self.report.warning(
f"lineage-v2-extract-{lineage_type.name}",
f"Error was {e}, {traceback.format_exc()}",
title="Failed to extract some lineage",
message=f"Failed to extract lineage of type {lineage_type.name}",
context=f"Query: '{query}'",
exc=e,
)
self._lineage_v1.report_status(f"extract-{lineage_type.name}", False)

Expand Down Expand Up @@ -409,3 +410,9 @@ def _process_external_tables(
def generate(self) -> Iterable[MetadataWorkUnit]:
for mcp in self.aggregator.gen_metadata():
yield mcp.as_workunit()
if len(self.aggregator.report.observed_query_parse_failures) > 0:
self.report.report_failure(
title="Failed to extract some SQL lineage",
message="Unexpected error(s) while attempting to extract lineage from SQL queries. See the full logs for more details.",
context=f"Query Parsing Failures: {self.aggregator.report.observed_query_parse_failures}",
)
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,26 @@ def get_workunits(
if not self.config.schema_pattern.allowed(schema):
continue
for table in tables[db].get(schema, {}):
if (
not self.config.profiling.profile_external_tables
and table.type == "EXTERNAL_TABLE"
):
self.report.profiling_skipped_other[schema] += 1
logger.info(
f"Skipping profiling of external table {db}.{schema}.{table.name}"
)
continue
if table.type == "EXTERNAL_TABLE":
if not self.config.profiling.profile_external_tables:
# Case 1: If user did not tell us to profile external tables, simply log this.
self.report.profiling_skipped_other[schema] += 1
logger.info(
f"Skipping profiling of external table {db}.{schema}.{table.name}"
)
# Continue, since we should not profile this table.
continue
elif self.config.profiling.profile_table_level_only:
# Case 2: User DID tell us to profile external tables, but only at the table level.
# Currently, we do not support this combination. The user needs to also set
# profile_table_level_only to False in order to profile.
self.report.report_warning(
title="Skipped profiling for external tables",
message="External tables are not supported for profiling when 'profile_table_level_only' config is set to 'True'. Please set 'profile_table_level_only' to 'False' in order to profile external Redshift tables.",
context=f"External Table: {db}.{schema}.{table.name}",
)
# Continue, since we were unable to retrieve cheap profiling stats from svv_table_info.
continue
# Emit the profile work unit
profile_request = self.get_profile_request(table, schema, db)
if profile_request is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def list_tables(
else:
return f"{tables_query} UNION {external_tables_query}"

# Why is this unused. Is this a bug?
list_columns: str = """
SELECT
n.nspname as "schema",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
DatasetSubTypes,
)
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.exception import handle_redshift_exceptions_yield
from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor
from datahub.ingestion.source.redshift.lineage_v2 import RedshiftSqlLineageV2
from datahub.ingestion.source.redshift.profile import RedshiftProfiler
Expand Down Expand Up @@ -411,17 +412,33 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
]

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
connection = RedshiftSource.get_redshift_connection(self.config)
connection = self._try_get_redshift_connection(self.config)

if connection is None:
# If we failed to establish a connection, short circuit the connector.
return

database = self.config.database
logger.info(f"Processing db {database}")
self.report.report_ingestion_stage_start(METADATA_EXTRACTION)
self.db_tables[database] = defaultdict()
self.db_views[database] = defaultdict()
self.db_schemas.setdefault(database, {})

# TODO: Ideally, we'd push down exception handling to the place where the connection is used, as opposed to keeping
# this fallback. For now, this gets us broad coverage quickly.
yield from handle_redshift_exceptions_yield(
self.report, self._extract_metadata, connection, database
)

def _extract_metadata(
self, connection: redshift_connector.Connection, database: str
) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:

yield from self.gen_database_container(
database=database,
)

self.cache_tables_and_views(connection, database)

self.report.tables_in_mem_size[database] = humanfriendly.format_size(
Expand Down Expand Up @@ -556,6 +573,7 @@ def process_schema(
):
for table in self.db_tables[schema.database][schema.name]:
table.columns = schema_columns[schema.name].get(table.name, [])
table.column_count = len(table.columns)
table_wu_generator = self._process_table(
table, database=database
)
Expand All @@ -575,8 +593,10 @@ def process_schema(
f"Table processed: {schema.database}.{schema.name}.{table.name}"
)
else:
logger.info(
f"No tables in cache for {schema.database}.{schema.name}, skipping"
self.report.info(
title="No tables found in some schemas",
message="No tables found in some schemas. This may be due to insufficient privileges for the provided user.",
context=f"Schema: {schema.database}.{schema.name}",
)
else:
logger.info("Table processing disabled, skipping")
Expand All @@ -589,6 +609,7 @@ def process_schema(
):
for view in self.db_views[schema.database][schema.name]:
view.columns = schema_columns[schema.name].get(view.name, [])
view.column_count = len(view.columns)
yield from self._process_view(
table=view, database=database, schema=schema
)
Expand All @@ -603,8 +624,10 @@ def process_schema(
f"Table processed: {schema.database}.{schema.name}.{view.name}"
)
else:
logger.info(
f"No views in cache for {schema.database}.{schema.name}, skipping"
self.report.info(
title="No views found in some schemas",
message="No views found in some schemas. This may be due to insufficient privileges for the provided user.",
context=f"Schema: {schema.database}.{schema.name}",
)
else:
logger.info("View processing disabled, skipping")
Expand Down Expand Up @@ -1092,3 +1115,43 @@ def add_config_to_report(self):
self.config.start_time,
self.config.end_time,
)

def _try_get_redshift_connection(
self,
config: RedshiftConfig,
) -> Optional[redshift_connector.Connection]:
try:
return RedshiftSource.get_redshift_connection(config)
except redshift_connector.Error as e:
error_message = str(e).lower()
if "password authentication failed" in error_message:
self.report.report_failure(
title="Invalid credentials",
message="Failed to connect to Redshift. Please verify your username, password, and database.",
exc=e,
)
elif "timeout" in error_message:
self.report.report_failure(
title="Unable to connect",
message="Failed to connect to Redshift. Please verify your host name and port number.",
exc=e,
)
elif "communication error" in error_message:
self.report.report_failure(
title="Unable to connect",
message="Failed to connect to Redshift. Please verify that the host name is valid and reachable.",
exc=e,
)
elif "database" in error_message and "does not exist" in error_message:
self.report.report_failure(
title="Database does not exist",
message="Failed to connect to Redshift. Please verify that the provided database exists and the provided user has access to it.",
exc=e,
)
else:
self.report.report_failure(
title="Unable to connect",
message="Failed to connect to Redshift. Please verify your connection details.",
exc=e,
)
return None
Loading
Loading