Skip to content

Commit

Permalink
Merge pull request #2694 from kconvey/kconvey-retry-upstream
Browse files Browse the repository at this point in the history
Add retry of additional errors
  • Loading branch information
beckjake committed Aug 12, 2020
2 parents 89775fa + 4456872 commit 1bd82d4
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -4,6 +4,7 @@
### Features
- Add support for impersonating a service account using `impersonate_service_account` in the BigQuery profile configuration ([#2677](https://github.com/fishtown-analytics/dbt/issues/2677)) ([docs](https://docs.getdbt.com/reference/warehouse-profiles/bigquery-profile#service-account-impersonation))
- Macros in the current project can override internal dbt macros that are called through `execute_macros`. ([#2301](https://github.com/fishtown-analytics/dbt/issues/2301), [#2686](https://github.com/fishtown-analytics/dbt/pull/2686))
- Add better retry support when using the BigQuery adapter ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), follow-up to [#1963](https://github.com/fishtown-analytics/dbt/pull/1963))


### Breaking changes
Expand All @@ -16,6 +17,7 @@

Contributors:
- [@bbhoss](https://github.com/bbhoss) ([#2677](https://github.com/fishtown-analytics/dbt/pull/2677))
- [@kconvey](https://github.com/kconvey) ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694))

## dbt 0.18.0b2 (July 30, 2020)

Expand Down
29 changes: 26 additions & 3 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
@@ -1,5 +1,6 @@
from contextlib import contextmanager
from dataclasses import dataclass
from requests.exceptions import ConnectionError
from typing import Optional, Any, Dict

import google.auth
Expand All @@ -25,6 +26,18 @@

BQ_QUERY_JOB_SPLIT = '-----Query Job SQL Follows-----'

REOPENABLE_ERRORS = (
ConnectionResetError,
ConnectionError,
)

RETRYABLE_ERRORS = (
google.cloud.exceptions.ServerError,
google.cloud.exceptions.BadRequest,
ConnectionResetError,
ConnectionError,
)


class Priority(StrEnum):
Interactive = 'interactive'
Expand Down Expand Up @@ -390,12 +403,20 @@ def _query_and_results(self, client, sql, conn, job_params, timeout=None):

def _retry_and_handle(self, msg, conn, fn):
"""retry a function call within the context of exception_handler."""
def reopen_conn_on_error(error):
if isinstance(error, REOPENABLE_ERRORS):
logger.warning('Reopening connection after {!r}', error)
self.close(conn)
self.open(conn)
return

with self.exception_handler(msg):
return retry.retry_target(
target=fn,
predicate=_ErrorCounter(self.get_retries(conn)).count_error,
sleep_generator=self._retry_generator(),
deadline=None)
deadline=None,
on_error=reopen_conn_on_error)

def _retry_generator(self):
"""Generates retry intervals that exponentially back off."""
Expand Down Expand Up @@ -425,5 +446,7 @@ def count_error(self, error):


def _is_retryable(error):
"""Return true for 500 level (retryable) errors."""
return isinstance(error, google.cloud.exceptions.ServerError)
"""Return true for errors that are unlikely to occur again if retried."""
if isinstance(error, RETRYABLE_ERRORS):
return True
return False
36 changes: 33 additions & 3 deletions test/unit/test_bigquery_adapter.py
Expand Up @@ -3,6 +3,7 @@
import re
import unittest
from contextlib import contextmanager
from requests.exceptions import ConnectionError
from unittest.mock import patch, MagicMock, Mock

import hologram
Expand Down Expand Up @@ -427,9 +428,10 @@ def setUp(self):

self.connections.get_thread_connection = lambda: self.mock_connection

def test_retry_and_handle(self):
@patch(
'dbt.adapters.bigquery.connections._is_retryable', return_value=True)
def test_retry_and_handle(self, is_retryable):
self.connections.DEFAULT_MAXIMUM_DELAY = 2.0
dbt.adapters.bigquery.connections._is_retryable = lambda x: True

@contextmanager
def dummy_handler(msg):
Expand All @@ -453,14 +455,42 @@ def raiseDummyException():
raiseDummyException)
self.assertEqual(DummyException.count, 9)

@patch(
'dbt.adapters.bigquery.connections._is_retryable', return_value=True)
def test_retry_connection_reset(self, is_retryable):
self.connections.open = MagicMock()
self.connections.close = MagicMock()
self.connections.DEFAULT_MAXIMUM_DELAY = 2.0

@contextmanager
def dummy_handler(msg):
yield

self.connections.exception_handler = dummy_handler

def raiseConnectionResetError():
raise ConnectionResetError("Connection broke")

mock_conn = Mock(credentials=Mock(retries=1))
with self.assertRaises(ConnectionResetError):
self.connections._retry_and_handle(
"some sql", mock_conn,
raiseConnectionResetError)
self.connections.close.assert_called_once_with(mock_conn)
self.connections.open.assert_called_once_with(mock_conn)

def test_is_retryable(self):
_is_retryable = dbt.adapters.bigquery.connections._is_retryable
exceptions = dbt.adapters.bigquery.impl.google.cloud.exceptions
internal_server_error = exceptions.InternalServerError('code broke')
bad_request_error = exceptions.BadRequest('code broke')
connection_error = ConnectionError('code broke')
client_error = exceptions.ClientError('bad code')

self.assertTrue(_is_retryable(internal_server_error))
self.assertFalse(_is_retryable(bad_request_error))
self.assertTrue(_is_retryable(bad_request_error))
self.assertTrue(_is_retryable(connection_error))
self.assertFalse(_is_retryable(client_error))

def test_drop_dataset(self):
mock_table = Mock()
Expand Down

0 comments on commit 1bd82d4

Please sign in to comment.