From d16ef618e51fda67e41428b8b70aa24f8cd3adb7 Mon Sep 17 00:00:00 2001 From: Yevhenii <34103125+yevhenii-ldv@users.noreply.github.com> Date: Thu, 31 Dec 2020 03:13:20 +0200 Subject: [PATCH] Resolving issue #1353: implement backoff in Google Sheets (#1438) * Issue #1353: implement backoff for integration tests * update code for backoff HTTP error while read Google Sheet * create Client class for Google Sheets with backoff all methods * update Google Sheets Source after review #2 * update docker version for google_sheets_source --- .../71607ba1-c0ac-4799-8049-7f4b90dd50f7.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../source-google-sheets/Dockerfile | 2 +- .../google_sheets_source/client.py | 56 +++++++++++++++++++ .../google_sheets_source.py | 18 +++--- .../google_sheets_source/helpers.py | 20 +++---- .../integration_tests/integration_test.py | 19 +++---- .../connectors/source-google-sheets/setup.py | 1 + .../unit_tests/test_helpers.py | 21 ++++--- 9 files changed, 102 insertions(+), 39 deletions(-) create mode 100644 airbyte-integrations/connectors/source-google-sheets/google_sheets_source/client.py diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json index 4046fe1526d7a7..6d415abe86f620 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "71607ba1-c0ac-4799-8049-7f4b90dd50f7", "name": "Google Sheets", "dockerRepository": "airbyte/source-google-sheets", - "dockerImageTag": "0.1.4", + "dockerImageTag": "0.1.5", "documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-google-sheets" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f145afcde0641c..403148fc4db8b1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -46,7 +46,7 @@ - sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7 name: Google Sheets dockerRepository: airbyte/source-google-sheets - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-google-sheets - sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad name: MySQL diff --git a/airbyte-integrations/connectors/source-google-sheets/Dockerfile b/airbyte-integrations/connectors/source-google-sheets/Dockerfile index 1e884a37dd42e7..55f8809122818c 100644 --- a/airbyte-integrations/connectors/source-google-sheets/Dockerfile +++ b/airbyte-integrations/connectors/source-google-sheets/Dockerfile @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install . -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-google-sheets diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/client.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/client.py new file mode 100644 index 00000000000000..673274876854f9 --- /dev/null +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/client.py @@ -0,0 +1,56 @@ +""" +MIT License + +Copyright (c) 2020 Airbyte + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +from typing import Dict, List + +import backoff +from googleapiclient import errors +from requests import codes as status_codes + +from .helpers import SCOPES, Helpers + + +def error_handler(error): + return error.resp.status != status_codes.TOO_MANY_REQUESTS + + +class GoogleSheetsClient: + def __init__(self, credentials: Dict[str, str], scopes: List[str] = SCOPES): + self.client = Helpers.get_authenticated_sheets_client(credentials, scopes) + + @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler) + def get(self, **kwargs): + return self.client.get(**kwargs).execute() + + @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler) + def create(self, **kwargs): + return self.client.create(**kwargs).execute() + + @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler) + def get_values(self, **kwargs): + return self.client.values().batchGet(**kwargs).execute() + + @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler) + def update_values(self, **kwargs): + return self.client.values().batchUpdate(**kwargs).execute() diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py index 72b49ca639bd28..c6322f78012c04 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py @@ -28,7 +28,9 @@ from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type from apiclient import errors from base_python import AirbyteLogger, Source +from requests.status_codes import codes as status_codes +from .client import GoogleSheetsClient from .helpers import Helpers from .models.spreadsheet import Spreadsheet from .models.spreadsheet_values import SpreadsheetValues @@ -46,15 +48,15 @@ def __init__(self): def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: # Check involves verifying that the specified spreadsheet is reachable with our credentials. - client = Helpers.get_authenticated_sheets_client(json.loads(config["credentials_json"])) + client = GoogleSheetsClient(json.loads(config["credentials_json"])) spreadsheet_id = config["spreadsheet_id"] try: # Attempt to get first row of sheet - client.get(spreadsheetId=spreadsheet_id, includeGridData=False, ranges="1:1").execute() + client.get(spreadsheetId=spreadsheet_id, includeGridData=False, ranges="1:1") except errors.HttpError as err: reason = str(err) # Give a clearer message if it's a common error like 404. - if err.resp.status == 404: + if err.resp.status == status_codes.NOT_FOUND: reason = "Requested spreadsheet was not found." logger.error(f"Formatted error: {reason}") return AirbyteConnectionStatus(status=Status.FAILED, message=str(reason)) @@ -62,11 +64,11 @@ def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: return AirbyteConnectionStatus(status=Status.SUCCEEDED) def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: - client = Helpers.get_authenticated_sheets_client(json.loads(config["credentials_json"])) + client = GoogleSheetsClient(json.loads(config["credentials_json"])) spreadsheet_id = config["spreadsheet_id"] try: logger.info(f"Running discovery on sheet {spreadsheet_id}") - spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False).execute()) + spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False)) sheet_names = [sheet.properties.title for sheet in spreadsheet_metadata.sheets] streams = [] for sheet_name in sheet_names: @@ -77,14 +79,14 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: except errors.HttpError as err: reason = str(err) - if err.resp.status == 404: + if err.resp.status == status_codes.NOT_FOUND: reason = "Requested spreadsheet was not found." raise Exception(f"Could not run discovery: {reason}") def read( self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any] ) -> Generator[AirbyteMessage, None, None]: - client = Helpers.get_authenticated_sheets_client(json.loads(config["credentials_json"])) + client = GoogleSheetsClient(json.loads(config["credentials_json"])) sheet_to_column_name = Helpers.parse_sheet_and_column_names_from_catalog(catalog) spreadsheet_id = config["spreadsheet_id"] @@ -102,7 +104,7 @@ def read( range = f"{sheet}!{row_cursor}:{row_cursor + ROW_BATCH_SIZE}" logger.info(f"Fetching range {range}") row_batch = SpreadsheetValues.parse_obj( - client.values().batchGet(spreadsheetId=spreadsheet_id, ranges=range, majorDimension="ROWS").execute() + client.get_values(spreadsheetId=spreadsheet_id, ranges=range, majorDimension="ROWS") ) row_cursor += ROW_BATCH_SIZE + 1 # there should always be one range since we requested only one diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py index 3de89c7f9aaf67..3a00d6010708e2 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py @@ -27,8 +27,8 @@ from typing import Dict, FrozenSet, Iterable, List from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog -from apiclient import discovery from google.oauth2 import service_account +from googleapiclient import discovery from .models.spreadsheet import RowData, Spreadsheet @@ -37,17 +37,17 @@ class Helpers(object): @staticmethod - def get_authenticated_sheets_client(credentials: Dict[str, str], scopes=SCOPES) -> discovery.Resource: + def get_authenticated_sheets_client(credentials: Dict[str, str], scopes: List[str] = SCOPES) -> discovery.Resource: creds = Helpers.get_authenticated_google_credentials(credentials, scopes) return discovery.build("sheets", "v4", credentials=creds).spreadsheets() @staticmethod - def get_authenticated_drive_client(credentials: Dict[str, str], scopes=SCOPES) -> discovery.Resource: + def get_authenticated_drive_client(credentials: Dict[str, str], scopes: List[str] = SCOPES) -> discovery.Resource: creds = Helpers.get_authenticated_google_credentials(credentials, scopes) return discovery.build("drive", "v3", credentials=creds) @staticmethod - def get_authenticated_google_credentials(credentials: Dict[str, str], scopes=SCOPES): + def get_authenticated_google_credentials(credentials: Dict[str, str], scopes: List[str] = SCOPES): return service_account.Credentials.from_service_account_info(credentials, scopes=scopes) @staticmethod @@ -86,10 +86,8 @@ def get_formatted_row_values(row_data: RowData) -> List[str]: return [value.formattedValue for value in row_data.values] @staticmethod - def get_first_row(client: discovery.Resource, spreadsheet_id: str, sheet_name: str) -> List[str]: - spreadsheet = Spreadsheet.parse_obj( - client.get(spreadsheetId=spreadsheet_id, includeGridData=True, ranges=f"{sheet_name}!1:1").execute() - ) + def get_first_row(client, spreadsheet_id: str, sheet_name: str) -> List[str]: + spreadsheet = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=True, ranges=f"{sheet_name}!1:1")) # There is only one sheet since we are specifying the sheet in the requested ranges. returned_sheets = spreadsheet.sheets @@ -133,7 +131,7 @@ def row_data_to_record_message(sheet_name: str, cell_values: List[str], column_i @staticmethod def get_available_sheets_to_column_index_to_name( - client: discovery.Resource, spreadsheet_id: str, requested_sheets_and_columns: Dict[str, FrozenSet[str]] + client, spreadsheet_id: str, requested_sheets_and_columns: Dict[str, FrozenSet[str]] ) -> Dict[str, Dict[int, str]]: available_sheets = Helpers.get_sheets_in_spreadsheet(client, spreadsheet_id) @@ -150,8 +148,8 @@ def get_available_sheets_to_column_index_to_name( return available_sheets_to_column_index_to_name @staticmethod - def get_sheets_in_spreadsheet(client: discovery.Resource, spreadsheet_id: str): - spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False).execute()) + def get_sheets_in_spreadsheet(client, spreadsheet_id: str): + spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False)) return [sheet.properties.title for sheet in spreadsheet_metadata.sheets] @staticmethod diff --git a/airbyte-integrations/connectors/source-google-sheets/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-google-sheets/integration_tests/integration_test.py index d3fdb36cee3b34..8abc030e0b1f9c 100644 --- a/airbyte-integrations/connectors/source-google-sheets/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-google-sheets/integration_tests/integration_test.py @@ -29,8 +29,8 @@ from typing import Dict from airbyte_protocol import ConfiguredAirbyteCatalog, ConnectorSpecification -from apiclient import discovery from base_python_test import StandardSourceTestIface +from google_sheets_source.client import GoogleSheetsClient from google_sheets_source.helpers import Helpers from google_sheets_source.models.spreadsheet import Spreadsheet @@ -61,8 +61,8 @@ def get_catalog(self) -> ConfiguredAirbyteCatalog: def setup(self) -> None: Path(self._get_tmp_dir()).mkdir(parents=True, exist_ok=True) - sheets_client = Helpers.get_authenticated_sheets_client(self._get_creds(), SCOPES) - spreadsheet_id = GoogleSheetsSourceStandardTest._create_spreadsheet(sheets_client) + sheets_client = GoogleSheetsClient(self._get_creds(), SCOPES) + spreadsheet_id = self._create_spreadsheet(sheets_client) self._write_spreadsheet_id(spreadsheet_id) def teardown(self) -> None: @@ -89,8 +89,7 @@ def _get_creds(self) -> Dict[str, str]: def _get_tmp_dir(): return "/test_root/gsheet_test" - @staticmethod - def _create_spreadsheet(sheets_client: discovery.Resource) -> str: + def _create_spreadsheet(self, sheets_client: GoogleSheetsClient) -> str: """ :return: spreadsheetId """ @@ -99,7 +98,7 @@ def _create_spreadsheet(sheets_client: discovery.Resource) -> str: "sheets": [{"properties": {"title": "sheet1"}}, {"properties": {"title": "sheet2"}}], } - spreadsheet = Spreadsheet.parse_obj(sheets_client.create(body=request).execute()) + spreadsheet = Spreadsheet.parse_obj(sheets_client.create(body=request)) spreadsheet_id = spreadsheet.spreadsheetId rows = [["header1", "irrelevant", "header3", "", "ignored"]] @@ -109,13 +108,13 @@ def _create_spreadsheet(sheets_client: discovery.Resource) -> str: rows.append(["", "", ""]) rows.append(["orphan1", "orphan2", "orphan3"]) - sheets_client.values().batchUpdate( + sheets_client.update_values( spreadsheetId=spreadsheet_id, body={"data": {"majorDimension": "ROWS", "values": rows, "range": "sheet1"}, "valueInputOption": "RAW"}, - ).execute() - sheets_client.values().batchUpdate( + ) + sheets_client.update_values( spreadsheetId=spreadsheet_id, body={"data": {"majorDimension": "ROWS", "values": rows, "range": "sheet2"}, "valueInputOption": "RAW"}, - ).execute() + ) return spreadsheet_id diff --git a/airbyte-integrations/connectors/source-google-sheets/setup.py b/airbyte-integrations/connectors/source-google-sheets/setup.py index 54d3bc43e31606..e260cd19bc6e15 100644 --- a/airbyte-integrations/connectors/source-google-sheets/setup.py +++ b/airbyte-integrations/connectors/source-google-sheets/setup.py @@ -33,6 +33,7 @@ install_requires=[ "airbyte-protocol", "base-python", + "backoff", "requests", "google-auth-httplib2", "google-api-python-client", diff --git a/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py b/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py index 47137b89e1bf12..88f773b1783a39 100644 --- a/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py +++ b/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py @@ -23,9 +23,10 @@ """ import unittest -from unittest.mock import Mock +from unittest.mock import Mock, patch from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream +from google_sheets_source.client import GoogleSheetsClient from google_sheets_source.helpers import Helpers from google_sheets_source.models import CellData, GridData, RowData, Sheet, SheetProperties, Spreadsheet @@ -142,8 +143,10 @@ def test_get_first_row(self): client = Mock() client.get.return_value.execute.return_value = fake_response - - actual = Helpers.get_first_row(client, spreadsheet_id, sheet) + with patch.object(GoogleSheetsClient, "__init__", lambda s, credentials, scopes: None): + sheet_client = GoogleSheetsClient({"fake": "credentials"}, ["auth_scopes"]) + sheet_client.client = client + actual = Helpers.get_first_row(sheet_client, spreadsheet_id, sheet) self.assertEqual(expected_first_row, actual) client.get.assert_called_with(spreadsheetId=spreadsheet_id, includeGridData=True, ranges=f"{sheet}!1:1") @@ -154,8 +157,10 @@ def test_get_sheets_in_spreadsheet(self): client.get.return_value.execute.return_value = Spreadsheet( spreadsheetId=spreadsheet_id, sheets=[Sheet(properties=SheetProperties(title=t)) for t in expected_sheets] ) - - actual_sheets = Helpers.get_sheets_in_spreadsheet(client, spreadsheet_id) + with patch.object(GoogleSheetsClient, "__init__", lambda s, credentials, scopes: None): + sheet_client = GoogleSheetsClient({"fake": "credentials"}, ["auth_scopes"]) + sheet_client.client = client + actual_sheets = Helpers.get_sheets_in_spreadsheet(sheet_client, spreadsheet_id) self.assertEqual(expected_sheets, actual_sheets) client.get.assert_called_with(spreadsheetId=spreadsheet_id, includeGridData=False) @@ -186,9 +191,11 @@ def mock_client_call(spreadsheetId, includeGridData, ranges=None): client = Mock() client.get.side_effect = mock_client_call - + with patch.object(GoogleSheetsClient, "__init__", lambda s, credentials, scopes: None): + sheet_client = GoogleSheetsClient({"fake": "credentials"}, ["auth_scopes"]) + sheet_client.client = client actual = Helpers.get_available_sheets_to_column_index_to_name( - client, spreadsheet_id, {sheet1: frozenset(sheet1_first_row), "doesnotexist": frozenset(["1", "2"])} + sheet_client, spreadsheet_id, {sheet1: frozenset(sheet1_first_row), "doesnotexist": frozenset(["1", "2"])} ) expected = {sheet1: {0: "1", 1: "2", 2: "3", 3: "4"}}