Skip to content

Commit

Permalink
Simplify faker offsets and limits (#27807)
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Jun 28, 2023
1 parent 2081a0c commit 788a0ec
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 65 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-faker/Dockerfile
Expand Up @@ -34,5 +34,5 @@ COPY source_faker ./source_faker
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=3.0.0
LABEL io.airbyte.version=3.0.1
LABEL io.airbyte.name=airbyte/source-faker
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-faker/metadata.yaml
Expand Up @@ -4,7 +4,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: dfd88b22-b603-4c3d-aad7-3701784586b1
dockerImageTag: 3.0.0
dockerImageTag: 3.0.1
dockerRepository: airbyte/source-faker
githubIssueLabel: source-faker
icon: faker.svg
Expand Down
Expand Up @@ -32,7 +32,7 @@
},
"always_updated": {
"title": "Always Updated",
"description": "Should the updated_at values for every record be new each sync?",
"description": "Should the updated_at values for every record be new each sync? Setting this to false will case the source to stop emitting records after COUNT records have been emitted.",
"type": "boolean",
"default": true
},
Expand Down
Expand Up @@ -20,6 +20,7 @@ class Products(Stream, IncrementalMixin):

def __init__(self, count: int, seed: int, parallelism: int, records_per_slice: int, always_updated: bool, **kwargs):
super().__init__(**kwargs)
self.count = count
self.seed = seed
self.records_per_slice = records_per_slice
self.always_updated = always_updated
Expand Down Expand Up @@ -47,23 +48,20 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
if "updated_at" in self.state and not self.always_updated:
return iter([])

total_records = self.state["id"] if "id" in self.state else 0
products = self.load_products()
updated_at = ""

median_record_byte_size = 180
rows_to_emit = len(products) - total_records
if rows_to_emit > 0:
yield generate_estimate(self.name, rows_to_emit, median_record_byte_size)
rows_to_emit = len(products)
yield generate_estimate(self.name, rows_to_emit, median_record_byte_size)

for product in products:
if product["id"] > total_records:
product["updated_at"] = format_airbyte_time(datetime.datetime.now())
if product["id"] <= self.count:
updated_at = format_airbyte_time(datetime.datetime.now())
product["updated_at"] = updated_at
yield product
total_records = product["id"]
updated_at = product["updated_at"]

self.state = {"id": total_records, "seed": self.seed, "updated_at": updated_at}
self.state = {"seed": self.seed, "updated_at": updated_at}


class Users(Stream, IncrementalMixin):
Expand Down Expand Up @@ -103,29 +101,27 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
if "updated_at" in self.state and not self.always_updated:
return iter([])

total_records = self.state["id"] if "id" in self.state else 0
records_in_sync = 0
updated_at = ""

median_record_byte_size = 450
yield generate_estimate(self.name, self.count - total_records, median_record_byte_size)
yield generate_estimate(self.name, self.count, median_record_byte_size)

loop_offset = 0
with Pool(initializer=self.generator.prepare, processes=self.parallelism) as pool:
while records_in_sync < self.count:
records_remaining_this_loop = min(self.records_per_slice, (self.count - total_records))
users = pool.map(self.generator.generate, range(total_records, total_records + records_remaining_this_loop))
while loop_offset < self.count:
records_remaining_this_loop = min(self.records_per_slice, (self.count - loop_offset))
users = pool.map(self.generator.generate, range(loop_offset, loop_offset + records_remaining_this_loop))
for user in users:
updated_at = user.record.data["updated_at"]
total_records += 1
records_in_sync += 1
loop_offset += 1
yield user

if records_remaining_this_loop == 0:
break

self.state = {"id": total_records, "seed": self.seed, "updated_at": updated_at}
self.state = {"seed": self.seed, "updated_at": updated_at}

self.state = {"id": total_records, "seed": self.seed, "updated_at": updated_at}
self.state = {"seed": self.seed, "updated_at": updated_at}


