Skip to content

Commit

Permalink
🐛 Source S3: Loading of files' metadata (#8252)
Browse files Browse the repository at this point in the history
  • Loading branch information
antixar committed Feb 1, 2022
1 parent c3e9ef2 commit 91eff1d
Show file tree
Hide file tree
Showing 35 changed files with 1,161 additions and 835 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2",
"name": "S3",
"dockerRepository": "airbyte/source-s3",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/s3",
"icon": "s3.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6454,7 +6454,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-s3:0.1.9"
- dockerImage: "airbyte/source-s3:0.1.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.io/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ tests:
basic_read:
# for CSV format
- config_path: "secrets/config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
expect_records:
path: "integration_tests/expected_records.txt"
# for Parquet format
- config_path: "secrets/parquet_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
expect_records:
path: "integration_tests/parquet_expected_records.txt"
# for custom server
- config_path: "integration_tests/config_minio.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
# expected records contains _ab_source_file_last_modified property which
# is modified all the time s3 file changed and for custom server it is
Expand All @@ -46,18 +49,21 @@ tests:
incremental:
# for CSV format
- config_path: "secrets/config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
# for custom server
- config_path: "integration_tests/config_minio.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
Expand All @@ -66,10 +72,13 @@ tests:
full_refresh:
# for CSV format
- config_path: "secrets/config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
# for custom server
- config_path: "integration_tests/config_minio.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,14 @@
#


import shutil
import tempfile
from zipfile import ZipFile
from typing import Iterable

import docker
import pytest

pytest_plugins = ("source_acceptance_test.plugin",)


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
def connector_setup() -> Iterable[None]:
"""This fixture is a placeholder for external resources that acceptance test might require."""
yield


@pytest.fixture(scope="session", autouse=True)
def minio_setup():
client = docker.from_env()
tmp_dir = tempfile.mkdtemp()
with ZipFile("./integration_tests/minio_data.zip") as archive:
archive.extractall(tmp_dir)

