Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SourceFile integration #716

Merged
merged 25 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
04acf12
Copy fresh source template to init csv-source
ChristopheDuong Oct 26, 2020
924506e
Start CSV Source
ChristopheDuong Oct 26, 2020
2dfaa28
Implement CSV Source
ChristopheDuong Oct 26, 2020
e4bc915
Handle Exception cases while loading CSVs
ChristopheDuong Oct 26, 2020
9eb53f8
Fix codestyle formatting
ChristopheDuong Oct 26, 2020
59b1b47
Handle different reader methods for the future
ChristopheDuong Oct 27, 2020
d2f2d6f
Handle private buckets on GCS & S3
ChristopheDuong Oct 27, 2020
e5f5aad
Setting up tests
ChristopheDuong Oct 27, 2020
0dfb77d
Rename CsvSource to FileSource
ChristopheDuong Oct 27, 2020
62c50a5
Add call to other formats of readers
ChristopheDuong Oct 27, 2020
d761e76
Tweak File Source Configuration naming
ChristopheDuong Oct 28, 2020
5957ab5
Merge remote-tracking branch 'origin/master' into source-pandas
ChristopheDuong Oct 28, 2020
e26189b
Setting standard python tests
ChristopheDuong Oct 28, 2020
863d3c9
Adapt to new spec.json
ChristopheDuong Oct 29, 2020
c6381b2
Print more details while running docker tests
ChristopheDuong Oct 29, 2020
1843141
Add integration tests and implement proper spec.json
ChristopheDuong Oct 29, 2020
1769521
Update docs
ChristopheDuong Oct 29, 2020
fbecf0e
Fix integratins tests for AWS
ChristopheDuong Oct 29, 2020
53e6b10
Handle catalog when reading
ChristopheDuong Oct 29, 2020
99160ed
Merge remote-tracking branch 'origin/master' into source-file branch
ChristopheDuong Oct 30, 2020
203d080
Fixing some typos
ChristopheDuong Oct 30, 2020
c32625d
Update to mirror source-python-template
ChristopheDuong Oct 30, 2020
3c2a60e
Tweaks from reviews
ChristopheDuong Nov 2, 2020
68c1f16
Add descriptions in spec.json
ChristopheDuong Nov 2, 2020
068c684
Enable ssh/scp/sftp sources using paramiko library
ChristopheDuong Nov 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be in STANDARD_SOURCE_DEFINITION and not STANDARD_SOURCE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure i understand your comment...
I don't see folders named STANDARD_SOURCE_DEFINITION in airbyte-config/init/src/main/resources/config/ ?

Where is the STANDARD_SOURCE_DEFINITION ?

"sourceId": "778daa7c-feaf-4db6-96f3-70fd645acc77",
"name": "File",
"dockerRepository": "airbyte/source-file",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-file"
}
2 changes: 1 addition & 1 deletion airbyte-integrations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ to tell Airbyte to use the latest version of your integration.
```
1. Publish the new version to Docker Hub.
```
./tools/integrations/manage.sh publish airbyte-integrations/connectors/source-postgres-signer
./tools/integrations/manage.sh publish airbyte-integrations/connectors/source-postgres-singer
```
1. Update the connector version inside the `STANDARD_SOURCE` (or `STANDARD_DESTINATION` directory) to the one you just published.
This will update Airbyte to use this new version by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ This is an autogenerated file describing the high-level steps you need to take t
1. Integration tests go in the `integration_tests` folder. Any tests that require external APIs or resources, you may need to give instructions on how to set up a testing instance/account for Airbyte's CI.
However, for initial local development, you can place any sensitive credentials inside the `secrets/credentials.json` file -- this is gitignored by default.
1. Update `README.md` to document the usage of your integration. If API credentials are required to run the integration, please document how they can be obtained or link to a how-to guide.
1. For Airybte core contributors, make sure to add the secret to RPass under the secret name as listed in `README.md`.
1. For Airbyte core contributors, make sure to add the secret to RPass under the secret name as listed in `README.md`.
1. Add your source to the source registry in `airbyte-config/init`.

Once you've done all the above, delete this file :)
2 changes: 2 additions & 0 deletions airbyte-integrations/connectors/source-file/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
build

15 changes: 15 additions & 0 deletions airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM airbyte/integration-base-python:dev

RUN apt-get update && apt-get install -y jq curl bash && rm -rf /var/lib/apt/lists/*

ENV CODE_PATH="source_file"
ENV AIRBYTE_IMPL_MODULE="source_file"
ENV AIRBYTE_IMPL_PATH="SourceFile"

WORKDIR /airbyte/integration_code
COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install ".[main]"

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-file
22 changes: 22 additions & 0 deletions airbyte-integrations/connectors/source-file/Dockerfile.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM airbyte/base-python-test:dev

RUN apt-get update && rm -rf /var/lib/apt/lists/*

ENV CODE_PATH="integration_tests"
ENV AIRBYTE_TEST_MODULE="integration_tests"
ENV AIRBYTE_TEST_PATH="SourceFileStandardTest"
ENV AIRBYTE_TEST_CASE=true

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-file-standard-test

WORKDIR /airbyte/integration_code
COPY source_file source_file
COPY $CODE_PATH $CODE_PATH
COPY secrets $CODE_PATH
COPY source_file/*.json $CODE_PATH
COPY setup.py ./

RUN pip install ".[integration_tests]"

WORKDIR /airbyte
36 changes: 36 additions & 0 deletions airbyte-integrations/connectors/source-file/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Testing Source File

This integration

## Necessary Credentials for tests

In order to run integrations tests in this connector, you need to:
1. Testing Google Cloud Service Storage
1. Download and store your Google [Service Account](https://console.cloud.google.com/iam-admin/serviceaccounts) JSON file in `secrets/gcs.json`, it should look something like this:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i missed it, but how does gcs.json get imported into config.json?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in: airbyte-integrations/connectors/source-file/integration_tests/integration_source_test.py which are the "custom" integration tests, not the standard_tests.

The content of the JSON is copied into the configuration as a string.
In the UI, you would need to do the same and copy/paste the content of the JSON.

Then in the source.py of this connector, we either are able to manipulate the DICT object directly once we parse that string or have to produce a temporary file with the ocntent of the json (depending on the google API we are using)

```
{
"type": "service_account",
"project_id": "XXXXXXX",
"private_key_id": "XXXXXXXX",
"private_key": "-----BEGIN PRIVATE KEY-----\nXXXXXXXXXX\n-----END PRIVATE KEY-----\n",
"client_email": "XXXXX@XXXXXX.iam.gserviceaccount.com",
"client_id": "XXXXXXXXX",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/XXXXXXX0XXXXXX.iam.gserviceaccount.com"
}