class Purchases(Stream, IncrementalMixin):
Expand Down Expand Up @@ -165,31 +161,25 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
if "updated_at" in self.state and not self.always_updated:
return iter([])

total_purchase_records = self.state["id"] if "id" in self.state else 0
total_user_records = self.state["user_id"] if "user_id" in self.state else 0
user_records_in_sync = 0
updated_at = ""

# a fuzzy guess, some users have purchases, some don't
median_record_byte_size = 230
yield generate_estimate(self.name, (self.count - total_user_records) * 1.3, median_record_byte_size)
yield generate_estimate(self.name, (self.count) * 1.3, median_record_byte_size)

loop_offset = 0
with Pool(initializer=self.generator.prepare, processes=self.parallelism) as pool:
while total_user_records < self.count:
records_remaining_this_loop = min(self.records_per_slice, (self.count - user_records_in_sync))
carts = pool.map(self.generator.generate, range(total_user_records, total_user_records + records_remaining_this_loop))
while loop_offset < self.count:
records_remaining_this_loop = min(self.records_per_slice, (self.count - loop_offset))
carts = pool.map(self.generator.generate, range(loop_offset, loop_offset + records_remaining_this_loop))
for purchases in carts:
loop_offset += 1
for purchase in purchases:
updated_at = purchase.record.data["updated_at"]
total_purchase_records += 1
yield purchase

total_user_records += 1
user_records_in_sync += 1

if records_remaining_this_loop == 0:
break

self.state = {"id": total_purchase_records, "user_id": total_user_records, "seed": self.seed, "updated_at": updated_at}
self.state = {"seed": self.seed, "updated_at": updated_at}

self.state = {"id": total_purchase_records, "user_id": total_user_records, "seed": self.seed, "updated_at": updated_at}
self.state = {"seed": self.seed, "updated_at": updated_at}
Expand Up @@ -172,33 +172,6 @@ def test_read_products():
assert state_rows_count == 2


def test_no_read_limit_hit():
source = SourceFaker()
config = {"count": 10, "parallelism": 1}
catalog = ConfiguredAirbyteCatalog(
streams=[
{
"stream": {"name": "users", "json_schema": {}, "supported_sync_modes": ["incremental"]},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
}
]
)
state = {"users": {"id": 10}}
iterator = source.read(logger, config, catalog, state)

record_rows_count = 0
state_rows_count = 0
for row in iterator:
if row.type is Type.RECORD:
record_rows_count = record_rows_count + 1
if row.type is Type.STATE:
state_rows_count = state_rows_count + 1

assert record_rows_count == 0
assert state_rows_count == 1


def test_read_big_random_data():
source = SourceFaker()
config = {"count": 1000, "records_per_slice": 100, "parallelism": 1}
Expand Down Expand Up @@ -288,8 +261,8 @@ def test_read_with_seed():
iterator = source.read(logger, config, catalog, state)

records = [row for row in iterator if row.type is Type.RECORD]
assert records[0].record.data["occupation"] == "Cartoonist"
assert records[0].record.data["email"] == "reflect1958+1@yahoo.com"
assert records[0].record.data["occupation"] == "Sheriff Principal"
assert records[0].record.data["email"] == "alleged2069+1@example.com"


def test_ensure_no_purchases_without_users():
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/faker.md
Expand Up @@ -96,6 +96,7 @@ None!

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :-------------------------------------------------------------------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------- |
| 3.0.1 | 2022-06-28 | [27807](https://github.com/airbytehq/airbyte/pull/27807) | Fix bug with purchase stream updated_at |
| 3.0.0 | 2022-06-23 | [27684](https://github.com/airbytehq/airbyte/pull/27684) | Stream cursor is now `updated_at` & remove `records_per_sync` option |
| 2.1.0 | 2022-05-08 | [25903](https://github.com/airbytehq/airbyte/pull/25903) | Add user.address (object) |
| 2.0.3 | 2022-02-20 | [23259](https://github.com/airbytehq/airbyte/pull/23259) | bump to test publication |
Expand Down

0 comments on commit 788a0ec

Please sign in to comment.