diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index c951305..719592b 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -22,12 +22,8 @@ def __init__(self, maxsize, *args, **kwargs): class TableResolverStarted: - def __init__(self, count=1) -> None: - self._count = count - - @property - def count(self): - return self._count + def __init__(self) -> None: + pass class TableResolverFinished: @@ -89,7 +85,6 @@ def resolve_table( parent_item: Resource, res: queue.Queue, ): - table_resolvers_started = 0 try: if depth == 0: self._logger.info( @@ -115,6 +110,7 @@ def resolve_table( continue res.put(SyncInsertMessage(resource.to_arrow_record())) for child_resolvers in resolver.child_resolvers: + res.put(TableResolverStarted()) self._pools[depth + 1].submit( self.resolve_table, child_resolvers, @@ -123,7 +119,6 @@ def resolve_table( resource, res, ) - table_resolvers_started += 1 total_resources += 1 if depth == 0: self._logger.info( @@ -145,7 +140,6 @@ def resolve_table( exc_info=True, ) finally: - res.put(TableResolverStarted(count=table_resolvers_started)) res.put(TableResolverFinished()) def _sync( @@ -155,17 +149,13 @@ def _sync( res: queue.Queue, deterministic_cq_id=False, ): - total_table_resolvers = 0 - try: - for resolver in resolvers: - clients = resolver.multiplex(client) - for client in clients: - self._pools[0].submit( - self.resolve_table, resolver, 0, client, None, res - ) - total_table_resolvers += 1 - finally: - res.put(TableResolverStarted(total_table_resolvers)) + for resolver in resolvers: + clients = resolver.multiplex(client) + for client in clients: + res.put(TableResolverStarted()) + self._pools[0].submit( + self.resolve_table, resolver, 0, client, None, res + ) def sync( self, client, resolvers: List[TableResolver], deterministic_cq_id=False @@ -180,7 +170,7 @@ def sync( while True: message = res.get() if type(message) == TableResolverStarted: - total_table_resolvers += message.count + total_table_resolvers += 1 if total_table_resolvers == finished_table_resolvers: break continue