diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index c951305..3d07d79 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -10,7 +10,7 @@ SyncMigrateTableMessage, ) from cloudquery.sdk.schema import Resource -from .table_resolver import TableResolver +from .table_resolver import TableResolver, Client QUEUE_PER_WORKER = 100 @@ -72,7 +72,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client: Client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -85,7 +85,7 @@ def resolve_table( self, resolver: TableResolver, depth: int, - client, + client: Client, parent_item: Resource, res: queue.Queue, ): @@ -93,11 +93,17 @@ def resolve_table( try: if depth == 0: self._logger.info( - "table resolver started", table=resolver.table.name, depth=depth + "table resolver started", + client_id=client.id(), + table=resolver.table.name, + depth=depth, ) else: self._logger.debug( - "table resolver started", table=resolver.table.name, depth=depth + "table resolver started", + client_id=client.id(), + table=resolver.table.name, + depth=depth, ) total_resources = 0 for item in resolver.resolve(client, parent_item): @@ -108,6 +114,7 @@ def resolve_table( except Exception as e: self._logger.error( "failed to resolve resource", + client_id=client.id(), table=resolver.table.name, depth=depth, exc_info=True, @@ -128,19 +135,25 @@ def resolve_table( if depth == 0: self._logger.info( "table resolver finished successfully", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, ) else: self._logger.debug( "table resolver finished successfully", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, ) except Exception as e: self._logger.error( "table resolver finished with error", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, exc_info=True, ) diff --git a/cloudquery/sdk/scheduler/table_resolver.py b/cloudquery/sdk/scheduler/table_resolver.py index 3cde6bd..d1ffd77 100644 --- a/cloudquery/sdk/scheduler/table_resolver.py +++ b/cloudquery/sdk/scheduler/table_resolver.py @@ -1,4 +1,12 @@ -from typing import Any, Generator +from cloudquery.sdk.schema.table import Table +from cloudquery.sdk.schema import Resource +from typing import Any, Generator, List + + +class Client: + def id(self) -> str: + raise NotImplementedError() + from cloudquery.sdk.schema import Resource from cloudquery.sdk.schema.table import Table @@ -17,16 +25,16 @@ def table(self) -> Table: def child_resolvers(self): return self._child_resolvers - def multiplex(self, client): + def multiplex(self, client: Client) -> List[Client]: return [client] def resolve(self, client, parent_resource) -> Generator[Any, None, None]: raise NotImplementedError() - def pre_resource_resolve(self, client, resource): + def pre_resource_resolve(self, client: Client, resource): return - def resolve_column(self, client, resource: Resource, column_name: str): + def resolve_column(self, client: Client, resource: Resource, column_name: str): if type(resource.item) is dict: if column_name in resource.item: resource.set(column_name, resource.item[column_name]) @@ -34,5 +42,5 @@ def resolve_column(self, client, resource: Resource, column_name: str): if hasattr(resource.item, column_name): resource.set(column_name, getattr(resource.item, column_name)) - def post_resource_resolve(self, client, resource): + def post_resource_resolve(self, client: Client, resource): return diff --git a/cloudquery/sdk/serve/plugin.py b/cloudquery/sdk/serve/plugin.py index f51503d..7842425 100644 --- a/cloudquery/sdk/serve/plugin.py +++ b/cloudquery/sdk/serve/plugin.py @@ -16,7 +16,7 @@ def get_logger(args): - log = structlog.get_logger() + log = structlog.get_logger(processors=[structlog.processors.JSONRenderer()]) return log diff --git a/tests/scheduler/scheduler.py b/tests/scheduler/scheduler.py index 1a6eb84..3e5458a 100644 --- a/tests/scheduler/scheduler.py +++ b/tests/scheduler/scheduler.py @@ -37,7 +37,8 @@ def resolve(self, client, parent_resource) -> Generator[Any, None, None]: class TestClient: - pass + def id(self): + return "test_client" def test_scheduler():