/
source.py
242 lines (216 loc) · 11.5 KB
/
source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
import socket
from typing import Any, Generator, List, MutableMapping, Union
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import FailureType
from airbyte_cdk.models.airbyte_protocol import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteStateMessage,
AirbyteStreamStatus,
ConfiguredAirbyteCatalog,
Status,
Type,
)
from airbyte_cdk.sources.source import Source
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message
from apiclient import errors
from google.auth import exceptions as google_exceptions
from requests.status_codes import codes as status_codes
from .client import GoogleSheetsClient
from .helpers import Helpers
from .models.spreadsheet import Spreadsheet
from .models.spreadsheet_values import SpreadsheetValues
from .utils import exception_description_by_status_code, safe_name_conversion
# override default socket timeout to be 10 mins instead of 60 sec.
# on behalf of https://github.com/airbytehq/oncall/issues/242
DEFAULT_SOCKET_TIMEOUT: int = 600
socket.setdefaulttimeout(DEFAULT_SOCKET_TIMEOUT)
class SourceGoogleSheets(Source):
"""
Spreadsheets API Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets
"""
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
# Check involves verifying that the specified spreadsheet is reachable with our credentials.
try:
client = GoogleSheetsClient(self.get_credentials(config))
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"Please use valid credentials json file. Error: {e}")
spreadsheet_id = Helpers.get_spreadsheet_id(config["spreadsheet_id"])
try:
spreadsheet = client.get(spreadsheetId=spreadsheet_id, includeGridData=False)
except errors.HttpError as err:
message = "Config error: "
# Give a clearer message if it's a common error like 404.
if err.resp.status == status_codes.NOT_FOUND:
message += "The spreadsheet link is not valid. Enter the URL of the Google spreadsheet you want to sync."
raise AirbyteTracedException(
message=message,
internal_message=message,
failure_type=FailureType.config_error,
) from err
except google_exceptions.GoogleAuthError as err:
message = "Access to the spreadsheet expired or was revoked. Re-authenticate to restore access."
raise AirbyteTracedException(
message=message,
internal_message=message,
failure_type=FailureType.config_error,
) from err
# Check for duplicate headers
spreadsheet_metadata = Spreadsheet.parse_obj(spreadsheet)
grid_sheets = Helpers.get_grid_sheets(spreadsheet_metadata)
duplicate_headers_in_sheet = {}
for sheet_name in grid_sheets:
try:
header_row_data = Helpers.get_first_row(client, spreadsheet_id, sheet_name)
if config.get("names_conversion"):
header_row_data = [safe_name_conversion(h) for h in header_row_data]
_, duplicate_headers = Helpers.get_valid_headers_and_duplicates(header_row_data)
if duplicate_headers:
duplicate_headers_in_sheet[sheet_name] = duplicate_headers
except Exception as err:
if str(err).startswith("Expected data for exactly one row for sheet"):
logger.warn(f"Skip empty sheet: {sheet_name}")
else:
logger.error(str(err))
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"Unable to read the schema of sheet {sheet_name}. Error: {str(err)}"
)
if duplicate_headers_in_sheet:
duplicate_headers_error_message = ", ".join(
[
f"[sheet:{sheet_name}, headers:{duplicate_sheet_headers}]"
for sheet_name, duplicate_sheet_headers in duplicate_headers_in_sheet.items()
]
)
return AirbyteConnectionStatus(
status=Status.FAILED,
message="The following duplicate headers were found in the following sheets. Please fix them to continue: "
+ duplicate_headers_error_message,
)
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
client = GoogleSheetsClient(self.get_credentials(config))
spreadsheet_id = Helpers.get_spreadsheet_id(config["spreadsheet_id"])
try:
logger.info(f"Running discovery on sheet {spreadsheet_id}")
spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False))
grid_sheets = Helpers.get_grid_sheets(spreadsheet_metadata)
streams = []
for sheet_name in grid_sheets:
try:
header_row_data = Helpers.get_first_row(client, spreadsheet_id, sheet_name)
if config.get("names_conversion"):
header_row_data = [safe_name_conversion(h) for h in header_row_data]
stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_row_data)
streams.append(stream)
except Exception as err:
if str(err).startswith("Expected data for exactly one row for sheet"):
logger.warn(f"Skip empty sheet: {sheet_name}")
else:
logger.error(str(err))
return AirbyteCatalog(streams=streams)
except errors.HttpError as err:
error_description = exception_description_by_status_code(err.resp.status, spreadsheet_id)
config_error_status_codes = [status_codes.NOT_FOUND, status_codes.FORBIDDEN]
if err.resp.status in config_error_status_codes:
message = f"{error_description}. {err.reason}."
raise AirbyteTracedException(
message=message,
internal_message=message,
failure_type=FailureType.config_error,
) from err
raise Exception(f"Could not discover the schema of your spreadsheet. {error_description}. {err.reason}.")
def _read(
self,
logger: AirbyteLogger,
config: json,
catalog: ConfiguredAirbyteCatalog,
) -> Generator[AirbyteMessage, None, None]:
client = GoogleSheetsClient(self.get_credentials(config))
client.Backoff.row_batch_size = config.get("batch_size", 200)
sheet_to_column_name = Helpers.parse_sheet_and_column_names_from_catalog(catalog)
stream_name_to_stream = {stream.stream.name: stream for stream in catalog.streams}
spreadsheet_id = Helpers.get_spreadsheet_id(config["spreadsheet_id"])
logger.info(f"Starting syncing spreadsheet {spreadsheet_id}")
# For each sheet in the spreadsheet, get a batch of rows, and as long as there hasn't been
# a blank row, emit the row batch
sheet_to_column_index_to_name = Helpers.get_available_sheets_to_column_index_to_name(
client, spreadsheet_id, sheet_to_column_name, config.get("names_conversion")
)
sheet_row_counts = Helpers.get_sheet_row_count(client, spreadsheet_id)
logger.info(f"Row counts: {sheet_row_counts}")
for sheet in sheet_to_column_index_to_name.keys():
logger.info(f"Syncing sheet {sheet}")
stream = stream_name_to_stream.get(sheet).stream
yield as_airbyte_message(stream, AirbyteStreamStatus.STARTED)
# We revalidate the sheet here to avoid errors in case the sheet was changed after the sync started
is_valid, reason = Helpers.check_sheet_is_valid(client, spreadsheet_id, sheet)
if is_valid:
column_index_to_name = sheet_to_column_index_to_name[sheet]
row_cursor = 2 # we start syncing past the header row
# For the loop, it is necessary that the initial row exists when we send a request to the API,
# if the last row of the interval goes outside the sheet - this is normal, we will return
# only the real data of the sheet and in the next iteration we will loop out.
while row_cursor <= sheet_row_counts[sheet]:
row_batch = SpreadsheetValues.parse_obj(
client.get_values(
sheet=sheet,
row_cursor=row_cursor,
spreadsheetId=spreadsheet_id,
majorDimension="ROWS",
)
)
row_cursor += client.Backoff.row_batch_size + 1
# there should always be one range since we requested only one
value_ranges = row_batch.valueRanges[0]
if not value_ranges.values:
break
row_values = value_ranges.values
if len(row_values) == 0:
break
yield as_airbyte_message(stream, AirbyteStreamStatus.RUNNING)
for row in row_values:
if not Helpers.is_row_empty(row) and Helpers.row_contains_relevant_data(row, column_index_to_name.keys()):
yield AirbyteMessage(
type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row, column_index_to_name)
)
yield as_airbyte_message(stream, AirbyteStreamStatus.COMPLETE)
else:
logger.info(f"Skipping syncing sheet {sheet}: {reason}")
def read(
self,
logger: AirbyteLogger,
config: json,
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Generator[AirbyteMessage, None, None]:
spreadsheet_id = Helpers.get_spreadsheet_id(config["spreadsheet_id"])
try:
yield from self._read(logger, config, catalog)
except errors.HttpError as e:
error_description = exception_description_by_status_code(e.status_code, spreadsheet_id)
if e.status_code == status_codes.FORBIDDEN:
raise AirbyteTracedException(
message=f"Stopped syncing process. {error_description}",
internal_message=error_description,
failure_type=FailureType.config_error,
) from e
if e.status_code == status_codes.TOO_MANY_REQUESTS:
logger.info(f"Stopped syncing process due to rate limits. {error_description}")
else:
logger.info(f"{e.status_code}: {e.reason}. {error_description}")
finally:
logger.info(f"Finished syncing spreadsheet {spreadsheet_id}")
@staticmethod
def get_credentials(config):
# backward compatible with old style config
if config.get("credentials_json"):
credentials = {"auth_type": "Service", "service_account_info": config.get("credentials_json")}
return credentials
return config.get("credentials")