Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll committed Aug 18, 2023
1 parent cfdc274 commit 4148832
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 16 deletions.
10 changes: 5 additions & 5 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
from airbyte_cdk.sources.file_based.types import StreamState


logger = logging.Logger("source-S3")


Expand Down Expand Up @@ -114,7 +113,10 @@ def _convert_legacy_state(legacy_state: StreamState) -> MutableMapping[str, Any]

for filename in filenames:
if filename in converted_history:
if datetime_obj > datetime.strptime(converted_history[filename], DefaultFileBasedCursor.DATE_TIME_FORMAT):
if datetime_obj > datetime.strptime(
converted_history[filename],
DefaultFileBasedCursor.DATE_TIME_FORMAT,
):
converted_history[filename] = datetime_obj.strftime(DefaultFileBasedCursor.DATE_TIME_FORMAT)
else:
# If the file was already synced with a later timestamp, ignore
Expand All @@ -127,9 +129,7 @@ def _convert_legacy_state(legacy_state: StreamState) -> MutableMapping[str, Any]
cursor = f"{cursor_datetime}_{filename}"
else:
# Having a cursor with empty history is not expected, but we handle it.
logger.warning(
f"Cursor found without a history object; this is not expected. cursor_value={legacy_cursor}"
)
logger.warning(f"Cursor found without a history object; this is not expected. cursor_value={legacy_cursor}")
# Note: we convert to the v4 cursor granularity, but since no items are in the history we simply use the
# timestamp as the cursor value instead of the concatenation of timestamp_filename, which is the v4
# cursor format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,16 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat,
csv_options["encoding"] = format_options.encoding
if skip_rows := advanced_options.pop("skip_rows", None):
csv_options["skip_rows_before_header"] = skip_rows
if skip_rows_after_names := advanced_options.pop(
"skip_rows_after_names", None
):
if skip_rows_after_names := advanced_options.pop("skip_rows_after_names", None):
csv_options["skip_rows_after_header"] = skip_rows_after_names
if autogenerate_column_names := advanced_options.pop(
"autogenerate_column_names", None
):
if autogenerate_column_names := advanced_options.pop("autogenerate_column_names", None):
csv_options["autogenerate_column_names"] = autogenerate_column_names

cls._filter_legacy_noops(advanced_options)

if advanced_options or additional_reader_options:
raise ValueError(
f"The config options you selected are no longer supported.\n"
+ f"advanced_options={advanced_options}"
"The config options you selected are no longer supported.\n" + f"advanced_options={advanced_options}"
if advanced_options
else "" + f"additional_reader_options={additional_reader_options}"
if additional_reader_options
Expand All @@ -122,9 +117,7 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat,
return {"filetype": "parquet", "decimal_as_float": True}
else:
# This should never happen because it would fail schema validation
raise ValueError(
f"Format filetype {format_options} is not a supported file type"
)
raise ValueError(f"Format filetype {format_options} is not a supported file type")

@classmethod
def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Dict[str, Any]:
Expand Down

0 comments on commit 4148832

Please sign in to comment.