Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source File - add support for custom encoding #15293

Merged
merged 8 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
70 changes: 33 additions & 37 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ class URLFile:
```
"""

def __init__(self, url: str, provider: dict):
def __init__(self, url: str, provider: dict, binary=None, encoding=None):
self._url = url
self._provider = provider
self._file = None
self.args = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: self._reader_args

"mode": "rb" if binary else "r",
"encoding": encoding,
}

def __enter__(self):
return self._file
Expand All @@ -74,29 +78,28 @@ def close(self):
self._file.close()
self._file = None

def open(self, binary=False):
def open(self):
self.close()
try:
self._file = self._open(binary=binary)
self._file = self._open()
except google.api_core.exceptions.NotFound as err:
raise FileNotFoundError(self.url) from err
return self

def _open(self, binary):
mode = "rb" if binary else "r"
def _open(self):
storage = self.storage_scheme
url = self.url

if storage == "gs://":
return self._open_gcs_url(binary=binary)
return self._open_gcs_url()
elif storage == "s3://":
return self._open_aws_url(binary=binary)
return self._open_aws_url()
elif storage == "azure://":
return self._open_azblob_url(binary=binary)
return self._open_azblob_url()
elif storage == "webhdfs://":
host = self._provider["host"]
port = self._provider["port"]
return smart_open.open(f"webhdfs://{host}:{port}/{url}", mode=mode)
return smart_open.open(f"webhdfs://{host}:{port}/{url}", **self.args)
elif storage in ("ssh://", "scp://", "sftp://"):
user = self._provider["user"]
host = self._provider["host"]
Expand All @@ -114,19 +117,15 @@ def _open(self, binary):
uri = f"{storage}{user}:{password}@{host}:{port}/{url}"
else:
uri = f"{storage}{user}@{host}:{port}/{url}"
return smart_open.open(uri, transport_params=transport_params, mode=mode)
return smart_open.open(uri, transport_params=transport_params, **self.args)
elif storage in ("https://", "http://"):
transport_params = None
if "user_agent" in self._provider and self._provider["user_agent"]:
airbyte_version = environ.get("AIRBYTE_VERSION", "0.0")
transport_params = {"headers": {"Accept-Encoding": "identity", "User-Agent": f"Airbyte/{airbyte_version}"}}
logger.info(f"TransportParams: {transport_params}")
return smart_open.open(
self.full_url,
mode=mode,
transport_params=transport_params,
)
return smart_open.open(self.full_url, mode=mode)
return smart_open.open(self.full_url, transport_params=transport_params, **self.args)
return smart_open.open(self.full_url, **self.args)

@property
def url(self) -> str:
Expand Down Expand Up @@ -168,8 +167,7 @@ def storage_scheme(self) -> str:
logger.error(f"Unknown Storage provider in: {self.full_url}")
return ""

def _open_gcs_url(self, binary) -> object:
mode = "rb" if binary else "r"
def _open_gcs_url(self) -> object:
service_account_json = self._provider.get("service_account_json")
credentials = None
if service_account_json:
Expand All @@ -185,28 +183,27 @@ def _open_gcs_url(self, binary) -> object:
client = GCSClient(credentials=credentials, project=credentials._project_id)
else:
client = GCSClient.create_anonymous_client()
file_to_close = smart_open.open(self.full_url, transport_params=dict(client=client), mode=mode)
file_to_close = smart_open.open(self.full_url, transport_params={"client": client}, **self.args)

return file_to_close

def _open_aws_url(self, binary):
mode = "rb" if binary else "r"
def _open_aws_url(self):
aws_access_key_id = self._provider.get("aws_access_key_id")
aws_secret_access_key = self._provider.get("aws_secret_access_key")
use_aws_account = aws_access_key_id and aws_secret_access_key

if use_aws_account:
aws_access_key_id = self._provider.get("aws_access_key_id", "")
aws_secret_access_key = self._provider.get("aws_secret_access_key", "")
result = smart_open.open(f"{self.storage_scheme}{aws_access_key_id}:{aws_secret_access_key}@{self.url}", mode=mode)
url = f"{self.storage_scheme}{aws_access_key_id}:{aws_secret_access_key}@{self.url}"
result = smart_open.open(url, **self.args)
else:
config = botocore.client.Config(signature_version=botocore.UNSIGNED)
params = {"client": boto3.client("s3", config=config)}
result = smart_open.open(self.full_url, transport_params=params, mode=mode)
result = smart_open.open(self.full_url, transport_params=params, **self.args)
return result

def _open_azblob_url(self, binary):
mode = "rb" if binary else "r"
def _open_azblob_url(self):
storage_account = self._provider.get("storage_account")
storage_acc_url = f"https://{storage_account}.blob.core.windows.net"
sas_token = self._provider.get("sas_token", None)
Expand All @@ -220,14 +217,15 @@ def _open_azblob_url(self, binary):
# assuming anonymous public read access given no credential
client = BlobServiceClient(account_url=storage_acc_url)

result = smart_open.open(f"{self.storage_scheme}{self.url}", transport_params=dict(client=client), mode=mode)
return result
url = f"{self.storage_scheme}{self.url}"
return smart_open.open(url, transport_params=dict(client=client), **self.args)


class Client:
"""Class that manages reading and parsing data from streams"""

reader_class = URLFile
binary_formats = {"excel", "feather", "parquet", "orc", "pickle"}

def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: str = None):
self._dataset_name = dataset_name
Expand All @@ -243,6 +241,9 @@ def __init__(self, dataset_name: str, url: str, provider: dict, format: str = No
logger.error(error_msg)
raise ConfigurationError(error_msg) from err

self.binary_source = self._reader_format in self.binary_formats
self.encoding = self._reader_options.get("encoding")

@property
def stream_name(self) -> str:
if self._dataset_name:
Expand Down Expand Up @@ -336,17 +337,12 @@ def dtype_to_json_type(dtype) -> str:

@property
def reader(self) -> reader_class:
return self.reader_class(url=self._url, provider=self._provider)

@property
def binary_source(self):
binary_formats = {"excel", "feather", "parquet", "orc", "pickle"}
return self._reader_format in binary_formats
return self.reader_class(url=self._url, provider=self._provider, binary=self.binary_source, encoding=self.encoding)

def read(self, fields: Iterable = None) -> Iterable[dict]:
"""Read data from the stream"""
with self.reader.open(binary=self.binary_source) as fp:
if self._reader_format == "json" or self._reader_format == "jsonl":
with self.reader.open() as fp:
if self._reader_format in ["json", "jsonl"]:
yield from self.load_nested_json(fp)
elif self._reader_format == "yaml":
fields = set(fields) if fields else None
Expand Down Expand Up @@ -376,8 +372,8 @@ def _stream_properties(self, fp):
def streams(self) -> Iterable:
"""Discovers available streams"""
# TODO handle discovery of directories of multiple files instead
with self.reader.open(binary=self.binary_source) as fp:
if self._reader_format == "json" or self._reader_format == "jsonl":
with self.reader.open() as fp:
if self._reader_format in ["json", "jsonl"]:
json_schema = self.load_nested_json_schema(fp)
else:
json_schema = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def check(self, logger, config: Mapping) -> AirbyteConnectionStatus:
client = self._get_client(config)
logger.info(f"Checking access to {client.reader.full_url}...")
try:
with client.reader.open(binary=client.binary_source):
with client.reader.open():
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as err:
reason = f"Failed to load {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from pathlib import Path

from source_file.source import SourceFile

HERE = Path(__file__).parent.absolute()


def test_csv_with_utf16_encoding():

config_local_csv_utf16 = {
"dataset_name": "AAA",
"format": "csv",
"reader_options": '{"encoding":"utf_16"}',
"url": f"{HERE}/../integration_tests/sample_files/test_utf16.csv",
"provider": {"storage": "local"},
}
expected_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"header1": {"type": ["string", "null"]},
"header2": {"type": ["number", "null"]},
"header3": {"type": ["number", "null"]},
"header4": {"type": ["boolean", "null"]},
},
"type": "object",
}

catalog = SourceFile().discover(logger=logging.getLogger("airbyte"), config=config_local_csv_utf16)
stream = next(iter(catalog.streams))
assert stream.json_schema == expected_schema