Skip to content

Commit

Permalink
🐛 Source Google Sheets: added stop reading in case of 429 error (#29427)
Browse files Browse the repository at this point in the history
  • Loading branch information
darynaishchenko committed Aug 21, 2023
1 parent 70a7e9a commit 32bb251
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ COPY source_google_sheets ./source_google_sheets
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.name=airbyte/source-google-sheets
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
dockerImageTag: 0.3.4
dockerImageTag: 0.3.5
dockerRepository: airbyte/source-google-sheets
githubIssueLabel: source-google-sheets
icon: google-sheets.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
) from err
raise Exception(f"Could not run discovery: {reason}")

def read(
def _read(
self,
logger: AirbyteLogger,
config: json,
Expand Down Expand Up @@ -206,7 +206,22 @@ def read(
else:
logger.info(f"Skipping syncing sheet {sheet}: {reason}")

logger.info(f"Finished syncing spreadsheet {spreadsheet_id}")
def read(
self,
logger: AirbyteLogger,
config: json,
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Generator[AirbyteMessage, None, None]:
try:
yield from self._read(logger, config, catalog, state)
except errors.HttpError as e:
if e.status_code == 429:
logger.info(f"Stopped syncing process due to rate limits. {e.reason}")
else:
logger.info(f"{e.status_code}: {e.reason}")
finally:
logger.info(f"Finished syncing spreadsheet {Helpers.get_spreadsheet_id(config['spreadsheet_id'])}")

@staticmethod
def get_credentials(config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging

import pytest
import requests
from airbyte_cdk.models.airbyte_protocol import (
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
)
from airbyte_cdk.utils import AirbyteTracedException
from apiclient import errors
from source_google_sheets import SourceGoogleSheets
from source_google_sheets.client import GoogleSheetsClient
from source_google_sheets.helpers import SCOPES
from source_google_sheets.helpers import SCOPES, Helpers


def test_invalid_credentials_error_message(invalid_config):
Expand Down Expand Up @@ -58,3 +67,29 @@ def test_discover_403_error(mocker, invalid_config):
expected_message = ("Forbidden when requesting spreadsheet with id invalid_spreadsheet_id. The caller does not have right permissions. "
"See docs for more details here: https://cloud.google.com/service-infrastructure/docs/service-control/reference/rpc/google.api/servicecontrol.v1#code")
assert e.value.args[0] == expected_message


def test_read_429_error(mocker, invalid_config, caplog):
source = SourceGoogleSheets()
resp = requests.Response()
resp.status = 429
resp.reason = "Request a higher quota limit"
mocker.patch.object(GoogleSheetsClient, "__init__", lambda s, credentials, scopes=SCOPES: None)
mocker.patch.object(GoogleSheetsClient, "get", return_value=mocker.Mock)
mocker.patch.object(Helpers, "get_sheets_in_spreadsheet", side_effect=errors.HttpError(resp=resp, content=b''))

sheet1 = "soccer_team"
sheet1_columns = frozenset(["arsenal", "chelsea", "manutd", "liverpool"])
sheet1_schema = {"properties": {c: {"type": "string"} for c in sheet1_columns}}
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(name=sheet1, json_schema=sheet1_schema, supported_sync_modes=["full_refresh"]),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
),
]
)
records = list(source.read(logger=logging.getLogger("airbyte"), config=invalid_config, catalog=catalog))
assert [] == records
assert "Stopped syncing process due to rate limits. Request a higher quota limit" in caplog.text
3 changes: 2 additions & 1 deletion docs/integrations/sources/google-sheets.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ The [Google API rate limit](https://developers.google.com/sheets/api/limits) is

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|-----------------------------------------------------------------------------------|
| 0.3.4 | 2023-05-15 | [29453](https://github.com/airbytehq/airbyte/pull/29453) | Update spec descriptions |
| 0.3.5 | 2023-08-16 | [29427](https://github.com/airbytehq/airbyte/pull/29427) | Add stop reading in case of 429 error |
| 0.3.4 | 2023-05-15 | [29453](https://github.com/airbytehq/airbyte/pull/29453) | Update spec descriptions |
| 0.3.3 | 2023-08-10 | [29327](https://github.com/airbytehq/airbyte/pull/29327) | Add user-friendly error message for 404 and 403 error while discover |
| 0.3.2 | 2023-08-09 | [29246](https://github.com/airbytehq/airbyte/pull/29246) | Add checking while reading to skip modified sheets |
| 0.3.1 | 2023-07-06 | [28033](https://github.com/airbytehq/airbyte/pull/28033) | Fixed several reported vulnerabilities (25 total), CVE-2022-37434, CVE-2022-42898 |
Expand Down

0 comments on commit 32bb251

Please sign in to comment.