Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions lineage/bigquery_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Optional
from lineage.query_context import QueryContext
from sqllineage.exceptions import SQLLineageException
from lineage.table_resolver import TableResolver
from lineage.query import Query
from lineage.utils import get_logger

logger = get_logger(__name__)


class BigQueryQuery(Query):

EMPTY_QUERY_TYPE = ''
DROP_PREFIX = 'DROP'
ALTER_PREFIX = 'ALTER'
PLATFORM_TYPE = 'BIGQUERY'

@staticmethod
def from_dict(query_dict: dict):
query_context = QueryContext.from_dict(query_dict.pop('query_context'))
if 'platform_type' in query_dict:
query_dict.pop('platform_type')
return BigQueryQuery(**query_dict, query_context=query_context)

@staticmethod
def _parse_table_json_column(table_resolver: TableResolver, table_json_column: dict) -> Optional[str]:
if table_json_column is None:
return None

project = table_json_column.get('project_id')
dataset = table_json_column.get('dataset_id')
table = table_json_column.get('table_id')

if project is None or dataset is None or table is None:
return None

if table.startswith('anon'):
return None

return table_resolver.name_qualification(f'{project}.{dataset}.{table}')

def parse(self, full_table_names: bool = False) -> None:
try:
table_resolver = TableResolver(profile_database_name=self._profile_database_name,
profile_schema_name=self._profile_schema_name,
full_table_names=full_table_names)

target_table = self._parse_table_json_column(table_resolver, self._query_context.destination_table)
source_tables = set()
for referenced_table in self._query_context.referenced_tables:
source_table = self._parse_table_json_column(table_resolver, referenced_table)
source_tables.add(source_table)

query_type = self.EMPTY_QUERY_TYPE
if self._query_context.query_type is not None:
query_type = self._query_context.query_type

if query_type.startswith(self.DROP_PREFIX):
self.dropped_tables.add(target_table)
elif query_type.startswith(self.ALTER_PREFIX):
_, _, self.renamed_tables, _ = \
self._parse_query_text(table_resolver, self._raw_query_text)
else:
self.source_tables = source_tables
self.target_tables.add(target_table)
except SQLLineageException as exc:
logger.debug(f'SQLLineageException was raised while parsing this query -\n{self._raw_query_text}\n'
f'Error was -\n{exc}.')

96 changes: 96 additions & 0 deletions lineage/bigquery_query_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from typing import Optional

from google.cloud import bigquery
from lineage.bigquery_query import BigQueryQuery
from lineage.query import Query
from lineage.query_context import QueryContext
from lineage.query_history import QueryHistory
from lineage.utils import get_logger
from datetime import datetime

logger = get_logger(__name__)


class BigQueryQueryHistory(QueryHistory):
INFORMATION_SCHEMA_QUERY_HISTORY = """
SELECT query, end_time, dml_statistics.inserted_row_count + dml_statistics.updated_row_count, statement_type,
user_email, destination_table, referenced_tables

FROM region-{location}.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE
project_id = @project_id
AND creation_time BETWEEN @start_time AND {creation_time_range_end_expr}
AND end_time BETWEEN @start_time AND {end_time_range_end_expr}
AND job_type = "QUERY"
AND state = "DONE"
AND error_result is NULL
AND query NOT like '%JOBS_BY_PROJECT%'
ORDER BY end_time
"""
INFO_SCHEMA_END_TIME_UP_TO_CURRENT_TIMESTAMP = 'CURRENT_TIMESTAMP()'
INFO_SCHEMA_END_TIME_UP_TO_PARAMETER = '@end_time'

def __init__(self, con, should_export_query_history: bool = True, ignore_schema: bool = False,
dataset: str = None) -> None:
self.dataset = dataset
super().__init__(con, should_export_query_history, ignore_schema)

