Skip to content

Commit

Permalink
Source File: add validate_and_transform of config (#18116)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed Oct 19, 2022
1 parent cd36d53 commit 5b81897
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@
- name: File
sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerRepository: airbyte/source-file
dockerImageTag: 0.2.24
dockerImageTag: 0.2.26
documentationUrl: https://docs.airbyte.com/integrations/sources/file
icon: file.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3109,7 +3109,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-file:0.2.24"
- dockerImage: "airbyte/source-file:0.2.26"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/file"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_file ./source_file
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.25
LABEL io.airbyte.version=0.2.26
LABEL io.airbyte.name=airbyte/source-file
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test__read_from_public_provider(download_gcs_public_data, storage_provider,
config = {
"format": "csv",
"dataset_name": "output",
"reader_options": json.dumps({"sep": separator, "nrows": 42}),
"reader_options": {"sep": separator, "nrows": 42},
"provider": {"storage": storage_provider, "user_agent": False},
"url": url,
}
Expand All @@ -103,7 +103,7 @@ def test__read_from_private_gcs(google_cloud_service_credentials, private_google
"dataset_name": "output",
"format": "csv",
"url": private_google_cloud_file,
"reader_options": json.dumps({"sep": ",", "nrows": 42}),
"reader_options": {"sep": ",", "nrows": 42},
"provider": {
"storage": "GCS",
"service_account_json": json.dumps(google_cloud_service_credentials),
Expand All @@ -117,7 +117,7 @@ def test__read_from_private_aws(aws_credentials, private_aws_file):
"dataset_name": "output",
"format": "csv",
"url": private_aws_file,
"reader_options": json.dumps({"sep": ",", "nrows": 42}),
"reader_options": {"sep": ",", "nrows": 42},
"provider": {
"storage": "S3",
"aws_access_key_id": aws_credentials["aws_access_key_id"],
Expand All @@ -132,7 +132,7 @@ def test__read_from_public_azblob(azblob_credentials, public_azblob_file):
"dataset_name": "output",
"format": "csv",
"url": public_azblob_file,
"reader_options": json.dumps({"sep": ",", "nrows": 42}),
"reader_options": {"sep": ",", "nrows": 42},
"provider": {"storage": "AzBlob", "storage_account": azblob_credentials["storage_account"]},
}
check_read(config)
Expand All @@ -143,7 +143,7 @@ def test__read_from_private_azblob_shared_key(azblob_credentials, private_azblob
"dataset_name": "output",
"format": "csv",
"url": private_azblob_file,
"reader_options": json.dumps({"sep": ",", "nrows": 42}),
"reader_options": {"sep": ",", "nrows": 42},
"provider": {
"storage": "AzBlob",
"storage_account": azblob_credentials["storage_account"],
Expand All @@ -158,7 +158,7 @@ def test__read_from_private_azblob_sas_token(azblob_credentials, private_azblob_
"dataset_name": "output",
"format": "csv",
"url": private_azblob_file,
"reader_options": json.dumps({"sep": ",", "nrows": 42}),
"reader_options": {"sep": ",", "nrows": 42},
"provider": {
"storage": "AzBlob",
"storage_account": azblob_credentials["storage_account"],
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"airbyte-cdk~=0.2.0",
"gcsfs==2022.7.1",
"genson==1.2.2",
"google-cloud-storage==2.5.0",
Expand Down
12 changes: 2 additions & 10 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,20 +230,12 @@ class Client:
reader_class = URLFile
binary_formats = {"excel", "excel_binary", "feather", "parquet", "orc", "pickle"}

def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: str = None):
def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: dict = None):
self._dataset_name = dataset_name
self._url = url
self._provider = provider
self._reader_format = format or "csv"
self._reader_options = {}
if reader_options:
try:
self._reader_options = json.loads(reader_options)
except json.decoder.JSONDecodeError as err:
error_msg = f"Failed to parse reader options {repr(err)}\n{reader_options}\n{traceback.format_exc()}"
logger.error(error_msg)
raise ConfigurationError(error_msg) from err

self._reader_options = reader_options or {}
self.binary_source = self._reader_format in self.binary_formats
self.encoding = self._reader_options.get("encoding")

Expand Down
32 changes: 17 additions & 15 deletions airbyte-integrations/connectors/source-file/source_file/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#


import json
import logging
import traceback
from datetime import datetime
Expand All @@ -19,9 +20,9 @@
Type,
)
from airbyte_cdk.sources import Source
from pandas.errors import ParserError

from .client import Client
from .utils import dropbox_force_download


class SourceFile(Source):
Expand Down Expand Up @@ -77,29 +78,28 @@ def _get_client(self, config: Mapping):

return client

def _validate_and_transform(self, config: Mapping[str, Any]):
if "reader_options" in config:
try:
config["reader_options"] = json.loads(config["reader_options"])
except ValueError:
raise Exception("reader_options is not valid JSON")
else:
config["reader_options"] = {}
config["url"] = dropbox_force_download(config["url"])
return config

def check(self, logger, config: Mapping) -> AirbyteConnectionStatus:
"""
Check involves verifying that the specified file is reachable with
our credentials.
"""
config = self._validate_and_transform(config)
client = self._get_client(config)
logger.info(f"Checking access to {client.reader.full_url}...")
try:
with client.reader.open() as f:
if config.get("provider").get("storage") == "HTTPS":
# on behalf of https://github.com/airbytehq/alpha-beta-issues/issues/224
# some providers like Dropbox creates the Shared Public URLs with ?dl=0 query param,
# this requires user interaction before accessing the file,
# we should validate this on the Check Connection stage to avoid sync issues further.
client.CSV_CHUNK_SIZE = 2
next(client.load_dataframes(f))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
# for all other formats and storrage providers
with client.reader.open():
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except ParserError:
reason = f"Failed to load {client.reader.full_url}, check the URL is valid and allows to download file directly"
logger.error(reason)
return AirbyteConnectionStatus(status=Status.FAILED, message=reason)
except Exception as err:
reason = f"Failed to load {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
logger.error(reason)
Expand All @@ -110,6 +110,7 @@ def discover(self, logger: AirbyteLogger, config: Mapping) -> AirbyteCatalog:
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a
Remote CSV File, returns an Airbyte catalog where each csv file is a stream, and each column is a field.
"""
config = self._validate_and_transform(config)
client = self._get_client(config)
name = client.stream_name

Expand All @@ -130,6 +131,7 @@ def read(
state: MutableMapping[str, Any] = None,
) -> Iterator[AirbyteMessage]:
"""Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state."""
config = self._validate_and_transform(config)
client = self._get_client(config)
fields = self.selected_fields(catalog)
name = client.stream_name
Expand Down
19 changes: 19 additions & 0 deletions airbyte-integrations/connectors/source-file/source_file/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from urllib.parse import parse_qs, urlencode, urlparse


def dropbox_force_download(url):
"""
https://help.dropbox.com/share/force-download
"""
parse_result = urlparse(url)
if parse_result.netloc.split(".")[-2:] == ["dropbox", "com"]:
qs = parse_qs(parse_result.query)
if qs.get("dl") == ["0"]:
qs["dl"] = "1"
parse_result = parse_result._replace(query=urlencode(qs))
return parse_result.geturl()
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def invalid_config(read_file):


@pytest.fixture
def non_direct_url_provided_config():
def config_dropbox_link():
return {
"dataset_name": "test",
"format": "csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,3 @@ def test_open_gcs_url():
provider.update({"service_account_json": '{service_account_json": "service_account_json"}'})
with pytest.raises(ConfigurationError):
assert URLFile(url="", provider=provider)._open_gcs_url()


def test_client_wrong_reader_options():
with pytest.raises(ConfigurationError):
Client(
dataset_name="test_dataset",
url="scp://test_dataset",
provider={"provider": {"storage": "HTTPS", "reader_impl": "gcsfs", "user_agent": False}},
reader_options='{encoding":"utf_16"}',
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import json
import logging
from copy import deepcopy
from unittest.mock import PropertyMock

import jsonschema
Expand Down Expand Up @@ -68,6 +69,7 @@ def get_catalog(properties):
stream=AirbyteStream(
name="test",
json_schema={"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": properties},
supported_sync_modes=[SyncMode.full_refresh],
),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
Expand All @@ -91,7 +93,7 @@ def test_nan_to_null(absolute_path, test_files):
)

source = SourceFile()
records = source.read(logger=logger, config=config, catalog=catalog)
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
assert records == [
{"col1": "key1", "col2": 1.11, "col3": None},
Expand All @@ -101,7 +103,7 @@ def test_nan_to_null(absolute_path, test_files):
]

config.update({"format": "yaml", "url": f"{absolute_path}/{test_files}/formats/yaml/demo.yaml"})
records = source.read(logger=logger, config=config, catalog=catalog)
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
assert records == []

Expand All @@ -128,10 +130,8 @@ def test_check_invalid_config(source, invalid_config):
assert actual.status == expected.status


def test_check_non_direct_url_provided_config(source, non_direct_url_provided_config):
expected = AirbyteConnectionStatus(status=Status.FAILED)
actual = source.check(logger=logger, config=non_direct_url_provided_config)
assert actual.status == expected.status
def test_discover_dropbox_link(source, config_dropbox_link):
source.discover(logger=logger, config=config_dropbox_link)


def test_discover(source, config, client):
Expand All @@ -145,3 +145,9 @@ def test_discover(source, config, client):

with pytest.raises(Exception):
source.discover(logger=logger, config=config)


def test_check_wrong_reader_options(source, config):
config["reader_options"] = '{encoding":"utf_16"}'
with pytest.raises(Exception):
source.check(logger=logger, config=config)
3 changes: 2 additions & 1 deletion docs/integrations/sources/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ In order to read large files from a remote location, this connector uses the [sm

| Version | Date | Pull Request | Subject |
| ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------- |
| 0.2.25 | 2022-10-14 | [17994](https://github.com/airbytehq/airbyte/pull/17994) | Handle `UnicodeDecodeError` during discover step.
| 0.2.26 | 2022-10-18 | [18116](https://github.com/airbytehq/airbyte/pull/18116) | Transform Dropbox shared link |
| 0.2.25 | 2022-10-14 | [17994](https://github.com/airbytehq/airbyte/pull/17994) | Handle `UnicodeDecodeError` during discover step. |
| 0.2.24 | 2022-10-03 | [17504](https://github.com/airbytehq/airbyte/pull/17504) | Validate data for `HTTPS` while `check_connection` |
| 0.2.23 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state. |
| 0.2.22 | 2022-09-15 | [16772](https://github.com/airbytehq/airbyte/pull/16772) | Fix schema generation for JSON files containing arrays |
Expand Down

0 comments on commit 5b81897

Please sign in to comment.