Skip to content

Commit

Permalink
Concurrent CDK: fix state message ordering
Browse files Browse the repository at this point in the history
Don't clear out `slices` when converting to sequential state
  • Loading branch information
clnoll committed Jan 11, 2024
1 parent 99488dd commit d40a7ba
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, An
for stream_slice in state.get("slices", []):
stream_slice[self.START_KEY] = self.parse_timestamp(stream_slice[self.START_KEY])
stream_slice[self.END_KEY] = self.parse_timestamp(stream_slice[self.END_KEY])
state["start"] = self.parse_timestamp(state["start"]) if "start" in state else self.zero_value
return state

def parse_value(self, value: Any) -> Any:
Expand Down Expand Up @@ -81,26 +82,34 @@ def convert_from_sequential_state(self, cursor_field: CursorField, stream_state:
{
"state_type": ConcurrencyCompatibleStateType.date_range.value,
"metadata": { … },
"start": <timestamp representing the latest date before which all records were synced>
"slices": [
{starts: 0, end: "2021-01-18T21:18:20.000+00:00", finished_processing: true}]
{"start": 0, "end": "2021-01-18T21:18:20.000+00:00"},
]
}
"""
if self.is_state_message_compatible(stream_state):
return stream_state
low_water_mark = (
self.parse_timestamp(stream_state[cursor_field.cursor_field_key])
if cursor_field.cursor_field_key in stream_state
else self.zero_value
)
if cursor_field.cursor_field_key in stream_state:
slices = [
{
# TODO: if we migrate stored state to the concurrent state format, we may want this to be the config start date
# instead of `zero_value`
self.START_KEY: self.zero_value,
self.END_KEY: self.parse_timestamp(stream_state[cursor_field.cursor_field_key]),
self.END_KEY: low_water_mark,
},
]
else:
slices = []
return {
"state_type": ConcurrencyCompatibleStateType.date_range.value,
"slices": slices,
"start": low_water_mark,
"legacy": stream_state,
}

Expand All @@ -113,23 +122,36 @@ def convert_to_sequential_state(self, cursor_field: CursorField, stream_state: M
"""
if self.is_state_message_compatible(stream_state):
legacy_state = stream_state.get("legacy", {})
if slices := stream_state.pop("slices", None):
latest_complete_time = self._get_latest_complete_time(slices)
if latest_complete_time:
legacy_state.update({cursor_field.cursor_field_key: self.output_format(latest_complete_time)})
latest_complete_time = self._get_latest_complete_time(stream_state["start"], stream_state.get("slices", []))
if latest_complete_time is not None:
legacy_state.update({cursor_field.cursor_field_key: self.output_format(latest_complete_time)})
return legacy_state or {}
else:
return stream_state

def _get_latest_complete_time(self, slices: List[MutableMapping[str, Any]]) -> Optional[datetime]:
def _get_latest_complete_time(self, start: datetime, slices: List[MutableMapping[str, Any]]) -> Optional[datetime]:
"""
Get the latest time before which all records have been processed.
"""
if slices:
first_interval = self.merge_intervals(slices)[0][self.END_KEY]
return first_interval
merged_intervals = self.merge_intervals(slices)
first_interval = merged_intervals[0]
if start < first_interval[self.START_KEY]:
# There is a region between `start` and the first interval that hasn't been synced yet, so
# we don't advance the state message timestamp
return start

if first_interval[self.START_KEY] <= start <= first_interval[self.END_KEY]:
# `start` is between the beginning and end of the first interval, so we know we've synced
# up to `self.END_KEY`
return first_interval[self.END_KEY]

# `start` falls outside of the first interval; this is unexpected because we shouldn't have tried
# to sync anything before `start`, but we can handle it anyway.
return self._get_latest_complete_time(start, merged_intervals[1:])
else:
return None
# Nothing has been synced so we don't advance
return start


class EpochValueConcurrentStreamStateConverter(DateTimeStreamStateConverter):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
"stream": {
"stream_state": {
"slices": [{"start": 0, "end": 0}],
"start": 0,
"state_type": ConcurrencyCompatibleStateType.date_range.value,
},
"stream_descriptor": {"name": "stream1"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ def test_given_boundary_fields_when_close_partition_then_emit_state(self) -> Non
)
)

self._message_repository.emit_message.assert_called_once_with(self._state_manager.create_state_message.return_value)
self._state_manager.update_state_for_stream.assert_called_once_with(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
{_A_CURSOR_FIELD_KEY: 0}, # State message is updated to the legacy format before being emitted
)

def test_given_boundary_fields_when_close_partition_then_emit_updated_state(self) -> None:
self._cursor_with_slice_boundary_fields().close_partition(
_partition(
{_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 30},
)
)

self._message_repository.emit_message.assert_called_once_with(self._state_manager.create_state_message.return_value)
self._state_manager.update_state_for_stream.assert_called_once_with(
_A_STREAM_NAME,
Expand Down
Loading

0 comments on commit d40a7ba

Please sign in to comment.