From a092f770da8c5b5b3f4b0fec624cad8aca770bd0 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Tue, 18 Oct 2022 19:15:47 +0300 Subject: [PATCH 1/8] improve reader_options Signed-off-by: Sergey Chvalyuk --- .../connectors/source-file/source_file/client.py | 12 ++---------- .../connectors/source-file/source_file/source.py | 14 ++++++++++++++ .../source-file/unit_tests/test_client.py | 12 ++---------- .../source-file/unit_tests/test_source.py | 11 +++++++++-- 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/airbyte-integrations/connectors/source-file/source_file/client.py b/airbyte-integrations/connectors/source-file/source_file/client.py index d07ba892f93b9..79fd2f8deb907 100644 --- a/airbyte-integrations/connectors/source-file/source_file/client.py +++ b/airbyte-integrations/connectors/source-file/source_file/client.py @@ -230,20 +230,12 @@ class Client: reader_class = URLFile binary_formats = {"excel", "excel_binary", "feather", "parquet", "orc", "pickle"} - def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: str = None): + def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: dict = None): self._dataset_name = dataset_name self._url = url self._provider = provider self._reader_format = format or "csv" - self._reader_options = {} - if reader_options: - try: - self._reader_options = json.loads(reader_options) - except json.decoder.JSONDecodeError as err: - error_msg = f"Failed to parse reader options {repr(err)}\n{reader_options}\n{traceback.format_exc()}" - logger.error(error_msg) - raise ConfigurationError(error_msg) from err - + self._reader_options = reader_options or {} self.binary_source = self._reader_format in self.binary_formats self.encoding = self._reader_options.get("encoding") diff --git a/airbyte-integrations/connectors/source-file/source_file/source.py b/airbyte-integrations/connectors/source-file/source_file/source.py index ae0e58497f0f5..a565e29d91873 100644 --- a/airbyte-integrations/connectors/source-file/source_file/source.py +++ b/airbyte-integrations/connectors/source-file/source_file/source.py @@ -3,6 +3,7 @@ # +import json import logging import traceback from datetime import datetime @@ -77,11 +78,22 @@ def _get_client(self, config: Mapping): return client + def _validate_and_transform(self, config: Mapping[str, Any]): + if "reader_options" in config: + try: + config["reader_options"] = json.loads(config["reader_options"]) + except ValueError: + raise Exception("reader_options is not valid JSON") + else: + config["reader_options"] = {} + return config + def check(self, logger, config: Mapping) -> AirbyteConnectionStatus: """ Check involves verifying that the specified file is reachable with our credentials. """ + config = self._validate_and_transform(config) client = self._get_client(config) logger.info(f"Checking access to {client.reader.full_url}...") try: @@ -110,6 +122,7 @@ def discover(self, logger: AirbyteLogger, config: Mapping) -> AirbyteCatalog: Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Remote CSV File, returns an Airbyte catalog where each csv file is a stream, and each column is a field. """ + config = self._validate_and_transform(config) client = self._get_client(config) name = client.stream_name @@ -130,6 +143,7 @@ def read( state: MutableMapping[str, Any] = None, ) -> Iterator[AirbyteMessage]: """Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.""" + config = self._validate_and_transform(config) client = self._get_client(config) fields = self.selected_fields(catalog) name = client.stream_name diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_client.py b/airbyte-integrations/connectors/source-file/unit_tests/test_client.py index 72825ac664f59..95791d597d21a 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_client.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_client.py @@ -17,6 +17,7 @@ def wrong_format_client(): format="wrong", ) + @pytest.fixture def csv_format_client(): return Client( @@ -26,6 +27,7 @@ def csv_format_client(): format="csv", ) + @pytest.mark.parametrize( "storage, expected_scheme", [ @@ -132,13 +134,3 @@ def test_open_gcs_url(): provider.update({"service_account_json": '{service_account_json": "service_account_json"}'}) with pytest.raises(ConfigurationError): assert URLFile(url="", provider=provider)._open_gcs_url() - - -def test_client_wrong_reader_options(): - with pytest.raises(ConfigurationError): - Client( - dataset_name="test_dataset", - url="scp://test_dataset", - provider={"provider": {"storage": "HTTPS", "reader_impl": "gcsfs", "user_agent": False}}, - reader_options='{encoding":"utf_16"}', - ) diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py index 41f3b0027ac3a..c9649273764db 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -4,6 +4,7 @@ import json import logging +from copy import deepcopy from unittest.mock import PropertyMock import jsonschema @@ -91,7 +92,7 @@ def test_nan_to_null(absolute_path, test_files): ) source = SourceFile() - records = source.read(logger=logger, config=config, catalog=catalog) + records = source.read(logger=logger, config=deepcopy(config), catalog=catalog) records = [r.record.data for r in records] assert records == [ {"col1": "key1", "col2": 1.11, "col3": None}, @@ -101,7 +102,7 @@ def test_nan_to_null(absolute_path, test_files): ] config.update({"format": "yaml", "url": f"{absolute_path}/{test_files}/formats/yaml/demo.yaml"}) - records = source.read(logger=logger, config=config, catalog=catalog) + records = source.read(logger=logger, config=deepcopy(config), catalog=catalog) records = [r.record.data for r in records] assert records == [] @@ -145,3 +146,9 @@ def test_discover(source, config, client): with pytest.raises(Exception): source.discover(logger=logger, config=config) + + +def test_check_wrong_reader_options(source, config): + config["reader_options"] = '{encoding":"utf_16"}' + with pytest.raises(Exception): + source.check(logger=logger, config=config) From b03abdc2ac0128fdff1ee1e1a84ba8e9e17b7e58 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Tue, 18 Oct 2022 22:16:18 +0300 Subject: [PATCH 2/8] dropbox_force_download added Signed-off-by: Sergey Chvalyuk --- .../source-file/source_file/client.py | 1 - .../source-file/source_file/source.py | 2 ++ .../source-file/source_file/utils.py | 19 +++++++++++++++++++ .../source-file/unit_tests/conftest.py | 2 +- .../source-file/unit_tests/test_source.py | 6 ++---- 5 files changed, 24 insertions(+), 6 deletions(-) create mode 100644 airbyte-integrations/connectors/source-file/source_file/utils.py diff --git a/airbyte-integrations/connectors/source-file/source_file/client.py b/airbyte-integrations/connectors/source-file/source_file/client.py index 79fd2f8deb907..6a953dc4f558e 100644 --- a/airbyte-integrations/connectors/source-file/source_file/client.py +++ b/airbyte-integrations/connectors/source-file/source_file/client.py @@ -309,7 +309,6 @@ def load_dataframes(self, fp, skip_data=False) -> Iterable: logger.error(error_msg) raise ConfigurationError(error_msg) from err - reader_options = {**self._reader_options} try: if self._reader_format == "csv": diff --git a/airbyte-integrations/connectors/source-file/source_file/source.py b/airbyte-integrations/connectors/source-file/source_file/source.py index a565e29d91873..2e5023ae6724b 100644 --- a/airbyte-integrations/connectors/source-file/source_file/source.py +++ b/airbyte-integrations/connectors/source-file/source_file/source.py @@ -23,6 +23,7 @@ from pandas.errors import ParserError from .client import Client +from .utils import dropbox_force_download class SourceFile(Source): @@ -86,6 +87,7 @@ def _validate_and_transform(self, config: Mapping[str, Any]): raise Exception("reader_options is not valid JSON") else: config["reader_options"] = {} + config["url"] = dropbox_force_download(config["url"]) return config def check(self, logger, config: Mapping) -> AirbyteConnectionStatus: diff --git a/airbyte-integrations/connectors/source-file/source_file/utils.py b/airbyte-integrations/connectors/source-file/source_file/utils.py new file mode 100644 index 0000000000000..7dd61daf63ae4 --- /dev/null +++ b/airbyte-integrations/connectors/source-file/source_file/utils.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from urllib.parse import parse_qs, urlencode, urlparse + + +def dropbox_force_download(url): + """ + https://help.dropbox.com/share/force-download + """ + parse_result = urlparse(url) + if parse_result.netloc.split(".")[-2:] == ["dropbox", "com"]: + qs = parse_qs(parse_result.query) + if qs.get("dl") == ["0"]: + qs["dl"] = "1" + parse_result = parse_result._replace(query=urlencode(qs)) + return parse_result.geturl() diff --git a/airbyte-integrations/connectors/source-file/unit_tests/conftest.py b/airbyte-integrations/connectors/source-file/unit_tests/conftest.py index 20a1b25560de0..4dab8b9f5def0 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/conftest.py @@ -35,7 +35,7 @@ def invalid_config(read_file): @pytest.fixture -def non_direct_url_provided_config(): +def config_dropbox_link(): return { "dataset_name": "test", "format": "csv", diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py index c9649273764db..4a5a814dc4933 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -129,10 +129,8 @@ def test_check_invalid_config(source, invalid_config): assert actual.status == expected.status -def test_check_non_direct_url_provided_config(source, non_direct_url_provided_config): - expected = AirbyteConnectionStatus(status=Status.FAILED) - actual = source.check(logger=logger, config=non_direct_url_provided_config) - assert actual.status == expected.status +def test_discover_dropbox_link(source, config_dropbox_link): + source.discover(logger=logger, config=config_dropbox_link) def test_discover(source, config, client): From c4c5d48586dd64e1b47f833b579b451badca9d60 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Tue, 18 Oct 2022 22:18:59 +0300 Subject: [PATCH 3/8] revert back old check Signed-off-by: Sergey Chvalyuk --- .../connectors/source-file/source_file/source.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/airbyte-integrations/connectors/source-file/source_file/source.py b/airbyte-integrations/connectors/source-file/source_file/source.py index 2e5023ae6724b..68d8592914cdb 100644 --- a/airbyte-integrations/connectors/source-file/source_file/source.py +++ b/airbyte-integrations/connectors/source-file/source_file/source.py @@ -20,7 +20,6 @@ Type, ) from airbyte_cdk.sources import Source -from pandas.errors import ParserError from .client import Client from .utils import dropbox_force_download @@ -99,21 +98,8 @@ def check(self, logger, config: Mapping) -> AirbyteConnectionStatus: client = self._get_client(config) logger.info(f"Checking access to {client.reader.full_url}...") try: - with client.reader.open() as f: - if config.get("provider").get("storage") == "HTTPS": - # on behalf of https://github.com/airbytehq/alpha-beta-issues/issues/224 - # some providers like Dropbox creates the Shared Public URLs with ?dl=0 query param, - # this requires user interaction before accessing the file, - # we should validate this on the Check Connection stage to avoid sync issues further. - client.CSV_CHUNK_SIZE = 2 - next(client.load_dataframes(f)) - return AirbyteConnectionStatus(status=Status.SUCCEEDED) - # for all other formats and storrage providers + with client.reader.open(): return AirbyteConnectionStatus(status=Status.SUCCEEDED) - except ParserError: - reason = f"Failed to load {client.reader.full_url}, check the URL is valid and allows to download file directly" - logger.error(reason) - return AirbyteConnectionStatus(status=Status.FAILED, message=reason) except Exception as err: reason = f"Failed to load {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}" logger.error(reason) From b8ff3470976fa05497840e2c9aab02ffd43bae66 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Tue, 18 Oct 2022 22:24:42 +0300 Subject: [PATCH 4/8] file.md updated Signed-off-by: Sergey Chvalyuk --- airbyte-integrations/connectors/source-file/Dockerfile | 2 +- docs/integrations/sources/file.md | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-file/Dockerfile b/airbyte-integrations/connectors/source-file/Dockerfile index faf40d209f6ef..b3f1178648ba9 100644 --- a/airbyte-integrations/connectors/source-file/Dockerfile +++ b/airbyte-integrations/connectors/source-file/Dockerfile @@ -17,5 +17,5 @@ COPY source_file ./source_file ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.25 +LABEL io.airbyte.version=0.2.26 LABEL io.airbyte.name=airbyte/source-file diff --git a/docs/integrations/sources/file.md b/docs/integrations/sources/file.md index f10136ec89f39..15e26a7d9956a 100644 --- a/docs/integrations/sources/file.md +++ b/docs/integrations/sources/file.md @@ -129,7 +129,8 @@ In order to read large files from a remote location, this connector uses the [sm | Version | Date | Pull Request | Subject | | ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------- | -| 0.2.25 | 2022-10-14 | [17994](https://github.com/airbytehq/airbyte/pull/17994) | Handle `UnicodeDecodeError` during discover step. +| 0.2.26 | 2022-10-18 | [18116](https://github.com/airbytehq/airbyte/pull/18116) | DropBox: Force a shared link to download | +| 0.2.25 | 2022-10-14 | [17994](https://github.com/airbytehq/airbyte/pull/17994) | Handle `UnicodeDecodeError` during discover step. | | 0.2.24 | 2022-10-03 | [17504](https://github.com/airbytehq/airbyte/pull/17504) | Validate data for `HTTPS` while `check_connection` | | 0.2.23 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state. | | 0.2.22 | 2022-09-15 | [16772](https://github.com/airbytehq/airbyte/pull/16772) | Fix schema generation for JSON files containing arrays | From 3a0bb83e5c32fa77a19e73a340ca727f748ce725 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Wed, 19 Oct 2022 09:14:14 +0300 Subject: [PATCH 5/8] integration_tests fixed Signed-off-by: Sergey Chvalyuk --- .../client_storage_providers_test.py | 12 ++++++------ docs/integrations/sources/file.md | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py b/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py index 69086321f2384..89d3463f79feb 100644 --- a/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py +++ b/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py @@ -90,7 +90,7 @@ def test__read_from_public_provider(download_gcs_public_data, storage_provider, config = { "format": "csv", "dataset_name": "output", - "reader_options": json.dumps({"sep": separator, "nrows": 42}), + "reader_options": {"sep": separator, "nrows": 42}, "provider": {"storage": storage_provider, "user_agent": False}, "url": url, } @@ -103,7 +103,7 @@ def test__read_from_private_gcs(google_cloud_service_credentials, private_google "dataset_name": "output", "format": "csv", "url": private_google_cloud_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": { "storage": "GCS", "service_account_json": json.dumps(google_cloud_service_credentials), @@ -117,7 +117,7 @@ def test__read_from_private_aws(aws_credentials, private_aws_file): "dataset_name": "output", "format": "csv", "url": private_aws_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": { "storage": "S3", "aws_access_key_id": aws_credentials["aws_access_key_id"], @@ -132,7 +132,7 @@ def test__read_from_public_azblob(azblob_credentials, public_azblob_file): "dataset_name": "output", "format": "csv", "url": public_azblob_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": {"storage": "AzBlob", "storage_account": azblob_credentials["storage_account"]}, } check_read(config) @@ -143,7 +143,7 @@ def test__read_from_private_azblob_shared_key(azblob_credentials, private_azblob "dataset_name": "output", "format": "csv", "url": private_azblob_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": { "storage": "AzBlob", "storage_account": azblob_credentials["storage_account"], @@ -158,7 +158,7 @@ def test__read_from_private_azblob_sas_token(azblob_credentials, private_azblob_ "dataset_name": "output", "format": "csv", "url": private_azblob_file, - "reader_options": json.dumps({"sep": ",", "nrows": 42}), + "reader_options": {"sep": ",", "nrows": 42}, "provider": { "storage": "AzBlob", "storage_account": azblob_credentials["storage_account"], diff --git a/docs/integrations/sources/file.md b/docs/integrations/sources/file.md index 15e26a7d9956a..06dffb5782971 100644 --- a/docs/integrations/sources/file.md +++ b/docs/integrations/sources/file.md @@ -129,7 +129,7 @@ In order to read large files from a remote location, this connector uses the [sm | Version | Date | Pull Request | Subject | | ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------- | -| 0.2.26 | 2022-10-18 | [18116](https://github.com/airbytehq/airbyte/pull/18116) | DropBox: Force a shared link to download | +| 0.2.26 | 2022-10-18 | [18116](https://github.com/airbytehq/airbyte/pull/18116) | Transform Dropbox shared link | | 0.2.25 | 2022-10-14 | [17994](https://github.com/airbytehq/airbyte/pull/17994) | Handle `UnicodeDecodeError` during discover step. | | 0.2.24 | 2022-10-03 | [17504](https://github.com/airbytehq/airbyte/pull/17504) | Validate data for `HTTPS` while `check_connection` | | 0.2.23 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state. | From b000583d1e5cbf3e5055eb408d5f7ca5e4abde45 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Wed, 19 Oct 2022 18:17:58 +0300 Subject: [PATCH 6/8] fix 'supported_sync_modes' added Signed-off-by: Sergey Chvalyuk --- .../connectors/source-file/unit_tests/test_source.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py index 4a5a814dc4933..c66d75b7ff1c3 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -69,6 +69,7 @@ def get_catalog(properties): stream=AirbyteStream( name="test", json_schema={"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": properties}, + supported_sync_modes=[SyncMode.full_refresh] ), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.overwrite, From 72a45f8268b7f42ad99d7247415a8d2b9e374cde Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Wed, 19 Oct 2022 21:11:48 +0300 Subject: [PATCH 7/8] fix Signed-off-by: Sergey Chvalyuk --- airbyte-integrations/connectors/source-file/setup.py | 2 +- .../connectors/source-file/unit_tests/test_source.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-file/setup.py b/airbyte-integrations/connectors/source-file/setup.py index 8ec1aba3f4b7d..9203aba8120d9 100644 --- a/airbyte-integrations/connectors/source-file/setup.py +++ b/airbyte-integrations/connectors/source-file/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1", + "airbyte-cdk~=0.2.0", "gcsfs==2022.7.1", "genson==1.2.2", "google-cloud-storage==2.5.0", diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py index c66d75b7ff1c3..ea71245a48456 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -69,7 +69,7 @@ def get_catalog(properties): stream=AirbyteStream( name="test", json_schema={"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": properties}, - supported_sync_modes=[SyncMode.full_refresh] + supported_sync_modes=[SyncMode.full_refresh], ), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.overwrite, From af84b228f0d873b83407c69c9eab25ae11a32a15 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Wed, 19 Oct 2022 22:16:07 +0300 Subject: [PATCH 8/8] bump 0.2.26 Signed-off-by: Sergey Chvalyuk --- airbyte-integrations/connectors/source-file-secure/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-file-secure/Dockerfile b/airbyte-integrations/connectors/source-file-secure/Dockerfile index 793605451fdb4..e38177292b4c8 100644 --- a/airbyte-integrations/connectors/source-file-secure/Dockerfile +++ b/airbyte-integrations/connectors/source-file-secure/Dockerfile @@ -1,4 +1,4 @@ -FROM airbyte/source-file:0.2.25 +FROM airbyte/source-file:0.2.26 WORKDIR /airbyte/integration_code COPY source_file_secure ./source_file_secure @@ -9,5 +9,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.25 +LABEL io.airbyte.version=0.2.26 LABEL io.airbyte.name=airbyte/source-file-secure