diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 104896a7126799..a1fc463c74e2a6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -317,7 +317,7 @@ - name: File sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77 dockerRepository: airbyte/source-file - dockerImageTag: 0.2.24 + dockerImageTag: 0.2.26 documentationUrl: https://docs.airbyte.com/integrations/sources/file icon: file.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 180f109f593a67..ad8b845e0b7eca 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3109,7 +3109,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-file:0.2.24" +- dockerImage: "airbyte/source-file:0.2.26" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/file" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-file/Dockerfile b/airbyte-integrations/connectors/source-file/Dockerfile index faf40d209f6ef1..b3f1178648ba9c 100644 --- a/airbyte-integrations/connectors/source-file/Dockerfile +++ b/airbyte-integrations/connectors/source-file/Dockerfile @@ -17,5 +17,5 @@ COPY source_file ./source_file ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.25 +LABEL io.airbyte.version=0.2.26 LABEL io.airbyte.name=airbyte/source-file diff --git a/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py b/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py index 69086321f2384a..89d3463f79febb 100644 --- a/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py +++ b/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py @@ -90,7 +90,7 @@ def test__read_from_public_provider(download_gcs_public_data, storage_provider, config = { "format": "csv", "dataset_name": "output", - "reader_options": json.dumps({"sep": separator, "nrows": 42}), + "reader_options": {"sep": separator, "nrows": 42}, "provider": {"storage": storage_provider, "user_agent": False}, "url": url, } @@ -103,7 +103,7 @@ def test__read_from_private_gcs(google_cloud_service_credentials, private_google "dataset_name": "output", "format": "csv", "url": private_google_cloud_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": { "storage": "GCS", "service_account_json": json.dumps(google_cloud_service_credentials), @@ -117,7 +117,7 @@ def test__read_from_private_aws(aws_credentials, private_aws_file): "dataset_name": "output", "format": "csv", "url": private_aws_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": { "storage": "S3", "aws_access_key_id": aws_credentials["aws_access_key_id"], @@ -132,7 +132,7 @@ def test__read_from_public_azblob(azblob_credentials, public_azblob_file): "dataset_name": "output", "format": "csv", "url": public_azblob_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": {"storage": "AzBlob", "storage_account": azblob_credentials["storage_account"]}, } check_read(config) @@ -143,7 +143,7 @@ def test__read_from_private_azblob_shared_key(azblob_credentials, private_azblob "dataset_name": "output", "format": "csv", "url": private_azblob_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": { "storage": "AzBlob", "storage_account": azblob_credentials["storage_account"], @@ -158,7 +158,7 @@ def test__read_from_private_azblob_sas_token(azblob_credentials, private_azblob_ "dataset_name": "output", "format": "csv", "url": private_azblob_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": { "storage": "AzBlob", "storage_account": azblob_credentials["storage_account"], diff --git a/airbyte-integrations/connectors/source-file/setup.py b/airbyte-integrations/connectors/source-file/setup.py index 8ec1aba3f4b7d9..9203aba8120d97 100644 --- a/airbyte-integrations/connectors/source-file/setup.py +++ b/airbyte-integrations/connectors/source-file/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1", + "airbyte-cdk~=0.2.0", "gcsfs==2022.7.1", "genson==1.2.2", "google-cloud-storage==2.5.0", diff --git a/airbyte-integrations/connectors/source-file/source_file/client.py b/airbyte-integrations/connectors/source-file/source_file/client.py index 9d6ee5ae2aa5ab..6a953dc4f558e4 100644 --- a/airbyte-integrations/connectors/source-file/source_file/client.py +++ b/airbyte-integrations/connectors/source-file/source_file/client.py @@ -230,20 +230,12 @@ class Client: reader_class = URLFile binary_formats = {"excel", "excel_binary", "feather", "parquet", "orc", "pickle"} - def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: str = None): + def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: dict = None): self._dataset_name = dataset_name self._url = url self._provider = provider self._reader_format = format or "csv" - self._reader_options = {} - if reader_options: - try: - self._reader_options = json.loads(reader_options) - except json.decoder.JSONDecodeError as err: - error_msg = f"Failed to parse reader options {repr(err)}\n{reader_options}\n{traceback.format_exc()}" - logger.error(error_msg) - raise ConfigurationError(error_msg) from err - + self._reader_options = reader_options or {} self.binary_source = self._reader_format in self.binary_formats self.encoding = self._reader_options.get("encoding") diff --git a/airbyte-integrations/connectors/source-file/source_file/source.py b/airbyte-integrations/connectors/source-file/source_file/source.py index ae0e58497f0f56..68d8592914cdbc 100644 --- a/airbyte-integrations/connectors/source-file/source_file/source.py +++ b/airbyte-integrations/connectors/source-file/source_file/source.py @@ -3,6 +3,7 @@ # +import json import logging import traceback from datetime import datetime @@ -19,9 +20,9 @@ Type, ) from airbyte_cdk.sources import Source -from pandas.errors import ParserError from .client import Client +from .utils import dropbox_force_download class SourceFile(Source): @@ -77,29 +78,28 @@ def _get_client(self, config: Mapping): return client + def _validate_and_transform(self, config: Mapping[str, Any]): + if "reader_options" in config: + try: + config["reader_options"] = json.loads(config["reader_options"]) + except ValueError: + raise Exception("reader_options is not valid JSON") + else: + config["reader_options"] = {} + config["url"] = dropbox_force_download(config["url"]) + return config + def check(self, logger, config: Mapping) -> AirbyteConnectionStatus: """ Check involves verifying that the specified file is reachable with our credentials. """ + config = self._validate_and_transform(config) client = self._get_client(config) logger.info(f"Checking access to {client.reader.full_url}...") try: - with client.reader.open() as f: - if config.get("provider").get("storage") == "HTTPS": - # on behalf of https://github.com/airbytehq/alpha-beta-issues/issues/224 - # some providers like Dropbox creates the Shared Public URLs with ?dl=0 query param, - # this requires user interaction before accessing the file, - # we should validate this on the Check Connection stage to avoid sync issues further. - client.CSV_CHUNK_SIZE = 2 - next(client.load_dataframes(f)) - return AirbyteConnectionStatus(status=Status.SUCCEEDED) - # for all other formats and storrage providers + with client.reader.open(): return AirbyteConnectionStatus(status=Status.SUCCEEDED) - except ParserError: - reason = f"Failed to load {client.reader.full_url}, check the URL is valid and allows to download file directly" - logger.error(reason) - return AirbyteConnectionStatus(status=Status.FAILED, message=reason) except Exception as err: reason = f"Failed to load {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}" logger.error(reason) @@ -110,6 +110,7 @@ def discover(self, logger: AirbyteLogger, config: Mapping) -> AirbyteCatalog: Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Remote CSV File, returns an Airbyte catalog where each csv file is a stream, and each column is a field. """ + config = self._validate_and_transform(config) client = self._get_client(config) name = client.stream_name @@ -130,6 +131,7 @@ def read( state: MutableMapping[str, Any] = None, ) -> Iterator[AirbyteMessage]: """Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.""" + config = self._validate_and_transform(config) client = self._get_client(config) fields = self.selected_fields(catalog) name = client.stream_name diff --git a/airbyte-integrations/connectors/source-file/source_file/utils.py b/airbyte-integrations/connectors/source-file/source_file/utils.py new file mode 100644 index 00000000000000..7dd61daf63ae42 --- /dev/null +++ b/airbyte-integrations/connectors/source-file/source_file/utils.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from urllib.parse import parse_qs, urlencode, urlparse + + +def dropbox_force_download(url): + """ + https://help.dropbox.com/share/force-download + """ + parse_result = urlparse(url) + if parse_result.netloc.split(".")[-2:] == ["dropbox", "com"]: + qs = parse_qs(parse_result.query) + if qs.get("dl") == ["0"]: + qs["dl"] = "1" + parse_result = parse_result._replace(query=urlencode(qs)) + return parse_result.geturl() diff --git a/airbyte-integrations/connectors/source-file/unit_tests/conftest.py b/airbyte-integrations/connectors/source-file/unit_tests/conftest.py index 20a1b25560de0d..4dab8b9f5def09 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/conftest.py @@ -35,7 +35,7 @@ def invalid_config(read_file): @pytest.fixture -def non_direct_url_provided_config(): +def config_dropbox_link(): return { "dataset_name": "test", "format": "csv", diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_client.py b/airbyte-integrations/connectors/source-file/unit_tests/test_client.py index 25efd8c0ecf654..95791d597d21a5 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_client.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_client.py @@ -134,13 +134,3 @@ def test_open_gcs_url(): provider.update({"service_account_json": '{service_account_json": "service_account_json"}'}) with pytest.raises(ConfigurationError): assert URLFile(url="", provider=provider)._open_gcs_url() - - -def test_client_wrong_reader_options(): - with pytest.raises(ConfigurationError): - Client( - dataset_name="test_dataset", - url="scp://test_dataset", - provider={"provider": {"storage": "HTTPS", "reader_impl": "gcsfs", "user_agent": False}}, - reader_options='{encoding":"utf_16"}', - ) diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py index 41f3b0027ac3ad..ea71245a484569 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -4,6 +4,7 @@ import json import logging +from copy import deepcopy from unittest.mock import PropertyMock import jsonschema @@ -68,6 +69,7 @@ def get_catalog(properties): stream=AirbyteStream( name="test", json_schema={"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": properties}, + supported_sync_modes=[SyncMode.full_refresh], ), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.overwrite, @@ -91,7 +93,7 @@ def test_nan_to_null(absolute_path, test_files): ) source = SourceFile() - records = source.read(logger=logger, config=config, catalog=catalog) + records = source.read(logger=logger, config=deepcopy(config), catalog=catalog) records = [r.record.data for r in records] assert records == [ {"col1": "key1", "col2": 1.11, "col3": None}, @@ -101,7 +103,7 @@ def test_nan_to_null(absolute_path, test_files): ] config.update({"format": "yaml", "url": f"{absolute_path}/{test_files}/formats/yaml/demo.yaml"}) - records = source.read(logger=logger, config=config, catalog=catalog) + records = source.read(logger=logger, config=deepcopy(config), catalog=catalog) records = [r.record.data for r in records] assert records == [] @@ -128,10 +130,8 @@ def test_check_invalid_config(source, invalid_config): assert actual.status == expected.status -def test_check_non_direct_url_provided_config(source, non_direct_url_provided_config): - expected = AirbyteConnectionStatus(status=Status.FAILED) - actual = source.check(logger=logger, config=non_direct_url_provided_config) - assert actual.status == expected.status +def test_discover_dropbox_link(source, config_dropbox_link): + source.discover(logger=logger, config=config_dropbox_link) def test_discover(source, config, client): @@ -145,3 +145,9 @@ def test_discover(source, config, client): with pytest.raises(Exception): source.discover(logger=logger, config=config) + + +def test_check_wrong_reader_options(source, config): + config["reader_options"] = '{encoding":"utf_16"}' + with pytest.raises(Exception): + source.check(logger=logger, config=config) diff --git a/docs/integrations/sources/file.md b/docs/integrations/sources/file.md index f10136ec89f39b..06dffb57829716 100644 --- a/docs/integrations/sources/file.md +++ b/docs/integrations/sources/file.md @@ -129,7 +129,8 @@ In order to read large files from a remote location, this connector uses the [sm | Version | Date | Pull Request | Subject | | ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------- | -| 0.2.25 | 2022-10-14 | [17994](https://github.com/airbytehq/airbyte/pull/17994) | Handle `UnicodeDecodeError` during discover step. +| 0.2.26 | 2022-10-18 | [18116](https://github.com/airbytehq/airbyte/pull/18116) | Transform Dropbox shared link | +| 0.2.25 | 2022-10-14 | [17994](https://github.com/airbytehq/airbyte/pull/17994) | Handle `UnicodeDecodeError` during discover step. | | 0.2.24 | 2022-10-03 | [17504](https://github.com/airbytehq/airbyte/pull/17504) | Validate data for `HTTPS` while `check_connection` | | 0.2.23 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state. | | 0.2.22 | 2022-09-15 | [16772](https://github.com/airbytehq/airbyte/pull/16772) | Fix schema generation for JSON files containing arrays |