Skip to content

Commit

Permalink
🎉 Source File: Convert 'nan' to 'null' (#15768)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed Aug 20, 2022
1 parent ca65136 commit e9fa2c4
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@
- name: File
sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerRepository: airbyte/source-file
dockerImageTag: 0.2.18
dockerImageTag: 0.2.19
documentationUrl: https://docs.airbyte.io/integrations/sources/file
icon: file.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2291,7 +2291,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-file:0.2.18"
- dockerImage: "airbyte/source-file:0.2.19"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/file"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_file ./source_file
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.18
LABEL io.airbyte.version=0.2.19
LABEL io.airbyte.name=airbyte/source-file
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
col1;col2;col3
key1;1.11;
key2;;2.22
key3;;
key4;3.33;
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import boto3
import botocore
import google
import numpy as np
import pandas as pd
import smart_open
from airbyte_cdk.entrypoint import logger
Expand Down Expand Up @@ -357,7 +358,7 @@ def read(self, fields: Iterable = None) -> Iterable[dict]:
fp = self._cache_stream(fp)
for df in self.load_dataframes(fp):
columns = fields.intersection(set(df.columns)) if fields else df.columns
df = df.where(pd.notnull(df), None)
df.replace({np.nan: None}, inplace=True)
yield from df[list(columns)].to_dict(orient="records")

def _cache_stream(self, fp):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
#


import logging
import traceback
from datetime import datetime
from typing import Generator, Iterable, Mapping
from typing import Any, Iterable, Iterator, Mapping, MutableMapping

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import (
Expand Down Expand Up @@ -108,8 +109,12 @@ def discover(self, logger: AirbyteLogger, config: Mapping) -> AirbyteCatalog:
return AirbyteCatalog(streams=streams)

def read(
self, logger: AirbyteLogger, config: Mapping, catalog: ConfiguredAirbyteCatalog, state_path: Mapping[str, any]
) -> Generator[AirbyteMessage, None, None]:
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: MutableMapping[str, Any] = None,
) -> Iterator[AirbyteMessage]:
"""Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state."""
client = self._get_client(config)
fields = self.selected_fields(catalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import logging
from pathlib import Path

from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode
from source_file.source import SourceFile

HERE = Path(__file__).parent.absolute()
Expand Down Expand Up @@ -33,3 +35,47 @@ def test_csv_with_utf16_encoding():
catalog = SourceFile().discover(logger=logging.getLogger("airbyte"), config=config_local_csv_utf16)
stream = next(iter(catalog.streams))
assert stream.json_schema == expected_schema


def get_catalog(properties):
return ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="test",
json_schema={"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": properties},
),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)
]
)


def test_nan_to_null():
"""make sure numpy.nan converted to None"""
config = {
"dataset_name": "test",
"format": "csv",
"reader_options": json.dumps({"sep": ";"}),
"url": f"{HERE}/../integration_tests/sample_files/test_nan.csv",
"provider": {"storage": "local"},
}

catalog = get_catalog(
{
"col1": {"type": ["string", "null"]},
"col2": {"type": ["number", "null"]},
"col3": {"type": ["number", "null"]},
}
)

source = SourceFile()
records = source.read(logger=logging.getLogger("airbyte"), config=config, catalog=catalog)
records = [r.record.data for r in records]
assert records == [
{"col1": "key1", "col2": 1.11, "col3": None},
{"col1": "key2", "col2": None, "col3": 2.22},
{"col1": "key3", "col2": None, "col3": None},
{"col1": "key4", "col2": 3.33, "col3": None},
]
1 change: 1 addition & 0 deletions docs/integrations/sources/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ In order to read large files from a remote location, this connector uses the [sm

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|---------------------------------------------------|
| 0.2.19 | 2022-08-19 | [15768](https://github.com/airbytehq/airbyte/pull/15768) | Convert 'nan' to 'null' |
| 0.2.18 | 2022-08-16 | [15698](https://github.com/airbytehq/airbyte/pull/15698) | Cache binary stream to file for discover |
| 0.2.17 | 2022-08-11 | [15501](https://github.com/airbytehq/airbyte/pull/15501) | Cache binary stream to file |
| 0.2.16 | 2022-08-10 | [15293](https://github.com/airbytehq/airbyte/pull/15293) | added support for encoding reader option |
Expand Down

0 comments on commit e9fa2c4

Please sign in to comment.