diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index d8e383c..c951305 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -171,8 +171,8 @@ def sync( self, client, resolvers: List[TableResolver], deterministic_cq_id=False ) -> Generator[SyncMessage, None, None]: res = queue.Queue() - for resolver in resolvers: - yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema()) + yield from self._send_migrate_table_messages(resolvers) + thread = futures.ThreadPoolExecutor() thread.submit(self._sync, client, resolvers, res, deterministic_cq_id) total_table_resolvers = 0 @@ -191,3 +191,11 @@ def sync( continue yield message thread.shutdown(wait=True) + + def _send_migrate_table_messages( + self, resolvers: List[TableResolver] + ) -> Generator[SyncMessage, None, None]: + for resolver in resolvers: + yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema()) + if resolver.child_resolvers: + yield from self._send_migrate_table_messages(resolver.child_resolvers) diff --git a/tests/scheduler/scheduler.py b/tests/scheduler/scheduler.py index a60321c..1a6eb84 100644 --- a/tests/scheduler/scheduler.py +++ b/tests/scheduler/scheduler.py @@ -1,9 +1,10 @@ from typing import Any, List, Generator + import pyarrow as pa -import pytest -from cloudquery.sdk.scheduler import Scheduler, TableResolver -from cloudquery.sdk.schema import Table, Column, Resource + from cloudquery.sdk.message import SyncMessage +from cloudquery.sdk.scheduler import Scheduler, TableResolver +from cloudquery.sdk.schema import Column from cloudquery.sdk.schema.table import Table @@ -46,10 +47,12 @@ def test_scheduler(): expected_record1 = pa.record_batch([[1]], schema=table1.to_arrow_schema()) table2 = Table("test_child_table", [Column("test_child_column", pa.int64())]) expected_record2 = pa.record_batch([[2]], schema=table2.to_arrow_schema()) - resources: List[SyncMessage] = [] - for resource in s.sync(client, [SchedulerTestTableResolver()]): - resources.append(resource) - assert len(resources) == 3 - assert resources[1].record == expected_record1 - assert resources[2].record == expected_record2 + messages: List[SyncMessage] = [] + for message in s.sync(client, [SchedulerTestTableResolver()]): + messages.append(message) + assert len(messages) == 4 + assert Table.from_arrow_schema(messages[0].table).name == "test_table" + assert Table.from_arrow_schema(messages[1].table).name == "test_child_table" + assert messages[2].record == expected_record1 + assert messages[3].record == expected_record2 s.shutdown()