From 5d584e6ed4db6f1373757064d42874fcc2299233 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 1 Aug 2023 19:14:57 +0300 Subject: [PATCH 1/5] fix: Better logging in scheduler --- cloudquery/sdk/scheduler/scheduler.py | 17 ++++++++++++----- cloudquery/sdk/scheduler/table_resolver.py | 14 +++++++++----- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index c951305..1c7bc42 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -10,7 +10,7 @@ SyncMigrateTableMessage, ) from cloudquery.sdk.schema import Resource -from .table_resolver import TableResolver +from .table_resolver import TableResolver, Client QUEUE_PER_WORKER = 100 @@ -72,7 +72,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client: Client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -85,7 +85,7 @@ def resolve_table( self, resolver: TableResolver, depth: int, - client, + client: Client, parent_item: Resource, res: queue.Queue, ): @@ -93,11 +93,11 @@ def resolve_table( try: if depth == 0: self._logger.info( - "table resolver started", table=resolver.table.name, depth=depth + "table resolver started", client_id=client.id(), table=resolver.table.name, depth=depth ) else: self._logger.debug( - "table resolver started", table=resolver.table.name, depth=depth + "table resolver started", client_id=client.id(), table=resolver.table.name, depth=depth ) total_resources = 0 for item in resolver.resolve(client, parent_item): @@ -108,6 +108,7 @@ def resolve_table( except Exception as e: self._logger.error( "failed to resolve resource", + client_id=client.id(), table=resolver.table.name, depth=depth, exc_info=True, @@ -128,19 +129,25 @@ def resolve_table( if depth == 0: self._logger.info( "table resolver finished successfully", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, ) else: self._logger.debug( "table resolver finished successfully", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, ) except Exception as e: self._logger.error( "table resolver finished with error", + client_id=client.id(), table=resolver.table.name, + resources=total_resources, depth=depth, exc_info=True, ) diff --git a/cloudquery/sdk/scheduler/table_resolver.py b/cloudquery/sdk/scheduler/table_resolver.py index 84cdc25..ff7a4ac 100644 --- a/cloudquery/sdk/scheduler/table_resolver.py +++ b/cloudquery/sdk/scheduler/table_resolver.py @@ -1,6 +1,10 @@ from cloudquery.sdk.schema.table import Table from cloudquery.sdk.schema import Resource -from typing import Any, Generator +from typing import Any, Generator, List + +class Client: + def id(self): + raise NotImplementedError() class TableResolver: @@ -16,16 +20,16 @@ def table(self) -> Table: def child_resolvers(self): return self._child_resolvers - def multiplex(self, client): + def multiplex(self, client: Client) -> List[Client]: return [client] def resolve(self, client, parent_resource) -> Generator[Any, None, None]: raise NotImplementedError() - def pre_resource_resolve(self, client, resource): + def pre_resource_resolve(self, client: Client, resource): return - def resolve_column(self, client, resource: Resource, column_name: str): + def resolve_column(self, client: Client, resource: Resource, column_name: str): if type(resource.item) is dict: if column_name in resource.item: resource.set(column_name, resource.item[column_name]) @@ -33,5 +37,5 @@ def resolve_column(self, client, resource: Resource, column_name: str): if hasattr(resource.item, column_name): resource.set(column_name, resource.item[column_name]) - def post_resource_resolve(self, client, resource): + def post_resource_resolve(self, client: Client, resource): return From 72ebe24b0c3b9a0ba4f585008ec884669565c626 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 1 Aug 2023 19:15:12 +0300 Subject: [PATCH 2/5] fmt --- cloudquery/sdk/scheduler/scheduler.py | 10 ++++++++-- cloudquery/sdk/scheduler/table_resolver.py | 1 + 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index 1c7bc42..3d07d79 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -93,11 +93,17 @@ def resolve_table( try: if depth == 0: self._logger.info( - "table resolver started", client_id=client.id(), table=resolver.table.name, depth=depth + "table resolver started", + client_id=client.id(), + table=resolver.table.name, + depth=depth, ) else: self._logger.debug( - "table resolver started", client_id=client.id(), table=resolver.table.name, depth=depth + "table resolver started", + client_id=client.id(), + table=resolver.table.name, + depth=depth, ) total_resources = 0 for item in resolver.resolve(client, parent_item): diff --git a/cloudquery/sdk/scheduler/table_resolver.py b/cloudquery/sdk/scheduler/table_resolver.py index ff7a4ac..25c7128 100644 --- a/cloudquery/sdk/scheduler/table_resolver.py +++ b/cloudquery/sdk/scheduler/table_resolver.py @@ -2,6 +2,7 @@ from cloudquery.sdk.schema import Resource from typing import Any, Generator, List + class Client: def id(self): raise NotImplementedError() From ac0d79708b089d81c3283b37ef636f122f07081b Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 1 Aug 2023 19:18:42 +0300 Subject: [PATCH 3/5] fix tests --- cloudquery/sdk/serve/plugin.py | 2 +- tests/scheduler/scheduler.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cloudquery/sdk/serve/plugin.py b/cloudquery/sdk/serve/plugin.py index f51503d..7842425 100644 --- a/cloudquery/sdk/serve/plugin.py +++ b/cloudquery/sdk/serve/plugin.py @@ -16,7 +16,7 @@ def get_logger(args): - log = structlog.get_logger() + log = structlog.get_logger(processors=[structlog.processors.JSONRenderer()]) return log diff --git a/tests/scheduler/scheduler.py b/tests/scheduler/scheduler.py index 1a6eb84..3e5458a 100644 --- a/tests/scheduler/scheduler.py +++ b/tests/scheduler/scheduler.py @@ -37,7 +37,8 @@ def resolve(self, client, parent_resource) -> Generator[Any, None, None]: class TestClient: - pass + def id(self): + return "test_client" def test_scheduler(): From 5c576b17b12a8ae2aae77baa77840a7bb4dfe1fe Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Tue, 1 Aug 2023 19:49:21 +0300 Subject: [PATCH 4/5] Update cloudquery/sdk/scheduler/table_resolver.py Co-authored-by: Kemal <223029+disq@users.noreply.github.com> --- cloudquery/sdk/scheduler/table_resolver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudquery/sdk/scheduler/table_resolver.py b/cloudquery/sdk/scheduler/table_resolver.py index 9800f2e..db416c6 100644 --- a/cloudquery/sdk/scheduler/table_resolver.py +++ b/cloudquery/sdk/scheduler/table_resolver.py @@ -4,7 +4,7 @@ class Client: - def id(self): + def id(self) -> str: raise NotImplementedError() from cloudquery.sdk.schema import Resource From 4a38fb41e4b49a3387207059831aefc395a7c86d Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 1 Aug 2023 19:51:27 +0300 Subject: [PATCH 5/5] fmt --- cloudquery/sdk/scheduler/table_resolver.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cloudquery/sdk/scheduler/table_resolver.py b/cloudquery/sdk/scheduler/table_resolver.py index db416c6..d1ffd77 100644 --- a/cloudquery/sdk/scheduler/table_resolver.py +++ b/cloudquery/sdk/scheduler/table_resolver.py @@ -7,6 +7,7 @@ class Client: def id(self) -> str: raise NotImplementedError() + from cloudquery.sdk.schema import Resource from cloudquery.sdk.schema.table import Table