From 44847d82beccfb80d7ff05efb7acb6a2c59c32ca Mon Sep 17 00:00:00 2001 From: ykurochkin Date: Thu, 24 Dec 2020 19:22:11 +0200 Subject: [PATCH] create Client class for Google Sheets with backoff all methods --- .../google_sheets_source/client.py | 53 +++++++++++++++++++ .../google_sheets_source.py | 25 +++++---- .../google_sheets_source/helpers.py | 5 +- .../integration_tests/integration_test.py | 23 ++++---- 4 files changed, 79 insertions(+), 27 deletions(-) create mode 100644 airbyte-integrations/connectors/source-google-sheets/google_sheets_source/client.py diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/client.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/client.py new file mode 100644 index 0000000000000..778351012f06b --- /dev/null +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/client.py @@ -0,0 +1,53 @@ +""" +MIT License + +Copyright (c) 2020 Airbyte + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import backoff +from apiclient import discovery, errors +from requests.status_codes import codes as status_codes + + +def error_handler(error): + return error.resp.status != status_codes.TOO_MANY_REQUESTS + + +class GoogleSheetsClient: + @staticmethod + @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler) + def get(client: discovery.Resource, **kwargs): + return client.get(**kwargs).execute() + + @staticmethod + @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler) + def create(client: discovery.Resource, **kwargs): + return client.create(**kwargs).execute() + + @staticmethod + @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler) + def get_values(client: discovery.Resource, **kwargs): + return client.values().batchGet(**kwargs).execute() + + @staticmethod + @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler) + def update_values(client: discovery.Resource, **kwargs): + return client.values().batchUpdate(**kwargs).execute() diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py index 8f9645a086eb8..e75c186ab4bf9 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py @@ -25,11 +25,12 @@ import json from typing import Dict, Generator -import backoff from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type -from apiclient import discovery, errors +from apiclient import errors from base_python import AirbyteLogger, Source +from requests.status_codes import codes as status_codes +from .client import GoogleSheetsClient from .helpers import Helpers from .models.spreadsheet import Spreadsheet from .models.spreadsheet_values import SpreadsheetValues @@ -51,11 +52,11 @@ def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: spreadsheet_id = config["spreadsheet_id"] try: # Attempt to get first row of sheet - client.get(spreadsheetId=spreadsheet_id, includeGridData=False, ranges="1:1").execute() + GoogleSheetsClient.get(client, spreadsheetId=spreadsheet_id, includeGridData=False, ranges="1:1") except errors.HttpError as err: reason = str(err) # Give a clearer message if it's a common error like 404. - if err.resp.status == 404: + if err.resp.status == status_codes.NOT_FOUND: reason = "Requested spreadsheet was not found." logger.error(f"Formatted error: {reason}") return AirbyteConnectionStatus(status=Status.FAILED, message=str(reason)) @@ -67,7 +68,9 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: spreadsheet_id = config["spreadsheet_id"] try: logger.info(f"Running discovery on sheet {spreadsheet_id}") - spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False).execute()) + spreadsheet_metadata = Spreadsheet.parse_obj( + GoogleSheetsClient.get(client, spreadsheetId=spreadsheet_id, includeGridData=False) + ) sheet_names = [sheet.properties.title for sheet in spreadsheet_metadata.sheets] streams = [] for sheet_name in sheet_names: @@ -78,7 +81,7 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: except errors.HttpError as err: reason = str(err) - if err.resp.status == 404: + if err.resp.status == status_codes.NOT_FOUND: reason = "Requested spreadsheet was not found." raise Exception(f"Could not run discovery: {reason}") @@ -102,7 +105,9 @@ def read( while not encountered_blank_row: range = f"{sheet}!{row_cursor}:{row_cursor + ROW_BATCH_SIZE}" logger.info(f"Fetching range {range}") - row_batch = self._parse_rows_data(client=client, spreadsheet_id=spreadsheet_id, range=range, major_dimension="ROWS") + row_batch = SpreadsheetValues.parse_obj( + GoogleSheetsClient.get_values(client, spreadsheetId=spreadsheet_id, ranges=range, majorDimension="ROWS") + ) row_cursor += ROW_BATCH_SIZE + 1 # there should always be one range since we requested only one value_ranges = row_batch.valueRanges[0] @@ -121,9 +126,3 @@ def read( elif Helpers.row_contains_relevant_data(row, column_index_to_name.keys()): yield AirbyteMessage(type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row, column_index_to_name)) logger.info(f"Finished syncing spreadsheet {spreadsheet_id}") - - @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60) - def _parse_rows_data(self, client: discovery.Resource, spreadsheet_id: str, range: str, major_dimension: str) -> SpreadsheetValues: - return SpreadsheetValues.parse_obj( - client.values().batchGet(spreadsheetId=spreadsheet_id, ranges=range, majorDimension=major_dimension).execute() - ) diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py index 3de89c7f9aaf6..70fdb74a5914e 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py @@ -30,6 +30,7 @@ from apiclient import discovery from google.oauth2 import service_account +from .client import GoogleSheetsClient from .models.spreadsheet import RowData, Spreadsheet SCOPES = ["https://www.googleapis.com/auth/spreadsheets.readonly", "https://www.googleapis.com/auth/drive.readonly"] @@ -88,7 +89,7 @@ def get_formatted_row_values(row_data: RowData) -> List[str]: @staticmethod def get_first_row(client: discovery.Resource, spreadsheet_id: str, sheet_name: str) -> List[str]: spreadsheet = Spreadsheet.parse_obj( - client.get(spreadsheetId=spreadsheet_id, includeGridData=True, ranges=f"{sheet_name}!1:1").execute() + GoogleSheetsClient.get(client, spreadsheetId=spreadsheet_id, includeGridData=True, ranges=f"{sheet_name}!1:1") ) # There is only one sheet since we are specifying the sheet in the requested ranges. @@ -151,7 +152,7 @@ def get_available_sheets_to_column_index_to_name( @staticmethod def get_sheets_in_spreadsheet(client: discovery.Resource, spreadsheet_id: str): - spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False).execute()) + spreadsheet_metadata = Spreadsheet.parse_obj(GoogleSheetsClient.get(client, spreadsheetId=spreadsheet_id, includeGridData=False)) return [sheet.properties.title for sheet in spreadsheet_metadata.sheets] @staticmethod diff --git a/airbyte-integrations/connectors/source-google-sheets/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-google-sheets/integration_tests/integration_test.py index 68ec9b25e2550..6e007fea472f2 100644 --- a/airbyte-integrations/connectors/source-google-sheets/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-google-sheets/integration_tests/integration_test.py @@ -28,10 +28,10 @@ from pathlib import Path from typing import Dict -import backoff from airbyte_protocol import ConfiguredAirbyteCatalog, ConnectorSpecification -from apiclient import discovery, errors +from apiclient import discovery from base_python_test import StandardSourceTestIface +from google_sheets_source.client import GoogleSheetsClient from google_sheets_source.helpers import Helpers from google_sheets_source.models.spreadsheet import Spreadsheet @@ -64,7 +64,6 @@ def setup(self) -> None: sheets_client = Helpers.get_authenticated_sheets_client(self._get_creds(), SCOPES) spreadsheet_id = self._create_spreadsheet(sheets_client) - self._set_spreadsheet_data(sheets_client, spreadsheet_id) self._write_spreadsheet_id(spreadsheet_id) def teardown(self) -> None: @@ -91,7 +90,6 @@ def _get_creds(self) -> Dict[str, str]: def _get_tmp_dir(): return "/test_root/gsheet_test" - @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60) def _create_spreadsheet(self, sheets_client: discovery.Resource) -> str: """ :return: spreadsheetId @@ -101,12 +99,9 @@ def _create_spreadsheet(self, sheets_client: discovery.Resource) -> str: "sheets": [{"properties": {"title": "sheet1"}}, {"properties": {"title": "sheet2"}}], } - spreadsheet = Spreadsheet.parse_obj(sheets_client.create(body=request).execute()) + spreadsheet = Spreadsheet.parse_obj(GoogleSheetsClient.create(sheets_client, body=request)) spreadsheet_id = spreadsheet.spreadsheetId - return spreadsheet_id - @backoff.on_exception(backoff.expo, errors.HttpError, max_time=60) - def _set_spreadsheet_data(self, sheets_client: discovery.Resource, spreadsheet_id: str): rows = [["header1", "irrelevant", "header3", "", "ignored"]] rows.extend([f"a{i}", "dontmindme", i] for i in range(300)) rows.append(["lonely_left_value", "", ""]) @@ -114,11 +109,15 @@ def _set_spreadsheet_data(self, sheets_client: discovery.Resource, spreadsheet_i rows.append(["", "", ""]) rows.append(["orphan1", "orphan2", "orphan3"]) - sheets_client.values().batchUpdate( + GoogleSheetsClient.update_values( + sheets_client, spreadsheetId=spreadsheet_id, body={"data": {"majorDimension": "ROWS", "values": rows, "range": "sheet1"}, "valueInputOption": "RAW"}, - ).execute() - sheets_client.values().batchUpdate( + ) + GoogleSheetsClient.update_values( + sheets_client, spreadsheetId=spreadsheet_id, body={"data": {"majorDimension": "ROWS", "values": rows, "range": "sheet2"}, "valueInputOption": "RAW"}, - ).execute() + ) + + return spreadsheet_id