Skip to content

Commit

Permalink
[low-code connectors] Cartesian product stream slicer (#13740)
Browse files Browse the repository at this point in the history
* list slicer

* Add comment

* product stream slicer

* comment

* rename

* format

* Update comment

* split on 2 lines for readability

* Revert "rename"

This reverts commit e801f2d.
  • Loading branch information
girarda committed Jun 16, 2022
1 parent 5f84341 commit 34e2dc3
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import itertools
from collections import ChainMap
from typing import Any, Iterable, List, Mapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer


class CartesianProductStreamSlicer(StreamSlicer):
"""
Stream slicers that iterates over the cartesian product of input stream slicers
Given 2 stream slicers with the following slices:
A: [{"i": 0}, {"i": 1}, {"i": 2}]
B: [{"s": "hello"}, {"s": "world"}]
the resulting stream slices are
[
{"i": 0, "s": "hello"},
{"i": 0, "s": "world"},
{"i": 1, "s": "hello"},
{"i": 1, "s": "world"},
{"i": 2, "s": "hello"},
{"i": 2, "s": "world"},
]
"""

def __init__(self, stream_slicers: List[StreamSlicer]):
self._stream_slicers = stream_slicers

def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
sub_slices = (s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers)
return (ChainMap(*a) for a in itertools.product(*sub_slices))
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ def __init__(self, slice_values: Union[str, List[str]], slice_definition: Mappin
self._config = config

def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
return (self._interpolation.eval(self._config, slice_value=slice_value, literal_eval=True) for slice_value in self._slice_values)
return [self._interpolation.eval(self._config, slice_value=slice_value) for slice_value in self._slice_values]
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pytest as pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer


@pytest.mark.parametrize(
"test_name, stream_slicers, expected_slices",
[
(
"test_single_stream_slicer",
[ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None)],
[{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}],
),
(
"test_two_stream_slicers",
[
ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None),
ListStreamSlicer(["A", "B"], {"letter": "{{ slice_value }}"}, None),
],
[
{"owner_resource": "customer", "letter": "A"},
{"owner_resource": "customer", "letter": "B"},
{"owner_resource": "store", "letter": "A"},
{"owner_resource": "store", "letter": "B"},
{"owner_resource": "subscription", "letter": "A"},
{"owner_resource": "subscription", "letter": "B"},
],
),
(
"test_list_and_datetime",
[
ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None),
DatetimeStreamSlicer(
InterpolatedString("2021-01-01"), InterpolatedString("2021-01-03"), "1d", InterpolatedString(""), "%Y-%m-%d", None
),
],
[
{"owner_resource": "customer", "start_date": "2021-01-01", "end_date": "2021-01-01"},
{"owner_resource": "customer", "start_date": "2021-01-02", "end_date": "2021-01-02"},
{"owner_resource": "customer", "start_date": "2021-01-03", "end_date": "2021-01-03"},
{"owner_resource": "store", "start_date": "2021-01-01", "end_date": "2021-01-01"},
{"owner_resource": "store", "start_date": "2021-01-02", "end_date": "2021-01-02"},
{"owner_resource": "store", "start_date": "2021-01-03", "end_date": "2021-01-03"},
{"owner_resource": "subscription", "start_date": "2021-01-01", "end_date": "2021-01-01"},
{"owner_resource": "subscription", "start_date": "2021-01-02", "end_date": "2021-01-02"},
{"owner_resource": "subscription", "start_date": "2021-01-03", "end_date": "2021-01-03"},
],
),
],
)
def test_substream_slicer(test_name, stream_slicers, expected_slices):
slicer = CartesianProductStreamSlicer(stream_slicers)
slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)]
assert slices == expected_slices

0 comments on commit 34e2dc3

Please sign in to comment.