```
1. Your Service Account should have [Storage Admin Rights](https://console.cloud.google.com/iam-admin/iam) (to create Buckets, read and store files in GCS)

1. Testing Amazon S3
1. Create a file at `secrets/aws.json`
```
{
"aws_access_key_id": "XXXXXXX",
"aws_secret_access_key": "XXXXXXX"
}
```


1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-file/base_python
26 changes: 26 additions & 0 deletions airbyte-integrations/connectors/source-file/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
project.ext.pyModule = 'source_file'
apply from: rootProject.file('tools/gradle/commons/integrations/python.gradle')
apply from: rootProject.file('tools/gradle/commons/integrations/image.gradle')
apply from: rootProject.file('tools/gradle/commons/integrations/test-image.gradle')
apply from: rootProject.file('tools/gradle/commons/integrations/integration-test.gradle')
apply from: rootProject.file('tools/gradle/commons/integrations/standard-source-test-python.gradle')


standardSourceTestPython {
ext {
imageName = "${extractImageName(project.file('Dockerfile'))}:dev"
pythonContainerName = "${extractImageName(project.file('Dockerfile.test'))}:dev"
}
}

task unitTest(type: PythonTask){
command = "setup.py test"
}

build.dependsOn(unitTest)
build.dependsOn ':airbyte-integrations:bases:base-python-test:build'
buildImage.dependsOn ':airbyte-integrations:bases:base-python:buildImage'
integrationTest.dependsOn(buildImage)

buildTestImage.dependsOn ':airbyte-integrations:bases:base-python-test:buildImage'
standardSourceTestPython.dependsOn(buildTestImage)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
MIT License

Copyright (c) 2020 Airbyte

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

from .integration_source_test import TestSourceFile
from .standard_source_test import SourceFileStandardTest

__all__ = ["SourceFileStandardTest", "TestSourceFile"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"streams": [
{
"name": "my_own_data_sample/my_file.csv",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"date": {
"type": "string"
},
"key": {
"type": "string"
},
"total_confirmed": {
"type": "number"
},
"total_healed": {
"type": "number"
},
"total_deceased": {
"type": "number"
},
"total_recovered": {
"type": "number"
},
"total_tested": {
"type": "number"
}
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"format": "csv",
"reader_options": "{\"sep\": \",\", \"nrows\": 20}",
"storage": "https://",
"url": "storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv",
"reader_impl": "gcsfs"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""
MIT License

Copyright (c) 2020 Airbyte

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

import json
import os
import tempfile
import uuid

import boto3
import pytest
from base_python import AirbyteLogger
from botocore.errorfactory import ClientError
from google.api_core.exceptions import Conflict
from google.cloud import storage
from source_file import SourceFile


class TestSourceFile(object):
service_account_file: str = "../secrets/gcs.json"
aws_credentials: str = "../secrets/aws.json"
cloud_bucket_name: str = "airbytetestbucket"

@pytest.fixture(scope="class")
def download_gcs_public_data(self):
print("\nDownload public dataset from gcs to local /tmp")
config = get_config()
config["storage"] = "https://"
config["url"] = "storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv"
df = run_load_dataframes(config)
tmp_file = tempfile.NamedTemporaryFile(delete=False)
df.to_csv(tmp_file.name, index=False)
yield tmp_file.name
os.remove(tmp_file.name)
print(f"\nLocal File {tmp_file.name} is now deleted")

@pytest.fixture(scope="class")
def create_gcs_private_data(self, download_gcs_public_data):
storage_client = storage.Client.from_service_account_json(self.service_account_file)
bucket_name = create_unique_gcs_bucket(storage_client, self.cloud_bucket_name)
print(f"\nUpload dataset to private gcs bucket {bucket_name}")
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob("myfile.csv")
blob.upload_from_filename(download_gcs_public_data)
yield f"{bucket_name}/myfile.csv"
bucket.delete(force=True)
print(f"\nGCS Bucket {bucket_name} is now deleted")

@pytest.fixture(scope="class")
def create_aws_private_data(self, download_gcs_public_data):
with open(self.aws_credentials) as json_file:
aws_config = json.load(json_file)
region = "eu-west-3"
location = {"LocationConstraint": region}
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_config["aws_access_key_id"],
aws_secret_access_key=aws_config["aws_secret_access_key"],
region_name=region,
)
bucket_name = self.cloud_bucket_name
print(f"\nUpload dataset to private aws bucket {bucket_name}")
try:
s3_client.head_bucket(Bucket=bucket_name)
except ClientError:
s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)
s3_client.upload_file(download_gcs_public_data, bucket_name, "myfile.csv")
yield f"{bucket_name}/myfile.csv"
s3 = boto3.resource(
"s3", aws_access_key_id=aws_config["aws_access_key_id"], aws_secret_access_key=aws_config["aws_secret_access_key"]
)
bucket = s3.Bucket(bucket_name)
bucket.objects.all().delete()
print(f"\nS3 Bucket {bucket_name} is now deleted")

@pytest.mark.parametrize(
"reader_impl, storage, url",
[
("gcfs", "https://", "storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv"),
("smart_open", "https://", "storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv"),
("smart_open", "file://", "local"),
],
)
def test_local_data(self, download_gcs_public_data, reader_impl, storage, url):
config = get_config()
config["storage"] = storage
if url != "local":
config["url"] = url
else:
config["url"] = download_gcs_public_data
config["reader_impl"] = reader_impl
run_load_dataframes(config)

@pytest.mark.parametrize("reader_impl", ["gcsfs", "smart_open"])
def test_remote_gcs_load(self, create_gcs_private_data, reader_impl):
config = get_config()
config["storage"] = "gs://"
config["url"] = create_gcs_private_data
config["reader_impl"] = reader_impl
with open(self.service_account_file) as json_file:
config["service_account_json"] = json.dumps(json.load(json_file))
run_load_dataframes(config)

@pytest.mark.parametrize("reader_impl", ["s3fs", "smart_open"])
def test_remote_aws_load(self, create_aws_private_data, reader_impl):
config = get_config()
config["storage"] = "s3://"
config["url"] = create_aws_private_data
config["reader_impl"] = reader_impl
with open(self.aws_credentials) as json_file:
aws_config = json.load(json_file)
config["aws_access_key_id"] = aws_config["aws_access_key_id"]
config["aws_secret_access_key"] = aws_config["aws_secret_access_key"]
run_load_dataframes(config)


def run_load_dataframes(config):
df_list = SourceFile.load_dataframes(config=config, logger=AirbyteLogger(), skip_data=False)
assert len(df_list) == 1 # Properly load 1 DataFrame
df = df_list[0]
assert len(df.columns) == 10 # DataFrame should have 10 columns
assert len(df.index) == 42 # DataFrame should have 42 rows of data
return df


def get_config():
return {"format": "csv", "reader_options": '{"sep": ",", "nrows": 42}'}


def create_unique_gcs_bucket(storage_client, name: str) -> str:
"""
Make a unique bucket to which we'll upload the file.
(GCS buckets are part of a single global namespace.)
"""
for i in range(0, 5):
bucket_name = f"{name}-{uuid.uuid1()}"
try:
bucket = storage_client.bucket(bucket_name)
bucket.storage_class = "STANDARD"
# fixed locations are cheaper...
storage_client.create_bucket(bucket, location="us-east1")
print(f"\nNew GCS bucket created {bucket_name}")
return bucket_name
except Conflict:
print(f"\nError: {bucket_name} already exists!")
Loading