Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll committed Jan 18, 2024
1 parent 68c0076 commit 291c851
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ def extract_value(self, record: Record) -> Comparable:


class Cursor(ABC):

@property
@abstractmethod
def state(self):
...

@abstractmethod
def observe(self, record: Record) -> None:
"""
Expand All @@ -53,6 +59,10 @@ def close_partition(self, partition: Partition) -> None:


class NoopCursor(Cursor):

def state(self):
return {}

def observe(self, record: Record) -> None:
pass

Expand Down Expand Up @@ -87,7 +97,11 @@ def __init__(
self._start = start
self._most_recent_record: Optional[Record] = None
self._has_closed_at_least_one_slice = False
self.start, self.state = self._get_concurrent_state(stream_state)
self.start, self._concurrent_state = self._get_concurrent_state(stream_state)

@property
def state(self):
return self._concurrent_state

def _get_concurrent_state(self, state: MutableMapping[str, Any]) -> Tuple[datetime, MutableMapping[str, Any]]:
if self._connector_state_converter.is_state_message_compatible(state):
Expand Down Expand Up @@ -120,7 +134,9 @@ def close_partition(self, partition: Partition) -> None:
def _add_slice_to_state(self, partition: Partition) -> None:
if self._slice_boundary_fields:
if "slices" not in self.state:
raise RuntimeError(f"The state should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support.")
raise RuntimeError(
f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
)
self.state["slices"].append(
{
"start": self._extract_from_slice(partition, self._slice_boundary_fields[self._START_BOUNDARY]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ def is_state_message_compatible(state: MutableMapping[str, Any]) -> bool:

@abstractmethod
def convert_from_sequential_state(
self, cursor_field: "CursorField", stream_state: MutableMapping[str, Any], start: Any,
self,
cursor_field: "CursorField",
stream_state: MutableMapping[str, Any],
start: Any,
) -> MutableMapping[str, Any]:
"""
Convert the state message to the format required by the ConcurrentCursor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@


class DateTimeStreamStateConverter(AbstractStreamStateConverter):

def get_sync_start(self, cursor_field: CursorField, stream_state: MutableMapping[str, Any], start: Optional[Any]) -> datetime:
sync_start = self.parse_timestamp(start) if start is not None else self.zero_value
prev_sync_low_water_mark = self.parse_timestamp(stream_state[cursor_field.cursor_field_key]) if cursor_field.cursor_field_key in stream_state else None
prev_sync_low_water_mark = (
self.parse_timestamp(stream_state[cursor_field.cursor_field_key]) if cursor_field.cursor_field_key in stream_state else None
)
if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start:
return prev_sync_low_water_mark
else:
Expand Down Expand Up @@ -79,7 +80,9 @@ def merge_intervals(self, intervals: List[MutableMapping[str, datetime]]) -> Lis
def _compare_intervals(self, end_time: Any, start_time: Any) -> bool:
return bool(self.increment(end_time) >= start_time)

def convert_from_sequential_state(self, cursor_field: CursorField, stream_state: MutableMapping[str, Any], start: datetime) -> MutableMapping[str, Any]:
def convert_from_sequential_state(
self, cursor_field: CursorField, stream_state: MutableMapping[str, Any], start: datetime
) -> MutableMapping[str, Any]:
"""
Convert the state message to the format required by the ConcurrentCursor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,14 @@
from datetime import datetime, timezone

import pytest
from airbyte_cdk.models import (
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStateType,
AirbyteStream,
AirbyteStreamState,
StreamDescriptor,
SyncMode,
)
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ConcurrencyCompatibleStateType
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
EpochValueConcurrentStreamStateConverter,
IsoMillisConcurrentStreamStateConverter,
)


@pytest.mark.parametrize(
"converter, input_state, is_compatible",
[
Expand Down

0 comments on commit 291c851

Please sign in to comment.