container = client.containers.run(
"minio/minio",
f"server {tmp_dir}/minio_data",
network_mode="host",
volumes=["/tmp:/tmp", "/var/run/docker.sock:/var/run/docker.sock"],
detach=True,
)
yield
shutil.rmtree(tmp_dir)
container.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://localhost:9000"
"endpoint": "http://<local_ip>:9000"
},
"format": {
"filetype": "csv"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
import time
from pathlib import Path
from typing import Any, Iterable, List, Mapping
from zipfile import ZipFile

import docker
import pytest
import requests # type: ignore[import]
from airbyte_cdk import AirbyteLogger
from docker.errors import APIError
from netifaces import AF_INET, ifaddresses, interfaces
from requests.exceptions import ConnectionError # type: ignore[import]

from .integration_test import TMP_FOLDER, TestIncrementalFileStreamS3

LOGGER = AirbyteLogger()


def get_local_ip() -> str:
all_interface_ips: List[str] = []
for iface_name in interfaces():
all_interface_ips += [i["addr"] for i in ifaddresses(iface_name).setdefault(AF_INET, [{"addr": None}]) if i["addr"]]
LOGGER.info(f"detected interface IPs: {all_interface_ips}")
for ip in sorted(all_interface_ips):
if not ip.startswith("127."):
return ip

assert False, "not found an non-localhost interface"


@pytest.fixture(scope="session")
def minio_credentials() -> Mapping[str, Any]:
config_template = Path(__file__).parent / "config_minio.template.json"
assert config_template.is_file() is not None, f"not found {config_template}"
config_file = Path(__file__).parent / "config_minio.json"
config_file.write_text(config_template.read_text().replace("<local_ip>", get_local_ip()))
credentials = {}
with open(str(config_file)) as f:
credentials = json.load(f)
return credentials


@pytest.fixture(scope="session", autouse=True)
def minio_setup(minio_credentials: Mapping[str, Any]) -> Iterable[None]:

with ZipFile("./integration_tests/minio_data.zip") as archive:
archive.extractall(TMP_FOLDER)
client = docker.from_env()
# Minio should be attached to non-localhost interface.
# Because another test container should have direct connection to it
local_ip = get_local_ip()
LOGGER.debug(f"minio settings: {minio_credentials}")
try:
container = client.containers.run(
image="minio/minio:RELEASE.2021-10-06T23-36-31Z",
command=f"server {TMP_FOLDER}",
name="ci_test_minio",
auto_remove=True,
volumes=[f"/{TMP_FOLDER}/minio_data:/{TMP_FOLDER}"],
detach=True,
ports={"9000/tcp": (local_ip, 9000)},
)
except APIError as err:
if err.status_code == 409:
for container in client.containers.list():
if container.name == "ci_test_minio":
LOGGER.info("minio was started before")
break
else:
raise

check_url = f"http://{local_ip}:9000/minio/health/live"
checked = False
for _ in range(120): # wait 1 min
time.sleep(0.5)
LOGGER.info(f"try to connect to {check_url}")
try:
data = requests.get(check_url)
except ConnectionError as err:
LOGGER.warn(f"minio error: {err}")
continue
if data.status_code == 200:
checked = True
LOGGER.info("Run a minio/minio container...")
break
else:
LOGGER.info(f"minio error: {data.response.text}")
if not checked:
assert False, "couldn't connect to minio!!!"

yield
# this minio container was not finished because it is needed for all integration adn acceptance tests


def pytest_sessionfinish(session: Any, exitstatus: Any) -> None:
"""tries to find and remove all temp buckets"""
instance = TestIncrementalFileStreamS3()
instance._s3_connect(instance.credentials)
temp_buckets = []
for bucket in instance.s3_resource.buckets.all():
if bucket.name.startswith(instance.temp_bucket_prefix):
temp_buckets.append(bucket.name)
for bucket_name in temp_buckets:
bucket = instance.s3_resource.Bucket(bucket_name)
bucket.objects.all().delete()
bucket.delete()
LOGGER.info(f"S3 Bucket {bucket_name} is now deleted")
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,26 @@


import json
import os
import shutil
import time
from typing import Iterator, List, Mapping
from typing import Any, Dict, Iterator, List, Mapping

import boto3
from airbyte_cdk.logger import AirbyteLogger
import pytest
from airbyte_cdk import AirbyteLogger
from botocore.errorfactory import ClientError
from source_s3.source import SourceS3
from source_s3.stream import IncrementalFileStreamS3
from unit_tests.abstract_test_parser import memory_limit
from unit_tests.test_csv_parser import generate_big_file

from .integration_test_abstract import HERE, SAMPLE_DIR, AbstractTestIncrementalFileStream

TMP_FOLDER = "/tmp/test_minio_source_s3"
if not os.path.exists(TMP_FOLDER):
os.makedirs(TMP_FOLDER)

LOGGER = AirbyteLogger()


Expand All @@ -35,7 +45,7 @@ def credentials(self) -> Mapping:
def provider(self, bucket_name: str) -> Mapping:
return {"storage": "S3", "bucket": bucket_name}

def _s3_connect(self, credentials: Mapping):
def _s3_connect(self, credentials: Mapping) -> None:
region = "eu-west-3"
self.s3_client = boto3.client(
"s3",
Expand Down Expand Up @@ -85,9 +95,42 @@ def cloud_files(self, cloud_bucket_name: str, credentials: Mapping, files_to_upl
self.s3_client.upload_file(str(filepath), bucket_name, upload_path, ExtraArgs=extra_args)
yield f"{bucket_name}/{upload_path}"

def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping):
def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping) -> None:
self._s3_connect(credentials)
bucket = self.s3_resource.Bucket(cloud_bucket_name)
bucket.objects.all().delete()
bucket.delete()
LOGGER.info(f"S3 Bucket {cloud_bucket_name} is now deleted")


class TestIntegrationCsvFiles:
logger = AirbyteLogger()

@memory_limit(150) # max used memory should be less than 150Mb
def read_source(self, credentials: Dict[str, Any], catalog: Dict[str, Any]) -> int:
read_count = 0
for msg in SourceS3().read(logger=self.logger, config=credentials, catalog=catalog):
if msg.record:
read_count += 1
return read_count

@pytest.mark.order(1)
def test_big_file(self, minio_credentials: Dict[str, Any]) -> None:
"""tests a big csv file (>= 1.0G records)"""
# generates a big CSV files separately
big_file_folder = os.path.join(TMP_FOLDER, "minio_data", "test-bucket", "big_files")
shutil.rmtree(big_file_folder, ignore_errors=True)
os.makedirs(big_file_folder)
filepath = os.path.join(big_file_folder, "file.csv")

# please change this value if you need to test another file size
future_file_size = 0.5 # in gigabytes
_, file_size = generate_big_file(filepath, future_file_size, 500)
expected_count = sum(1 for _ in open(filepath)) - 1
self.logger.info(f"generated file {filepath} with size {file_size}Gb, lines: {expected_count}")

minio_credentials["path_pattern"] = "big_files/file.csv"
minio_credentials["format"]["block_size"] = 5 * 1024 ** 2
source = SourceS3()
catalog = source.read_catalog(HERE / "configured_catalog.json")
assert self.read_source(minio_credentials, catalog) == expected_count

0 comments on commit 91eff1d

Please sign in to comment.