diff --git a/onto/attrs/attribute_new.py b/onto/attrs/attribute_new.py index 6de300a..6fd8564 100644 --- a/onto/attrs/attribute_new.py +++ b/onto/attrs/attribute_new.py @@ -294,10 +294,14 @@ def _graphql_field_constructor(self): from functools import partial field_base = graphql.GraphQLField - def resolve_info(obj, context): + from graphql import GraphQLResolveInfo + def resolve_info(obj, context: GraphQLResolveInfo, **kwargs): value = getattr(obj, self.properties.name, None) if isinstance(value, enum.Enum): value = value.value + elif isinstance(value, Callable): + f = value + value = f(obj, **kwargs) return value # def resolver(attributed, resolve_info): diff --git a/onto/attrs/unit.py b/onto/attrs/unit.py index c333df6..a1960c6 100644 --- a/onto/attrs/unit.py +++ b/onto/attrs/unit.py @@ -1,6 +1,5 @@ import contextlib - # Attribute # - Serialization/Deserialization instructions # - Validation @@ -13,6 +12,7 @@ from onto.common import _NA + class _ModelRegistry(type): """ @@ -36,7 +36,7 @@ def __new__(mcs, name, bases, attrs): if new_cls.__name__ in mcs._REGISTRY: raise ValueError( "Class with name {} is declared more than once. " - .format(new_cls.__name__) + .format(new_cls.__name__) ) mcs._REGISTRY[new_cls.__name__] = new_cls @@ -90,11 +90,11 @@ def _get_root(cls): # def __mul__(cls, cls_b): # import types - # cls_r = types.new_class( - # name=cls.__name__ + cls_b.__name__, - # bases=(cls, cls_b), - # ) - # return cls_r + # cls_r = types.new_class( + # name=cls.__name__ + cls_b.__name__, + # bases=(cls, cls_b), + # ) + # return cls_r @property def properties(self): @@ -130,6 +130,7 @@ def __getattr__(self, item): def output_wrapper(decorated): return self_cls(decor=decorated) + try: f = decorator_cls.easy(decor=decor, output_wrapper=output_wrapper) except AttributeError: @@ -143,7 +144,6 @@ def descendant_of(self, cs): class MonadContext(Monad, contextlib.ContextDecorator): - stack = list() @classmethod @@ -151,11 +151,13 @@ def context(cls): return cls(decor=root_decor.get()) def __enter__(self): - self.stack.append( root_decor.set(self.decor) ) + self.stack.append(root_decor.set(self.decor)) def __exit__(self, exc_type, exc_val, exc_tb): root_decor.reset(self.stack.pop()) return False + + # # # class CurSelfContext(contextlib.ContextDecorator): @@ -195,6 +197,7 @@ def _constructor(_self): ) return field_obj + return _constructor @property @@ -210,20 +213,24 @@ def _marshmallow_field_cls(self): def _marshmallow_field_override(self): yield from () + class GraphqlCapableMixin: @property def _graphql_object_type(self): yield from () + class DefaultDecoratorMixin(MarshmallowCapableBaseMixin, GraphqlCapableMixin): is_internal = False import contextvars + root_decor = contextvars.ContextVar('root_decor', default=DefaultDecoratorMixin()) cur_self = contextvars.ContextVar('cur_self', default=None) + def whichever_order(li: list, operation): """ Hack: try operation on all permutative orders of li until the one order does not trigger an error @@ -240,6 +247,7 @@ def whichever_order(li: list, operation): else: raise Exception(f'All possible combinations failed {str(errors)}') + class DecoratorBase(metaclass=_ModelRegistry): """ Can only create new state in self. @@ -326,6 +334,7 @@ def op(typ): return whichever_order(typ, op) + # _vals_ # class BindClass_(DecoratorBase): @@ -360,7 +369,7 @@ def __init__(self, annotation, *args, decorated, **kwargs): if origin is list: if arguments := typing.get_args(annotation): element_type = next(iter(arguments)) - if element_type in (str, bool, float, int, ): + if element_type in (str, bool, float, int,): decorated = List(value=lambda a: a.of_type(element_type), decorated=decorated) else: decorated = OfType(type_cls=origin, decorated=decorated) @@ -381,7 +390,6 @@ def __init__(self): class Nothing(DecoratorBase): - """ Use attr.nothing @@ -393,7 +401,6 @@ class A: b.name == c.name == 'c' """ - @classmethod def easy(cls, *args, **kwargs): return cls.easy_property(*args, **kwargs) @@ -414,13 +421,13 @@ def _marshmallow_field_kwargs(self): yield from self.decorated._marshmallow_field_kwargs yield 'missing', self.default_value + # class DefaultParamsMixin: # import_enabled = True # export_enabled = True class ImportRequired(DecoratorBase): - import_required = True @property @@ -458,8 +465,8 @@ def _graphql_object_type(self): yield from self.decorated._graphql_object_type # pass -class Optional(DecoratorBase): +class Optional(DecoratorBase): import_required = False export_required = False @@ -642,7 +649,6 @@ class Int(Integer): class IntegerTimestamp(Integer): - _long_type = None @classmethod @@ -677,6 +683,7 @@ def easy(cls, *args, **kwargs): def __init__(self, *args, **kwargs): super().__init__(*args, type_cls=bool, **kwargs) + # # import contextvars # current_self = contextvars.ContextVar('current_self', default=list()) # TODO: note mutable default @@ -699,6 +706,7 @@ def _get_default_fget(self, *, name): def fget(_self_obj): inner = getattr(_self_obj, _ATTRIBUTE_STORE_NAME) return getattr(inner, name) + return fget def __init__(self, fget=_NA, *args, **kwargs): @@ -718,6 +726,7 @@ def _get_default_fset(self, *, name): def fset(_self_obj, value): inner = getattr(_self_obj, _ATTRIBUTE_STORE_NAME) return setattr(inner, name, value) + return fset def __init__(self, fset=_NA, *args, **kwargs): @@ -737,6 +746,7 @@ def _get_default_fdel(self, *, name): def fdel(_self_obj): inner = getattr(_self_obj, _ATTRIBUTE_STORE_NAME) return delattr(inner, name) + return fdel def __init__(self, fdel=_NA, *args, **kwargs): @@ -777,6 +787,7 @@ def make_init(self, name=_NA): def _init(_self_obj): inner = getattr(_self_obj, _ATTRIBUTE_STORE_NAME) return setattr(inner, name, self._easy_initializer(_self_obj)) + return _init def __init__(self, easy_initializer, *args, **kwargs): @@ -784,13 +795,13 @@ def __init__(self, easy_initializer, *args, **kwargs): self._easy_initializer = easy_initializer - class Dict(Init): def __init__(self, *args, **kwargs): def _dict_initializer(_self): attr_name = self.name # TODO: fix setattr(_self, attr_name, dict()) + super().__init__( *args, initializer=_dict_initializer, @@ -829,7 +840,6 @@ def _graphql_object_type(self): class Enum(OfType): - from enum import Enum as _Enum def __init__(self, enum_cls: typing.Type[_Enum], *args, **kwargs): @@ -872,6 +882,7 @@ def new(cls, *args, **kwargs): """ Dispatch to subclass when required """ + def fget(self): return self.doc_ref.id @@ -899,7 +910,6 @@ def _marshmallow_field_cls(self): return fields.DocIdField - class NodeId(DecoratorBase): """ For GraphQL ID @@ -964,12 +974,14 @@ class NoneAsMissing(DecoratorBase): @property def _marshmallow_field_override(self): yield from self.decorated._marshmallow_field_override + def _deserialize(_self, value, attr, data, **kwargs): if value is None: from marshmallow.fields import missing_ return missing_ else: return super(_self.__class__, _self)._deserialize(value, attr, data, **kwargs) + yield ('_deserialize', _deserialize) @property @@ -1085,7 +1097,27 @@ def _graphql_field_kwargs(self): yield 'description', self.doc -class AsRoot(DecoratorBase): +class Executable(Getter): + """ + 可以被执行的 attribute 用于 graphql + """ - is_root = True + def __init__(self, f=_NA, *args, **kwargs): + self._f = f + super().__init__(lambda *_: f, *args, **kwargs) + + def _make_graphql_args(self): + from onto.sink.graphql import GraphQLSink + parameters = GraphQLSink._parameters_for(self._f) + for name, annotated_type in parameters: + yield name, GraphQLSink._param_to_graphql_arg(annotated_type=annotated_type) + yield from () + @property + def _graphql_field_kwargs(self): + yield from self.decorated._graphql_field_kwargs + yield 'args', dict(self._make_graphql_args()) + + +class AsRoot(DecoratorBase): + is_root = True diff --git a/onto/database/kafka.py b/onto/database/kafka.py index 132e81c..acffbf1 100644 --- a/onto/database/kafka.py +++ b/onto/database/kafka.py @@ -36,35 +36,38 @@ def _document_path(self): return str(self) -class KafkaDatabase(Database): +class KafkaReadDatabase(Database): + + @classmethod + def listener(cls): + from onto.database.utils import GenericListener + return GenericListener bootstrap_servers = None + d = dict() + @classmethod - @functools.lru_cache(maxsize=None) - def kafka_producer(cls) -> kafka.KafkaProducer: - from kafka import KafkaProducer - producer = KafkaProducer(bootstrap_servers=[cls.bootstrap_servers]) - return producer + def _onto_set(cls, ref: Reference, snapshot: Snapshot, transaction=_NA): + cls.d[str(ref)] = snapshot.to_dict() + cls.listener()._pub(reference=ref, snapshot=snapshot) @classmethod - def set(cls, ref: Reference, snapshot: Snapshot, transaction=_NA, - **kwargs): - d = snapshot.to_dict() - import json - s = json.dumps(d) - b = s.encode(encoding='utf-8') - bk = ref.id.encode(encoding='utf-8') - cls.kafka_producer().send(topic=ref.collection, key=bk, value=b, **kwargs) + def get(cls, ref: Reference, transaction=_NA): + return Snapshot(cls.d[str(ref)]) update = set create = set @classmethod def delete(cls, ref: Reference, transaction=_NA): - cls.kafka_producer().send(topic=ref.collection, key=ref.id, value=None) + """ Note: this only deletes one instance that has _doc_id == ref.last - ref = KafkaReference() + :param ref: + :param transaction: + :return: + """ + del cls.d[str(ref)] class KafkaSnapshot(Snapshot): diff --git a/onto/database/mock.py b/onto/database/mock.py index bf1d6b9..0df3ed0 100644 --- a/onto/database/mock.py +++ b/onto/database/mock.py @@ -1,6 +1,6 @@ from onto.common import _NA -from onto.database import Database, Reference, Snapshot, Listener -from onto.query.query import Query +from onto.database import Database, Reference, Snapshot +from onto.database.utils import GenericListener class MockReference(Reference): @@ -30,7 +30,7 @@ class Comparators(Database.Comparators): @classmethod def listener(cls): - return MockListener + return GenericListener d = dict() @@ -67,36 +67,3 @@ def query(cls, q): yield from () -class MockListener(Listener): - from asyncio.queues import Queue - from collections import defaultdict - # - # def create_queue(): - # from onto.context import Context as CTX - - qs = defaultdict(Queue) - - @classmethod - def _pub(cls, reference: Reference, snapshot: Snapshot): - col = reference.collection - cls.qs[col].put_nowait((reference, snapshot)) - - @classmethod - async def _sub(cls, col): - while True: - item = await cls.qs[col].get() - if item is None: - break - try: - yield item - except Exception as e: - from onto.context import Context as CTX - CTX.logger.exception(f"a task in the queue has failed {item}") - cls.qs[col].task_done() - - @classmethod - async def listen(cls, col, source): - async for ref, snapshot in cls._sub(col): - await source._invoke_mediator( - func_name='on_create', ref=ref, snapshot=snapshot) - diff --git a/onto/database/utils.py b/onto/database/utils.py new file mode 100644 index 0000000..0c7cd5d --- /dev/null +++ b/onto/database/utils.py @@ -0,0 +1,35 @@ +from onto.database import Listener, Reference, Snapshot + + +class GenericListener(Listener): + from asyncio.queues import Queue + from collections import defaultdict + # + # def create_queue(): + # from onto.context import Context as CTX + + qs = defaultdict(Queue) + + @classmethod + def _pub(cls, reference: Reference, snapshot: Snapshot): + col = reference.collection + cls.qs[col].put_nowait((reference, snapshot)) + + @classmethod + async def _sub(cls, col): + while True: + item = await cls.qs[col].get() + if item is None: + break + try: + yield item + except Exception as e: + from onto.context import Context as CTX + CTX.logger.exception(f"a task in the queue has failed {item}") + cls.qs[col].task_done() + + @classmethod + async def listen(cls, col, source): + async for ref, snapshot in cls._sub(col): + await source._invoke_mediator( + func_name='on_create', ref=ref, snapshot=snapshot) \ No newline at end of file diff --git a/onto/sink/graphql.py b/onto/sink/graphql.py index 19e6cee..8b0ae2d 100644 --- a/onto/sink/graphql.py +++ b/onto/sink/graphql.py @@ -118,7 +118,8 @@ def __init__(self, view_model_cls: Type[ViewModel], camelize=True, many=False): self.many = many super().__init__() - def _param_to_graphql_arg(self, annotated_type): + @staticmethod + def _param_to_graphql_arg(annotated_type): from onto.models.utils import _graphql_type_from_py from graphql import GraphQLArgument, GraphQLInputObjectType # TODO: add notes about `typing.*` not supported @@ -245,10 +246,17 @@ def _register_op(self): async def f(parent, info, **kwargs): kwargs = { - 'user': self._get_user(info), 'info': info, **kwargs, } + + try: + user = self._get_user(info) + kwargs['user'] = user + except: + import logging + logging.error('未能解析user,可能是没有装载 AuthMiddleware;程序将继续执行以兼容不需要用户的测试代码') + res = await self._invoke_mediator(func_name='query', **kwargs) return res diff --git a/onto/source/base.py b/onto/source/base.py index 94458f0..7c82f5b 100644 --- a/onto/source/base.py +++ b/onto/source/base.py @@ -21,7 +21,7 @@ class Source(SourceBase): _protocol_cls = Protocol - def __init__(self): + def __init__(self) -> object: """ Initializes a ViewMediator to declare protocols that are called when the results of a query change. Note that mediator.start must be called later. diff --git a/onto/source/kafka.py b/onto/source/kafka.py index 06e01a2..a8f084a 100644 --- a/onto/source/kafka.py +++ b/onto/source/kafka.py @@ -1,11 +1,22 @@ from onto.source.base import Source -async def _kafka_subscribe(topic_name, callback): +async def _kafka_subscribe(topic_name, callback, bootstrap_servers='kafka.default.svc.cluster.local:9092'): from aiokafka import AIOKafkaConsumer + + def key_deserializer(v: bytes): + return v.decode("utf-8") + + def value_deserializer(v: bytes): + import json + s = v.decode('utf-8') + return json.loads(s) + consumer = AIOKafkaConsumer( topic_name, - bootstrap_servers='10.10.8.140:9092', + bootstrap_servers=bootstrap_servers, + key_deserializer=key_deserializer, + value_deserializer=value_deserializer # group_id="my-group" ) # Get cluster layout and join group `my-group` @@ -14,7 +25,7 @@ async def _kafka_subscribe(topic_name, callback): try: # Consume messages async for msg in consumer: - callback(message=msg) + await callback(message=msg) # print("consumed: ", msg.topic, msg.partition, msg.offset, # msg.key, msg.value, msg.timestamp) except Exception as e: @@ -47,5 +58,5 @@ def start(self, loop): async def _register(self): from functools import partial - f = partial(self._invoke_mediator, func_name='on_topic') + f = partial(self._invoke_mediator_async, func_name='on_topic') await _kafka_subscribe(topic_name=self.topic_name, callback=f)