diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 00039b67303d00..777e544741ada1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -574,7 +574,7 @@ - name: Google Sheets sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7 dockerRepository: airbyte/source-google-sheets - dockerImageTag: 0.2.21 + dockerImageTag: 0.2.30 documentationUrl: https://docs.airbyte.com/integrations/sources/google-sheets icon: google-sheets.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 68b80e6e6323d5..b7bb4c1c60652e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5393,7 +5393,7 @@ oauthFlowOutputParameters: - - "access_token" - - "refresh_token" -- dockerImage: "airbyte/source-google-sheets:0.2.21" +- dockerImage: "airbyte/source-google-sheets:0.2.30" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/google-sheets" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-google-sheets/Dockerfile b/airbyte-integrations/connectors/source-google-sheets/Dockerfile index eeea91de04c03e..95235659c8b1b7 100644 --- a/airbyte-integrations/connectors/source-google-sheets/Dockerfile +++ b/airbyte-integrations/connectors/source-google-sheets/Dockerfile @@ -34,5 +34,5 @@ COPY google_sheets_source ./google_sheets_source ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.21 +LABEL io.airbyte.version=0.2.30 LABEL io.airbyte.name=airbyte/source-google-sheets diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py index 8588720eeba3ce..32b48ee7f16ccb 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py @@ -169,9 +169,15 @@ def read( if len(row_values) == 0: break + row_id = row_cursor for row in row_values: if not Helpers.is_row_empty(row) and Helpers.row_contains_relevant_data(row, column_index_to_name.keys()): - yield AirbyteMessage(type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row, column_index_to_name)) + yield AirbyteMessage( + type=Type.RECORD, record=Helpers.row_data_to_record_message(sheet, row_id, row, column_index_to_name) + ) + row_id += 1 + + row_cursor += ROW_BATCH_SIZE + 1 logger.info(f"Finished syncing spreadsheet {spreadsheet_id}") @staticmethod diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py index c321c416b8f9b0..d88c68114ea765 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py @@ -53,14 +53,22 @@ def headers_to_airbyte_stream(logger: AirbyteLogger, sheet_name: str, header_row if duplicate_fields: logger.warn(f"Duplicate headers found in {sheet_name}. Ignoring them :{duplicate_fields}") + props = {field: {"type": "string"} for field in fields} + props["row_id"] = {"type": "integer"} sheet_json_schema = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "required": ["row_id"], # For simplicity, the type of every cell is a string - "properties": {field: {"type": "string"} for field in fields}, + "properties": props, } - return AirbyteStream(name=sheet_name, json_schema=sheet_json_schema, supported_sync_modes=[SyncMode.full_refresh]) + return AirbyteStream( + name=sheet_name, + json_schema=sheet_json_schema, + supported_sync_modes=[SyncMode.full_refresh], + source_defined_primary_key=[["row_id"]], + ) @staticmethod def get_valid_headers_and_duplicates(header_row_values: List[str]) -> (List[str], List[str]): @@ -121,8 +129,10 @@ def parse_sheet_and_column_names_from_catalog(catalog: ConfiguredAirbyteCatalog) return sheet_to_column_name @staticmethod - def row_data_to_record_message(sheet_name: str, cell_values: List[str], column_index_to_name: Dict[int, str]) -> AirbyteRecordMessage: - data = {} + def row_data_to_record_message( + sheet_name: str, row_id: int, cell_values: List[str], column_index_to_name: Dict[int, str] + ) -> AirbyteRecordMessage: + data = {"row_id": row_id} for relevant_index in sorted(column_index_to_name.keys()): if relevant_index >= len(cell_values): break diff --git a/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py b/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py index decb7fe26fbaa8..77cf54be8a38a8 100644 --- a/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py +++ b/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py @@ -27,14 +27,18 @@ def test_headers_to_airbyte_stream(self): sheet_name = "sheet1" header_values = ["h1", "h2", "h3"] + props = {header: {"type": "string"} for header in header_values} + props["row_id"] = {"type": "integer"} expected_stream = AirbyteStream( name=sheet_name, json_schema={ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "required": ["row_id"], # For simplicity, the type of every cell is a string - "properties": {header: {"type": "string"} for header in header_values}, + "properties": props, }, + source_defined_primary_key=[["row_id"]], supported_sync_modes=[SyncMode.full_refresh], ) @@ -57,15 +61,20 @@ def test_duplicate_headers_to_ab_stream_ignores_duplicates(self): header_values = ["h1", "h1", "h3"] # h1 is ignored because it is duplicate - expected_stream_header_values = ["h3"] + props = { + "h3": {"type": "string"}, + "row_id": {"type": "integer"}, + } expected_stream = AirbyteStream( name=sheet_name, json_schema={ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "required": ["row_id"], # For simplicity, the type of every cell is a string - "properties": {header: {"type": "string"} for header in expected_stream_header_values}, + "properties": props, }, + source_defined_primary_key=[["row_id"]], supported_sync_modes=[SyncMode.full_refresh], ) @@ -81,9 +90,11 @@ def test_headers_to_airbyte_stream_blank_values_terminate_row(self): json_schema={ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "required": ["row_id"], # For simplicity, the type of every cell is a string - "properties": {"h1": {"type": "string"}}, + "properties": {"h1": {"type": "string"}, "row_id": {"type": "integer"}}, }, + source_defined_primary_key=[["row_id"]], supported_sync_modes=[SyncMode.full_refresh], ) actual_stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_values) @@ -143,10 +154,11 @@ def test_row_data_to_record_message(self): sheet = "my_sheet" cell_values = ["v1", "v2", "v3", "v4"] column_index_to_name = {0: "c1", 3: "c4"} + row_id = 1 - actual = Helpers.row_data_to_record_message(sheet, cell_values, column_index_to_name) + actual = Helpers.row_data_to_record_message(sheet, row_id, cell_values, column_index_to_name) - expected = AirbyteRecordMessage(stream=sheet, data={"c1": "v1", "c4": "v4"}, emitted_at=1) + expected = AirbyteRecordMessage(stream=sheet, data={"row_id": row_id, "c1": "v1", "c4": "v4"}, emitted_at=1) self.assertEqual(expected.stream, actual.stream) self.assertEqual(expected.data, actual.data) diff --git a/docs/integrations/sources/google-sheets.md b/docs/integrations/sources/google-sheets.md index 976be4c5403775..f56245ff5d63f1 100644 --- a/docs/integrations/sources/google-sheets.md +++ b/docs/integrations/sources/google-sheets.md @@ -71,6 +71,7 @@ The [Google API rate limit](https://developers.google.com/sheets/api/limits) is | Version | Date | Pull Request | Subject | | ------- | ---------- | -------------------------------------------------------- | ----------------------------------------------------------------------------- | +| 0.2.30 | 2022-10-09 | [](https://github.com/airbytehq/airbyte/pull/) | Add row_id to rows and use as primary key | | 0.2.21 | 2022-10-04 | [15591](https://github.com/airbytehq/airbyte/pull/15591) | Clean instantiation of AirbyteStream | | 0.2.20 | 2022-10-10 | [17766](https://github.com/airbytehq/airbyte/pull/17766) | Fix null pointer exception when parsing the spreadsheet id. | | 0.2.19 | 2022-09-29 | [17410](https://github.com/airbytehq/airbyte/pull/17410) | Use latest CDK. |