From e04d3e9f02e482b3e5efd042700bc4c941630115 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 13:18:13 +0100 Subject: [PATCH 1/6] Fix JSON and UUID type checking, add JSON test, consolidate setup.py and requirements.txt --- cloudquery/sdk/scalar/scalar_factory.py | 23 ++++++------ cloudquery/sdk/scheduler/scheduler.py | 47 ++++++++++++++----------- requirements.txt | 22 +----------- setup.py | 19 +++++++++- tests/types/__init__.py | 0 tests/types/json.py | 8 +++++ 6 files changed, 65 insertions(+), 54 deletions(-) create mode 100644 tests/types/__init__.py create mode 100644 tests/types/json.py diff --git a/cloudquery/sdk/scalar/scalar_factory.py b/cloudquery/sdk/scalar/scalar_factory.py index 4b390c1..c08545a 100644 --- a/cloudquery/sdk/scalar/scalar_factory.py +++ b/cloudquery/sdk/scalar/scalar_factory.py @@ -1,5 +1,6 @@ import pyarrow as pa -from .scalar import ScalarInvalidTypeError + +from cloudquery.sdk.types import UUIDType, JSONType from .binary import Binary from .bool import Bool from .date32 import Date32 @@ -7,11 +8,11 @@ from .float import Float from .int import Int from .list import List +from .scalar import ScalarInvalidTypeError from .string import String from .timestamp import Timestamp from .uint import Uint from .uuid import UUID -from cloudquery.sdk.types import UUIDType, JSONType class ScalarFactory: @@ -37,9 +38,9 @@ def new_scalar(self, dt: pa.DataType): elif dt_id == pa.types.lib.Type_UINT8: return Uint(bitwidth=8) elif ( - dt_id == pa.types.lib.Type_BINARY - or dt_id == pa.types.lib.Type_LARGE_BINARY - or dt_id == pa.types.lib.Type_FIXED_SIZE_BINARY + dt_id == pa.types.lib.Type_BINARY + or dt_id == pa.types.lib.Type_LARGE_BINARY + or dt_id == pa.types.lib.Type_FIXED_SIZE_BINARY ): return Binary() elif dt_id == pa.types.lib.Type_BOOL: @@ -65,16 +66,16 @@ def new_scalar(self, dt: pa.DataType): # elif dt_id == pa.types.lib.Type_INTERVAL_MONTH_DAY_NANO: # return () elif ( - dt_id == pa.types.lib.Type_LIST - or dt_id == pa.types.lib.Type_LARGE_LIST - or dt_id == pa.types.lib.Type_FIXED_SIZE_LIST + dt_id == pa.types.lib.Type_LIST + or dt_id == pa.types.lib.Type_LARGE_LIST + or dt_id == pa.types.lib.Type_FIXED_SIZE_LIST ): item = ScalarFactory.new_scalar(dt.field(0).type) return List(type(item)) # elif dt_id == pa.types.lib.Type_MAP: # return () elif ( - dt_id == pa.types.lib.Type_STRING or dt_id == pa.types.lib.Type_LARGE_STRING + dt_id == pa.types.lib.Type_STRING or dt_id == pa.types.lib.Type_LARGE_STRING ): return String() # elif dt_id == pa.types.lib.Type_STRUCT: @@ -85,9 +86,9 @@ def new_scalar(self, dt: pa.DataType): # return () elif dt_id == pa.types.lib.Type_TIMESTAMP: return Timestamp() - elif dt == UUIDType: + elif dt == UUIDType(): return UUID() - elif dt == JSONType: + elif dt == JSONType(): return String() else: raise ScalarInvalidTypeError("Invalid type {} for scalar".format(dt)) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index fa94506..2cc4098 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -1,18 +1,19 @@ -from typing import List, Generator, Any import queue -import time import structlog +import time +import traceback +from concurrent import futures from enum import Enum -from cloudquery.sdk.schema import Table, Resource +from typing import Generator +from typing import List, Generator, Any + from cloudquery.sdk.message import ( SyncMessage, SyncInsertMessage, SyncMigrateTableMessage, ) -from concurrent import futures -from typing import Generator +from cloudquery.sdk.schema import Table, Resource from .table_resolver import TableResolver -import traceback QUEUE_PER_WORKER = 100 @@ -39,7 +40,7 @@ def __init__(self) -> None: class Scheduler: def __init__( - self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None + self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None ): self._queue = queue.Queue() self._max_depth = max_depth @@ -74,7 +75,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -84,12 +85,12 @@ def resolve_resource( return resource def resolve_table( - self, - resolver: TableResolver, - depth: int, - client, - parent_item: Resource, - res: queue.Queue, + self, + resolver: TableResolver, + depth: int, + client, + parent_item: Resource, + res: queue.Queue, ): table_resolvers_started = 0 try: @@ -103,11 +104,13 @@ def resolve_table( ) total_resources = 0 for item in resolver.resolve(client, parent_item): + print("item", item) try: resource = self.resolve_resource( resolver, client, parent_item, item ) except Exception as e: + print("exception", e) self._logger.error( "failed to resolve resource", table=resolver.table.name, @@ -151,11 +154,11 @@ def resolve_table( res.put(TableResolverFinished()) def _sync( - self, - client, - resolvers: List[TableResolver], - res: queue.Queue, - deterministic_cq_id=False, + self, + client, + resolvers: List[TableResolver], + res: queue.Queue, + deterministic_cq_id=False, ): total_table_resolvers = 0 try: @@ -170,7 +173,7 @@ def _sync( res.put(TableResolverStarted(total_table_resolvers)) def sync( - self, client, resolvers: List[TableResolver], deterministic_cq_id=False + self, client, resolvers: List[TableResolver], deterministic_cq_id=False ) -> Generator[SyncMessage, None, None]: res = queue.Queue() for resolver in resolvers: @@ -181,6 +184,7 @@ def sync( finished_table_resolvers = 0 while True: message = res.get() + print("got message", message) if type(message) == TableResolverStarted: total_table_resolvers += message.count if total_table_resolvers == finished_table_resolvers: @@ -192,4 +196,5 @@ def sync( break continue yield message - thread.shutdown() + print("shutting down thread") + thread.shutdown(wait=True) diff --git a/requirements.txt b/requirements.txt index b4871e2..945c9b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,21 +1 @@ -cloudquery-plugin-pb==0.0.14 -exceptiongroup==1.1.2 -black==23.7.0 -grpcio==1.56.2 -grpcio-tools==1.56.2 -iniconfig==2.0.0 -Jinja2==3.1.2 -MarkupSafe==2.1.3 -numpy==1.25.1 -packaging==23.1 -pandas==2.0.3 -pluggy==1.2.0 -protobuf==4.23.4 -pyarrow==12.0.1 -pytest==7.4.0 -python-dateutil==2.8.2 -pytz==2023.3 -six==1.16.0 -structlog==23.1.0 -tomli==2.0.1 -tzdata==2023.3 +. \ No newline at end of file diff --git a/setup.py b/setup.py index 6f1847c..d4d32a6 100644 --- a/setup.py +++ b/setup.py @@ -11,9 +11,26 @@ dependencies = [ "cloudquery-plugin-pb==0.0.14", - "pyarrow==12.0.1", + "exceptiongroup==1.1.2", + "black==23.7.0", + "grpcio==1.56.2", + "grpcio-tools==1.56.2", + "iniconfig==2.0.0", "Jinja2==3.1.2", + "MarkupSafe==2.1.3", + "numpy==1.25.1", + "packaging==23.1", + "pandas==2.0.3", + "pluggy==1.2.0", + "protobuf==4.23.4", + "pyarrow==12.0.1", + "pytest==7.4.0", + "python-dateutil==2.8.2", + "pytz==2023.3", + "six==1.16.0", "structlog==23.1.0", + "tomli==2.0.1", + "tzdata==2023.3", ] url = "https://github.com/cloudquery/plugin-sdk-python" diff --git a/tests/types/__init__.py b/tests/types/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/types/json.py b/tests/types/json.py new file mode 100644 index 0000000..c3888be --- /dev/null +++ b/tests/types/json.py @@ -0,0 +1,8 @@ +from cloudquery.sdk.types import JSONType + + +def test_json_type(): + j = JSONType() + # test equality + assert j == JSONType() + print(j) From f9cded5e80ebc1b8ce15d9403809b5b487ea4b2b Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 13:18:56 +0100 Subject: [PATCH 2/6] Reformat --- cloudquery/sdk/scalar/scalar_factory.py | 14 ++++++------- cloudquery/sdk/scheduler/scheduler.py | 28 ++++++++++++------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/cloudquery/sdk/scalar/scalar_factory.py b/cloudquery/sdk/scalar/scalar_factory.py index c08545a..6eabaca 100644 --- a/cloudquery/sdk/scalar/scalar_factory.py +++ b/cloudquery/sdk/scalar/scalar_factory.py @@ -38,9 +38,9 @@ def new_scalar(self, dt: pa.DataType): elif dt_id == pa.types.lib.Type_UINT8: return Uint(bitwidth=8) elif ( - dt_id == pa.types.lib.Type_BINARY - or dt_id == pa.types.lib.Type_LARGE_BINARY - or dt_id == pa.types.lib.Type_FIXED_SIZE_BINARY + dt_id == pa.types.lib.Type_BINARY + or dt_id == pa.types.lib.Type_LARGE_BINARY + or dt_id == pa.types.lib.Type_FIXED_SIZE_BINARY ): return Binary() elif dt_id == pa.types.lib.Type_BOOL: @@ -66,16 +66,16 @@ def new_scalar(self, dt: pa.DataType): # elif dt_id == pa.types.lib.Type_INTERVAL_MONTH_DAY_NANO: # return () elif ( - dt_id == pa.types.lib.Type_LIST - or dt_id == pa.types.lib.Type_LARGE_LIST - or dt_id == pa.types.lib.Type_FIXED_SIZE_LIST + dt_id == pa.types.lib.Type_LIST + or dt_id == pa.types.lib.Type_LARGE_LIST + or dt_id == pa.types.lib.Type_FIXED_SIZE_LIST ): item = ScalarFactory.new_scalar(dt.field(0).type) return List(type(item)) # elif dt_id == pa.types.lib.Type_MAP: # return () elif ( - dt_id == pa.types.lib.Type_STRING or dt_id == pa.types.lib.Type_LARGE_STRING + dt_id == pa.types.lib.Type_STRING or dt_id == pa.types.lib.Type_LARGE_STRING ): return String() # elif dt_id == pa.types.lib.Type_STRUCT: diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index 2cc4098..9486934 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -40,7 +40,7 @@ def __init__(self) -> None: class Scheduler: def __init__( - self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None + self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None ): self._queue = queue.Queue() self._max_depth = max_depth @@ -75,7 +75,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -85,12 +85,12 @@ def resolve_resource( return resource def resolve_table( - self, - resolver: TableResolver, - depth: int, - client, - parent_item: Resource, - res: queue.Queue, + self, + resolver: TableResolver, + depth: int, + client, + parent_item: Resource, + res: queue.Queue, ): table_resolvers_started = 0 try: @@ -154,11 +154,11 @@ def resolve_table( res.put(TableResolverFinished()) def _sync( - self, - client, - resolvers: List[TableResolver], - res: queue.Queue, - deterministic_cq_id=False, + self, + client, + resolvers: List[TableResolver], + res: queue.Queue, + deterministic_cq_id=False, ): total_table_resolvers = 0 try: @@ -173,7 +173,7 @@ def _sync( res.put(TableResolverStarted(total_table_resolvers)) def sync( - self, client, resolvers: List[TableResolver], deterministic_cq_id=False + self, client, resolvers: List[TableResolver], deterministic_cq_id=False ) -> Generator[SyncMessage, None, None]: res = queue.Queue() for resolver in resolvers: From bb19f0869a0fc511bbfda73d9876526e83775c99 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 13:21:32 +0100 Subject: [PATCH 3/6] remove print --- tests/types/json.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/types/json.py b/tests/types/json.py index c3888be..f0ba704 100644 --- a/tests/types/json.py +++ b/tests/types/json.py @@ -5,4 +5,3 @@ def test_json_type(): j = JSONType() # test equality assert j == JSONType() - print(j) From 7fe5aae9de361d0d8ef4a121d49ab54502c9fac9 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 13:25:50 +0100 Subject: [PATCH 4/6] Remove prints --- cloudquery/sdk/scheduler/scheduler.py | 41 +++++++++++---------------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index 7b20a31..5577d15 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -1,18 +1,15 @@ import queue -import structlog -import time -import traceback from concurrent import futures -from enum import Enum -from typing import Generator from typing import List, Generator, Any +import structlog + from cloudquery.sdk.message import ( SyncMessage, SyncInsertMessage, SyncMigrateTableMessage, ) -from cloudquery.sdk.schema import Table, Resource +from cloudquery.sdk.schema import Resource from .table_resolver import TableResolver QUEUE_PER_WORKER = 100 @@ -40,7 +37,7 @@ def __init__(self) -> None: class Scheduler: def __init__( - self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None + self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None ): self._queue = queue.Queue() self._max_depth = max_depth @@ -75,7 +72,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -85,12 +82,12 @@ def resolve_resource( return resource def resolve_table( - self, - resolver: TableResolver, - depth: int, - client, - parent_item: Resource, - res: queue.Queue, + self, + resolver: TableResolver, + depth: int, + client, + parent_item: Resource, + res: queue.Queue, ): table_resolvers_started = 0 try: @@ -104,13 +101,11 @@ def resolve_table( ) total_resources = 0 for item in resolver.resolve(client, parent_item): - print("item", item) try: resource = self.resolve_resource( resolver, client, parent_item, item ) except Exception as e: - print("exception", e) self._logger.error( "failed to resolve resource", table=resolver.table.name, @@ -154,11 +149,11 @@ def resolve_table( res.put(TableResolverFinished()) def _sync( - self, - client, - resolvers: List[TableResolver], - res: queue.Queue, - deterministic_cq_id=False, + self, + client, + resolvers: List[TableResolver], + res: queue.Queue, + deterministic_cq_id=False, ): total_table_resolvers = 0 try: @@ -173,7 +168,7 @@ def _sync( res.put(TableResolverStarted(total_table_resolvers)) def sync( - self, client, resolvers: List[TableResolver], deterministic_cq_id=False + self, client, resolvers: List[TableResolver], deterministic_cq_id=False ) -> Generator[SyncMessage, None, None]: res = queue.Queue() for resolver in resolvers: @@ -184,7 +179,6 @@ def sync( finished_table_resolvers = 0 while True: message = res.get() - print("got message", message) if type(message) == TableResolverStarted: total_table_resolvers += message.count if total_table_resolvers == finished_table_resolvers: @@ -196,5 +190,4 @@ def sync( break continue yield message - print("shutting down thread") thread.shutdown(wait=True) From a98b57d5f1210a5adf34ec4e3fca9f8484aabc91 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 13:27:10 +0100 Subject: [PATCH 5/6] Reformat --- cloudquery/sdk/scalar/uint.py | 2 +- cloudquery/sdk/scheduler/scheduler.py | 28 +++++++++++++-------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/cloudquery/sdk/scalar/uint.py b/cloudquery/sdk/scalar/uint.py index 51aeab9..ad9661c 100644 --- a/cloudquery/sdk/scalar/uint.py +++ b/cloudquery/sdk/scalar/uint.py @@ -4,7 +4,7 @@ class Uint(Scalar): def __init__(self, valid: bool = False, value: any = None, bitwidth: int = 64): self._bitwidth = bitwidth - self._max = 2**bitwidth + self._max = 2 ** bitwidth super().__init__(valid, value) def __eq__(self, scalar: Scalar) -> bool: diff --git a/cloudquery/sdk/scheduler/scheduler.py b/cloudquery/sdk/scheduler/scheduler.py index 5577d15..ae9b7ba 100644 --- a/cloudquery/sdk/scheduler/scheduler.py +++ b/cloudquery/sdk/scheduler/scheduler.py @@ -37,7 +37,7 @@ def __init__(self) -> None: class Scheduler: def __init__( - self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None + self, concurrency: int, queue_size: int = 0, max_depth: int = 3, logger=None ): self._queue = queue.Queue() self._max_depth = max_depth @@ -72,7 +72,7 @@ def shutdown(self): pool.shutdown() def resolve_resource( - self, resolver: TableResolver, client, parent: Resource, item: Any + self, resolver: TableResolver, client, parent: Resource, item: Any ) -> Resource: resource = Resource(resolver.table, parent, item) resolver.pre_resource_resolve(client, resource) @@ -82,12 +82,12 @@ def resolve_resource( return resource def resolve_table( - self, - resolver: TableResolver, - depth: int, - client, - parent_item: Resource, - res: queue.Queue, + self, + resolver: TableResolver, + depth: int, + client, + parent_item: Resource, + res: queue.Queue, ): table_resolvers_started = 0 try: @@ -149,11 +149,11 @@ def resolve_table( res.put(TableResolverFinished()) def _sync( - self, - client, - resolvers: List[TableResolver], - res: queue.Queue, - deterministic_cq_id=False, + self, + client, + resolvers: List[TableResolver], + res: queue.Queue, + deterministic_cq_id=False, ): total_table_resolvers = 0 try: @@ -168,7 +168,7 @@ def _sync( res.put(TableResolverStarted(total_table_resolvers)) def sync( - self, client, resolvers: List[TableResolver], deterministic_cq_id=False + self, client, resolvers: List[TableResolver], deterministic_cq_id=False ) -> Generator[SyncMessage, None, None]: res = queue.Queue() for resolver in resolvers: From d0355e1abe5ca18a544403a113769a23b54c185b Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 1 Aug 2023 13:56:08 +0100 Subject: [PATCH 6/6] Fix formatting --- cloudquery/sdk/scalar/uint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudquery/sdk/scalar/uint.py b/cloudquery/sdk/scalar/uint.py index ad9661c..51aeab9 100644 --- a/cloudquery/sdk/scalar/uint.py +++ b/cloudquery/sdk/scalar/uint.py @@ -4,7 +4,7 @@ class Uint(Scalar): def __init__(self, valid: bool = False, value: any = None, bitwidth: int = 64): self._bitwidth = bitwidth - self._max = 2 ** bitwidth + self._max = 2**bitwidth super().__init__(valid, value) def __eq__(self, scalar: Scalar) -> bool: