Skip to content

Commit

Permalink
#1740 Issue: fix failing Google Sheets Source with large spreadsheet (#…
Browse files Browse the repository at this point in the history
…1762)


Co-authored-by: Sherif Nada <snadalive@gmail.com>
  • Loading branch information
yevhenii-ldv and sherifnada committed Jan 21, 2021
1 parent 51a1d9f commit c2dab06
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "71607ba1-c0ac-4799-8049-7f4b90dd50f7",
"name": "Google Sheets",
"dockerRepository": "airbyte/source-google-sheets",
"dockerImageTag": "0.1.6",
"dockerImageTag": "0.1.7",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-google-sheets"
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
- sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
name: Google Sheets
dockerRepository: airbyte/source-google-sheets
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-google-sheets
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install .

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/source-google-sheets
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ class GoogleSheetsClient:
def __init__(self, credentials: Dict[str, str], scopes: List[str] = SCOPES):
self.client = Helpers.get_authenticated_sheets_client(credentials, scopes)

@backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler)
@backoff.on_exception(backoff.expo, errors.HttpError, max_time=120, giveup=error_handler)
def get(self, **kwargs):
return self.client.get(**kwargs).execute()

@backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler)
@backoff.on_exception(backoff.expo, errors.HttpError, max_time=120, giveup=error_handler)
def create(self, **kwargs):
return self.client.create(**kwargs).execute()

@backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler)
@backoff.on_exception(backoff.expo, errors.HttpError, max_time=120, giveup=error_handler)
def get_values(self, **kwargs):
return self.client.values().batchGet(**kwargs).execute()

@backoff.on_exception(backoff.expo, errors.HttpError, max_time=60, giveup=error_handler)
@backoff.on_exception(backoff.expo, errors.HttpError, max_time=120, giveup=error_handler)
def update_values(self, **kwargs):
return self.client.values().batchUpdate(**kwargs).execute()
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,22 @@ def read(
# For each sheet in the spreadsheet, get a batch of rows, and as long as there hasn't been
# a blank row, emit the row batch
sheet_to_column_index_to_name = Helpers.get_available_sheets_to_column_index_to_name(client, spreadsheet_id, sheet_to_column_name)
sheet_row_counts = Helpers.get_sheet_row_count(client, spreadsheet_id)
logger.info(f"Row counts: {sheet_row_counts}")
for sheet in sheet_to_column_index_to_name.keys():
logger.info(f"Syncing sheet {sheet}")
column_index_to_name = sheet_to_column_index_to_name[sheet]
row_cursor = 2 # we start syncing past the header row
while True:
# For the loop, it is necessary that the initial row exists when we send a request to the API,
# if the last row of the interval goes outside the sheet - this is normal, we will return
# only the real data of the sheet and in the next iteration we will loop out.
while row_cursor <= sheet_row_counts[sheet]:
range = f"{sheet}!{row_cursor}:{row_cursor + ROW_BATCH_SIZE}"
logger.info(f"Fetching range {range}")
row_batch = SpreadsheetValues.parse_obj(
client.get_values(spreadsheetId=spreadsheet_id, ranges=range, majorDimension="ROWS")
)

row_cursor += ROW_BATCH_SIZE + 1
# there should always be one range since we requested only one
value_ranges = row_batch.valueRanges[0]
Expand All @@ -122,8 +128,6 @@ def read(
break

for row in row_values:
if Helpers.is_row_empty(row):
continue
elif Helpers.row_contains_relevant_data(row, column_index_to_name.keys()):
if not Helpers.is_row_empty(row) and Helpers.row_contains_relevant_data(row, column_index_to_name.keys()):
yield AirbyteMessage(type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row, column_index_to_name))
logger.info(f"Finished syncing spreadsheet {spreadsheet_id}")
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,15 @@ def get_available_sheets_to_column_index_to_name(
return available_sheets_to_column_index_to_name

@staticmethod
def get_sheets_in_spreadsheet(client, spreadsheet_id: str):
def get_sheets_in_spreadsheet(client, spreadsheet_id: str) -> List[str]:
spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False))
return [sheet.properties.title for sheet in spreadsheet_metadata.sheets]

@staticmethod
def get_sheet_row_count(client, spreadsheet_id: str) -> Dict[str, int]:
spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False))
return {sheet.properties.title: sheet.properties.gridProperties["rowCount"] for sheet in spreadsheet_metadata.sheets}

@staticmethod
def is_row_empty(cell_values: List[str]) -> bool:
for cell in cell_values:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _create_spreadsheet(self, sheets_client: GoogleSheetsClient) -> str:
spreadsheet_id = spreadsheet.spreadsheetId

rows = [["header1", "irrelevant", "header3", "", "ignored"]]
rows.extend([f"a{i}", "dontmindme", i] for i in range(300))
rows.extend([f"a{i}", "dontmindme", i] for i in range(320))
rows.append(["lonely_left_value", "", ""])
rows.append(["", "", "lonelyrightvalue"])
rows.append(["", "", ""])
Expand Down

0 comments on commit c2dab06

Please sign in to comment.