Skip to content

Commit

Permalink
Source Airtable: skip missing streams (#25946)
Browse files Browse the repository at this point in the history
* Source Airtable: skip missing streams

* Move stream removal to a separate method, cover with tests

* Update changelog

* Fix flake warnings

* Update docs/integrations/sources/airtable.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update docs/integrations/sources/airtable.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Automated Change

* Update link to docs in warning

* Automated Change

* Automated Change

* Automated Change

* “Empty-Commit”

---------

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: arsenlosenko <arsenlosenko@users.noreply.github.com>
  • Loading branch information
3 people committed May 18, 2023
1 parent f481dd8 commit 9fb3542
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 7 deletions.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-airtable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_airtable ./source_airtable
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=3.0.0
LABEL io.airbyte.version=3.0.1
LABEL io.airbyte.name=airbyte/source-airtable
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 14c6e7ea-97ed-4f5e-a7b5-25e9a80b8212
dockerImageTag: 3.0.0
dockerImageTag: 3.0.1
dockerRepository: airbyte/source-airtable
githubIssueLabel: source-airtable
icon: airtable.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@


import logging
from typing import Any, Iterable, List, Mapping, Tuple
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Tuple, Union

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteCatalog
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.utils.schema_helpers import split_config

from .auth import AirtableAuth
from .schema_helpers import SchemaHelpers
Expand All @@ -35,6 +36,35 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
except Exception as e:
return False, str(e)

def _remove_missed_streams_from_catalog(
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog
) -> ConfiguredAirbyteCatalog:
config, _ = split_config(config)
stream_instances = {s.name: s for s in self.streams(config)}
for index, configured_stream in enumerate(catalog.streams):
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance:
table_id = configured_stream.stream.name.split("/")[2]
similar_streams = [s for s in stream_instances if s.endswith(table_id)]
logger.warn(
f"The requested stream {configured_stream.stream.name} was not found in the source. Please check if this stream was renamed or removed previously and reset data, removing from catalog for this sync run. For more information please refer to documentation: https://docs.airbyte.com/integrations/sources/airtable/#note-on-changed-table-names-and-deleted-tables"
f" Similar streams: {similar_streams}"
f" Available streams: {stream_instances.keys()}"
)
del catalog.streams[index]
return catalog

def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
"""Override to provide filtering of catalog in case if streams were renamed/removed previously"""
catalog = self._remove_missed_streams_from_catalog(logger, config, catalog)
return super().read(logger, config, catalog, state)

def discover(self, logger: AirbyteLogger, config) -> AirbyteCatalog:
"""
Override to provide the dynamic schema generation capabilities,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@


import pytest
from airbyte_cdk.models import AirbyteStream
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode, SyncMode
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from source_airtable.streams import AirtableStream


@pytest.fixture
Expand Down Expand Up @@ -149,3 +150,48 @@ def prepared_stream():
supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append_dedup],
)
}


@pytest.fixture
def make_airtable_stream(prepared_stream):
def make(name):
return AirtableStream(
stream_path=prepared_stream["stream_path"],
stream_name=name,
stream_schema=prepared_stream["stream"].json_schema,
authenticator=fake_auth
)
return make


@pytest.fixture
def make_stream(prepared_stream):
def make(name):
return {
"stream_path": prepared_stream["stream_path"],
"stream": AirbyteStream(
name=name,
json_schema=prepared_stream["stream"].json_schema,
supported_sync_modes=[SyncMode.full_refresh],
supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append_dedup],
),
"sync_mode": SyncMode.full_refresh,
"destination_sync_mode": DestinationSyncMode.overwrite
}
return make


@pytest.fixture
def fake_catalog(make_stream):
stream1 = make_stream(name="test_base/test_table1/abcdef")
stream2 = make_stream(name="test_base/test_table2/qwerty")
return ConfiguredAirbyteCatalog(
streams=[stream1, stream2],
)


@pytest.fixture
def fake_streams(make_airtable_stream):
stream1 = make_airtable_stream(name="test_base/test_table1/abcdef")
stream2 = make_airtable_stream(name="test_base/test_table2_renamed/qwerty")
yield [stream1, stream2]
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#


import logging
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -59,4 +60,14 @@ def test_streams(config, fake_bases_response, fake_tables_response, expected_dis
assert len(streams) == 1
assert [stream.name for stream in streams] == expected_discovery_stream_name


def test_remove_missed_streams_from_catalog(mocker, config, fake_catalog, fake_streams, caplog):
logger = logging.getLogger(__name__)
source = SourceAirtable()
mocker.patch("source_airtable.source.SourceAirtable.streams", return_value=fake_streams)
streams_before = len(fake_catalog.streams)
catalog = source._remove_missed_streams_from_catalog(logger=logger, config=config, catalog=fake_catalog)
assert streams_before - len(catalog.streams) == 1
assert len(caplog.messages) == 1
assert caplog.text.startswith("WARNING")
#
5 changes: 4 additions & 1 deletion docs/integrations/sources/airtable.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Currently, this source connector works with `Standard` subscription plan only. `
When using OAuth, you may see a `400` or `401` error causing a failed sync. You can re-authenticate your Airtable connector to solve the issue temporarily. We are working on a permanent fix that you can follow [here](https://github.com/airbytehq/airbyte/issues/25278).
:::

1. Click **Set up source**.
5. Click **Set up source**.
<!-- /env:cloud -->

<!-- env:oss -->
Expand All @@ -44,6 +44,8 @@ When using OAuth, you may see a `400` or `401` error causing a failed sync. You
5. Click **Set up source**.
<!-- /env:oss -->

### Note on changed table names and deleted tables
Please keep in mind that if you start syncing a table via Airbyte, then rename it in your Airtable account, the connector will not continue syncing that table until you reset your connection schema and select it again. At that point, the table will begin syncing to a table with the new name in the destination. This is because there is no way for Airtable to tell Airbyte which tables have been renamed. Similarly, if you delete a table that was previously syncing, the connector will stop syncing it.

## Supported sync modes

Expand Down Expand Up @@ -109,6 +111,7 @@ See information about rate limits [here](https://airtable.com/developers/web/api

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------|
| 3.0.1 | 2023-05-10 | [25946](https://github.com/airbytehq/airbyte/pull/25946) | Skip stream if it does not appear in catalog |
| 3.0.0 | 2023-03-20 | [22704](https://github.com/airbytehq/airbyte/pull/22704) | Fix for stream name uniqueness |
| 2.0.4 | 2023-03-15 | [24093](https://github.com/airbytehq/airbyte/pull/24093) | Update spec and doc |
| 2.0.3 | 2023-02-02 | [22311](https://github.com/airbytehq/airbyte/pull/22311) | Fix for `singleSelect` types when discovering the schema |
Expand Down

0 comments on commit 9fb3542

Please sign in to comment.