Skip to content

Commit

Permalink
Source SFTP-Bulk: Support custom CSV separators (#19224)
Browse files Browse the repository at this point in the history
* manage different separators

* tests

* update tests

* add optional parameter for separator

* add default separator

* typo

* add trailing newline

* fix spec.json order

* bump dockerfile version

* update changelog

* auto-bump connector version

* retrigger checks

* retrigger checks

---------

Co-authored-by: Sunny Hashmi <6833405+sh4sh@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 20, 2023
1 parent a6acd13 commit a6d6b06
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 25 deletions.
Expand Up @@ -28243,7 +28243,7 @@
"sourceDefinitionId": "31e3242f-dee7-4cdc-a4b8-8e06c5458517",
"name": "SFTP Bulk",
"dockerRepository": "airbyte/source-sftp-bulk",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/sftp-bulk",
"icon": "sftp.svg",
"sourceType": "file",
Expand Down Expand Up @@ -28307,28 +28307,36 @@
"order": 6,
"examples": [ "csv", "json" ]
},
"separator": {
"title": "CSV Separator (Optional)",
"description": "The separator used in the CSV files. Define None if you want to use the Sniffer functionality",
"type": "string",
"default": ",",
"examples": [ "," ],
"order": 7
},
"folder_path": {
"title": "Folder Path (Optional)",
"description": "The directory to search files for sync",
"type": "string",
"default": "",
"examples": [ "/logs/2022" ],
"order": 7
"order": 8
},
"file_pattern": {
"title": "File Pattern (Optional)",
"description": "The regular expression to specify files for sync in a chosen Folder Path",
"type": "string",
"default": "",
"examples": [ "log-([0-9]{4})([0-9]{2})([0-9]{2}) - This will filter files which `log-yearmmdd`" ],
"order": 8
"order": 9
},
"file_most_recent": {
"title": "Most recent file (Optional)",
"description": "Sync only the most recent file for the configured folder path and file pattern",
"type": "boolean",
"default": false,
"order": 9
"order": 10
},
"start_date": {
"type": "string",
Expand All @@ -28337,7 +28345,7 @@
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": [ "2017-01-25T00:00:00Z" ],
"description": "The date from which you'd like to replicate data for all incremental streams, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.",
"order": 10
"order": 11
}
}
},
Expand Down
Expand Up @@ -2497,7 +2497,7 @@
- name: SFTP Bulk
sourceDefinitionId: 31e3242f-dee7-4cdc-a4b8-8e06c5458517
dockerRepository: airbyte/source-sftp-bulk
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.com/integrations/sources/sftp-bulk
icon: sftp.svg
sourceType: file
Expand Down
Expand Up @@ -18186,7 +18186,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-sftp-bulk:0.1.1"
- dockerImage: "airbyte/source-sftp-bulk:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/source/ftp"
connectionSpecification:
Expand Down Expand Up @@ -18255,14 +18255,23 @@
examples:
- "csv"
- "json"
separator:
title: "CSV Separator (Optional)"
description: "The separator used in the CSV files. Define None if you want\
\ to use the Sniffer functionality"
type: "string"
default: ","
examples:
- ","
order: 7
folder_path:
title: "Folder Path (Optional)"
description: "The directory to search files for sync"
type: "string"
default: ""
examples:
- "/logs/2022"
order: 7
order: 8
file_pattern:
title: "File Pattern (Optional)"
description: "The regular expression to specify files for sync in a chosen\
Expand All @@ -18271,14 +18280,14 @@
default: ""
examples:
- "log-([0-9]{4})([0-9]{2})([0-9]{2}) - This will filter files which `log-yearmmdd`"
order: 8
order: 9
file_most_recent:
title: "Most recent file (Optional)"
description: "Sync only the most recent file for the configured folder path\
\ and file pattern"
type: "boolean"
default: false
order: 9
order: 10
start_date:
type: "string"
title: "Start Date"
Expand All @@ -18289,7 +18298,7 @@
description: "The date from which you'd like to replicate data for all incremental\
\ streams, in the format YYYY-MM-DDT00:00:00Z. All data generated after\
\ this date will be replicated."
order: 10
order: 11
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
Expand Down
Expand Up @@ -13,5 +13,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-sftp-bulk
@@ -0,0 +1,3 @@
string_col;int_col
"hello";1
"foo";2
Expand Up @@ -187,6 +187,17 @@ def test_get_files_pattern_json(config: Mapping, configured_catalog: ConfiguredA
assert res.record.data["int_col"] == 2


def test_get_files_pattern_json_new_separator(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result_iter = source.read(logger, {**config, "file_pattern": "test_2.+"}, configured_catalog, None)
result = list(result_iter)
assert len(result) == 1
for res in result:
assert res.type == Type.RECORD
assert res.record.data["string_col"] == "hello"
assert res.record.data["int_col"] == 1


def test_get_files_pattern_no_match_json(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result = source.read(logger, {**config, "file_pattern": "bad_pattern.+"}, configured_catalog, None)
Expand All @@ -197,7 +208,7 @@ def test_get_files_no_pattern_csv(config: Mapping, configured_catalog: Configure
source = SourceFtp()
result_iter = source.read(logger, {**config, "file_type": "csv", "folder_path": "files/csv"}, configured_catalog, None)
result = list(result_iter)
assert len(result) == 2
assert len(result) == 4
for res in result:
assert res.type == Type.RECORD
assert res.record.data["string_col"] in ["foo", "hello"]
Expand All @@ -217,6 +228,33 @@ def test_get_files_pattern_csv(config: Mapping, configured_catalog: ConfiguredAi
assert res.record.data["int_col"] in [1, 2]


def test_get_files_pattern_csv_new_separator(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result_iter = source.read(
logger, {**config, "file_type": "csv", "folder_path": "files/csv", "file_pattern": "test_2.+"}, configured_catalog, None
)
result = list(result_iter)
assert len(result) == 2
for res in result:
assert res.type == Type.RECORD
assert res.record.data["string_col"] in ["foo", "hello"]
assert res.record.data["int_col"] in [1, 2]


def test_get_files_pattern_csv_new_separator_with_config(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result_iter = source.read(
logger, {**config, "file_type": "csv", "folder_path": "files/csv", "separator": ";", "file_pattern": "test_2.+"},
configured_catalog, None
)
result = list(result_iter)
assert len(result) == 2
for res in result:
assert res.type == Type.RECORD
assert res.record.data["string_col"] in ["foo", "hello"]
assert res.record.data["int_col"] in [1, 2]


def test_get_files_pattern_no_match_csv(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog):
source = SourceFtp()
result = source.read(
Expand Down
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import csv
import io
import logging
import os
Expand Down Expand Up @@ -168,15 +169,25 @@ def get_files(self, prefix, search_pattern=None, modified_since=None, most_recen

return sorted_files

def peek_line(self, f):
pos = f.tell()
line = f.readline()
f.seek(pos)
return line

@backoff.on_exception(backoff.expo, (socket.timeout), max_tries=5, factor=2)
def fetch_file(self, fn: Mapping[str, Any], file_type="csv") -> pd.DataFrame:
def fetch_file(self, fn: Mapping[str, Any], separator, file_type="csv") -> pd.DataFrame:
try:
with self._connection.open(fn["filepath"], "rb") as f:
with self._connection.open(fn["filepath"], "r") as f:
df: pd.DataFrame = None

if not separator:
dialect = csv.Sniffer().sniff(self.peek_line(f=f))
separator = dialect.delimiter

# Using pandas to make reading files in different formats easier
if file_type == "csv":
df = pd.read_csv(f)
df = pd.read_csv(f, engine="python", sep=separator)
elif file_type == "json":
df = pd.read_json(f, lines=True)
else:
Expand All @@ -196,10 +207,10 @@ def fetch_file(self, fn: Mapping[str, Any], file_type="csv") -> pd.DataFrame:

raise Exception("Unable to read file: %s" % e) from e

def fetch_files(self, files, file_type="csv") -> Tuple[datetime, Dict[str, Any]]:
def fetch_files(self, files, separator, file_type="csv") -> Tuple[datetime, Dict[str, Any]]:
logger.info("Fetching %s files", len(files))
for fn in files:
records = self.fetch_file(fn, file_type)
records = self.fetch_file(fn=fn, separator=separator, file_type=file_type)
yield (fn["last_modified"], records.to_dict("records"))

self.close()
Expand Up @@ -55,7 +55,7 @@ def _infer_json_schema(self, config: Mapping[str, Any], connection: SFTPClient)

# Get last file to infer schema
# Use pandas `infer_objects` to infer dtypes
df = connection.fetch_file(files[-1], config["file_type"])
df = connection.fetch_file(fn=files[-1], file_type=config["file_type"], separator=config.get("separator"))
df = df.infer_objects()

# Default column used for incremental sync
Expand Down
Expand Up @@ -65,13 +65,21 @@
"order": 6,
"examples": ["csv", "json"]
},
"separator": {
"title": "CSV Separator (Optional)",
"description": "The separator used in the CSV files. Define None if you want to use the Sniffer functionality",
"type": "string",
"default": ",",
"examples": [","],
"order": 7
},
"folder_path": {
"title": "Folder Path (Optional)",
"description": "The directory to search files for sync",
"type": "string",
"default": "",
"examples": ["/logs/2022"],
"order": 7
"order": 8
},
"file_pattern": {
"title": "File Pattern (Optional)",
Expand All @@ -81,14 +89,14 @@
"examples": [
"log-([0-9]{4})([0-9]{2})([0-9]{2}) - This will filter files which `log-yearmmdd`"
],
"order": 8
"order": 9
},
"file_most_recent": {
"title": "Most recent file (Optional)",
"description": "Sync only the most recent file for the configured folder path and file pattern",
"type": "boolean",
"default": false,
"order": 9
"order": 10
},
"start_date": {
"type": "string",
Expand All @@ -97,7 +105,7 @@
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2017-01-25T00:00:00Z"],
"description": "The date from which you'd like to replicate data for all incremental streams, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.",
"order": 10
"order": 11
}
}
}
Expand Down
Expand Up @@ -69,7 +69,9 @@ def read_records(
most_recent_only=self._only_most_recent_file,
)

for cursor, records in self.connection.fetch_files(files, self.config["file_type"]):
for cursor, records in self.connection.fetch_files(
files=files, file_type=self.config["file_type"], separator=self.config.get("separator")
):
if cursor and sync_mode == SyncMode.incremental:
if self._cursor_value and cursor > self._cursor_value:
self._cursor_value = cursor
Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Expand Up @@ -208,7 +208,7 @@
| **S3** | <img alt="S3 icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/s3.svg" height="30" height="30"/> | Source | airbyte/source-s3:2.1.2 | generally_available | [docs](https://docs.airbyte.com/integrations/sources/s3) | [connectors/source/s3](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/s3) | [source-s3](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | <small>`69589781-7828-43c5-9f63-8925b1c1ccc2`</small> |
| **SAP Fieldglass** | <img alt="SAP Fieldglass icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/sapfieldglass.svg" height="30" height="30"/> | Source | airbyte/source-sap-fieldglass:0.1.0 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sap-fieldglass) | [connectors/source/sap-fieldglass](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sap-fieldglass) | [source-sap-fieldglass](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sap-fieldglass) | <small>`ec5f3102-fb31-4916-99ae-864faf8e7e25`</small> |
| **SFTP** | <img alt="SFTP icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp:0.1.2 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sftp) | [connectors/source/sftp](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sftp) | [source-sftp](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp) | <small>`a827c52e-791c-4135-a245-e233c5255199`</small> |
| **SFTP Bulk** | <img alt="SFTP Bulk icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp-bulk:0.1.1 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [connectors/source/sftp-bulk](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sftp-bulk) | [source-sftp-bulk](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | <small>`31e3242f-dee7-4cdc-a4b8-8e06c5458517`</small> |
| **SFTP Bulk** | <img alt="SFTP Bulk icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp-bulk:0.1.2 | alpha | [docs](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [connectors/source/sftp-bulk](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/sftp-bulk) | [source-sftp-bulk](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | <small>`31e3242f-dee7-4cdc-a4b8-8e06c5458517`</small> |
| **SalesLoft** | <img alt="SalesLoft icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/salesloft.svg" height="30" height="30"/> | Source | airbyte/source-salesloft:1.0.0 | beta | [docs](https://docs.airbyte.com/integrations/sources/salesloft) | [connectors/source/salesloft](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/salesloft) | [source-salesloft](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesloft) | <small>`41991d12-d4b5-439e-afd0-260a31d4c53f`</small> |
| **Salesforce** | <img alt="Salesforce icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/salesforce.svg" height="30" height="30"/> | Source | airbyte/source-salesforce:2.0.9 | generally_available | [docs](https://docs.airbyte.com/integrations/sources/salesforce) | [connectors/source/salesforce](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/salesforce) | [source-salesforce](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesforce) | <small>`b117307c-14b6-41aa-9422-947e34922962`</small> |
| **Sample Data (Faker)** | <img alt="Sample Data (Faker) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/faker.svg" height="30" height="30"/> | Source | airbyte/source-faker:2.0.3 | beta | [docs](https://docs.airbyte.com/integrations/sources/faker) | [connectors/source/faker](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/source/faker) | [source-faker](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-faker) | <small>`dfd88b22-b603-4c3d-aad7-3701784586b1`</small> |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/sftp-bulk.md
Expand Up @@ -60,5 +60,6 @@ More formats \(e.g. Apache Avro\) will be supported in the future.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------|:----------------|
| 0.1.2 | 2023-04-19 | [#19224](https://github.com/airbytehq/airbyte/pull/19224) | Support custom CSV separators |
| 0.1.1 | 2023-03-17 | [#24180](https://github.com/airbytehq/airbyte/pull/24180) | Fix field order |
| 0.1.0 | 2021-24-05 | | Initial version |

0 comments on commit a6d6b06

Please sign in to comment.