Skip to content

Commit

Permalink
large test
Browse files Browse the repository at this point in the history
  • Loading branch information
hrl20 committed Apr 2, 2024
1 parent ce74ba5 commit ddda446
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ def write(
for stream_name in buffer.keys():
logger.info(f"flushing buffer for state: {message}")
table_name = f"_airbyte_raw_{stream_name}"
query = f"""
INSERT INTO {schema_name}.{table_name}
(_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
VALUES (?,?,?)
"""
pa_table = pa.Table.from_pydict(buffer[stream_name])
con.sql(f"INSERT INTO {schema_name}.{table_name} SELECT * FROM pa_table")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import random
import string
import tempfile
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
from typing import Any, Dict, Generator
from unittest.mock import MagicMock

import duckdb
Expand All @@ -30,6 +31,7 @@
)
from destination_duckdb import DestinationDuckdb
from destination_duckdb.destination import CONFIG_MOTHERDUCK_API_KEY
from faker import Faker

CONFIG_PATH = "integration_tests/config.json"
SECRETS_CONFIG_PATH = (
Expand Down Expand Up @@ -96,6 +98,12 @@ def test_table_name() -> str:
return f"airbyte_integration_{rand_string}"


@pytest.fixture
def test_large_table_name() -> str:
letters = string.ascii_lowercase
rand_string = "".join(random.choice(letters) for _ in range(10))
return f"airbyte_integration_{rand_string}"

@pytest.fixture
def table_schema() -> str:
schema = {"type": "object", "properties": {"column1": {"type": ["null", "string"]}}}
Expand All @@ -104,7 +112,7 @@ def table_schema() -> str:

@pytest.fixture
def configured_catalogue(
test_table_name: str, table_schema: str
test_table_name: str, test_large_table_name: str, table_schema: str,
) -> ConfiguredAirbyteCatalog:
append_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(
Expand All @@ -115,7 +123,16 @@ def configured_catalogue(
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
)
return ConfiguredAirbyteCatalog(streams=[append_stream])
append_stream_large = ConfiguredAirbyteStream(
stream=AirbyteStream(
name=test_large_table_name,
json_schema=table_schema,
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
)
return ConfiguredAirbyteCatalog(streams=[append_stream, append_stream_large])


@pytest.fixture
Expand Down Expand Up @@ -206,3 +223,60 @@ def test_write(
assert len(result) == 2
assert result[0][2] == json.dumps(airbyte_message1.record.data)
assert result[1][2] == json.dumps(airbyte_message2.record.data)

def _airbyte_messages(n: int, batch_size: int, table_name: str) -> Generator[AirbyteMessage, None, None]:
fake = Faker()
Faker.seed(0)

for i in range(n):
if i != 0 and i % batch_size == 0:
yield AirbyteMessage(
type=Type.STATE, state=AirbyteStateMessage(data={"state": str(i // batch_size)})
)
else:
message = AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream=table_name,
data={"key1": fake.first_name() , "key2": fake.ssn()},
emitted_at=int(datetime.now().timestamp()) * 1000,
),
)
yield message

TOTAL_RECORDS = 5000
BATCH_WRITE_SIZE = 1000

@pytest.mark.slow
def test_large_number_of_writes(
config: Dict[str, str],
request,
configured_catalogue: ConfiguredAirbyteCatalog,
test_large_table_name: str,
test_schema_name: str,
):
destination = DestinationDuckdb()
generator = destination.write(
config,
configured_catalogue,
_airbyte_messages(TOTAL_RECORDS, BATCH_WRITE_SIZE, test_large_table_name),
)

result = list(generator)
assert len(result) == TOTAL_RECORDS // (BATCH_WRITE_SIZE + 1)
motherduck_api_key = str(config.get(CONFIG_MOTHERDUCK_API_KEY, ""))
duckdb_config = {}
if motherduck_api_key:
duckdb_config["motherduck_token"] = motherduck_api_key
duckdb_config["custom_user_agent"] = "airbyte_intg_test"

con = duckdb.connect(
database=config.get("destination_path"), read_only=False, config=duckdb_config
)
with con:
cursor = con.execute(
"SELECT count(1) "
f"FROM {test_schema_name}._airbyte_raw_{test_large_table_name}"
)
result = cursor.fetchall()
assert result[0][0] == TOTAL_RECORDS - TOTAL_RECORDS // (BATCH_WRITE_SIZE + 1)
17 changes: 16 additions & 1 deletion airbyte-integrations/connectors/destination-duckdb/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pytest = "^7.4.0"
ruff = "^0.0.286"
black = "^23.7.0"
mypy = "^1.5.1"
faker = "24.4.0"

[build-system]
requires = ["poetry-core"]
Expand Down

0 comments on commit ddda446

Please sign in to comment.