Skip to content

Commit

Permalink
✨Source Airtable: add table name to data records (airbytehq#31044)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Karpets authored and ariesgun committed Oct 23, 2023
1 parent 6049c92 commit 9713938
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 49 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-airtable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_airtable ./source_airtable
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=4.0.0
LABEL io.airbyte.version=4.1.0
LABEL io.airbyte.name=airbyte/source-airtable

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 14c6e7ea-97ed-4f5e-a7b5-25e9a80b8212
dockerImageTag: 4.0.0
dockerImageTag: 4.1.0
dockerRepository: airbyte/source-airtable
githubIssueLabel: source-airtable
icon: airtable.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SchemaTypes:
"singleSelect": SchemaTypes.string,
"externalSyncSource": SchemaTypes.string,
"url": SchemaTypes.string,
# referal default type
# referral default type
"simpleText": SchemaTypes.string,
}

Expand All @@ -89,6 +89,7 @@ def get_json_schema(table: Dict[str, Any]) -> Dict[str, str]:
properties: Dict = {
"_airtable_id": SchemaTypes.string,
"_airtable_created_time": SchemaTypes.string,
"_airtable_table_name": SchemaTypes.string,
}

fields: Dict = table.get("fields", {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def discover(self, logger: AirbyteLogger, config) -> AirbyteCatalog:
f"{base_name}/{SchemaHelpers.clean_name(table.get('name'))}/{table.get('id')}",
SchemaHelpers.get_json_schema(table),
),
"table_name": table.get("name"),
}
)
return AirbyteCatalog(streams=[stream["stream"] for stream in self.streams_catalog])
Expand All @@ -101,5 +102,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream_path=stream["stream_path"],
stream_name=stream["stream"].name,
stream_schema=stream["stream"].json_schema,
table_name=stream["table_name"],
authenticator=self._auth,
)
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ def path(self, **kwargs) -> str:


class AirtableStream(HttpStream, ABC):
def __init__(self, stream_path: str, stream_name: str, stream_schema, **kwargs):
def __init__(self, stream_path: str, stream_name: str, stream_schema, table_name: str, **kwargs):
super().__init__(**kwargs)
self.stream_path = stream_path
self.stream_name = stream_name
self.stream_schema = stream_schema
self.table_name = table_name

url_base = URL_BASE
primary_key = "id"
Expand Down Expand Up @@ -146,6 +147,7 @@ def process_records(self, records) -> Iterable[Mapping[str, Any]]:
yield {
"_airtable_id": record.get("id"),
"_airtable_created_time": record.get("createdTime"),
"_airtable_table_name": self.table_name,
**{SchemaHelpers.clean_name(k): v for k, v in data.items()},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ def streams_json_response():


@pytest.fixture
def streams_processed_response():
def streams_processed_response(table):
return [
{
"_airtable_id": "some_id",
"_airtable_created_time": "2022-12-02T19:50:00.000Z",
"_airtable_table_name": table,
"field1": True,
"field2": "test",
"field3": 123,
Expand All @@ -109,14 +110,15 @@ def expected_json_schema():
"properties": {
"_airtable_created_time": {"type": ["null", "string"]},
"_airtable_id": {"type": ["null", "string"]},
"_airtable_table_name": {"type": ["null", "string"]},
"test": {"type": ["null", "string"]},
},
"type": "object",
}


@pytest.fixture(scope="function", autouse=True)
def prepared_stream():
def prepared_stream(table):
return {
"stream_path": "some_base_id/some_table_id",
"stream": AirbyteStream(
Expand All @@ -128,12 +130,14 @@ def prepared_stream():
"properties": {
"_airtable_id": {"type": ["null", "string"]},
"_airtable_created_time": {"type": ["null", "string"]},
"_airtable_table_name": {"type": ["null", "string"]},
"name": {"type": ["null", "string"]},
},
},
supported_sync_modes=[SyncMode.full_refresh],
supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append_dedup],
),
"table_name": table,
}


Expand All @@ -144,6 +148,7 @@ def make(name):
stream_path=prepared_stream["stream_path"],
stream_name=name,
stream_schema=prepared_stream["stream"].json_schema,
table_name=prepared_stream["table_name"],
authenticator=fake_auth,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
"_airtable_id": {
"type": ["null", "string"]
},
"_airtable_table_name": {
"type": ["null", "string"]
},
"assignee_(from_table_6)": {
"items": {
"type": ["null", "number"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
import os
from typing import Any, Mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def stream_instance(self, prepared_stream):
stream_path=prepared_stream["stream_path"],
stream_name=prepared_stream["stream"].name,
stream_schema=prepared_stream["stream"].json_schema,
table_name=prepared_stream["table_name"],
authenticator=MagicMock(),
)

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/airtable.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ See information about rate limits [here](https://airtable.com/developers/web/api

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------|
| 4.1.0 | 2023-10-10 | [31044](https://github.com/airbytehq/airbyte/pull/31044) | Add source table name to output records |
| 4.0.0 | 2023-10-09 | [31181](https://github.com/airbytehq/airbyte/pull/31181) | Additional schema processing for the FORMULA schema type: Convert to simple data types |
| 3.0.1 | 2023-05-10 | [25946](https://github.com/airbytehq/airbyte/pull/25946) | Skip stream if it does not appear in catalog |
| 3.0.0 | 2023-03-20 | [22704](https://github.com/airbytehq/airbyte/pull/22704) | Fix for stream name uniqueness |
Expand Down

0 comments on commit 9713938

Please sign in to comment.