From 2a37631fccd0596c61202a9d1b10bd8a4184458c Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 18 Aug 2022 22:30:13 +0300 Subject: [PATCH 1/7] make sure numpy.nan converted to None Signed-off-by: Sergey Chvalyuk --- .../connectors/source-file/Dockerfile | 2 +- .../source-file/source_file/client.py | 3 +- .../source-file/source_file/source.py | 11 +++-- .../source-file/unit_tests/test_source.py | 45 +++++++++++++++++++ 4 files changed, 56 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-file/Dockerfile b/airbyte-integrations/connectors/source-file/Dockerfile index d8658ef0179aa..1cac4f99413e7 100644 --- a/airbyte-integrations/connectors/source-file/Dockerfile +++ b/airbyte-integrations/connectors/source-file/Dockerfile @@ -17,5 +17,5 @@ COPY source_file ./source_file ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.18 +LABEL io.airbyte.version=0.2.19 LABEL io.airbyte.name=airbyte/source-file diff --git a/airbyte-integrations/connectors/source-file/source_file/client.py b/airbyte-integrations/connectors/source-file/source_file/client.py index dabeee9fd32d4..0cb3a92549589 100644 --- a/airbyte-integrations/connectors/source-file/source_file/client.py +++ b/airbyte-integrations/connectors/source-file/source_file/client.py @@ -13,6 +13,7 @@ import boto3 import botocore import google +import numpy as np import pandas as pd import smart_open from airbyte_cdk.entrypoint import logger @@ -357,7 +358,7 @@ def read(self, fields: Iterable = None) -> Iterable[dict]: fp = self._cache_stream(fp) for df in self.load_dataframes(fp): columns = fields.intersection(set(df.columns)) if fields else df.columns - df = df.where(pd.notnull(df), None) + df.replace({np.nan: None}, inplace=True) yield from df[list(columns)].to_dict(orient="records") def _cache_stream(self, fp): diff --git a/airbyte-integrations/connectors/source-file/source_file/source.py b/airbyte-integrations/connectors/source-file/source_file/source.py index ba8ae07334c82..a178f5524cdfa 100644 --- a/airbyte-integrations/connectors/source-file/source_file/source.py +++ b/airbyte-integrations/connectors/source-file/source_file/source.py @@ -3,9 +3,10 @@ # +import logging import traceback from datetime import datetime -from typing import Generator, Iterable, Mapping +from typing import Any, Iterable, Iterator, Mapping, MutableMapping from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import ( @@ -108,8 +109,12 @@ def discover(self, logger: AirbyteLogger, config: Mapping) -> AirbyteCatalog: return AirbyteCatalog(streams=streams) def read( - self, logger: AirbyteLogger, config: Mapping, catalog: ConfiguredAirbyteCatalog, state_path: Mapping[str, any] - ) -> Generator[AirbyteMessage, None, None]: + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: MutableMapping[str, Any] = None, + ) -> Iterator[AirbyteMessage]: """Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.""" client = self._get_client(config) fields = self.selected_fields(catalog) diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py index 8f6ed5277c620..79719e4543255 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -2,9 +2,11 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json import logging from pathlib import Path +from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode from source_file.source import SourceFile HERE = Path(__file__).parent.absolute() @@ -33,3 +35,46 @@ def test_csv_with_utf16_encoding(): catalog = SourceFile().discover(logger=logging.getLogger("airbyte"), config=config_local_csv_utf16) stream = next(iter(catalog.streams)) assert stream.json_schema == expected_schema + + +def get_catalog(properties): + return ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="test", + json_schema={"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": properties}, + ), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, + ) + ] + ) + + +def test_nan_to_null(): + """make sure numpy.nan converted to None""" + config = { + "dataset_name": "test", + "format": "csv", + "reader_options": json.dumps({"sep": ";"}), + "url": f"{HERE}/../integration_tests/sample_files/test_nan.csv", + "provider": {"storage": "local"}, + } + + catalog = get_catalog( + { + "col1": {"type": ["string", "null"]}, + "col2": {"type": ["number", "null"]}, + } + ) + + source = SourceFile() + records = source.read(logger=logging.getLogger("airbyte"), config=config, catalog=catalog) + records = [r.record.data for r in records] + assert records == [ + {"col1": "key1", "col2": 1.1}, + {"col1": "key2", "col2": None}, + {"col1": "key3", "col2": None}, + {"col1": "key4", "col2": 2.2}, + ] From 55848e992e25a141daabe2ab66027621c0f88b3e Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 18 Aug 2022 23:56:25 +0300 Subject: [PATCH 2/7] test_nan.csv added Signed-off-by: Sergey Chvalyuk --- .../source-file/integration_tests/sample_files/test_nan.csv | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv diff --git a/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv b/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv new file mode 100644 index 0000000000000..a753667f11a42 --- /dev/null +++ b/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv @@ -0,0 +1,5 @@ +col1;col2 +key1;1.1 +key2; +key3; +key4;2.2 From 4dc9e783acad39becdc18ec0f64bd51079dbf5c5 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Fri, 19 Aug 2022 00:00:34 +0300 Subject: [PATCH 3/7] test_nan.csv updated Signed-off-by: Sergey Chvalyuk --- .../integration_tests/sample_files/test_nan.csv | 10 +++++----- .../connectors/source-file/unit_tests/test_source.py | 9 +++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv b/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv index a753667f11a42..9b8ffc48dd417 100644 --- a/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv +++ b/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv @@ -1,5 +1,5 @@ -col1;col2 -key1;1.1 -key2; -key3; -key4;2.2 +col1;col2;col3 +key1;1.11; +key2;;5.55 +key3;; +key4;2.22; diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py index 79719e4543255..44c591fc28ad7 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -66,6 +66,7 @@ def test_nan_to_null(): { "col1": {"type": ["string", "null"]}, "col2": {"type": ["number", "null"]}, + "col3": {"type": ["number", "null"]}, } ) @@ -73,8 +74,8 @@ def test_nan_to_null(): records = source.read(logger=logging.getLogger("airbyte"), config=config, catalog=catalog) records = [r.record.data for r in records] assert records == [ - {"col1": "key1", "col2": 1.1}, - {"col1": "key2", "col2": None}, - {"col1": "key3", "col2": None}, - {"col1": "key4", "col2": 2.2}, + {'col1': 'key1', 'col2': 1.11, 'col3': None}, + {'col1': 'key2', 'col2': None, 'col3': 5.55}, + {'col1': 'key3', 'col2': None, 'col3': None}, + {'col1': 'key4', 'col2': 2.22, 'col3': None} ] From d64799f142122e7da24a066e4ce514926947276d Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Fri, 19 Aug 2022 00:02:14 +0300 Subject: [PATCH 4/7] change numbers Signed-off-by: Sergey Chvalyuk --- .../source-file/integration_tests/sample_files/test_nan.csv | 4 ++-- .../connectors/source-file/unit_tests/test_source.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv b/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv index 9b8ffc48dd417..99c0784b46aea 100644 --- a/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv +++ b/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test_nan.csv @@ -1,5 +1,5 @@ col1;col2;col3 key1;1.11; -key2;;5.55 +key2;;2.22 key3;; -key4;2.22; +key4;3.33; diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py index 44c591fc28ad7..e03db78738d8b 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -75,7 +75,7 @@ def test_nan_to_null(): records = [r.record.data for r in records] assert records == [ {'col1': 'key1', 'col2': 1.11, 'col3': None}, - {'col1': 'key2', 'col2': None, 'col3': 5.55}, + {'col1': 'key2', 'col2': None, 'col3': 2.22}, {'col1': 'key3', 'col2': None, 'col3': None}, - {'col1': 'key4', 'col2': 2.22, 'col3': None} + {'col1': 'key4', 'col2': 3.33, 'col3': None} ] From daf1f78a83bd5a9ea0770407060dd1f9ab035e5f Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Fri, 19 Aug 2022 00:05:48 +0300 Subject: [PATCH 5/7] file.md updated Signed-off-by: Sergey Chvalyuk --- docs/integrations/sources/file.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/file.md b/docs/integrations/sources/file.md index af2584fd7bf0d..57123a3ddc22c 100644 --- a/docs/integrations/sources/file.md +++ b/docs/integrations/sources/file.md @@ -127,6 +127,7 @@ In order to read large files from a remote location, this connector uses the [sm | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|---------------------------------------------------| +| 0.2.19 | 2022-08-19 | [15768](https://github.com/airbytehq/airbyte/pull/15768) | Convert 'nan' to 'null' | | 0.2.18 | 2022-08-16 | [15698](https://github.com/airbytehq/airbyte/pull/15698) | Cache binary stream to file for discover | | 0.2.17 | 2022-08-11 | [15501](https://github.com/airbytehq/airbyte/pull/15501) | Cache binary stream to file | | 0.2.16 | 2022-08-10 | [15293](https://github.com/airbytehq/airbyte/pull/15293) | added support for encoding reader option | From 7da1ae6ae58603e43df6d61aa08c86f16856585d Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Fri, 19 Aug 2022 00:07:16 +0300 Subject: [PATCH 6/7] test_nan_to_null updated Signed-off-by: Sergey Chvalyuk --- .../connectors/source-file/unit_tests/test_source.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py index e03db78738d8b..3c8647287ca5d 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_source.py @@ -74,8 +74,8 @@ def test_nan_to_null(): records = source.read(logger=logging.getLogger("airbyte"), config=config, catalog=catalog) records = [r.record.data for r in records] assert records == [ - {'col1': 'key1', 'col2': 1.11, 'col3': None}, - {'col1': 'key2', 'col2': None, 'col3': 2.22}, - {'col1': 'key3', 'col2': None, 'col3': None}, - {'col1': 'key4', 'col2': 3.33, 'col3': None} + {"col1": "key1", "col2": 1.11, "col3": None}, + {"col1": "key2", "col2": None, "col3": 2.22}, + {"col1": "key3", "col2": None, "col3": None}, + {"col1": "key4", "col2": 3.33, "col3": None}, ] From 7033fb265cb99523e70ca39e89de191b871a64c1 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Fri, 19 Aug 2022 22:15:01 +0000 Subject: [PATCH 7/7] auto-bump connector version [ci skip] --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7f803fedd1ca4..f08f6bc16fb98 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -279,7 +279,7 @@ - name: File sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77 dockerRepository: airbyte/source-file - dockerImageTag: 0.2.18 + dockerImageTag: 0.2.19 documentationUrl: https://docs.airbyte.io/integrations/sources/file icon: file.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index a7045cb8f4592..f09b8eb0cea21 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2291,7 +2291,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-file:0.2.18" +- dockerImage: "airbyte/source-file:0.2.19" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/file" connectionSpecification: