Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faker emits AirbyteEstimateTraceMessage #19197

Merged
merged 5 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@
- name: Faker
sourceDefinitionId: dfd88b22-b603-4c3d-aad7-3701784586b1
dockerRepository: airbyte/source-faker
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.com/integrations/sources/faker
sourceType: api
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3632,7 +3632,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-faker:0.2.0"
- dockerImage: "airbyte/source-faker:0.2.1"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/faker"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-faker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_faker ./source_faker
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/source-faker
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "exception"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-faker/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "mimesis==6.1.1"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "mimesis==6.1.1"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest~=7.0",
"source-acceptance-test",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteEstimateTraceMessage,
AirbyteLogMessage,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
EstimateType,
Status,
TraceType,
Type,
)
from airbyte_cdk.sources import Source
Expand All @@ -42,7 +46,10 @@ def check(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteConnect
"""

# As this is an in-memory source, it always succeeds
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
if type(config["count"]) == int or type(config["count"]) == float:
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
else:
return AirbyteConnectionStatus(status=Status.FAILED)

def discover(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteCatalog:
"""
Expand Down Expand Up @@ -136,6 +143,10 @@ def read(
records_in_sync = 0
records_in_page = 0

users_estimate = count - cursor
yield generate_estimate(stream.stream.name, users_estimate, 450)
yield generate_estimate("Purchases", users_estimate * 1.5, 230) # a fuzzy guess, some users have purchases, some don't

for i in range(cursor, count):
user = generate_user(person, dt, i)
yield generate_record(stream, user)
Expand All @@ -162,6 +173,7 @@ def read(

elif stream.stream.name == "Products":
products = generate_products()
yield generate_estimate(stream.stream.name, len(products), 180)
for p in products:
yield generate_record(stream, p)
yield generate_state(state, stream, {"product_count": len(products)})
Expand Down Expand Up @@ -204,6 +216,14 @@ def log_stream(stream_name: str):
)


def generate_estimate(stream_name: str, total: int, bytes_per_row: int):
emitted_at = int(datetime.datetime.now().timestamp() * 1000)
estimate = AirbyteEstimateTraceMessage(
type=EstimateType.STREAM, name=stream_name, row_estimate=round(total), byte_estimate=round(total * bytes_per_row)
)
return AirbyteMessage(type=Type.TRACE, trace=AirbyteTraceMessage(type=TraceType.ESTIMATE, emitted_at=emitted_at, estimate=estimate))


def generate_state(state: Dict[str, any], stream: any, data: any):
state[
stream.stream.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ def test_read_small_random_data():
logger = None
config = {"count": 10}
catalog = ConfiguredAirbyteCatalog(
streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}]
streams=[
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
)
state = {}
iterator = source.read(logger, config, catalog, state)
Expand All @@ -70,8 +76,16 @@ def test_read_big_random_data():
config = {"count": 1000, "records_per_slice": 100, "records_per_sync": 1000}
catalog = ConfiguredAirbyteCatalog(
streams=[
{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
{
"stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
]
)
state = {}
Expand All @@ -98,9 +112,21 @@ def test_with_purchases():
config = {"count": 1000, "records_per_sync": 1000}
catalog = ConfiguredAirbyteCatalog(
streams=[
{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{"stream": {"name": "Purchases", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
{
"stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
{
"stream": {"name": "Purchases", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
]
)
state = {}
Expand Down Expand Up @@ -128,7 +154,13 @@ def test_sync_ends_with_limit():
logger = None
config = {"count": 100, "records_per_sync": 5}
catalog = ConfiguredAirbyteCatalog(
streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}]
streams=[
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
)
state = {}
iterator = source.read(logger, config, catalog, state)
Expand Down Expand Up @@ -157,7 +189,13 @@ def test_read_with_seed():
logger = None
config = {"count": 1, "seed": 100}
catalog = ConfiguredAirbyteCatalog(
streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}]
streams=[
{
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
)
state = {}
iterator = source.read(logger, config, catalog, state)
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/faker.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ N/A

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------- |
| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! |
| 0.2.1 | 2022-10-14 | [19197](https://github.com/airbytehq/airbyte/pull/19197) | Emit `AirbyteEstimateTraceMessage` |
| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! |
| 0.1.8 | 2022-10-12 | [17889](https://github.com/airbytehq/airbyte/pull/17889) | Bump to test publish command (2) |
| 0.1.7 | 2022-10-11 | [17848](https://github.com/airbytehq/airbyte/pull/17848) | Bump to test publish command |
| 0.1.6 | 2022-09-07 | [16418](https://github.com/airbytehq/airbyte/pull/16418) | Log start of each stream |
Expand Down