@classmethod
def _build_history_query(cls, start_date: datetime, end_date: datetime, database_name: str, location: str) -> \
(str, []):
query_parameters = [bigquery.ScalarQueryParameter("project_id", "STRING", database_name),
bigquery.ScalarQueryParameter("start_time", "TIMESTAMP", start_date)]

end_time_range_end_expr = cls.INFO_SCHEMA_END_TIME_UP_TO_CURRENT_TIMESTAMP
creation_time_range_end_expr = cls.INFO_SCHEMA_END_TIME_UP_TO_CURRENT_TIMESTAMP
if end_date is not None:
query_parameters.append(bigquery.ScalarQueryParameter("end_time",
"TIMESTAMP",
cls._include_end_date(end_date)))
end_time_range_end_expr = cls.INFO_SCHEMA_END_TIME_UP_TO_PARAMETER
creation_time_range_end_expr = cls.INFO_SCHEMA_END_TIME_UP_TO_PARAMETER

query = cls.INFORMATION_SCHEMA_QUERY_HISTORY.format(location=location,
creation_time_range_end_expr=
creation_time_range_end_expr,
end_time_range_end_expr=
end_time_range_end_expr)
return query, query_parameters

def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Query]:
database_name = self.get_database_name()
query, query_parameters = self._build_history_query(start_date, end_date, database_name, self._con.location)

job_config = bigquery.QueryJobConfig(
query_parameters=query_parameters
)

job = self._con.query(query, job_config=job_config)

logger.debug("Finished executing bigquery jobs history query")

queries = []
rows = job.result()
for row in rows:
query_context = QueryContext(query_time=row[1],
query_volume=row[2],
query_type=row[3],
user_name=row[4],
destination_table=row[5],
referenced_tables=row[6])

query = BigQueryQuery(raw_query_text=row[0],
query_context=query_context,
profile_database_name=database_name,
profile_schema_name=self.get_schema_name())

queries.append(query)
logger.debug("Finished fetching bigquery history job results")

return queries

def get_database_name(self) -> str:
return self._con.project

def get_schema_name(self) -> Optional[str]:
return self.dataset if not self._ignore_schema else None
23 changes: 22 additions & 1 deletion lineage/dbt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import dbt.config
from dbt.context.base import generate_base_context
from dbt.exceptions import DbtConfigError

from dbt.adapters.bigquery.connections import BigQueryConnectionManager
import google.cloud.bigquery
import google.cloud.exceptions
from google.api_core import client_info
from lineage.exceptions import ConfigError
from lineage.utils import get_logger

Expand Down Expand Up @@ -31,3 +34,21 @@ def extract_credentials_and_data_from_profiles(profiles_dir: str, profile_name:
logger.debug(f"Failed parsing selected profile - {profiles_dir}, {profile_name}, {exc}")
raise ConfigError(f"Failed parsing selected profile - {profiles_dir}, {profile_name}")


def get_bigquery_client(profile_credentials):
if profile_credentials.impersonate_service_account:
creds = \
BigQueryConnectionManager.get_impersonated_bigquery_credentials(profile_credentials)
else:
creds = BigQueryConnectionManager.get_bigquery_credentials(profile_credentials)

database = profile_credentials.database
location = getattr(profile_credentials, 'location', None)

info = client_info.ClientInfo(user_agent=f'elementary')
return google.cloud.bigquery.Client(
database,
creds,
location=location,
client_info=info,
)
6 changes: 6 additions & 0 deletions lineage/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ class ConfigError(Error):
"""Exception raised for errors in the profiles configuration"""
def __init__(self, message):
self.message = message


class SerializationError(Error):
"""Exception raised for errors during serialization / deserialization"""
def __init__(self, message):
self.message = message
123 changes: 23 additions & 100 deletions lineage/lineage_graph.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import itertools
from typing import Optional

import networkx as nx
import sqlparse
from lineage.exceptions import ConfigError
from sqllineage.core import LineageAnalyzer, LineageResult
from sqllineage.exceptions import SQLLineageException
from pyvis.network import Network
import webbrowser

from lineage.query_context import QueryContext
from lineage.query import Query
from lineage.utils import get_logger
from sqllineage.models import Schema, Table
from tqdm import tqdm
import pkg_resources

Expand Down Expand Up @@ -70,83 +64,25 @@ class LineageGraph(object):
SELECTED_NODE_COLOR = '#0925C7'
SELECTED_NODE_TITLE = 'Selected table<br/>'

def __init__(self, profile_database_name: str, profile_schema_name: str = None, show_isolated_nodes: bool = False,
full_table_names: bool = False) -> None:
def __init__(self, show_isolated_nodes: bool = False, show_full_table_names: bool = False) -> None:
self._lineage_graph = nx.DiGraph()
self._show_isolated_nodes = show_isolated_nodes
self._profile_database_name = profile_database_name
self._profile_schema_name = profile_schema_name
self._show_full_table_name = full_table_names

@staticmethod
def _parse_query(query: str) -> [LineageResult]:
parsed_query = sqlparse.parse(query.strip())
analyzed_statements = [LineageAnalyzer().analyze(statement) for statement in parsed_query
if statement.token_first(skip_cm=True, skip_ws=True)]
return analyzed_statements
self._show_full_table_names = show_full_table_names

@staticmethod
def _resolve_table_qualification(table: Table, database_name: str, schema_name: str) -> Table:
if not table.schema:
if database_name is not None and schema_name is not None:
table.schema = Schema(f'{database_name}.{schema_name}')
else:
parsed_query_schema_name = str(table.schema)
if '.' not in parsed_query_schema_name:
# Resolved schema is either empty or fully qualified with db_name.schema_name
if database_name is not None:
table.schema = Schema(f'{database_name}.{parsed_query_schema_name}')
else:
table.schema = Schema()
return table

def _should_ignore_table(self, table: Table) -> bool:
if self._profile_schema_name is not None:
if str(table.schema) == str(Schema(f'{self._profile_database_name}.{self._profile_schema_name}')):
return False
else:
if str(Schema(self._profile_database_name)) in str(table.schema):
return False
def _update_lineage_graph(self, query: Query) -> None:
query.parse(self._show_full_table_names)

return True
# Handle drop tables, if they exist in the statement
for dropped_table_name in query.dropped_tables:
self._remove_node(dropped_table_name)

def _name_qualification(self, table: Table, database_name: str, schema_name: str) -> Optional[str]:
table = self._resolve_table_qualification(table, database_name, schema_name)
# Handle rename tables
for old_table_name, new_table_name in query.renamed_tables:
self._rename_node(old_table_name, new_table_name)

if self._should_ignore_table(table):
return None
self._add_nodes_and_edges(query.source_tables, query.target_tables, query.get_context_as_html())

if self._show_full_table_name:
return str(table)

return str(table).rsplit('.', 1)[-1]

def _update_lineage_graph(self, analyzed_statements: [LineageResult], query_context: QueryContext) -> None:
database_name = query_context.queried_database
schema_name = query_context.queried_schema
for analyzed_statement in analyzed_statements:
# Handle drop tables, if they exist in the statement
dropped_tables = analyzed_statement.drop
for dropped_table in dropped_tables:
dropped_table_name = self._name_qualification(dropped_table, database_name, schema_name)
self._remove_node(dropped_table_name)

# Handle rename tables
renamed_tables = analyzed_statement.rename
for old_table, new_table in renamed_tables:
old_table_name = self._name_qualification(old_table, database_name, schema_name)
new_table_name = self._name_qualification(new_table, database_name, schema_name)
self._rename_node(old_table_name, new_table_name)

# sqllineage lib marks CTEs as intermediate tables. Remove CTEs (WITH statements) from the source tables.
sources = {self._name_qualification(source, database_name, schema_name)
for source in analyzed_statement.read - analyzed_statement.intermediate}
targets = {self._name_qualification(target, database_name, schema_name)
for target in analyzed_statement.write}

self._add_nodes_and_edges(sources, targets, query_context)

def _add_nodes_and_edges(self, sources: {str}, targets: {str}, query_context: QueryContext) -> None:
def _add_nodes_and_edges(self, sources: {str}, targets: {str}, query_context_html: str) -> None:
if None in sources:
sources.remove(None)
if None in targets:
Expand All @@ -160,10 +96,10 @@ def _add_nodes_and_edges(self, sources: {str}, targets: {str}, query_context: Qu
self._lineage_graph.add_nodes_from(sources)
elif len(targets) > 0 and len(sources) == 0:
if self._show_isolated_nodes:
self._lineage_graph.add_nodes_from(targets, title=query_context.to_html())
self._lineage_graph.add_nodes_from(targets, title=query_context_html)
else:
self._lineage_graph.add_nodes_from(sources)
self._lineage_graph.add_nodes_from(targets, title=query_context.to_html())
self._lineage_graph.add_nodes_from(targets, title=query_context_html)
for source, target in itertools.product(sources, targets):
self._lineage_graph.add_edge(source, target)

Expand Down Expand Up @@ -194,43 +130,30 @@ def _remove_node(self, node: str) -> None:
if self._lineage_graph.has_node(predecessor) and self._lineage_graph.degree(predecessor) == 0:
self._lineage_graph.remove_node(predecessor)

def init_graph_from_query_list(self, queries: [tuple]) -> None:
def init_graph_from_query_list(self, queries: [Query]) -> None:
logger.debug(f'Loading {len(queries)} queries into the lineage graph')
for query, query_context in tqdm(queries, desc="Updating lineage graph", colour='green'):
try:
analyzed_statements = self._parse_query(query)
except SQLLineageException as exc:
logger.debug(f'SQLLineageException was raised while parsing this query -\n{query}\n'
f'Error was -\n{exc}.')
continue

self._update_lineage_graph(analyzed_statements, query_context)
for query in tqdm(queries, desc="Updating lineage graph", colour='green'):
self._update_lineage_graph(query)

logger.debug(f'Finished updating lineage graph!')

def filter_on_table(self, selected_table: str, direction: str = None, depth: int = None) -> None:
logger.debug(f'Filtering lineage graph on table - {selected_table}')
resolved_selected_table_name = self._name_qualification(Table(selected_table), self._profile_database_name,
self._profile_schema_name)
logger.debug(f'Qualified table name - {resolved_selected_table_name}')
if resolved_selected_table_name is None:
raise ConfigError(f'Could not resolve table name - {selected_table}, please make sure to '
f'specify a table name that exists in the database configured in your profiles file.')

if direction == self.DOWNSTREAM_DIRECTION:
self._lineage_graph = self._downstream_graph(resolved_selected_table_name, depth)
self._lineage_graph = self._downstream_graph(selected_table, depth)
elif direction == self.UPSTREAM_DIRECTION:
self._lineage_graph = self._upstream_graph(resolved_selected_table_name, depth)
self._lineage_graph = self._upstream_graph(selected_table, depth)
elif direction == self.BOTH_DIRECTIONS:
downstream_graph = self._downstream_graph(resolved_selected_table_name, depth)
upstream_graph = self._upstream_graph(resolved_selected_table_name, depth)
downstream_graph = self._downstream_graph(selected_table, depth)
upstream_graph = self._upstream_graph(selected_table, depth)
self._lineage_graph = nx.compose(upstream_graph, downstream_graph)
else:
raise ConfigError(f'Direction must be one of the following - {self.UPSTREAM_DIRECTION}|'
f'{self.DOWNSTREAM_DIRECTION}|{self.BOTH_DIRECTIONS}, '
f'Got - {direction} instead.')

self._update_selected_node_attributes(resolved_selected_table_name)
self._update_selected_node_attributes(selected_table)
logger.debug(f'Finished filtering lineage graph on table - {selected_table}')
pass

Expand Down
Loading