Skip to content

Commit

Permalink
Source S3: don't require history to be present to identify legacy sta…
Browse files Browse the repository at this point in the history
…te format (airbytehq#29520)
  • Loading branch information
clnoll authored and harrytou committed Sep 1, 2023
1 parent 2d9dd27 commit d448248
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 20 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=3.1.7
LABEL io.airbyte.version=3.1.8
LABEL io.airbyte.name=airbyte/source-s3
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 3.1.7
dockerImageTag: 3.1.8
dockerRepository: airbyte/source-s3
githubIssueLabel: source-s3
icon: s3.svg
Expand Down
32 changes: 24 additions & 8 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/cursor.py
Expand Up @@ -10,6 +10,8 @@
from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
from airbyte_cdk.sources.file_based.types import StreamState

logger = logging.Logger("source-S3")


class Cursor(DefaultFileBasedCursor):
_DATE_FORMAT = "%Y-%m-%d"
Expand Down Expand Up @@ -68,15 +70,17 @@ def _is_legacy_state(value: StreamState) -> bool:
return False
try:
# Verify datetime format in history
item = list(value.get("history", {}).keys())[0]
datetime.strptime(item, Cursor._DATE_FORMAT)
history = value.get("history", {}).keys()
if history:
item = list(value.get("history", {}).keys())[0]
datetime.strptime(item, Cursor._DATE_FORMAT)

# verify the format of the last_modified cursor
last_modified_at_cursor = value.get(DefaultFileBasedCursor.CURSOR_FIELD)
if not last_modified_at_cursor:
return False
datetime.strptime(last_modified_at_cursor, Cursor._LEGACY_DATE_TIME_FORMAT)
except (IndexError, ValueError):
except ValueError:
return False
return True

Expand All @@ -100,14 +104,19 @@ def _convert_legacy_state(legacy_state: StreamState) -> MutableMapping[str, Any]
}
"""
converted_history = {}
legacy_cursor = legacy_state[DefaultFileBasedCursor.CURSOR_FIELD]
cursor_datetime = datetime.strptime(legacy_cursor, Cursor._LEGACY_DATE_TIME_FORMAT)
logger.info(f"Converting v3 -> v4 state. v3_cursor={legacy_cursor} v3_history={legacy_state.get('history')}")

cursor_datetime = datetime.strptime(legacy_state[DefaultFileBasedCursor.CURSOR_FIELD], Cursor._LEGACY_DATE_TIME_FORMAT)
for date_str, filenames in legacy_state.get("history", {}).items():
datetime_obj = Cursor._get_adjusted_date_timestamp(cursor_datetime, datetime.strptime(date_str, Cursor._DATE_FORMAT))

for filename in filenames:
if filename in converted_history:
if datetime_obj > datetime.strptime(converted_history[filename], DefaultFileBasedCursor.DATE_TIME_FORMAT):
if datetime_obj > datetime.strptime(
converted_history[filename],
DefaultFileBasedCursor.DATE_TIME_FORMAT,
):
converted_history[filename] = datetime_obj.strftime(DefaultFileBasedCursor.DATE_TIME_FORMAT)
else:
# If the file was already synced with a later timestamp, ignore
Expand All @@ -118,10 +127,17 @@ def _convert_legacy_state(legacy_state: StreamState) -> MutableMapping[str, Any]
if converted_history:
filename, _ = max(converted_history.items(), key=lambda x: (x[1], x[0]))
cursor = f"{cursor_datetime}_{filename}"
v3_migration_start_datetime = cursor_datetime - Cursor._V4_MIGRATION_BUFFER
else:
# If there is no history, _is_legacy_state should return False, so we should never get here
raise ValueError("No history found in state message. Please contact support.")
# Having a cursor with empty history is not expected, but we handle it.
logger.warning(f"Cursor found without a history object; this is not expected. cursor_value={legacy_cursor}")
# Note: we convert to the v4 cursor granularity, but since no items are in the history we simply use the
# timestamp as the cursor value instead of the concatenation of timestamp_filename, which is the v4
# cursor format.
# This is okay because the v4 cursor is kept for posterity but is not actually used in the v4 code. If we
# start to use the cursor we may need to revisit this logic.
cursor = cursor_datetime
converted_history = {}
v3_migration_start_datetime = cursor_datetime - Cursor._V4_MIGRATION_BUFFER
return {
"history": converted_history,
DefaultFileBasedCursor.CURSOR_FIELD: cursor,
Expand Down
Expand Up @@ -4,7 +4,7 @@

import json
from datetime import datetime
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Dict, List, Mapping, Optional, Union

from source_s3.source import SourceS3Spec
from source_s3.source_files_abstract.formats.avro_spec import AvroFormat
Expand Down Expand Up @@ -84,19 +84,31 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat,
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only" if format_options.infer_datatypes else "None",
"strings_can_be_null": additional_reader_options.get("strings_can_be_null", False),
"strings_can_be_null": additional_reader_options.pop("strings_can_be_null", False),
}

if format_options.escape_char:
csv_options["escape_char"] = format_options.escape_char
if format_options.encoding:
csv_options["encoding"] = format_options.encoding
if "skip_rows" in advanced_options:
csv_options["skip_rows_before_header"] = advanced_options["skip_rows"]
if "skip_rows_after_names" in advanced_options:
csv_options["skip_rows_after_header"] = advanced_options["skip_rows_after_names"]
if "autogenerate_column_names" in advanced_options:
csv_options["autogenerate_column_names"] = advanced_options["autogenerate_column_names"]
if skip_rows := advanced_options.pop("skip_rows", None):
csv_options["skip_rows_before_header"] = skip_rows
if skip_rows_after_names := advanced_options.pop("skip_rows_after_names", None):
csv_options["skip_rows_after_header"] = skip_rows_after_names
if autogenerate_column_names := advanced_options.pop("autogenerate_column_names", None):
csv_options["autogenerate_column_names"] = autogenerate_column_names

cls._filter_legacy_noops(advanced_options)

if advanced_options or additional_reader_options:
raise ValueError(
"The config options you selected are no longer supported.\n" + f"advanced_options={advanced_options}"
if advanced_options
else "" + f"additional_reader_options={additional_reader_options}"
if additional_reader_options
else ""
)

return csv_options

elif isinstance(format_options, JsonlFormat):
Expand All @@ -108,9 +120,21 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat,
raise ValueError(f"Format filetype {format_options} is not a supported file type")

@classmethod
def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Mapping[str, Any]:
def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Dict[str, Any]:
options_str = options_value or "{}"
try:
return json.loads(options_str)
except json.JSONDecodeError as error:
raise ValueError(f"Malformed {options_field} config json: {error}. Please ensure that it is a valid JSON.")

@staticmethod
def _filter_legacy_noops(advanced_options: Dict[str, Any]):
ignore_all = ("auto_dict_encode", "timestamp_parsers")
ignore_by_value = (("check_utf8", False),)

for option in ignore_all:
advanced_options.pop(option, None)

for option, value_to_ignore in ignore_by_value:
if advanced_options.get(option) == value_to_ignore:
advanced_options.pop(option)
Expand Up @@ -134,6 +134,16 @@ def _create_datetime(dt: str) -> datetime:
},
id="v4-migrated-from-v3",
),
pytest.param(
{"history": {}, "_ab_source_file_last_modified": "2023-08-01T00:00:00Z"},
{
"history": {},
"_ab_source_file_last_modified": None,
"v3_min_sync_date": "2023-07-31T23:00:00.000000Z",
},
id="empty-history-with-cursor",
),
],
)
def test_set_initial_state(input_state: MutableMapping[str, Any], expected_state: MutableMapping[str, Any]) -> None:
Expand Down Expand Up @@ -406,7 +416,7 @@ def test_list_files_v4_migration(input_state, all_files, expected_files_to_sync,
id="legacy_state_with_invalid_last_modified_datetime_format_is_not_legacy",
),
pytest.param(
{"_ab_source_file_last_modified": "2023-08-01T00:00:00Z"}, False, id="legacy_state_without_history_is_not_legacy_state"
{"_ab_source_file_last_modified": "2023-08-01T00:00:00Z"}, True, id="legacy_state_without_history_is_legacy_state"
),
pytest.param({"history": {"2023-08-01": ["file1.txt"]}}, False, id="legacy_state_without_last_modified_cursor_is_not_legacy_state"),
pytest.param(
Expand Down
Expand Up @@ -200,6 +200,15 @@ def test_convert_legacy_config(legacy_config, expected_config):
ValueError,
id="test_malformed_additional_reader_options",
),
pytest.param(
"csv",
{
"additional_reader_options": '{"include_columns": ""}',
},
None,
ValueError,
id="test_unsupported_additional_reader_options",
),
pytest.param(
"csv",
{
Expand All @@ -209,6 +218,64 @@ def test_convert_legacy_config(legacy_config, expected_config):
ValueError,
id="test_malformed_advanced_options",
),
pytest.param(
"csv",
{
"advanced_options": '{"column_names": ""}',
},
None,
ValueError,
id="test_unsupported_advanced_options",
),
pytest.param(
"csv",
{
"advanced_options": '{"check_utf8": false}',
},
{
"filetype": "csv",
"delimiter": ",",
"quote_char": '"',
"encoding": "utf8",
"double_quote": True,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": False,
},
None,
id="test_unsupported_advanced_options_by_value_succeeds_if_value_matches_ignored_values",
),
pytest.param(
"csv",
{
"advanced_options": '{"check_utf8": true}',
},
None,
ValueError,
id="test_unsupported_advanced_options_by_value_fails_if_value_doesnt_match_ignored_values",
),
pytest.param(
"csv",
{
"advanced_options": '{"auto_dict_encode": ""}',
},
{
"filetype": "csv",
"delimiter": ",",
"quote_char": '"',
"encoding": "utf8",
"double_quote": True,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": False,
},
None,
id="test_ignored_advanced_options",
),
pytest.param(
"jsonl",
{
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Expand Up @@ -282,6 +282,7 @@ Be cautious when raising this value too high, as it may result in Out Of Memory

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
| 3.1.8 | 2023-08-17 | [29520](https://github.com/airbytehq/airbyte/pull/29520) | Update legacy state and error handling |
| 3.1.7 | 2023-08-17 | [29505](https://github.com/airbytehq/airbyte/pull/29505) | v4 StreamReader and Cursor fixes |
| 3.1.6 | 2023-08-16 | [29480](https://github.com/airbytehq/airbyte/pull/29480) | update Pyarrow to version 12.0.1 |
| 3.1.5 | 2023-08-15 | [29418](https://github.com/airbytehq/airbyte/pull/29418) | Avoid duplicate syncs when migrating from v3 to v4 |
Expand Down

0 comments on commit d448248

Please sign in to comment.