Skip to content

Commit

Permalink
Google Sheets: add row_id to rows and use as primary key (#19215)
Browse files Browse the repository at this point in the history
* source-google-sheets: add row_id to rows and use as primary key

* Update Dockerfile

* Update google-sheets.md

* Update Dockerfile

* Update google-sheets.md

* auto-bump connector version

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people committed Nov 14, 2022
1 parent d59275d commit 44ac470
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@
- name: Google Sheets
sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
dockerRepository: airbyte/source-google-sheets
dockerImageTag: 0.2.21
dockerImageTag: 0.2.30
documentationUrl: https://docs.airbyte.com/integrations/sources/google-sheets
icon: google-sheets.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5393,7 +5393,7 @@
oauthFlowOutputParameters:
- - "access_token"
- - "refresh_token"
- dockerImage: "airbyte/source-google-sheets:0.2.21"
- dockerImage: "airbyte/source-google-sheets:0.2.30"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-sheets"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY google_sheets_source ./google_sheets_source
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.21
LABEL io.airbyte.version=0.2.30
LABEL io.airbyte.name=airbyte/source-google-sheets
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,15 @@ def read(
if len(row_values) == 0:
break

row_id = row_cursor
for row in row_values:
if not Helpers.is_row_empty(row) and Helpers.row_contains_relevant_data(row, column_index_to_name.keys()):
yield AirbyteMessage(type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row, column_index_to_name))
yield AirbyteMessage(
type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row_id, row, column_index_to_name)
)
row_id += 1

row_cursor += ROW_BATCH_SIZE + 1
logger.info(f"Finished syncing spreadsheet {spreadsheet_id}")

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,22 @@ def headers_to_airbyte_stream(logger: AirbyteLogger, sheet_name: str, header_row
if duplicate_fields:
logger.warn(f"Duplicate headers found in {sheet_name}. Ignoring them :{duplicate_fields}")

props = {field: {"type": "string"} for field in fields}
props["row_id"] = {"type": "integer"}
sheet_json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["row_id"],
# For simplicity, the type of every cell is a string
"properties": {field: {"type": "string"} for field in fields},
"properties": props,
}

return AirbyteStream(name=sheet_name, json_schema=sheet_json_schema, supported_sync_modes=[SyncMode.full_refresh])
return AirbyteStream(
name=sheet_name,
json_schema=sheet_json_schema,
supported_sync_modes=[SyncMode.full_refresh],
source_defined_primary_key=[["row_id"]],
)

@staticmethod
def get_valid_headers_and_duplicates(header_row_values: List[str]) -> (List[str], List[str]):
Expand Down Expand Up @@ -121,8 +129,10 @@ def parse_sheet_and_column_names_from_catalog(catalog: ConfiguredAirbyteCatalog)
return sheet_to_column_name

@staticmethod
def row_data_to_record_message(sheet_name: str, cell_values: List[str], column_index_to_name: Dict[int, str]) -> AirbyteRecordMessage:
data = {}
def row_data_to_record_message(
sheet_name: str, row_id: int, cell_values: List[str], column_index_to_name: Dict[int, str]
) -> AirbyteRecordMessage:
data = {"row_id": row_id}
for relevant_index in sorted(column_index_to_name.keys()):
if relevant_index >= len(cell_values):
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ def test_headers_to_airbyte_stream(self):
sheet_name = "sheet1"
header_values = ["h1", "h2", "h3"]

props = {header: {"type": "string"} for header in header_values}
props["row_id"] = {"type": "integer"}
expected_stream = AirbyteStream(
name=sheet_name,
json_schema={
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["row_id"],
# For simplicity, the type of every cell is a string
"properties": {header: {"type": "string"} for header in header_values},
"properties": props,
},
source_defined_primary_key=[["row_id"]],
supported_sync_modes=[SyncMode.full_refresh],
)

Expand All @@ -57,15 +61,20 @@ def test_duplicate_headers_to_ab_stream_ignores_duplicates(self):
header_values = ["h1", "h1", "h3"]

# h1 is ignored because it is duplicate
expected_stream_header_values = ["h3"]
props = {
"h3": {"type": "string"},
"row_id": {"type": "integer"},
}
expected_stream = AirbyteStream(
name=sheet_name,
json_schema={
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["row_id"],
# For simplicity, the type of every cell is a string
"properties": {header: {"type": "string"} for header in expected_stream_header_values},
"properties": props,
},
source_defined_primary_key=[["row_id"]],
supported_sync_modes=[SyncMode.full_refresh],
)

Expand All @@ -81,9 +90,11 @@ def test_headers_to_airbyte_stream_blank_values_terminate_row(self):
json_schema={
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["row_id"],
# For simplicity, the type of every cell is a string
"properties": {"h1": {"type": "string"}},
"properties": {"h1": {"type": "string"}, "row_id": {"type": "integer"}},
},
source_defined_primary_key=[["row_id"]],
supported_sync_modes=[SyncMode.full_refresh],
)
actual_stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_values)
Expand Down Expand Up @@ -143,10 +154,11 @@ def test_row_data_to_record_message(self):
sheet = "my_sheet"
cell_values = ["v1", "v2", "v3", "v4"]
column_index_to_name = {0: "c1", 3: "c4"}
row_id = 1

actual = Helpers.row_data_to_record_message(sheet, cell_values, column_index_to_name)
actual = Helpers.row_data_to_record_message(sheet, row_id, cell_values, column_index_to_name)

expected = AirbyteRecordMessage(stream=sheet, data={"c1": "v1", "c4": "v4"}, emitted_at=1)
expected = AirbyteRecordMessage(stream=sheet, data={"row_id": row_id, "c1": "v1", "c4": "v4"}, emitted_at=1)
self.assertEqual(expected.stream, actual.stream)
self.assertEqual(expected.data, actual.data)

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/google-sheets.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ The [Google API rate limit](https://developers.google.com/sheets/api/limits) is

| Version | Date | Pull Request | Subject |
| ------- | ---------- | -------------------------------------------------------- | ----------------------------------------------------------------------------- |
| 0.2.30 | 2022-10-09 | [](https://github.com/airbytehq/airbyte/pull/) | Add row_id to rows and use as primary key |
| 0.2.21 | 2022-10-04 | [15591](https://github.com/airbytehq/airbyte/pull/15591) | Clean instantiation of AirbyteStream |
| 0.2.20 | 2022-10-10 | [17766](https://github.com/airbytehq/airbyte/pull/17766) | Fix null pointer exception when parsing the spreadsheet id. |
| 0.2.19 | 2022-09-29 | [17410](https://github.com/airbytehq/airbyte/pull/17410) | Use latest CDK. |
Expand Down

0 comments on commit 44ac470

Please sign in to comment.