Skip to content

Commit

Permalink
Merge branch 'master' into state-structure-change-xmin-ctid
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 committed Jun 21, 2023
2 parents 52eb61d + e742f8d commit 27fb1a2
Show file tree
Hide file tree
Showing 32 changed files with 1,106 additions and 118 deletions.
2 changes: 1 addition & 1 deletion .github/actions/run-dagger-pipeline/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ runs:
CI_REPORT_BUCKET_NAME: ${{ inputs.report_bucket_name }}
GCP_GSM_CREDENTIALS: ${{ inputs.gcp_gsm_credentials }}
GCS_CREDENTIALS: ${{ inputs.gcs_credentials }}
METADATA_SERVICE_GCS_BUCKET_NAME: ${{ inputs.metadata_service_bucket_name }}
METADATA_SERVICE_BUCKET_NAME: ${{ inputs.metadata_service_bucket_name }}
METADATA_SERVICE_GCS_CREDENTIALS: ${{ inputs.metadata_service_gcs_credentials }}
PRODUCTION: ${{ inputs.production }}
PULL_REQUEST_NUMBER: ${{ github.event.pull_request.number }}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.41.0
current_version = 0.42.0
commit = False

[bumpversion:file:setup.py]
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.42.0
Supporting state per partition given incremental sync and partition router

## 0.41.0
Use x-www-urlencoded for access token refresh requests

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.41.0
RUN pip install --prefix=/install airbyte-cdk==0.42.0

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.41.0
LABEL io.airbyte.version=0.42.0
LABEL io.airbyte.name=airbyte/source-declarative-manifest
21 changes: 13 additions & 8 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ def _read_incremental(
for _slice in slices:
has_slices = True
if self.should_log_slice_message(logger):
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
)
yield self._create_slice_log_message(_slice)
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=_slice,
Expand Down Expand Up @@ -321,10 +318,7 @@ def _read_full_refresh(
total_records_counter = 0
for _slice in slices:
if self.should_log_slice_message(logger):
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
)
yield self._create_slice_log_message(_slice)
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
Expand All @@ -338,6 +332,17 @@ def _read_full_refresh(
if self._limit_reached(internal_config, total_records_counter):
return

def _create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage:
"""
Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if
the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump
"""
printable_slice = dict(_slice) if _slice else _slice
return AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}"),
)

def _checkpoint_state(self, stream: Stream, stream_state, state_manager: ConnectorStateManager):
# First attempt to retrieve the current state using the stream's state property. We receive an AttributeError if the state
# property is not implemented by the stream instance and as a fallback, use the stream_state retrieved from the stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
#

from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor

__all__ = ["DatetimeBasedCursor"]
__all__ = ["CursorFactory", "DatetimeBasedCursor", "PerPartitionCursor"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
from typing import Any, Callable, Iterable, Mapping, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState


class PerPartitionKeySerializer:
"""
We are concerned of the performance of looping through the `states` list and evaluating equality on the partition. To reduce this
concern, we wanted to use dictionaries to map `partition -> cursor`. However, partitions are dict and dict can't be used as dict keys
since they are not hashable. By creating json string using the dict, we can have a use the dict as a key to the dict since strings are
hashable.
"""

@staticmethod
def to_partition_key(to_serialize: Any) -> str:
# separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

@staticmethod
def to_partition(to_deserialize: Any):
return json.loads(to_deserialize)


class PerPartitionStreamSlice(StreamSlice):
def __init__(self, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any]):
self._partition = partition
self._cursor_slice = cursor_slice
if partition.keys() & cursor_slice.keys():
raise ValueError("Keys for partition and incremental sync cursor should not overlap")
self._stream_slice = dict(partition) | dict(cursor_slice)

@property
def partition(self):
return self._partition

@property
def cursor_slice(self):
return self._cursor_slice

def __repr__(self):
return repr(self._stream_slice)

def __setitem__(self, key: str, value: Any):
raise ValueError("PerPartitionStreamSlice is immutable")

def __getitem__(self, key: str):
return self._stream_slice[key]

def __len__(self):
return len(self._stream_slice)

def __iter__(self):
return iter(self._stream_slice)

def __contains__(self, item: str):
return item in self._stream_slice

def keys(self):
return self._stream_slice.keys()

def items(self):
return self._stream_slice.items()

def values(self):
return self._stream_slice.values()

def get(self, key: str, default: Any) -> Any:
return self._stream_slice.get(key, default)

def __eq__(self, other):
if isinstance(other, dict):
return self._stream_slice == other
if isinstance(other, PerPartitionStreamSlice):
# noinspection PyProtectedMember
return self._partition == other._partition and self._cursor_slice == other._cursor_slice
return False

def __ne__(self, other):
return not self.__eq__(other)


class CursorFactory:
def __init__(self, create_function: Callable[[], StreamSlicer]):
self._create_function = create_function

def create(self) -> StreamSlicer:
return self._create_function()


class PerPartitionCursor(StreamSlicer):
"""
Given a stream has many partitions, it is important to provide a state per partition.
Record | Stream Slice | Last Record | DatetimeCursorBased cursor
-- | -- | -- | --
1 | {"start_time": "2021-01-01","end_time": "2021-01-31","owner_resource": "1"''} | cursor_field: “2021-01-15” | 2021-01-15
2 | {"start_time": "2021-02-01","end_time": "2021-02-28","owner_resource": "1"''} | cursor_field: “2021-02-15” | 2021-02-15
3 | {"start_time": "2021-01-01","end_time": "2021-01-31","owner_resource": "2"''} | cursor_field: “2021-01-03” | 2021-01-03
4 | {"start_time": "2021-02-01","end_time": "2021-02-28","owner_resource": "2"''} | cursor_field: “2021-02-14” | 2021-02-14
Given the following errors, this can lead to some loss or duplication of records:
When | Problem | Affected Record
-- | -- | --
Between record #1 and #2 | Loss | #3
Between record #2 and #3 | Loss | #3, #4
Between record #3 and #4 | Duplication | #1, #2
Therefore, we need to manage state per partition.
"""

