Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-faker: cursor = updated_at & remove records_per_sync #27684

Merged
merged 9 commits into from
Jun 24, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-faker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_faker ./source_faker
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.1.0
LABEL io.airbyte.version=3.0.0
LABEL io.airbyte.name=airbyte/source-faker
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ But let's assume we don't have 1TB of local hard disk. So, we want to make 10 ch
```json
{
"count": 2039840637,
"seed": 0,
"records_per_sync": 203984064
"seed": 0
}
```

**`state.json`**

At the end of every sync, increment the `id` in the users stream and the `user_id` in the purchases stream by `203984064`, the `records_per_sync` chunk size
At the end of every sync, increment the `id` in the users stream and the `user_id` in the purchases stream by `203984064`

```json
[
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"count": 10,
"seed": 0,
"records_per_sync": 10,
"parallelism": 1
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-faker/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: dfd88b22-b603-4c3d-aad7-3701784586b1
dockerImageTag: 2.1.0
dockerImageTag: 3.0.0
dockerRepository: airbyte/source-faker
githubIssueLabel: source-faker
icon: faker.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
count: int = config["count"] if "count" in config else 0
seed: int = config["seed"] if "seed" in config else None
records_per_sync: int = config["records_per_sync"] if "records_per_sync" in config else 500
records_per_slice: int = config["records_per_slice"] if "records_per_slice" in config else 100
parallelism: int = config["parallelism"] if "parallelism" in config else 4

return [
Products(count, seed, parallelism, records_per_sync, records_per_slice),
Users(count, seed, parallelism, records_per_sync, records_per_slice),
Purchases(count, seed, parallelism, records_per_sync, records_per_slice),
Products(count, seed, parallelism, records_per_slice),
Users(count, seed, parallelism, records_per_slice),
Purchases(count, seed, parallelism, records_per_slice),
]
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,21 @@
"default": -1,
"order": 1
},
"records_per_sync": {
"title": "Records Per Sync",
"description": "How many fake records will be returned for each sync, for each stream? By default, it will take 2 syncs to create the requested 1000 records.",
"type": "integer",
"minimum": 1,
"default": 500,
"order": 2
},
"records_per_slice": {
"title": "Records Per Stream Slice",
"description": "How many fake records will be in each page (stream slice), before a state message is emitted?",
"type": "integer",
"minimum": 1,
"default": 1000,
"order": 3
"order": 2
},
"parallelism": {
"title": "Parallelism",
"description": "How many parallel workers should we use to generate fake data? Choose a value equal to the number of CPUs you will allocate to this source.",
"type": "integer",
"minimum": 1,
"default": 4,
"order": 4
"order": 3
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@

class Products(Stream, IncrementalMixin):
primary_key = None
cursor_field = "id"
cursor_field = "updated_at"

def __init__(self, count: int, seed: int, parallelism: int, records_per_sync: int, records_per_slice: int, **kwargs):
def __init__(self, count: int, seed: int, parallelism: int, records_per_slice: int, **kwargs):
super().__init__(**kwargs)
self.seed = seed
self.records_per_sync = records_per_sync
self.records_per_slice = records_per_slice

@property
Expand All @@ -32,7 +31,7 @@ def state(self) -> Mapping[str, Any]:
if hasattr(self, "_state"):
return self._state
else:
return {self.cursor_field: 0}
return {}

@state.setter
def state(self, value: Mapping[str, Any]):
Expand All @@ -43,7 +42,7 @@ def load_products(self) -> List[Dict]:
return read_json(os.path.join(dirname, "record_data", "products.json"))

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
total_records = self.state[self.cursor_field] if self.cursor_field in self.state else 0
total_records = self.state["id"] if "id" in self.state else 0
products = self.load_products()

median_record_byte_size = 180
Expand All @@ -56,18 +55,17 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
yield product
total_records = product["id"]

self.state = {self.cursor_field: total_records, "seed": self.seed}
self.state = {"id": total_records, "seed": self.seed}


class Users(Stream, IncrementalMixin):
primary_key = None
cursor_field = "id"
cursor_field = "updated_at"

def __init__(self, count: int, seed: int, parallelism: int, records_per_sync: int, records_per_slice: int, **kwargs):
def __init__(self, count: int, seed: int, parallelism: int, records_per_slice: int, **kwargs):
super().__init__(**kwargs)
self.count = count
self.seed = seed
self.records_per_sync = records_per_sync
self.records_per_slice = records_per_slice
self.parallelism = parallelism
self.generator = UserGenerator(self.name, self.seed)
Expand All @@ -81,7 +79,7 @@ def state(self) -> Mapping[str, Any]:
if hasattr(self, "_state"):
return self._state
else:
return {self.cursor_field: 0}
return {}

@state.setter
def state(self, value: Mapping[str, Any]):
Expand All @@ -93,40 +91,37 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
We make N workers (where N is the number of available CPUs) and spread out the CPU-bound work of generating records and serializing them to JSON
"""

total_records = self.state[self.cursor_field] if self.cursor_field in self.state else 0
total_records = self.state["id"] if "id" in self.state else 0
records_in_sync = 0

median_record_byte_size = 450
yield generate_estimate(self.name, self.count - total_records, median_record_byte_size)

with Pool(initializer=self.generator.prepare, processes=self.parallelism) as pool:
while records_in_sync < self.count and records_in_sync < self.records_per_sync:
while records_in_sync < self.count:
records_remaining_this_loop = min(self.records_per_slice, (self.count - total_records))
if records_remaining_this_loop <= 0:
break
users = pool.map(self.generator.generate, range(total_records, total_records + records_remaining_this_loop))
for user in users:
total_records += 1
records_in_sync += 1
yield user

if records_in_sync >= self.records_per_sync:
break
if records_remaining_this_loop == 0:
break

self.state = {self.cursor_field: total_records, "seed": self.seed}
self.state = {"id": total_records, "seed": self.seed}

self.state = {self.cursor_field: total_records, "seed": self.seed}
self.state = {"id": total_records, "seed": self.seed}


class Purchases(Stream, IncrementalMixin):
primary_key = None
cursor_field = "id"
cursor_field = "updated_at"

def __init__(self, count: int, seed: int, parallelism: int, records_per_sync: int, records_per_slice: int, **kwargs):
def __init__(self, count: int, seed: int, parallelism: int, records_per_slice: int, **kwargs):
super().__init__(**kwargs)
self.count = count
self.seed = seed
self.records_per_sync = records_per_sync
self.records_per_slice = records_per_slice
self.parallelism = parallelism
self.generator = PurchaseGenerator(self.name, self.seed)
Expand All @@ -140,7 +135,7 @@ def state(self) -> Mapping[str, Any]:
if hasattr(self, "_state"):
return self._state
else:
return {self.cursor_field: 0}
return {}

@state.setter
def state(self, value: Mapping[str, Any]):
Expand All @@ -152,7 +147,7 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
We make N workers (where N is the number of available CPUs) and spread out the CPU-bound work of generating records and serializing them to JSON
"""

total_purchase_records = self.state[self.cursor_field] if self.cursor_field in self.state else 0
total_purchase_records = self.state["id"] if "id" in self.state else 0
total_user_records = self.state["user_id"] if "user_id" in self.state else 0
user_records_in_sync = 0

Expand All @@ -161,10 +156,8 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
yield generate_estimate(self.name, (self.count - total_user_records) * 1.3, median_record_byte_size)

with Pool(initializer=self.generator.prepare, processes=self.parallelism) as pool:
while total_user_records < self.count and user_records_in_sync < self.records_per_sync:
while total_user_records < self.count:
records_remaining_this_loop = min(self.records_per_slice, (self.count - user_records_in_sync))
if records_remaining_this_loop <= 0:
break
carts = pool.map(self.generator.generate, range(total_user_records, total_user_records + records_remaining_this_loop))
for purchases in carts:
for purchase in purchases:
Expand All @@ -174,9 +167,9 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
total_user_records += 1
user_records_in_sync += 1

if user_records_in_sync >= self.records_per_sync:
break
if records_remaining_this_loop == 0:
break

self.state = {self.cursor_field: total_purchase_records, "user_id": total_user_records, "seed": self.seed}
self.state = {"id": total_purchase_records, "user_id": total_user_records, "seed": self.seed}

self.state = {self.cursor_field: total_purchase_records, "user_id": total_user_records, "seed": self.seed}
self.state = {"id": total_purchase_records, "user_id": total_user_records, "seed": self.seed}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
from multiprocessing import current_process

from airbyte_cdk.models import AirbyteRecordMessage, Type
Expand Down Expand Up @@ -38,17 +39,14 @@ def prepare(self):
dt = Datetime(seed=seed_with_offset)

def generate(self, user_id: int):
time_a = dt.datetime()
time_b = dt.datetime()

# faker doesn't always produce unique email addresses, so to enforce uniqueness, we will append the user_id to the prefix
email_parts = person.email().split("@")
email = f"{email_parts[0]}+{user_id + 1}@{email_parts[1]}"

profile = {
"id": user_id + 1,
"created_at": format_airbyte_time(time_a if time_a <= time_b else time_b),
"updated_at": format_airbyte_time(time_a if time_a > time_b else time_b),
"created_at": format_airbyte_time(dt.datetime()),
"updated_at": format_airbyte_time(datetime.datetime.now()),
"name": person.name(),
"title": person.title(),
"age": person.age(),
Expand Down Expand Up @@ -76,8 +74,5 @@ def generate(self, user_id: int):
while not profile["created_at"]:
profile["created_at"] = format_airbyte_time(dt.datetime())

if not profile["updated_at"]:
profile["updated_at"] = profile["created_at"] + 1

record = AirbyteRecordMessage(stream=self.stream_name, data=profile, emitted_at=now_millis())
return AirbyteMessageWithCachedJSON(type=Type.RECORD, record=record)
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def test_no_read_limit_hit():

def test_read_big_random_data():
source = SourceFaker()
config = {"count": 1000, "records_per_slice": 100, "records_per_sync": 1000, "parallelism": 1}
config = {"count": 1000, "records_per_slice": 100, "parallelism": 1}
catalog = ConfiguredAirbyteCatalog(
streams=[
{
Expand Down Expand Up @@ -178,7 +178,7 @@ def test_read_big_random_data():

def test_with_purchases():
source = SourceFaker()
config = {"count": 1000, "records_per_sync": 1000, "parallelism": 1}
config = {"count": 1000, "parallelism": 1}
catalog = ConfiguredAirbyteCatalog(
streams=[
{
Expand Down Expand Up @@ -217,37 +217,6 @@ def test_with_purchases():
assert latest_state.state.data["products"] == {'id': 100, 'seed': None}
assert latest_state.state.data["purchases"]["user_id"] > 0


def test_sync_ends_with_limit():
source = SourceFaker()
config = {"count": 100, "records_per_sync": 5, "parallelism": 1}
catalog = ConfiguredAirbyteCatalog(
streams=[
{
"stream": {"name": "users", "json_schema": {}, "supported_sync_modes": ["incremental"]},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
}
]
)
state = {}
iterator = source.read(logger, config, catalog, state)

record_rows_count = 0
state_rows_count = 0
latest_state = {}
for row in iterator:
if row.type is Type.RECORD:
record_rows_count = record_rows_count + 1
if row.type is Type.STATE:
state_rows_count = state_rows_count + 1
latest_state = row

assert record_rows_count == 5
assert state_rows_count == 1
assert latest_state.state.data == {"users": {"id": 5, "seed": None}}


def test_read_with_seed():
"""
This test asserts that setting a seed always returns the same values
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/faker.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ None!

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :-------------------------------------------------------------------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------- |
| 3.0.0 | 2022-06-23 | [27684](https://github.com/airbytehq/airbyte/pull/27684) | Stream cursor is now `updated_at` & remove `records_per_sync` option |
| 2.1.0 | 2022-05-08 | [25903](https://github.com/airbytehq/airbyte/pull/25903) | Add user.address (object) |
| 2.0.3 | 2022-02-20 | [23259](https://github.com/airbytehq/airbyte/pull/23259) | bump to test publication |
| 2.0.2 | 2022-02-20 | [23259](https://github.com/airbytehq/airbyte/pull/23259) | bump to test publication |
Expand Down