From 501fadcb370cc3fa0b74c41005c76ef93ac0f6e1 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 17:19:56 +0100 Subject: [PATCH 1/2] Fix race in scheduler --- cloudquery/sdk/scheduler/scheduler.py | 62 +++++++++++---------------- 1 file changed, 26 insertions(+), 36 deletions(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index c951305..a4e7554 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -22,12 +22,8 @@ def __init__(self, maxsize, *args, **kwargs): class TableResolverStarted: - def __init__(self, count=1) -> None: - self._count = count - - @property - def count(self): - return self._count + def __init__(self) -> None: + pass class TableResolverFinished: @@ -37,7 +33,7 @@ def __init__(self) -> None: class Scheduler: def __init__( - self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None + self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None ): self._queue = queue.Queue() self._max_depth = max_depth @@ -72,7 +68,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -82,14 +78,13 @@ def resolve_resource( return resource def resolve_table( - self, - resolver: TableResolver, - depth: int, - client, - parent_item: Resource, - res: queue.Queue, + self, + resolver: TableResolver, + depth: int, + client, + parent_item: Resource, + res: queue.Queue, ): - table_resolvers_started = 0 try: if depth == 0: self._logger.info( @@ -115,6 +110,7 @@ def resolve_table( continue res.put(SyncInsertMessage(resource.to_arrow_record())) for child_resolvers in resolver.child_resolvers: + res.put(TableResolverStarted()) self._pools[depth + 1].submit( self.resolve_table, child_resolvers, @@ -123,7 +119,6 @@ def resolve_table( resource, res, ) - table_resolvers_started += 1 total_resources += 1 if depth == 0: self._logger.info( @@ -145,30 +140,25 @@ def resolve_table( exc_info=True, ) finally: - res.put(TableResolverStarted(count=table_resolvers_started)) res.put(TableResolverFinished()) def _sync( - self, - client, - resolvers: List[TableResolver], - res: queue.Queue, - deterministic_cq_id=False, + self, + client, + resolvers: List[TableResolver], + res: queue.Queue, + deterministic_cq_id=False, ): - total_table_resolvers = 0 - try: - for resolver in resolvers: - clients = resolver.multiplex(client) - for client in clients: - self._pools[0].submit( - self.resolve_table, resolver, 0, client, None, res - ) - total_table_resolvers += 1 - finally: - res.put(TableResolverStarted(total_table_resolvers)) + for resolver in resolvers: + clients = resolver.multiplex(client) + for client in clients: + res.put(TableResolverStarted()) + self._pools[0].submit( + self.resolve_table, resolver, 0, client, None, res + ) def sync( - self, client, resolvers: List[TableResolver], deterministic_cq_id=False + self, client, resolvers: List[TableResolver], deterministic_cq_id=False ) -> Generator[SyncMessage, None, None]: res = queue.Queue() yield from self._send_migrate_table_messages(resolvers) @@ -180,7 +170,7 @@ def sync( while True: message = res.get() if type(message) == TableResolverStarted: - total_table_resolvers += message.count + total_table_resolvers += 1 if total_table_resolvers == finished_table_resolvers: break continue @@ -193,7 +183,7 @@ def sync( thread.shutdown(wait=True) def _send_migrate_table_messages( - self, resolvers: List[TableResolver] + self, resolvers: List[TableResolver] ) -> Generator[SyncMessage, None, None]: for resolver in resolvers: yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema()) From 46e6446bded95149d76befbdce6e56df3b450f9a Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 17:20:06 +0100 Subject: [PATCH 2/2] Fmt --- cloudquery/sdk/scheduler/scheduler.py | 30 +++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index a4e7554..719592b 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -33,7 +33,7 @@ def __init__(self) -> None: class Scheduler: def __init__( - self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None + self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None ): self._queue = queue.Queue() self._max_depth = max_depth @@ -68,7 +68,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -78,12 +78,12 @@ def resolve_resource( return resource def resolve_table( - self, - resolver: TableResolver, - depth: int, - client, - parent_item: Resource, - res: queue.Queue, + self, + resolver: TableResolver, + depth: int, + client, + parent_item: Resource, + res: queue.Queue, ): try: if depth == 0: @@ -143,11 +143,11 @@ def resolve_table( res.put(TableResolverFinished()) def _sync( - self, - client, - resolvers: List[TableResolver], - res: queue.Queue, - deterministic_cq_id=False, + self, + client, + resolvers: List[TableResolver], + res: queue.Queue, + deterministic_cq_id=False, ): for resolver in resolvers: clients = resolver.multiplex(client) @@ -158,7 +158,7 @@ def _sync( ) def sync( - self, client, resolvers: List[TableResolver], deterministic_cq_id=False + self, client, resolvers: List[TableResolver], deterministic_cq_id=False ) -> Generator[SyncMessage, None, None]: res = queue.Queue() yield from self._send_migrate_table_messages(resolvers) @@ -183,7 +183,7 @@ def sync( thread.shutdown(wait=True) def _send_migrate_table_messages( - self, resolvers: List[TableResolver] + self, resolvers: List[TableResolver] ) -> Generator[SyncMessage, None, None]: for resolver in resolvers: yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema())