Skip to content

Commit

Permalink
Merge pull request #371 from rcypher-databricks/dlt
Browse files Browse the repository at this point in the history
Cleanup logging/error messages
  • Loading branch information
rcypher-databricks committed Jun 21, 2023
2 parents a64918c + 642d917 commit 372e934
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,15 +501,19 @@ def pollRefreshPipeline(

pipeline_id = _get_table_view_pipeline_id(host, headers, model_name)
pipeline = _get_pipeline_state(host, headers, pipeline_id)
# get the most recently created update for the pipeline
latest_update = _find_update(pipeline)
if not latest_update:
raise dbt.exceptions.DbtRuntimeError(f"Nod update created for pipeline: {pipeline_id}")
raise dbt.exceptions.DbtRuntimeError(f"No update created for pipeline: {pipeline_id}")

state = latest_update.get("state")
# we use update_id to retrieve the update in the polling loop
update_id = latest_update.get("update_id", "")
prev_state = state

print(f"refreshing {model_name}, pipeline: {pipeline_id}, update: {update_id} {state}")
logger.info(
f"refreshing {model_name}, pipeline: {pipeline_id}, update: {update_id} {state}"
)

start = time.time()
exceeded_timeout = False
Expand All @@ -522,6 +526,7 @@ def pollRefreshPipeline(
time.sleep(polling_interval)

pipeline = _get_pipeline_state(host, headers, pipeline_id)
# get the update we are currently polling
update = _find_update(pipeline, update_id)
if not update:
raise dbt.exceptions.DbtRuntimeError(
Expand All @@ -530,7 +535,7 @@ def pollRefreshPipeline(

state = update.get("state")
if state != prev_state:
print(
logger.info(
f"refreshing {model_name}, pipeline: {pipeline_id}, update: {update_id} {state}"
)
prev_state = state
Expand All @@ -541,10 +546,12 @@ def pollRefreshPipeline(
if msg:
logger.error(msg)

# another update may have been created due to retry_on_fail settings
# get the latest update and see if it is a new one
latest_update = _find_update(pipeline)
if not latest_update:
raise dbt.exceptions.DbtRuntimeError(
f"Nod update created for pipeline: {pipeline_id}"
f"No update created for pipeline: {pipeline_id}"
)

latest_update_id = latest_update.get("update_id", "")
Expand All @@ -557,10 +564,10 @@ def pollRefreshPipeline(

if state == "FAILED":
msg = _get_update_error_msg(host, headers, pipeline_id, update_id)
raise dbt.exceptions.DbtRuntimeError(f"error refreshing {model_name} {msg}")
raise dbt.exceptions.DbtRuntimeError(f"error refreshing model {model_name} {msg}")

if state == "CANCELED":
raise dbt.exceptions.DbtRuntimeError(f"refreshing {model_name} cancelled")
raise dbt.exceptions.DbtRuntimeError(f"refreshing model {model_name} cancelled")

return

Expand Down Expand Up @@ -873,26 +880,26 @@ def _should_poll_refresh(sql: str) -> Tuple[bool, str]:


def _get_table_view_pipeline_id(host: str, headers: dict, name: str) -> str:
url = f"https://{host}/api/2.1/unity-catalog/tables/{name}"
resp1 = requests.get(url, headers=headers)
table_url = f"https://{host}/api/2.1/unity-catalog/tables/{name}"
resp1 = requests.get(table_url, headers=headers)
if resp1.status_code != 200:
raise dbt.exceptions.DbtRuntimeError(
f"Error getting materialized view/streaming table info: {name}"
f"Error getting info for materialized view/streaming table: {name}"
)

pipeline_id = resp1.json().get("pipeline_id", "")
if not pipeline_id:
raise dbt.exceptions.DbtRuntimeError(
f"Error materialized view/streaming table {name} does not have a pipeline id"
f"Materialized view/streaming table {name} does not have a pipeline id"
)

return pipeline_id


def _get_pipeline_state(host: str, headers: dict, pipeline_id: str) -> dict:
url = f"https://{host}/api/2.0/pipelines/{pipeline_id}"
pipeline_url = f"https://{host}/api/2.0/pipelines/{pipeline_id}"

response = requests.get(url, headers=headers)
response = requests.get(pipeline_url, headers=headers)
if response.status_code != 200:
raise dbt.exceptions.DbtRuntimeError(f"Error getting pipeline info: {pipeline_id}")

Expand Down

0 comments on commit 372e934

Please sign in to comment.