Skip to content

Commit

Permalink
Faker emits AirbyteEstimateTraceMessage (#19197)
Browse files Browse the repository at this point in the history
* Faker emits `AirbyteEstimateTraceMessage`

* update pr

* fix SAT

* auto-bump connector version

Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored and akashkulk committed Nov 17, 2022
1 parent 02b05f8 commit 10c7108
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@
- name: Faker
sourceDefinitionId: dfd88b22-b603-4c3d-aad7-3701784586b1
dockerRepository: airbyte/source-faker
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.com/integrations/sources/faker
sourceType: api
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3655,7 +3655,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-faker:0.2.0"
- dockerImage: "airbyte/source-faker:0.2.1"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/faker"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-faker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_faker ./source_faker
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/source-faker
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "exception"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-faker/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "mimesis==6.1.1"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "mimesis==6.1.1"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest~=7.0",
"source-acceptance-test",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteEstimateTraceMessage,
AirbyteLogMessage,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
EstimateType,
Status,
TraceType,
Type,
)
from airbyte_cdk.sources import Source
Expand All @@ -42,7 +46,10 @@ def check(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteConnect
"""

# As this is an in-memory source, it always succeeds
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
if type(config["count"]) == int or type(config["count"]) == float:
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
else:
return AirbyteConnectionStatus(status=Status.FAILED)

def discover(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteCatalog:
"""
Expand Down Expand Up @@ -136,6 +143,10 @@ def read(
records_in_sync = 0
records_in_page = 0

users_estimate = count - cursor
yield generate_estimate(stream.stream.name, users_estimate, 450)
yield generate_estimate("Purchases", users_estimate * 1.5, 230) # a fuzzy guess, some users have purchases, some don't

for i in range(cursor, count):
user = generate_user(person, dt, i)
yield generate_record(stream, user)
Expand All @@ -162,6 +173,7 @@ def read(

elif stream.stream.name == "Products":
products = generate_products()
yield generate_estimate(stream.stream.name, len(products), 180)
for p in products:
yield generate_record(stream, p)
yield generate_state(state, stream, {"product_count": len(products)})
Expand Down Expand Up @@ -204,6 +216,14 @@ def log_stream(stream_name: str):
)


def generate_estimate(stream_name: str, total: int, bytes_per_row: int):
emitted_at = int(datetime.datetime.now().timestamp() * 1000)
estimate = AirbyteEstimateTraceMessage(
type=EstimateType.STREAM, name=stream_name, row_estimate=round(total), byte_estimate=round(total * bytes_per_row)
)
return AirbyteMessage(type=Type.TRACE, trace=AirbyteTraceMessage(type=TraceType.ESTIMATE, emitted_at=emitted_at, estimate=estimate))


def generate_state(state: Dict[str, any], stream: any, data: any):
state[
stream.stream.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ def test_read_small_random_data():
logger = None
config = {"count": 10}
catalog = ConfiguredAirbyteCatalog(
streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}]
streams=[
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
)
state = {}
iterator = source.read(logger, config, catalog, state)
Expand All @@ -70,8 +76,16 @@ def test_read_big_random_data():
config = {"count": 1000, "records_per_slice": 100, "records_per_sync": 1000}
catalog = ConfiguredAirbyteCatalog(
streams=[
{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
{
"stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
]
)
state = {}
Expand All @@ -98,9 +112,21 @@ def test_with_purchases():
config = {"count": 1000, "records_per_sync": 1000}
catalog = ConfiguredAirbyteCatalog(
streams=[
{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{"stream": {"name": "Purchases", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
{
"stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
{
"stream": {"name": "Purchases", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
]
)
state = {}
Expand Down Expand Up @@ -128,7 +154,13 @@ def test_sync_ends_with_limit():
logger = None
config = {"count": 100, "records_per_sync": 5}
catalog = ConfiguredAirbyteCatalog(
streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}]
streams=[
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
)
state = {}
iterator = source.read(logger, config, catalog, state)
Expand Down Expand Up @@ -157,7 +189,13 @@ def test_read_with_seed():
logger = None
config = {"count": 1, "seed": 100}
catalog = ConfiguredAirbyteCatalog(
streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}]
streams=[
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
)
state = {}
iterator = source.read(logger, config, catalog, state)
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/faker.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ N/A

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------- |
| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! |
| 0.2.1 | 2022-10-14 | [19197](https://github.com/airbytehq/airbyte/pull/19197) | Emit `AirbyteEstimateTraceMessage` |
| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! |
| 0.1.8 | 2022-10-12 | [17889](https://github.com/airbytehq/airbyte/pull/17889) | Bump to test publish command (2) |
| 0.1.7 | 2022-10-11 | [17848](https://github.com/airbytehq/airbyte/pull/17848) | Bump to test publish command |
| 0.1.6 | 2022-09-07 | [16418](https://github.com/airbytehq/airbyte/pull/16418) | Log start of each stream |
Expand Down

0 comments on commit 10c7108

Please sign in to comment.