From 07c4fbf7e103a6b9af12e03cf06554e1b0c79f60 Mon Sep 17 00:00:00 2001 From: Baz Date: Mon, 9 Jan 2023 23:37:34 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20=F0=9F=8E=89=20Source=20Airtable?= =?UTF-8?q?:=20migrate=20to=20the=20`Metadata=20API`=20for=20dynamic=20sch?= =?UTF-8?q?ema=20generation=20(#20846)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 27 +-- .../connectors/source-airtable/Dockerfile | 2 +- .../connectors/source-airtable/README.md | 3 +- .../acceptance-test-config.yml | 32 ++-- .../integration_tests/configured_catalog.json | 60 ++++++- .../integration_tests/invalid_config.json | 4 +- .../integration_tests/sample_config.json | 4 +- .../source_airtable/helpers.py | 61 ------- .../source_airtable/schema_helpers.py | 44 +++++ .../source-airtable/source_airtable/source.py | 130 ++++++-------- .../source-airtable/source_airtable/spec.json | 19 +- .../source_airtable/streams.py | 156 +++++++++++++++++ .../source-airtable/unit_tests/conftest.py | 143 ++++++++++++++- .../unit_tests/test_helpers.py | 68 +------- .../source-airtable/unit_tests/test_source.py | 56 ++++-- .../unit_tests/test_streams.py | 165 ++++++++++++++++++ docs/integrations/sources/airtable.md | 5 +- 18 files changed, 695 insertions(+), 286 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-airtable/source_airtable/helpers.py create mode 100644 airbyte-integrations/connectors/source-airtable/source_airtable/schema_helpers.py create mode 100644 airbyte-integrations/connectors/source-airtable/source_airtable/streams.py create mode 100644 airbyte-integrations/connectors/source-airtable/unit_tests/test_streams.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 270290de2e930..aa9ba34dddbec 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -16,7 +16,7 @@ - name: Airtable sourceDefinitionId: 14c6e7ea-97ed-4f5e-a7b5-25e9a80b8212 dockerRepository: airbyte/source-airtable - dockerImageTag: 0.1.3 + dockerImageTag: 1.0.0 documentationUrl: https://docs.airbyte.com/integrations/sources/airtable icon: airtable.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index a213914f27ad2..d6c1ca693ea89 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -178,7 +178,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-airtable:0.1.3" +- dockerImage: "airbyte/source-airtable:1.0.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/airtable" connectionSpecification: @@ -187,35 +187,16 @@ type: "object" required: - "api_key" - - "base_id" - - "tables" properties: api_key: type: "string" - description: "The API Key for the Airtable account. See the Support Guide for more information on how to obtain this key." + description: "The API Key or PAT for the Airtable account. See the Support\ + \ Guide for more information on how to obtain this key." title: "API Key" airbyte_secret: true examples: - "key1234567890" - base_id: - type: "string" - description: "The Base ID to integrate the data from. You can find the Base\ - \ ID following the link Airtable\ - \ API, log in to your account, select the base you need and find Base\ - \ ID in the docs." - title: "Base ID" - examples: - - "app1234567890" - tables: - type: "array" - items: - type: "string" - description: "The list of Tables to integrate." - title: "Tables" - examples: - - "table 1" - - "table 2" supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] diff --git a/airbyte-integrations/connectors/source-airtable/Dockerfile b/airbyte-integrations/connectors/source-airtable/Dockerfile index 3de8ef2803c0d..5b132280981ff 100644 --- a/airbyte-integrations/connectors/source-airtable/Dockerfile +++ b/airbyte-integrations/connectors/source-airtable/Dockerfile @@ -34,5 +34,5 @@ COPY source_airtable ./source_airtable ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=1.0.0 LABEL io.airbyte.name=airbyte/source-airtable diff --git a/airbyte-integrations/connectors/source-airtable/README.md b/airbyte-integrations/connectors/source-airtable/README.md index 2df5ecc35132e..1986899b9801e 100644 --- a/airbyte-integrations/connectors/source-airtable/README.md +++ b/airbyte-integrations/connectors/source-airtable/README.md @@ -103,7 +103,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. To run your integration tests with acceptance tests, from the connector root, run ``` -python -m pytest integration_tests -p integration_tests.acceptance +docker build . --no-cache -t airbyte/source-airtable:dev \ +&& python -m pytest integration_tests -p integration_tests.acceptance ``` To run your integration tests with docker diff --git a/airbyte-integrations/connectors/source-airtable/acceptance-test-config.yml b/airbyte-integrations/connectors/source-airtable/acceptance-test-config.yml index d35ed96ef7610..526086b6b9a61 100644 --- a/airbyte-integrations/connectors/source-airtable/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-airtable/acceptance-test-config.yml @@ -1,20 +1,28 @@ # See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference) # for more information about how to configure these tests connector_image: airbyte/source-airtable:dev -tests: +acceptance_tests: spec: - - spec_path: "source_airtable/spec.json" + tests: + - spec_path: "source_airtable/spec.json" connection: - - config_path: "secrets/config.json" - status: "succeed" - - config_path: "integration_tests/invalid_config.json" - status: "failed" + tests: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" discovery: - - config_path: "secrets/config.json" + tests: + - config_path: "secrets/config.json" + # bypassed this check, because discovery mechanism was changed + backward_compatibility_tests_config: + disable_for_version: "0.1.3" basic_read: - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - empty_streams: [] + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] full_refresh: - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-airtable/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-airtable/integration_tests/configured_catalog.json index 9b0daf0f53fc9..44ede810e797a 100644 --- a/airbyte-integrations/connectors/source-airtable/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-airtable/integration_tests/configured_catalog.json @@ -2,7 +2,63 @@ "streams": [ { "stream": { - "name": "Table 1", + "name": "users/table_1", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": ["full_refresh"], + "supported_destination_sync_modes": ["overwrite", "append_dedup"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "users/table_2", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": ["full_refresh"], + "supported_destination_sync_modes": ["overwrite", "append_dedup"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "users/field_type_test", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": ["full_refresh"], + "supported_destination_sync_modes": ["overwrite", "append_dedup"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "users/50_columns", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": ["full_refresh"], + "supported_destination_sync_modes": ["overwrite", "append_dedup"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "users/checkboxes", "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", @@ -16,7 +72,7 @@ }, { "stream": { - "name": "Table 2", + "name": "untitled_base/table_1", "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", diff --git a/airbyte-integrations/connectors/source-airtable/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-airtable/integration_tests/invalid_config.json index 6675b8bcc309b..ccaeede953927 100644 --- a/airbyte-integrations/connectors/source-airtable/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-airtable/integration_tests/invalid_config.json @@ -1,5 +1,3 @@ { - "api_key": "key####################", - "base_id": "app####################", - "tables": ["Table 1", "Table 2"] + "api_key": "key123456" } diff --git a/airbyte-integrations/connectors/source-airtable/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-airtable/integration_tests/sample_config.json index 3de6695a01fe0..577e3fa82c861 100644 --- a/airbyte-integrations/connectors/source-airtable/integration_tests/sample_config.json +++ b/airbyte-integrations/connectors/source-airtable/integration_tests/sample_config.json @@ -1,5 +1,3 @@ { - "api_key": "key1234567890", - "base_id": "app1234567890", - "tables": ["Table 1", "Table 2"] + "api_key": "key1234567890" } diff --git a/airbyte-integrations/connectors/source-airtable/source_airtable/helpers.py b/airbyte-integrations/connectors/source-airtable/source_airtable/helpers.py deleted file mode 100644 index 52dee3407c365..0000000000000 --- a/airbyte-integrations/connectors/source-airtable/source_airtable/helpers.py +++ /dev/null @@ -1,61 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -from typing import Any, Dict - -import requests -from airbyte_cdk.models import AirbyteStream -from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode, SyncMode -from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator - - -class Helpers(object): - @staticmethod - def get_most_complete_row(auth: TokenAuthenticator, base_id: str, table: str, sample_size: int = 100) -> Dict[str, Any]: - url = f"https://api.airtable.com/v0/{base_id}/{table}?pageSize={sample_size}" - try: - response = requests.get(url, headers=auth.get_auth_header()) - response.raise_for_status() - except requests.exceptions.HTTPError as e: - if e.response.status_code == 401: - raise Exception("Invalid API key") - elif e.response.status_code == 404: - raise Exception(f"Table '{table}' not found") - else: - raise Exception(f"Error getting first row from table {table}: {e}") - json_response = response.json() - records = json_response.get("records", []) - most_complete_row = records[0] - for record in records: - if len(record.keys()) > len(most_complete_row.keys()): - most_complete_row = record - return most_complete_row - - @staticmethod - def get_json_schema(record: Dict[str, Any]) -> Dict[str, str]: - fields = record.get("fields", {}) - properties = { - "_airtable_id": {"type": ["null", "string"]}, - "_airtable_created_time": {"type": ["null", "string"]}, - } - - for field in fields: - properties[field] = {"type": ["null", "string"]} - - json_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": properties, - } - return json_schema - - @staticmethod - def get_airbyte_stream(table: str, json_schema: Dict[str, Any]) -> AirbyteStream: - return AirbyteStream( - name=table, - json_schema=json_schema, - supported_sync_modes=[SyncMode.full_refresh], - supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append_dedup], - ) diff --git a/airbyte-integrations/connectors/source-airtable/source_airtable/schema_helpers.py b/airbyte-integrations/connectors/source-airtable/source_airtable/schema_helpers.py new file mode 100644 index 0000000000000..edb5cffe466b5 --- /dev/null +++ b/airbyte-integrations/connectors/source-airtable/source_airtable/schema_helpers.py @@ -0,0 +1,44 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from typing import Any, Dict + +from airbyte_cdk.models import AirbyteStream +from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode, SyncMode + + +class SchemaHelpers: + @staticmethod + def clean_name(name_str: str) -> str: + return name_str.replace(" ", "_").lower().strip() + + @staticmethod + def get_json_schema(table: Dict[str, Any]) -> Dict[str, str]: + fields = table.get("fields", {}) + properties = { + "_airtable_id": {"type": ["null", "string"]}, + "_airtable_created_time": {"type": ["null", "string"]}, + } + + for field in fields: + field_name = SchemaHelpers.clean_name(field.get("name")) + properties[field_name] = {"type": ["null", "string"]} + + json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": True, + "properties": properties, + } + return json_schema + + @staticmethod + def get_airbyte_stream(stream_name: str, json_schema: Dict[str, Any]) -> AirbyteStream: + return AirbyteStream( + name=stream_name, + json_schema=json_schema, + supported_sync_modes=[SyncMode.full_refresh], + supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append_dedup], + ) diff --git a/airbyte-integrations/connectors/source-airtable/source_airtable/source.py b/airbyte-integrations/connectors/source-airtable/source_airtable/source.py index 8d9617dc0fedc..22da4729e45c3 100644 --- a/airbyte-integrations/connectors/source-airtable/source_airtable/source.py +++ b/airbyte-integrations/connectors/source-airtable/source_airtable/source.py @@ -3,99 +3,73 @@ # -from abc import ABC -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +import logging +from typing import Any, Iterable, List, Mapping, Tuple -import requests from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteCatalog from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator -from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer -from .helpers import Helpers +from .schema_helpers import SchemaHelpers +from .streams import AirtableBases, AirtableStream, AirtableTables -# Basic full refresh stream -class AirtableStream(HttpStream, ABC): - url_base = "https://api.airtable.com/v0/" - primary_key = "id" - transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - - def __init__(self, base_id: str, table_name: str, schema, **kwargs): - super().__init__(**kwargs) - self.base_id = base_id - self.table_name = table_name - self.schema = schema - - @property - def name(self): - return self.table_name - - def get_json_schema(self) -> Mapping[str, Any]: - return self.schema - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - json_response = response.json() - offset = json_response.get("offset", None) - if offset: - return {"offset": offset} - return None - - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - if next_page_token: - return next_page_token - return {} - - def process_records(self, records): - for record in records: - data = record.get("fields", {}) - processed_record = {"_airtable_id": record.get("id"), "_airtable_created_time": record.get("createdTime"), **data} - yield processed_record - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - json_response = response.json() - records = json_response.get("records", []) - records = self.process_records(records) - yield from records +class SourceAirtable(AbstractSource): - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - return f"{self.base_id}/{self.table_name}" + logger: logging.Logger = logging.getLogger("airbyte") + # prepared streams catalog + streams_catalog: Iterable[Mapping[str, Any]] = [] -# Source -class SourceAirtable(AbstractSource): - def check_connection(self, logger, config) -> Tuple[bool, any]: + def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: auth = TokenAuthenticator(token=config["api_key"]) - for table in config["tables"]: - try: - Helpers.get_most_complete_row(auth, config["base_id"], table) - except Exception as e: - return False, str(e) - return True, None + try: + # try reading first table from each base, to check the connectivity, + for base in AirtableBases(authenticator=auth).read_records(sync_mode=None): + base_id = base.get("id") + base_name = base.get("name") + self.logger.info(f"Reading first table info for base: {base_name}") + next(AirtableTables(base_id=base_id, authenticator=auth).read_records(sync_mode=None)) + return True, None + except Exception as e: + return False, str(e) def discover(self, logger: AirbyteLogger, config) -> AirbyteCatalog: - streams = [] + """ + Override to provide the dynamic schema generation capabilities, + using resource available for authenticated user. + + Retrieve: Bases, Tables from each Base, generate JSON Schema for each table. + """ auth = TokenAuthenticator(token=config["api_key"]) - for table in config["tables"]: - record = Helpers.get_most_complete_row(auth, config["base_id"], table) - json_schema = Helpers.get_json_schema(record) - airbyte_stream = Helpers.get_airbyte_stream(table, json_schema) - streams.append(airbyte_stream) - return AirbyteCatalog(streams=streams) + # list all bases available for authenticated account + for base in AirtableBases(authenticator=auth).read_records(sync_mode=None): + base_id = base.get("id") + base_name = SchemaHelpers.clean_name(base.get("name")) + # list and process each table under each base to generate the JSON Schema + for table in list(AirtableTables(base_id, authenticator=auth).read_records(sync_mode=None)): + self.streams_catalog.append( + { + "stream_path": f"{base_id}/{table.get('id')}", + "stream": SchemaHelpers.get_airbyte_stream( + f"{base_name}/{SchemaHelpers.clean_name(table.get('name'))}", + SchemaHelpers.get_json_schema(table), + ), + } + ) + return AirbyteCatalog(streams=[stream["stream"] for stream in self.streams_catalog]) def streams(self, config: Mapping[str, Any]) -> List[Stream]: - auth = TokenAuthenticator(token=config["api_key"]) - streams = [] - for table in config["tables"]: - record = Helpers.get_most_complete_row(auth, config["base_id"], table) - json_schema = Helpers.get_json_schema(record) - stream = AirtableStream(base_id=config["base_id"], table_name=table, authenticator=auth, schema=json_schema) - streams.append(stream) - return streams + # trigger discovery to populate the streams_catalog + if not self.streams_catalog: + self.discover(None, config) + # build the stream class from prepared streams_catalog + for stream in self.streams_catalog: + yield AirtableStream( + stream_path=stream["stream_path"], + stream_name=stream["stream"].name, + stream_schema=stream["stream"].json_schema, + authenticator=TokenAuthenticator(token=config["api_key"]), + ) diff --git a/airbyte-integrations/connectors/source-airtable/source_airtable/spec.json b/airbyte-integrations/connectors/source-airtable/source_airtable/spec.json index 3f5bfddbd407e..14b447101fbc5 100644 --- a/airbyte-integrations/connectors/source-airtable/source_airtable/spec.json +++ b/airbyte-integrations/connectors/source-airtable/source_airtable/spec.json @@ -4,29 +4,14 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Airtable Source Spec", "type": "object", - "required": ["api_key", "base_id", "tables"], + "required": ["api_key"], "properties": { "api_key": { "type": "string", - "description": "The API Key for the Airtable account. See the Support Guide for more information on how to obtain this key.", + "description": "The API Key or PAT for the Airtable account. See the Support Guide for more information on how to obtain this key.", "title": "API Key", "airbyte_secret": true, "examples": ["key1234567890"] - }, - "base_id": { - "type": "string", - "description": "The Base ID to integrate the data from. You can find the Base ID following the link Airtable API, log in to your account, select the base you need and find Base ID in the docs.", - "title": "Base ID", - "examples": ["app1234567890"] - }, - "tables": { - "type": "array", - "items": { - "type": "string" - }, - "description": "The list of Tables to integrate.", - "title": "Tables", - "examples": ["table 1", "table 2"] } } } diff --git a/airbyte-integrations/connectors/source-airtable/source_airtable/streams.py b/airbyte-integrations/connectors/source-airtable/source_airtable/streams.py new file mode 100644 index 0000000000000..f990656cf875d --- /dev/null +++ b/airbyte-integrations/connectors/source-airtable/source_airtable/streams.py @@ -0,0 +1,156 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from abc import ABC +from typing import Any, Iterable, Mapping, MutableMapping, Optional + +import requests +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer + +URL_BASE: str = "https://api.airtable.com/v0/" + + +class AirtableBases(HttpStream): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + url_base = URL_BASE + primary_key = None + name = "bases" + raise_on_http_errors = True + + def path(self, **kwargs) -> str: + """ + Documentation: https://airtable.com/developers/web/api/list-bases + """ + return "meta/bases" + + def should_retry(self, response: requests.Response) -> bool: + if response.status_code == 403 or response.status_code == 422: + self.logger.error(f"Stream {self.name}: permission denied or entity is unprocessable. Skipping.") + setattr(self, "raise_on_http_errors", False) + return False + return super().should_retry(response) + + def backoff_time(self, response: requests.Response) -> Optional[float]: + """ + Based on official docs: https://airtable.com/developers/web/api/rate-limits + when 429 is received, we should wait at least 30 sec. + """ + if response.status_code == 429: + self.logger.error(f"Stream {self.name}: rate limit exceeded") + return 30.0 + + def next_page_token(self, response: requests.Response, **kwargs) -> str: + """ + The bases list could be more than 100 records, therefore the pagination is required to fetch all of them. + """ + next_page = response.json().get("offset") + if next_page: + return next_page + return None + + def request_params(self, next_page_token: str = None, **kwargs) -> Mapping[str, Any]: + params = {} + if next_page_token: + params["offset"] = next_page_token + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Mapping[str, Any]: + """ + Example output: + { + 'bases': [ + {'id': '_some_id_', 'name': 'users', 'permissionLevel': 'create'}, + {'id': '_some_id_', 'name': 'Test Base', 'permissionLevel': 'create'}, + ] + } + """ + records = response.json().get(self.name) + yield from records + + +class AirtableTables(AirtableBases): + def __init__(self, base_id: list, **kwargs): + super().__init__(**kwargs) + self.base_id = base_id + + name = "tables" + + def path(self, **kwargs) -> str: + """ + Documentation: https://airtable.com/developers/web/api/list-bases + """ + return f"{super().path()}/{self.base_id}/tables" + + +class AirtableStream(HttpStream, ABC): + def __init__(self, stream_path: str, stream_name: str, stream_schema, **kwargs): + super().__init__(**kwargs) + self.stream_path = stream_path + self.stream_name = stream_name + self.stream_schema = stream_schema + + url_base = URL_BASE + primary_key = "id" + transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) + raise_on_http_errors = True + + @property + def name(self): + return self.stream_name + + def should_retry(self, response: requests.Response) -> bool: + if response.status_code == 403 or response.status_code == 422: + self.logger.error(f"Stream {self.name}: permission denied or entity is unprocessable. Skipping.") + setattr(self, "raise_on_http_errors", False) + return False + return super().should_retry(response) + + def backoff_time(self, response: requests.Response) -> Optional[float]: + """ + Based on official docs: https://airtable.com/developers/web/api/rate-limits + when 429 is received, we should wait at least 30 sec. + """ + if response.status_code == 429: + self.logger.error(f"Stream {self.name}: rate limit exceeded") + return 30.0 + return None + + def get_json_schema(self) -> Mapping[str, Any]: + return self.stream_schema + + def next_page_token(self, response: requests.Response, **kwargs) -> Optional[Mapping[str, Any]]: + next_page = response.json().get("offset") + if next_page: + return next_page + return None + + def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: + """ + All available params: https://airtable.com/developers/web/api/list-records#query + """ + params = {} + if next_page_token: + params["offset"] = next_page_token + return params + + def process_records(self, records) -> Iterable[Mapping[str, Any]]: + for record in records: + data = record.get("fields") + if len(data) > 0: + yield { + "_airtable_id": record.get("id"), + "_airtable_created_time": record.get("createdTime"), + **data, + } + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + records = response.json().get("records", []) + yield from self.process_records(records) + + def path(self, **kwargs) -> str: + return self.stream_path diff --git a/airbyte-integrations/connectors/source-airtable/unit_tests/conftest.py b/airbyte-integrations/connectors/source-airtable/unit_tests/conftest.py index 09cc0925179e5..1a8eaa92fc822 100644 --- a/airbyte-integrations/connectors/source-airtable/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-airtable/unit_tests/conftest.py @@ -2,9 +2,150 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # + import pytest +from airbyte_cdk.models import AirbyteStream +from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode, SyncMode +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator @pytest.fixture def config(): - return {"api_key": "key1234567890", "base_id": "app1234567890", "tables": ["Table 1", "Table 2"]} + return { + "api_key": "key1234567890", + } + + +@pytest.fixture +def fake_auth(): + return TokenAuthenticator(token="key1234567890") + + +@pytest.fixture +def fake_bases_response(): + return {"bases": [{"id": 1234, "name": "test_base"}]} + + +@pytest.fixture +def expected_bases_response(): + return [{"id": 1234, "name": "test_base"}] + + +@pytest.fixture +def fake_tables_response(): + return {"tables": [{"id": 5678, "name": "test_table"}]} + + +@pytest.fixture +def expected_discovery_stream_name(): + return ["test_base/test_table"] + + +@pytest.fixture +def field_name_to_cleaned(): + return "The Name (That should be cleaned)" + + +@pytest.fixture +def expected_clean_name(): + return "the_name_(that_should_be_cleaned)" + + +@pytest.fixture +def table(): + return "Table 1" + + +@pytest.fixture +def json_response(): + return { + "records": [ + { + "id": "abc", + "fields": [ + { + 'type': 'singleLineText', + 'id': '_fake_id_', + 'name': 'test', + } + ] + } + ] + } + + +@pytest.fixture +def streams_json_response(): + return { + "records": [ + { + 'id': 'some_id', + 'createdTime': '2022-12-02T19:50:00.000Z', + 'fields': {'field1': True, 'field2': "test", 'field3': 123}, + } + ] + } + + +@pytest.fixture +def streams_processed_response(): + return [ + { + '_airtable_id': 'some_id', + '_airtable_created_time': '2022-12-02T19:50:00.000Z', + 'field1': True, + 'field2': 'test', + 'field3': 123, + } + ] + + +@pytest.fixture +def expected_json_schema(): + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "_airtable_created_time": {"type": ["null", "string"]}, + "_airtable_id": {"type": ["null", "string"]}, + "test": {"type": ["null", "string"]}, + }, + "type": "object", + } + + +@pytest.fixture(scope='function', autouse=True) +def prepared_stream(): + return { + "stream_path": "some_base_id/some_table_id", + "stream": AirbyteStream( + name="test_base/test_table", + json_schema={ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": True, + "properties": { + "_airtable_id": { + "type": [ + "null", + "string" + ] + }, + "_airtable_created_time": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + } + } + }, + supported_sync_modes=[SyncMode.full_refresh], + supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append_dedup], + ) + } diff --git a/airbyte-integrations/connectors/source-airtable/unit_tests/test_helpers.py b/airbyte-integrations/connectors/source-airtable/unit_tests/test_helpers.py index 8641d2d9339d9..9b07cb2478ea0 100644 --- a/airbyte-integrations/connectors/source-airtable/unit_tests/test_helpers.py +++ b/airbyte-integrations/connectors/source-airtable/unit_tests/test_helpers.py @@ -2,79 +2,21 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from http import HTTPStatus -from unittest.mock import MagicMock, patch -import pytest -from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator -from source_airtable.helpers import Helpers +from source_airtable.schema_helpers import SchemaHelpers -@pytest.fixture -def base_id(): - return "app1234567890" - - -@pytest.fixture -def api_key(): - return "key1234567890" - - -@pytest.fixture -def table(): - return "Table 1" - - -@pytest.fixture -def auth(): - return MagicMock() - - -@pytest.fixture -def json_response(): - return {"records": [{"id": "abc", "fields": {"name": "test"}}]} - - -@pytest.fixture -def expected_json_schema(): - return { - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "_airtable_created_time": {"type": ["null", "string"]}, - "_airtable_id": {"type": ["null", "string"]}, - "name": {"type": ["null", "string"]}, - }, - "type": "object", - } - - -def test_get_most_complete_row(auth, base_id, table, json_response): - with patch("requests.get") as mock_get: - mock_get.return_value.status_code = HTTPStatus.OK - mock_get.return_value.json.return_value = json_response - assert Helpers.get_most_complete_row(auth, base_id, table) == {"id": "abc", "fields": {"name": "test"}} - - -def test_get_most_complete_row_invalid_api_key(base_id, table): - with pytest.raises(Exception): - auth = TokenAuthenticator("invalid_api_key") - Helpers.get_most_complete_row(auth, base_id, table) - - -def test_get_most_complete_row_table_not_found(auth, base_id, table): - with patch("requests.exceptions.HTTPError") as mock_get: - mock_get.return_value.status_code = HTTPStatus.NOT_FOUND - with pytest.raises(Exception): - Helpers.get_most_complete_row(auth, base_id, table) +def test_clean_name(field_name_to_cleaned, expected_clean_name): + assert expected_clean_name == SchemaHelpers.clean_name(field_name_to_cleaned) def test_get_json_schema(json_response, expected_json_schema): - json_schema = Helpers.get_json_schema(json_response["records"][0]) + json_schema = SchemaHelpers.get_json_schema(json_response["records"][0]) assert json_schema == expected_json_schema def test_get_airbyte_stream(table, expected_json_schema): - stream = Helpers.get_airbyte_stream(table, expected_json_schema) + stream = SchemaHelpers.get_airbyte_stream(table, expected_json_schema) assert stream assert stream.name == table assert stream.json_schema == expected_json_schema diff --git a/airbyte-integrations/connectors/source-airtable/unit_tests/test_source.py b/airbyte-integrations/connectors/source-airtable/unit_tests/test_source.py index 14f30ce4cf58e..9308b6e048614 100644 --- a/airbyte-integrations/connectors/source-airtable/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-airtable/unit_tests/test_source.py @@ -2,10 +2,11 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock + +import pytest from airbyte_cdk.models import AirbyteCatalog, ConnectorSpecification -from source_airtable.helpers import Helpers from source_airtable.source import SourceAirtable @@ -16,25 +17,46 @@ def test_spec(config): assert isinstance(spec, ConnectorSpecification) -def test_discover(config, mocker): +@pytest.mark.parametrize( + "status, check_passed", + [ + (200, (True, None)), + (401, (False, '401 Client Error: None for url: https://api.airtable.com/v0/meta/bases')), + ], + ids=["success", "fail"] +) +def test_check_connection(config, status, check_passed, fake_bases_response, fake_tables_response, requests_mock): source = SourceAirtable() - logger_mock, Helpers.get_most_complete_row = MagicMock(), MagicMock() - airbyte_catalog = source.discover(logger_mock, config) - assert [stream.name for stream in airbyte_catalog.streams] == config["tables"] - assert isinstance(airbyte_catalog, AirbyteCatalog) - assert Helpers.get_most_complete_row.call_count == 2 + # fake the bases + requests_mock.get("https://api.airtable.com/v0/meta/bases", status_code=status, json=fake_bases_response) + fake_base_id = fake_bases_response.get("bases")[0].get("id") + # fake the tables based on faked bases + requests_mock.get(f"https://api.airtable.com/v0/meta/bases/{fake_base_id}/tables", status_code=status, json=fake_tables_response) + assert source.check_connection(MagicMock(), config) == check_passed -@patch("requests.get") -def test_check_connection(config): +def test_discover(config, fake_bases_response, fake_tables_response, expected_discovery_stream_name, requests_mock): source = SourceAirtable() - logger_mock = MagicMock() - assert source.check_connection(logger_mock, config) == (True, None) + # fake the bases + requests_mock.get("https://api.airtable.com/v0/meta/bases", status_code=200, json=fake_bases_response) + fake_base_id = fake_bases_response.get("bases")[0].get("id") + # fake the tables based on faked bases + requests_mock.get(f"https://api.airtable.com/v0/meta/bases/{fake_base_id}/tables", status_code=200, json=fake_tables_response) + # generate fake catalog + airbyte_catalog = source.discover(MagicMock(), config) + assert [stream.name for stream in airbyte_catalog.streams] == expected_discovery_stream_name + assert isinstance(airbyte_catalog, AirbyteCatalog) -def test_streams(config): +def test_streams(config, fake_bases_response, fake_tables_response, expected_discovery_stream_name, requests_mock): source = SourceAirtable() - Helpers.get_most_complete_row = MagicMock() - streams = source.streams(config) - assert len(streams) == 2 - assert [stream.name for stream in streams] == config["tables"] + # fake the bases + requests_mock.get("https://api.airtable.com/v0/meta/bases", status_code=200, json=fake_bases_response) + fake_base_id = fake_bases_response.get("bases")[0].get("id") + # fake the tables based on faked bases + requests_mock.get(f"https://api.airtable.com/v0/meta/bases/{fake_base_id}/tables", status_code=200, json=fake_tables_response) + streams = list(source.streams(config)) + assert len(streams) == 1 + assert [stream.name for stream in streams] == expected_discovery_stream_name + +# diff --git a/airbyte-integrations/connectors/source-airtable/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-airtable/unit_tests/test_streams.py new file mode 100644 index 0000000000000..3ef2551739aad --- /dev/null +++ b/airbyte-integrations/connectors/source-airtable/unit_tests/test_streams.py @@ -0,0 +1,165 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from unittest.mock import MagicMock + +import pytest +import requests +from source_airtable.streams import URL_BASE, AirtableBases, AirtableStream, AirtableTables + + +class TestBases: + + bases_instance = AirtableBases(authenticator=MagicMock()) + + def test_url_base(self): + assert self.bases_instance.url_base == URL_BASE + + def test_primary_key(self): + assert self.bases_instance.primary_key is None + + def test_path(self): + assert self.bases_instance.path() == "meta/bases" + + def test_stream_name(self): + assert self.bases_instance.name == "bases" + + @pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (200, False), + (403, False), + (422, False), + (401, False), + ], + ) + def test_should_retry(self, http_status, should_retry): + response_mock = MagicMock() + response_mock.status_code = http_status + assert self.bases_instance.should_retry(response_mock) == should_retry + + @pytest.mark.parametrize( + ("http_status", "expected_backoff_time"), + [ + (200, None), + (429, 30), + ], + ) + def test_backoff_time(self, http_status, expected_backoff_time, requests_mock): + url = "https://api.airtable.com/v0/meta/bases/" + requests_mock.get(url, status_code=http_status, json={}) + response = requests.get(url) + assert self.bases_instance.backoff_time(response) == expected_backoff_time + + def test_next_page(self, requests_mock): + url = "https://api.airtable.com/v0/meta/bases/" + requests_mock.get(url, status_code=200, json={"offset": "xyz"}) + response = requests.get(url) + assert self.bases_instance.next_page_token(response) == "xyz" + + @pytest.mark.parametrize( + ("next_page", "expected"), + [ + (None, {}), + ("xyz", {"offset": "xyz"}), + ], + ) + def test_request_params(self, next_page, expected): + assert self.bases_instance.request_params(next_page) == expected + + def test_parse_response(self, fake_bases_response, expected_bases_response, requests_mock): + url = "https://api.airtable.com/v0/meta/bases/" + requests_mock.get(url, status_code=200, json=fake_bases_response) + response = requests.get(url) + assert list(self.bases_instance.parse_response(response)) == expected_bases_response + + +class TestTables: + + tables_instance = AirtableTables(base_id="test_base_id", authenticator=MagicMock()) + + def test_path(self): + assert self.tables_instance.path() == "meta/bases/test_base_id/tables" + + def test_stream_name(self): + assert self.tables_instance.name == "tables" + + +class TestAirtableStream: + + def stream_instance(self, prepared_stream): + return AirtableStream( + stream_path=prepared_stream["stream_path"], + stream_name=prepared_stream["stream"].name, + stream_schema=prepared_stream["stream"].json_schema, + authenticator=MagicMock(), + ) + + def test_streams_url_base(self, prepared_stream): + assert self.stream_instance(prepared_stream).url_base == URL_BASE + + def test_streams_primary_key(self, prepared_stream): + assert self.stream_instance(prepared_stream).primary_key == "id" + + def test_streams_name(self, prepared_stream): + assert self.stream_instance(prepared_stream).name == 'test_base/test_table' + + def test_streams_path(self, prepared_stream): + assert self.stream_instance(prepared_stream).path() == "some_base_id/some_table_id" + + @pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (200, False), + (403, False), + (422, False), + (401, False), + ], + ) + def test_streams_should_retry(self, http_status, should_retry, prepared_stream): + response_mock = MagicMock() + response_mock.status_code = http_status + assert self.stream_instance(prepared_stream).should_retry(response_mock) == should_retry + + @pytest.mark.parametrize( + ("http_status", "expected_backoff_time"), + [ + (200, None), + (429, 30), + ], + ) + def test_streams_backoff_time(self, http_status, expected_backoff_time, prepared_stream, requests_mock): + url = "https://api.airtable.com/v0/meta/bases/" + requests_mock.get(url, status_code=http_status, json={}) + response = requests.get(url) + assert self.stream_instance(prepared_stream).backoff_time(response) == expected_backoff_time + + def test_streams_get_json_schema(self, prepared_stream): + assert self.stream_instance(prepared_stream).get_json_schema() == prepared_stream['stream'].json_schema + + def test_streams_next_page(self, prepared_stream, requests_mock): + url = "https://api.airtable.com/v0/meta/bases/" + requests_mock.get(url, status_code=200, json={"offset": "xyz"}) + response = requests.get(url) + assert self.stream_instance(prepared_stream).next_page_token(response) == "xyz" + + @pytest.mark.parametrize( + ("next_page", "expected"), + [ + (None, {}), + ("xyz", {"offset": "xyz"}), + ], + ) + def test_streams_request_params(self, next_page, expected, prepared_stream): + assert self.stream_instance(prepared_stream).request_params(next_page) == expected + + def test_streams_parse_response(self, prepared_stream, streams_json_response, streams_processed_response, requests_mock): + stream = self.stream_instance(prepared_stream) + url = f"{stream.url_base}/{stream.path()}" + requests_mock.get(url, status_code=200, json=streams_json_response) + response = requests.get(url) + assert list(stream.parse_response(response)) == streams_processed_response + +# diff --git a/docs/integrations/sources/airtable.md b/docs/integrations/sources/airtable.md index efad9c36bdce5..ea0a3b371456a 100644 --- a/docs/integrations/sources/airtable.md +++ b/docs/integrations/sources/airtable.md @@ -18,14 +18,12 @@ This source allows you to configure any table in your Airtable base. In case you ### Requirements * An Airtable account & API key -* Base ID -* Tables you'd like to replicate ### Setup guide 1. To find your API key, navigate to your [account page](https://airtable.com/account). On your account overview page, under the API heading, there's a button that says "Generate API key." ![img.png](../../.gitbook/assets/airtable_api_key1.png) 2. Generate an API key by clicking the button. If one already exists, click the key to reveal it and copy it. ![img.png](../../.gitbook/assets/airtable_api_key2.png). See [here](https://support.airtable.com/hc/en-us/articles/219046777-How-do-I-get-my-API-key-) for more information on managing your API keys. -3. Find the Airtable base containing the tables you'd like to replicate by visiting https://airtable.com/api and logging in. Once you're logged in, you'll see a list of available bases: ![bases](../../.gitbook/assets/airtable_bases_ui_list1.png). Click the base whose tables you want to replicate. You'll find the base ID on the next page: ![](../../.gitbook/assets/airtable_base_id.png). Copy this ID for use when configuring the connector. + ### Performance Considerations (Airbyte Open-Source) @@ -35,6 +33,7 @@ See information about rate limits [here](https://support.airtable.com/hc/en-us/a | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :--------------------------------------- | +| 1.0.0 | 2022-12-22 | [20846](https://github.com/airbytehq/airbyte/pull/20846) | Migrated to Metadata API for dynamic schema generation | | 0.1.3 | 2022-10-26 | [18491](https://github.com/airbytehq/airbyte/pull/18491) | Improve schema discovery logic | | 0.1.2 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy | | 0.1.1 | 2021-12-06 | [8425](https://github.com/airbytehq/airbyte/pull/8425) | Update title, description fields in spec |