Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[faker] decouple stream state #20492

Merged
merged 16 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-faker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_faker ./source_faker
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
evantahler marked this conversation as resolved.
Show resolved Hide resolved
LABEL io.airbyte.name=airbyte/source-faker
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ acceptance_tests:
tests:
- config_path: secrets/config.json
backward_compatibility_tests_config:
disable_for_version: "0.2.1"
disable_for_version: "1.0.0"
evantahler marked this conversation as resolved.
Show resolved Hide resolved
basic_read:
tests:
- config_path: secrets/config.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
{"stream": "purchases", "data": {"id": 8, "product_id": 67, "user_id": 6, "added_to_cart_at": "2008-03-02T18:14:15+00:00", "purchased_at": "2020-06-21T18:14:15+00:00", "returned_at": "2020-09-24T18:14:15+00:00"}, "emitted_at": 1669830193010}
{"stream": "purchases", "data": {"id": 9, "product_id": 91, "user_id": 7, "added_to_cart_at": "2022-03-12T17:13:51+00:00", "purchased_at": null, "returned_at": null}, "emitted_at": 1669830193010}
{"stream": "purchases", "data": {"id": 10, "product_id": 79, "user_id": 8, "added_to_cart_at": "2017-12-31T07:18:11+00:00", "purchased_at": "2019-05-14T07:18:11+00:00", "returned_at": null}, "emitted_at": 1669830193010}
{"stream": "purchases", "data": {"id": 11, "product_id": 91, "user_id": 8, "added_to_cart_at": "2022-03-24T07:18:11+00:00", "purchased_at": "2022-05-11T07:18:11+00:00", "returned_at": null}, "emitted_at": 1669830193010}
{"stream": "purchases", "data": {"id": 11, "product_id": 91, "user_id": 8, "added_to_cart_at": "2022-03-24T07:18:11+00:00", "purchased_at": "2022-06-29T07:18:11+00:00", "returned_at": null}, "emitted_at": 1669830193010}
{"stream": "purchases", "data": {"id": 12, "product_id": 19, "user_id": 9, "added_to_cart_at": "2020-11-29T04:56:09+00:00", "purchased_at": "2022-03-02T04:56:09+00:00", "returned_at": "2022-04-12T04:56:09+00:00"}, "emitted_at": 1669830193010}
{"stream": "purchases", "data": {"id": 13, "product_id": 63, "user_id": 10, "added_to_cart_at": "2003-08-05T17:43:25+00:00", "purchased_at": "2015-12-15T17:43:25+00:00", "returned_at": null}, "emitted_at": 1669830193010}
{"stream": "products", "data": {"id": 1, "make": "Mazda", "model": "MX-5", "year": 2008, "price": 2869, "created_at": "2022-02-01T17:02:19+00:00"}, "emitted_at": 1669830193011}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [
Products(count, seed, records_per_sync, records_per_slice),
Users(count, seed, records_per_sync, records_per_slice),
Purchases(seed, records_per_sync, records_per_slice),
Purchases(count, seed, records_per_sync, records_per_slice),
]
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
break

self.state = {self.cursor_field: total_records, "seed": self.seed}
set_total_user_records(total_records)


class Purchases(Stream, IncrementalMixin):
primary_key = None
cursor_field = "user_id"
cursor_field = "id"
alafanechere marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, seed: int, records_per_sync: int, records_per_slice: int, **kwargs):
def __init__(self, count:int, seed: int, records_per_sync: int, records_per_slice: int, **kwargs):
super().__init__(**kwargs)
self.count = count
self.seed = seed
self.records_per_sync = records_per_sync
self.records_per_slice = records_per_slice
Expand Down Expand Up @@ -222,23 +222,33 @@ def generate_purchases(self, user_id: int, purchases_count: int) -> list[Dict]:
return purchases

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
purchases_count = self.state[self.cursor_field] if self.cursor_field in self.state else 0

if total_user_records <= 0:
return # if there are no new users, there should be no new purchases
total_purchase_records = self.state[self.cursor_field] if self.cursor_field in self.state else 0
total_user_records = self.state['user_id'] if 'user_id' in self.state else 0
evantahler marked this conversation as resolved.
Show resolved Hide resolved
user_records_in_sync = 0
user_records_in_slice = 0

median_record_byte_size = 230
yield generate_estimate(
self.name, total_user_records - purchases_count * 1.3, median_record_byte_size
self.name, (self.count - total_user_records) * 1.3, median_record_byte_size
) # a fuzzy guess, some users have purchases, some don't

for i in range(purchases_count, total_user_records):
purchases = self.generate_purchases(i + 1, purchases_count)
for i in range(total_user_records, self.count):
purchases = self.generate_purchases(i + 1, total_purchase_records)
for purchase in purchases:
total_purchase_records += 1
yield purchase
purchases_count += 1
total_user_records += 1
user_records_in_sync += 1
user_records_in_slice += 1

if user_records_in_slice >= self.records_per_slice:
self.state = {self.cursor_field: total_purchase_records, "user_id": total_user_records, "seed": self.seed}
user_records_in_slice = 0

if user_records_in_sync == self.records_per_sync:
break

self.state = {self.cursor_field: total_user_records, "seed": self.seed}
self.state = {self.cursor_field: total_purchase_records, "user_id": total_user_records, "seed": self.seed}


def generate_estimate(stream_name: str, total: int, bytes_per_row: int):
Expand All @@ -247,11 +257,3 @@ def generate_estimate(stream_name: str, total: int, bytes_per_row: int):
type=EstimateType.STREAM, name=stream_name, row_estimate=round(total), byte_estimate=round(total * bytes_per_row)
)
return AirbyteTraceMessage(type=TraceType.ESTIMATE, emitted_at=emitted_at, estimate=estimate_message)


# a globals hack to share data between streams:
total_user_records = 0


def set_total_user_records(total: int):
globals()["total_user_records"] = total
1 change: 1 addition & 0 deletions docs/integrations/sources/faker.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ None!

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------- |
| 1.0.0 | 2022-12-14 | [20492](https://github.com/airbytehq/airbyte/pull/20492) | Decouple stream states for better parallelism |
evantahler marked this conversation as resolved.
Show resolved Hide resolved
| 1.0.0 | 2022-11-28 | [19490](https://github.com/airbytehq/airbyte/pull/19490) | Faker uses the CDK; rename streams to be lower-case (breaking), add determinism to random purchases, and rename |
| 0.2.1 | 2022-10-14 | [19197](https://github.com/airbytehq/airbyte/pull/19197) | Emit `AirbyteEstimateTraceMessage` |
| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! |
Expand Down