Skip to content

Commit

Permalink
Source S3: run incremental syncs with concurrency (#34895)
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll committed Feb 25, 2024
1 parent a6b3f0c commit 6ed63f5
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 109 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 4.5.6
dockerImageTag: 4.5.7
dockerRepository: airbyte/source-s3
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
githubIssueLabel: source-s3
Expand Down
116 changes: 58 additions & 58 deletions airbyte-integrations/connectors/source-s3/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/pyproject.toml
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.5.6"
version = "4.5.7"
name = "source-s3"
description = "Source implementation for S3."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Expand Up @@ -21,6 +21,8 @@


class SourceS3(FileBasedSource):
_concurrency_level = DEFAULT_CONCURRENCY

@classmethod
def read_config(cls, config_path: str) -> Mapping[str, Any]:
"""
Expand Down
Expand Up @@ -30,7 +30,6 @@
class SourceS3StreamReader(AbstractFileBasedStreamReader):
def __init__(self):
super().__init__()
self._s3_client = None

@property
def config(self) -> Config:
Expand All @@ -56,24 +55,24 @@ def s3_client(self) -> BaseClient:
# We shouldn't hit this; config should always get set before attempting to
# list or read files.
raise ValueError("Source config is missing; cannot create the S3 client.")
if self._s3_client is None:
client_kv_args = _get_s3_compatible_client_args(self.config) if self.config.endpoint else {}

# Set the region_name if it's provided in the config
if self.config.region_name:
client_kv_args["region_name"] = self.config.region_name
client_kv_args = _get_s3_compatible_client_args(self.config) if self.config.endpoint else {}

if self.config.role_arn:
self._s3_client = self._get_iam_s3_client(client_kv_args)
else:
self._s3_client = boto3.client(
"s3",
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key,
**client_kv_args,
)
# Set the region_name if it's provided in the config
if self.config.region_name:
client_kv_args["region_name"] = self.config.region_name

if self.config.role_arn:
_s3_client = self._get_iam_s3_client(client_kv_args)
else:
_s3_client = boto3.client(
"s3",
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key,
**client_kv_args,
)

return self._s3_client
return _s3_client

def _get_iam_s3_client(self, client_kv_args: dict) -> BaseClient:
"""
Expand Down
Expand Up @@ -8,15 +8,15 @@
from datetime import datetime, timedelta
from itertools import product
from typing import Any, Dict, List, Optional, Set
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import FileReadMode
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from botocore.stub import Stubber
from moto import mock_sts
from moto import mock_s3, mock_sts
from pydantic import AnyUrl
from source_s3.v4.config import Config
from source_s3.v4.stream_reader import SourceS3StreamReader
Expand Down Expand Up @@ -124,10 +124,51 @@ def test_get_matching_files(
except Exception as exc:
raise exc

stub = set_stub(reader, mocked_response, multiple_pages)
files = list(reader.get_matching_files(globs, None, logger))
stub.deactivate()
assert set(f.uri for f in files) == expected_uris
with patch.object(SourceS3StreamReader, 's3_client', new_callable=MagicMock) as mock_s3_client:
_setup_mock_s3_client(mock_s3_client, mocked_response, multiple_pages)
files = list(reader.get_matching_files(globs, None, logger))
assert set(f.uri for f in files) == expected_uris


def _setup_mock_s3_client(mock_s3_client, mocked_response, multiple_pages):
responses = []
if multiple_pages and len(mocked_response) > 1:
# Split the mocked_response for pagination simulation
first_half = mocked_response[:len(mocked_response) // 2]
second_half = mocked_response[len(mocked_response) // 2:]

responses.append({
"IsTruncated": True,
"Contents": first_half,
"KeyCount": len(first_half),
"NextContinuationToken": "token",
})

responses.append({
"IsTruncated": False,
"Contents": second_half,
"KeyCount": len(second_half),
})
else:
responses.append({
"IsTruncated": False,
"Contents": mocked_response,
"KeyCount": len(mocked_response),
})

def list_objects_v2_side_effect(Bucket, Prefix=None, ContinuationToken=None, **kwargs):
if ContinuationToken == "token":
return responses[1]
return responses[0]

mock_s3_client.list_objects_v2 = MagicMock(side_effect=list_objects_v2_side_effect)


def _split_mocked_response(mocked_response, multiple_pages):
if not multiple_pages:
return mocked_response, []
split_index = len(mocked_response) // 2
return mocked_response[:split_index], mocked_response[split_index:]


@patch("boto3.client")
Expand Down Expand Up @@ -196,9 +237,9 @@ def test_open_file_calls_any_open_with_the_right_encoding(smart_open_mock):
with reader.open_file(RemoteFile(uri="", last_modified=datetime.now()), FileReadMode.READ, encoding, logger) as fp:
fp.read()

smart_open_mock.assert_called_once_with(
"s3://test/", transport_params={"client": reader.s3_client}, mode=FileReadMode.READ.value, encoding=encoding
)
assert smart_open_mock.call_args.args == ("s3://test/",)
assert smart_open_mock.call_args.kwargs["mode"] == FileReadMode.READ.value
assert smart_open_mock.call_args.kwargs["encoding"] == encoding


def test_get_s3_client_without_config_raises_exception():
Expand All @@ -218,29 +259,6 @@ def documentation_url(cls) -> AnyUrl:
stream_reader.config = other_config


def set_stub(reader: SourceS3StreamReader, contents: List[Dict[str, Any]], multiple_pages: bool) -> Stubber:
s3_stub = Stubber(reader.s3_client)
split_contents_idx = int(len(contents) / 2) if multiple_pages else -1
page1, page2 = contents[:split_contents_idx], contents[split_contents_idx:]
resp = {
"KeyCount": len(page1),
"Contents": page1,
}
if page2:
resp["NextContinuationToken"] = "token"
s3_stub.add_response("list_objects_v2", resp)
if page2:
s3_stub.add_response(
"list_objects_v2",
{
"KeyCount": len(page2),
"Contents": page2,
},
)
s3_stub.activate()
return s3_stub


@mock_sts
@patch("source_s3.v4.stream_reader.boto3.client")
def test_get_iam_s3_client(boto3_client_mock):
Expand Down Expand Up @@ -303,4 +321,4 @@ def test_filter_file_by_start_date(start_date: datetime, last_modified_date: dat
start_date=start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
)

assert expected_result == reader.is_modified_after_start_date(last_modified_date)
assert expected_result == reader.is_modified_after_start_date(last_modified_date)
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Expand Up @@ -264,6 +264,7 @@ To perform the text extraction from PDF and Docx files, the connector uses the [

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
| 4.5.7 | 2024-02-23 | [34895](https://github.com/airbytehq/airbyte/pull/34895) | Run incremental syncs with concurrency |
| 4.5.6 | 2024-02-21 | [35246](https://github.com/airbytehq/airbyte/pull/35246) | Fixes bug that occurred when creating CSV streams with tab delimiter. |
| 4.5.5 | 2024-02-18 | [35392](https://github.com/airbytehq/airbyte/pull/35392) | Add support filtering by start date |
| 4.5.4 | 2024-02-15 | [35055](https://github.com/airbytehq/airbyte/pull/35055) | Temporarily revert concurrency |
Expand Down

0 comments on commit 6ed63f5

Please sign in to comment.