_NO_STATE = {}
_NO_CURSOR_STATE = {}
_KEY = 0
_VALUE = 1

def __init__(self, cursor_factory: CursorFactory, partition_router: StreamSlicer):
self._cursor_factory = cursor_factory
self._partition_router = partition_router
self._cursor_per_partition = {}
self._partition_serializer = PerPartitionKeySerializer()

def stream_slices(self, sync_mode: SyncMode, _: StreamState) -> Iterable[PerPartitionStreamSlice]:
"""
We knowingly avoid using stream_state as we want PerPartitionCursor to manage its own state.
"""
slices = self._partition_router.stream_slices(sync_mode, self._NO_STATE)
for partition in slices:
cursor = self._cursor_per_partition.get(self._to_partition_key(partition))
if not cursor:
cursor = self._create_cursor(self._NO_CURSOR_STATE)
self._cursor_per_partition[self._to_partition_key(partition)] = cursor

for cursor_slice in cursor.stream_slices(sync_mode, self._get_state_for_partition(partition)):
yield PerPartitionStreamSlice(partition, cursor_slice)

def update_cursor(self, stream_slice: PerPartitionStreamSlice, last_record: Optional[Record] = None):
if not last_record:
# The `update_cursor` method is called without `last_record` in order to set the initial state. In that case, stream_slice is
# not a PerPartitionStreamSlice but is a dict representing the state
self._init_state(stream_slice)
else:
try:
self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].update_cursor(
stream_slice.cursor_slice, last_record
)
except KeyError as exception:
raise KeyError(
f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because "
f"we should only update cursor for partition that where emitted during `stream_slices`"
)

def _init_state(self, stream_state: dict) -> None:
if not stream_state:
return

for state in stream_state["states"]:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])

def get_stream_state(self) -> StreamState:
states = []
for partition_tuple, cursor in self._cursor_per_partition.items():
cursor_state = cursor.get_stream_state()
if cursor_state:
states.append(
{
"partition": self._to_dict(partition_tuple),
"cursor": cursor_state,
}
)
return {"states": states}

def _get_state_for_partition(self, partition: Mapping[str, Any]) -> Optional[StreamState]:
cursor = self._cursor_per_partition.get(self._to_partition_key(partition))
if cursor:
return cursor.get_stream_state()

return None

@staticmethod
def _is_new_state(stream_state):
return not bool(stream_state)

def _to_partition_key(self, partition) -> tuple:
return self._partition_serializer.to_partition_key(partition)

def _to_dict(self, partition_key: tuple) -> StreamSlice:
return self._partition_serializer.to_partition(partition_key)

def select_state(self, stream_slice: Optional[PerPartitionStreamSlice] = None) -> Optional[StreamState]:
if not stream_slice:
raise ValueError("A partition needs to be provided in order to extract a state")

if not stream_slice:
return None

return self._get_state_for_partition(stream_slice.partition)

def _create_cursor(self, cursor_state: Any) -> StreamSlicer:
cursor = self._cursor_factory.create()
cursor.update_cursor(cursor_state)
return cursor

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._partition_router.get_request_params(
stream_state=stream_state, stream_slice=stream_slice.partition, next_page_token=next_page_token
) | self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].get_request_params(
stream_state=stream_state, stream_slice=stream_slice.cursor_slice, next_page_token=next_page_token
)

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._partition_router.get_request_headers(
stream_state=stream_state, stream_slice=stream_slice.partition, next_page_token=next_page_token
) | self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].get_request_headers(
stream_state=stream_state, stream_slice=stream_slice.cursor_slice, next_page_token=next_page_token
)

def get_request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._partition_router.get_request_body_data(
stream_state=stream_state, stream_slice=stream_slice.partition, next_page_token=next_page_token
) | self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].get_request_body_data(
stream_state=stream_state, stream_slice=stream_slice.cursor_slice, next_page_token=next_page_token
)

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._partition_router.get_request_body_json(
stream_state=stream_state, stream_slice=stream_slice.partition, next_page_token=next_page_token
) | self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].get_request_body_json(
stream_state=stream_state, stream_slice=stream_slice.cursor_slice, next_page_token=next_page_token
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, PerPartitionCursor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.models.declarative_component_schema import AddedFieldDefinition as AddedFieldDefinitionModel
Expand Down Expand Up @@ -495,10 +495,6 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi
)

def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -> Optional[StreamSlicer]:
incremental_sync = (
self._create_component_from_model(model=model.incremental_sync, config=config) if model.incremental_sync else None
)

stream_slicer = None
if hasattr(model.retriever, "partition_router") and model.retriever.partition_router:
stream_slicer_model = model.retriever.partition_router
Expand All @@ -510,10 +506,15 @@ def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -
else self._create_component_from_model(model=stream_slicer_model, config=config)
)

if incremental_sync and stream_slicer:
return CartesianProductStreamSlicer(stream_slicers=[incremental_sync, stream_slicer], parameters=model.parameters)
elif incremental_sync:
return incremental_sync
if model.incremental_sync and stream_slicer:
return PerPartitionCursor(
cursor_factory=CursorFactory(
lambda: self._create_component_from_model(model=model.incremental_sync, config=config),
),
partition_router=stream_slicer,
)
elif model.incremental_sync:
return self._create_component_from_model(model=model.incremental_sync, config=config) if model.incremental_sync else None
elif stream_slicer:
return stream_slicer
else:
Expand Down
Loading

0 comments on commit 27fb1a2

Please sign in to comment.