From 423bc99401f47738a67be0d5118b7a3cb5369d0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20G=C3=B6ransson?= Date: Thu, 2 Dec 2021 12:18:27 +0100 Subject: [PATCH] expression support for service bus functionality (#31) * initial commit * Implemented support for expression argument in "frontend" * content_type is needed from variable and user * implementation of handling of expression in AsyncServiceBusHandler * fixes for problems found when testing * do not print output from async-messaged * refinement of expression implementation for service bus --- .github/workflows/code-quality.yaml | 4 +- grizzly/locust.py | 3 +- grizzly/steps/scenario/tasks.py | 3 + grizzly/testdata/utils.py | 10 +- grizzly/testdata/variables/servicebus.py | 92 ++++-- grizzly/users/messagequeue.py | 7 +- grizzly/users/servicebus.py | 117 ++++++-- grizzly_extras/arguments.py | 8 +- grizzly_extras/async_message/__init__.py | 9 +- grizzly_extras/async_message/mq.py | 8 +- grizzly_extras/async_message/sb.py | 222 ++++++++++---- grizzly_extras/transformer.py | 13 +- .../testdata/variables/test_messagequeue.py | 3 +- .../testdata/variables/test_servicebus.py | 84 +++++- tests/test_grizzly/users/test_messagequeue.py | 35 ++- tests/test_grizzly/users/test_servicebus.py | 112 ++++++- .../async_message/test_sb.py | 277 ++++++++++++++---- tests/test_grizzly_extras/test_transformer.py | 15 +- 18 files changed, 831 insertions(+), 191 deletions(-) diff --git a/.github/workflows/code-quality.yaml b/.github/workflows/code-quality.yaml index 9207ae81..ddcc2f5d 100644 --- a/.github/workflows/code-quality.yaml +++ b/.github/workflows/code-quality.yaml @@ -2,8 +2,8 @@ name: code quality on: pull_request: - types: [opened, synchronize] - workflow_dispatch: + branches: + - main jobs: code-quality: diff --git a/grizzly/locust.py b/grizzly/locust.py index 37a9a736..50675a92 100644 --- a/grizzly/locust.py +++ b/grizzly/locust.py @@ -412,8 +412,7 @@ def sig_term_handler() -> None: logger.info(f'stopping {external_dependency}') external_process.terminate() if context.config.verbose: - stdout, _ = external_process.communicate() - logger.debug(stdout.decode()) + external_process.communicate() logger.debug(f'{external_process.returncode=}') external_processes.clear() diff --git a/grizzly/steps/scenario/tasks.py b/grizzly/steps/scenario/tasks.py index 1e20fb04..075ed3bc 100644 --- a/grizzly/steps/scenario/tasks.py +++ b/grizzly/steps/scenario/tasks.py @@ -219,6 +219,9 @@ def step_task_print_message(context: Context, message: str) -> None: grizzly = cast(GrizzlyContext, context.grizzly) grizzly.scenario.add_task(PrintTask(message=message)) + if '{{' in message and '}}' in message: + grizzly.scenario.orphan_templates.append(message) + @then(u'parse "{content}" as "{content_type:ContentType}" and save value of "{expression}" in variable "{variable}"') def step_task_transform(context: Context, content: str, content_type: TransformerContentType, expression: str, variable: str) -> None: diff --git a/grizzly/testdata/utils.py b/grizzly/testdata/utils.py index 40d5ad09..44a136bf 100644 --- a/grizzly/testdata/utils.py +++ b/grizzly/testdata/utils.py @@ -147,6 +147,11 @@ def resolve_variable(grizzly: GrizzlyContext, value: str, guess_datatype: Option if len(value) < 1: return value + quote_char: Optional[str] = None + if value[0] in ['"', "'"] and value[0] == value[-1]: + quote_char = value[0] + value = value[1:-1] + resolved_variable: GrizzlyDictValueType if '{{' in value and '}}' in value: template = Template(value) @@ -158,7 +163,7 @@ def resolve_variable(grizzly: GrizzlyContext, value: str, guess_datatype: Option assert template_variable in grizzly.state.variables, f'value contained variable "{template_variable}" which has not been set' resolved_variable = template.render(**grizzly.state.variables) - elif len(value) > 4 and value[0] == '$': + elif len(value) > 4 and value[0] == '$' and value[1] != '.': # $. is jsonpath expression... if value[0:5] == '$conf': variable = value[7:] assert variable in grizzly.state.configuration, f'configuration variable "{variable}" is not set' @@ -175,5 +180,8 @@ def resolve_variable(grizzly: GrizzlyContext, value: str, guess_datatype: Option if guess_datatype: resolved_variable = GrizzlyDict.guess_datatype(resolved_variable) + elif quote_char is not None and isinstance(resolved_variable, str) and resolved_variable.count(' ') > 0: + resolved_variable = f'{quote_char}{resolved_variable}{quote_char}' + return resolved_variable diff --git a/grizzly/testdata/variables/servicebus.py b/grizzly/testdata/variables/servicebus.py index 00a22810..952d4bd1 100644 --- a/grizzly/testdata/variables/servicebus.py +++ b/grizzly/testdata/variables/servicebus.py @@ -1,25 +1,41 @@ +# pylint: disable=line-too-long '''Listens for messages on Azure Service Bus queue or topic. Use [transformer task](/grizzly/usage/tasks/transformer/) to extract specific parts of the message. ## Format -Initial value is the name of the queue or topic, prefix with the endpoint type. If the endpoint is a topic the additional value subscription -is mandatory. Arguments support templating for their value, but not the complete endpoint value. +Initial value for a variable must have the prefix `queue:` or `topic:` followed by the name of the targeted +type. When receiving messages from a topic, the argument `subscription:` is mandatory. The format of endpoint is: + +```plain +[queue|topic]:[, subscription:][, expression:] +``` + +Where `` can be a XPath or jsonpath expression, depending on the specified content type. This argument is only allowed when +receiving messages. See example below. + +> **Warning**: Do not use `expression` to filter messages unless you do not care about the messages that does not match the expression. If +> you do care about them, you should setup a subscription to do the filtering in Azure. + +Arguments support templating for their value, but not the complete endpoint value. Examples: + ```plain queue:test-queue topic:test-topic, subscription:test-subscription -queue:$conf::sb.endpoint.queue -topic:$conf::sb.endpoint.topic, subscription:$conf::sb.endpoint.subscription +queue:"$conf::sb.endpoint.queue" +topic:"$conf::sb.endpoint.topic", subscription:"$conf::sb.endpoint.subscription" +queue:"{{ queue_name }}", expression="$.document[?(@.name=='TPM report')]" ``` ## Arguments -* `repeat` _bool_ (optional) - if `True`, values read for the queue will be saved in a list and re-used if there are no new messages available +* `repeat` _bool_ (optional) - if `True`, values read from the endpoint will be saved in a list and re-used if there are no new messages available * `url` _str_ - see format of url below. * `wait` _int_ - number of seconds to wait for a message on the queue +* `content_type` _str_ (optional) - specify the MIME type of the message received on the queue, only mandatory when `expression` is specified in endpoint ### URL format @@ -40,7 +56,7 @@ ## Example ```gherkin -And value of variable "AtomicServiceBus.document_id" is "queue:documents-in | wait=120, url=$conf::sb.endpoint, repeat=True, expression='$.document.id', content_type=json" +And value of variable "AtomicServiceBus.document_id" is "queue:documents-in | wait=120, url=$conf::sb.endpoint, repeat=True" ... Given a user of type "RestApi" load testing "http://example.com" ... @@ -51,6 +67,16 @@ If there are no messages within 120 seconds, and it is the first iteration of the scenario, it will fail. If there has been at least one message on the queue since the scenario started, it will use the oldest of those values, and then add it back in the end of the list again. + +### Get message with expression + +When specifying an expression, the messages on the endpoint is first peeked on. If any message matches the expression, it is later consumed from the endpoint. +If no matching messages was found when peeking, it is repeated again up until the specified `wait` seconds has elapsed. To use expression, a content type must +be specified for the endpint, e.g. `application/xml`. + +```gherking +And value of variable "AtomicServiceBus.document_id" is "queue:documents-in | wait=120, url=$conf::sb.endpoint, repeat=True, content_type=json, expression='$.document[?(@.name=='TPM Report')'" +``` ''' from typing import Dict, Any, List, Type, Optional, cast @@ -61,6 +87,7 @@ from gevent import sleep as gsleep from grizzly_extras.async_message import AsyncMessageContext, AsyncMessageRequest, AsyncMessageResponse from grizzly_extras.arguments import split_value, parse_arguments, get_unsupported_arguments +from grizzly_extras.transformer import TransformerContentType from ...types import AtomicVariable, bool_typed from ...context import GrizzlyContext @@ -133,45 +160,57 @@ def atomicservicebus_endpoint(endpoint: str) -> str: raise ValueError(f'AtomicServiceBus: {endpoint} does not specify queue: or topic:') try: - arguments = parse_arguments(endpoint, ':') + arguments = parse_arguments(endpoint, ':', unquote=False) except ValueError as e: raise ValueError(f'AtomicServiceBus: {str(e)}') from e if 'topic' not in arguments and 'queue' not in arguments: - raise ValueError(f'AtomicServiceBus: only support endpoint types queue and topic, not {arguments.keys()}') + raise ValueError(f'AtomicServiceBus: endpoint needs to be prefixed with queue: or topic:') + + if 'topic' in arguments and 'queue' in arguments: + raise ValueError('AtomicServiceBus: cannot specify both topic: and queue: in endpoint') endpoint_type = 'topic' if 'topic' in arguments else 'queue' if len(arguments) > 1: - if endpoint_type != 'topic': - raise ValueError(f'AtomicServiceBus: additional arguments in endpoint is only supported for topic') + if endpoint_type != 'topic' and 'subscription' in arguments: + raise ValueError(f'AtomicServiceBus: argument subscription is only allowed if endpoint is a topic') - unsupported_arguments = get_unsupported_arguments(['topic', 'queue', 'subscription'], arguments) + unsupported_arguments = get_unsupported_arguments(['topic', 'queue', 'subscription', 'expression'], arguments) if len(unsupported_arguments) > 0: raise ValueError(f'AtomicServiceBus: arguments {", ".join(unsupported_arguments)} is not supported') - if endpoint_type == 'topic' and arguments.get('subscription', None) is None: + expression = arguments.get('expression', None) + subscription = arguments.get('subscription', None) + if endpoint_type == 'topic' and subscription is None: raise ValueError(f'AtomicServiceBus: endpoint needs to include subscription when receiving messages from a topic') grizzly = GrizzlyContext() try: - resolved_endpoint_name = resolve_variable(grizzly, arguments[endpoint_type]) + resolved_endpoint_name = cast(str, resolve_variable(grizzly, arguments[endpoint_type], guess_datatype=False)) except Exception as e: raise ValueError(f'AtomicServiceBus: {str(e)}') from e endpoint = f'{endpoint_type}:{resolved_endpoint_name}' - subscription = arguments.get('subscription', None) if subscription is not None: try: - resolved_subscription_name = resolve_variable(grizzly, subscription) + resolved_subscription_name = cast(str, resolve_variable(grizzly, subscription, guess_datatype=False)) except Exception as e: raise ValueError(f'AtomicServiceBus: {str(e)}') from e endpoint = f'{endpoint}, subscription:{resolved_subscription_name}' + if expression is not None: + try: + resolved_expression = cast(str, resolve_variable(grizzly, expression, guess_datatype=False)) + except Exception as e: + raise ValueError(f'AtomicServiceBus: {str(e)}') from e + + endpoint = f'{endpoint}, expression:{resolved_expression}' + return endpoint @@ -194,21 +233,26 @@ class AtomicServiceBus(AtomicVariable[str]): 'url': atomicservicebus_url, 'wait': int, 'endpoint_name': atomicservicebus_endpoint, + 'content_type': TransformerContentType.from_string, } def __init__(self, variable: str, value: str) -> None: safe_value = self.__class__.__base_type__(value) - settings = {'repeat': False, 'wait': None, 'url': None, 'worker': None, 'context': None, 'endpoint_name': None} + settings = {'repeat': False, 'wait': None, 'url': None, 'worker': None, 'context': None, 'endpoint_name': None, 'content_type': None} endpoint_name, endpoint_arguments = split_value(safe_value) arguments = parse_arguments(endpoint_arguments) + endpoint_parameters = parse_arguments(endpoint_name, ':') for argument, caster in self.__class__.arguments.items(): if argument in arguments: settings[argument] = caster(arguments[argument]) + if 'expression' in endpoint_parameters and not 'content_type' in arguments: + raise ValueError(f'{self.__class__.__name__}.{variable}: argument "content_type" is mandatory when "expression" is used in endpoint') + settings['endpoint_name'] = self.arguments['endpoint_name'](endpoint_name) super().__init__(variable, endpoint_name) @@ -246,6 +290,10 @@ def create_context(cls, settings: Dict[str, Any]) -> AsyncMessageContext: 'message_wait': settings.get('wait', None), } + content_type = settings.get('content_type', None) + if content_type is not None: + context.update({'content_type': content_type.name.lower()}) + return context def create_client(self, variable: str, settings: Dict[str, Any]) -> zmq.Socket: @@ -260,7 +308,16 @@ def create_client(self, variable: str, settings: Dict[str, Any]) -> zmq.Socket: def say_hello(self, client: zmq.Socket, variable: str) -> None: settings = self._settings[variable] - context = settings['context'] + context = cast(AsyncMessageContext, dict(settings['context'])) + + endpoint_arguments = parse_arguments(context['endpoint'], ':') + try: + del endpoint_arguments['expression'] + except: + pass + + cache_endpoint = ', '.join([f'{key}:{value}' for key, value in endpoint_arguments.items()]) + context['endpoint'] = cache_endpoint if settings.get('worker', None) is not None: return @@ -382,6 +439,7 @@ def __delitem__(self, variable: str) -> None: try: del self._settings[variable] del self._endpoint_messages[variable] + try: self._endpoint_clients[variable].disconnect(self._zmq_url) except (zmq.ZMQError, AttributeError, ): diff --git a/grizzly/users/messagequeue.py b/grizzly/users/messagequeue.py index d8d96f8c..525e68eb 100644 --- a/grizzly/users/messagequeue.py +++ b/grizzly/users/messagequeue.py @@ -30,7 +30,7 @@ queue:[, expression:] ``` -Where `` can be of XPath or jsonpath type, depending on the specified content type. See example below. +Where `` can be a XPath or jsonpath expression, depending on the specified content type. See example below. ## Examples @@ -58,7 +58,7 @@ When specifying an expression, the messages on the queue are first browsed. If any message matches the expression, it is later consumed from the queue. If no matching message was found during browsing, it is repeated again after a slight delay, up until the specified `message.wait` seconds has elapsed. To use expressions, a content type must be specified for the get -request, e.g. `"application/xml"`: +request, e.g. `application/xml`: ```gherkin Given a user of type "MessageQueue" load testing "mq://mq.example.com/?QueueManager=QM01&Channel=SRVCONN01" @@ -104,6 +104,7 @@ from gevent import sleep as gsleep from locust.exception import StopUser +from grizzly.types import RequestDirection from grizzly_extras.async_message import AsyncMessageContext, AsyncMessageRequest, AsyncMessageResponse, AsyncMessageError from grizzly_extras.arguments import get_unsupported_arguments, parse_arguments @@ -314,7 +315,7 @@ def action(am_request: AsyncMessageRequest, name: str) -> Generator[Dict[str, An metadata['abort'] = True # Parse the endpoint to validate queue name / expression parts try: - arguments = parse_arguments(request.endpoint, ':') + arguments = parse_arguments(endpoint, ':') except ValueError as e: raise RuntimeError(str(e)) from e diff --git a/grizzly/users/servicebus.py b/grizzly/users/servicebus.py index 2660b246..b1492bbc 100644 --- a/grizzly/users/servicebus.py +++ b/grizzly/users/servicebus.py @@ -2,6 +2,9 @@ > **Note**: If `message.wait` is not set, `azure.servicebus` will wait until there is a message available, and hence block the scenario. +> **Warning**: Do not use `expression` to filter messages unless you do not care about the messages that does not match the expression. If +> you do care about them, you should setup a subscription to do the filtering in Azure. + User is based on `azure.servicebus` for communicating with Azure Service Bus. But creating a connection and session towards a queue or a topic is a costly operation, and caching of the session was causing problems with `gevent` due to the sockets blocking and hence locust/grizzly was blocking when finished. To get around this, the user implementation communicates with a stand-alone process via zmq, which in turn communicates @@ -25,7 +28,14 @@ ``` `endpoint` in the request must have the prefix `queue:` or `topic:` followed by the name of the targeted -type. If you are going to receive messages from a topic, and additional `subscription:` som follow the specified `topic:`. +type. When receiving messages from a topic, the argument `subscription:` is mandatory. The format of endpoint is: + +```plain +[queue|topic]:[, subscription:][, expression:] +``` + +Where `` can be a XPath or jsonpath expression, depending on the specified content type. This argument is only allowed when +receiving messages. See example below. ## Examples @@ -39,6 +49,21 @@ Then receive request "queue-recv" from endpoint "queue:shared-queue" Then receive request "topic-recv" from endpoint "topic:shared-topic, subscription:my-subscription" ``` + +### Get message with expression + +When specifying an expression, the messages on the endpoint is first peeked on. If any message matches the expression, it is later consumed from the +endpoint. If no matching messages was found when peeking, it is repeated again after a slight delay, up until the specified `message.wait` seconds has +elapsed. To use expressions, a content type must be specified for the request, e.g. `application/xml`. + +```gherkin +Given a user of type "ServiceBus" load testing "sb://sb.example.com/;SharedAccessKeyName=authorization-key;SharedAccessKey=c2VjcmV0LXN0dWZm" +And set context variable "message.wait" to "5" +Then receive request "queue-recv" from endpoint "queue:shared-queue, expression:$.document[?(@.name=='TPM report')].id" +And set response content type to "application/json" +Then receive request "topic-recv" from endpoint "topic:shared-topic, subscription:my-subscription, expression:/documents/document[@name='TPM Report']/id/text()" +And set response content type to "application/xml" +``` ''' from typing import Generator, Dict, Any, Tuple, Optional, Set, cast from urllib.parse import urlparse, parse_qs @@ -50,6 +75,8 @@ from locust.exception import StopUser from gevent import sleep as gsleep from grizzly_extras.async_message import AsyncMessageContext, AsyncMessageResponse, AsyncMessageRequest, AsyncMessageError +from grizzly_extras.arguments import parse_arguments, get_unsupported_arguments +from grizzly_extras.transformer import TransformerContentType from ..types import RequestMethod, RequestDirection from ..task import RequestTask @@ -133,24 +160,32 @@ def __init__(self, *args: Tuple[Any], **kwargs: Dict[str, Any]) -> None: def say_hello(self, task: RequestTask, endpoint: str) -> None: if ('{{' in endpoint and '}}' in endpoint) or '$conf' in endpoint or '$env' in endpoint: - logger.warning(f'{self.__class__.__name__}: cannot say hello for {task.name} when endpoint ({endpoint}) is a template') + logger.warning(f'{self.__class__.__name__}: cannot say hello for {task.name} when endpoint is a template') return connection = 'sender' if task.method.direction == RequestDirection.TO else 'receiver' - if ',' in endpoint: - name = endpoint.split(',', 1)[0] - else: - name = endpoint - description = f'{connection}={endpoint}' - name = f'{connection}={name.strip()}' + try: + arguments = parse_arguments(endpoint, ':') + except ValueError as e: + raise RuntimeError(str(e)) from e + endpoint_arguments = dict(arguments) + + try: + del endpoint_arguments['expression'] + except: + pass + + cache_endpoint = ', '.join([f'{key}:{value}' for key, value in endpoint_arguments.items()]) + + description = f'{connection}={cache_endpoint}' if description in self.hellos: return context = cast(AsyncMessageContext, dict(self.am_context)) context.update({ - 'endpoint': endpoint, + 'endpoint': cache_endpoint, }) request: AsyncMessageRequest = { @@ -159,16 +194,53 @@ def say_hello(self, task: RequestTask, endpoint: str) -> None: 'context': context, } - with self.async_action(task, request, name, meta=True): - pass + with self.async_action(task, request, description) as metadata: + metadata.update({ + 'meta': True, + 'abort': True, + }) + + if 'queue' not in arguments and 'topic' not in arguments: + raise RuntimeError('endpoint needs to be prefixed with queue: or topic:') + + if 'queue' in arguments and 'topic' in arguments: + raise RuntimeError('cannot specify both topic: and queue: in endpoint') + + endpoint_type = 'topic' if 'topic' in arguments else 'queue' + + if len(arguments) > 1: + if endpoint_type != 'topic' and 'subscription' in arguments: + raise RuntimeError('argument subscription is only allowed if endpoint is a topic') + + unsupported_arguments = get_unsupported_arguments(['topic', 'queue', 'subscription', 'expression'], arguments) + + if len(unsupported_arguments) > 0: + raise RuntimeError(f'arguments {", ".join(unsupported_arguments)} is not supported') + + if endpoint_type == 'topic' and arguments.get('subscription', None) is None and task.method.direction == RequestDirection.FROM: + raise RuntimeError('endpoint needs to include subscription when receiving messages from a topic') + + if task.method.direction == RequestDirection.TO and arguments.get('expression', None) is not None: + raise RuntimeError('argument expression is only allowed when receiving messages') + + metadata['abort'] = task.scenario.stop_on_failure self.hellos.add(description) @contextmanager - def async_action(self, task: RequestTask, request: AsyncMessageRequest, name: str, meta: bool = False) -> Generator[None, None, None]: + def async_action(self, task: RequestTask, request: AsyncMessageRequest, name: str) -> Generator[Dict[str, bool], None, None]: + if len(name) > 65: + name = f'{name[:65]}...' request.update({'worker': self.worker_id}) connection = 'sender' if task.method.direction == RequestDirection.TO else 'receiver' request['context'].update({'connection': connection}) + metadata: Dict[str, bool] = { + 'abort': False, + 'meta': False, + } + + if task.response.content_type != TransformerContentType.GUESS: + request['context']['content_type'] = task.response.content_type.name.lower() response: Optional[AsyncMessageResponse] = None exception: Optional[Exception] = None @@ -176,7 +248,7 @@ def async_action(self, task: RequestTask, request: AsyncMessageRequest, name: st try: start_time = time() - yield + yield metadata self.zmq_client.send_json(request) @@ -192,10 +264,11 @@ def async_action(self, task: RequestTask, request: AsyncMessageRequest, name: st response_time = int((time() - start_time) * 1000) if response is not None: + response_worker = response.get('worker', None) if self.worker_id is None: - self.worker_id = response.get('worker', None) + self.worker_id = response_worker - assert self.worker_id == response.get('worker', '') + assert self.worker_id == response_worker if not response.get('success', False) and exception is None: exception = AsyncMessageError(response['message']) @@ -203,7 +276,7 @@ def async_action(self, task: RequestTask, request: AsyncMessageRequest, name: st response = {} try: - if not meta: + if not metadata.get('meta', False): self.response_event.fire( name=f'{task.scenario.identifier} {task.name}', request=task, @@ -228,7 +301,7 @@ def async_action(self, task: RequestTask, request: AsyncMessageRequest, name: st exception=exception, ) - if exception is not None and not meta and task.scenario.stop_on_failure: + if exception is not None and not metadata.get('meta', False) and task.scenario.stop_on_failure: try: self.zmq_client.disconnect(self.zmq_url) except: @@ -244,9 +317,7 @@ def request(self, request: RequestTask) -> None: self.say_hello(request, endpoint) context = cast(AsyncMessageContext, dict(self.am_context)) - context.update({ - 'endpoint': endpoint, - }) + context['endpoint'] = endpoint am_request: AsyncMessageRequest = { 'action': request.method.name, @@ -254,7 +325,11 @@ def request(self, request: RequestTask) -> None: 'payload': payload, } - with self.async_action(request, am_request, name): + with self.async_action(request, am_request, name) as metadata: + metadata['abort'] = True + if request.method not in [RequestMethod.SEND, RequestMethod.RECEIVE]: raise NotImplementedError(f'{self.__class__.__name__}: no implementation for {request.method.name} requests') + metadata['abort'] = request.scenario.stop_on_failure + diff --git a/grizzly_extras/arguments.py b/grizzly_extras/arguments.py index c7038268..ea2f8bdb 100644 --- a/grizzly_extras/arguments.py +++ b/grizzly_extras/arguments.py @@ -9,7 +9,7 @@ def get_unsupported_arguments(valid_arguments: List[str], arguments: Dict[str, A return [argument for argument in arguments.keys() if argument not in valid_arguments] -def parse_arguments(arguments: str, separator:str = '=') -> Dict[str, Any]: +def parse_arguments(arguments: str, separator: str = '=', unquote: bool = True) -> Dict[str, Any]: if separator not in arguments or (arguments.count(separator) > 1 and (arguments.count('"') < 2 and arguments.count("'") < 2) and ', ' not in arguments): raise ValueError(f'incorrect format in arguments: "{arguments}"') @@ -41,12 +41,14 @@ def parse_arguments(arguments: str, separator:str = '=') -> Dict[str, Any]: if value[-1] != value[0]: raise ValueError(f'value is incorrectly quoted: "{value}"') start_quote = value[0] - value = value[1:] + if unquote: + value = value[1:] if value[-1] in ['"', "'"]: if start_quote is None: raise ValueError(f'value is incorrectly quoted: "{value}"') - value = value[:-1] + if unquote: + value = value[:-1] if start_quote is None and ' ' in value: raise ValueError(f'value needs to be quoted: "{value}"') diff --git a/grizzly_extras/async_message/__init__.py b/grizzly_extras/async_message/__init__.py index acd9a320..40893701 100644 --- a/grizzly_extras/async_message/__init__.py +++ b/grizzly_extras/async_message/__init__.py @@ -75,8 +75,8 @@ def get_handler(self, action: str) -> Optional['AsyncMessageRequestHandler']: def handle(self, request: AsyncMessageRequest) -> AsyncMessageResponse: action = request['action'] request_handler = self.get_handler(action) - logger.debug(f'handling {action}') - logger.debug(jsondumps(request, indent=2, cls=JsonBytesEncoder)) + logger.debug(f'{self.worker}: handling {action}') + logger.debug(f'{self.worker}: {jsondumps(request, indent=2, cls=JsonBytesEncoder)}') response: AsyncMessageResponse @@ -93,6 +93,7 @@ def handle(self, request: AsyncMessageRequest) -> AsyncMessageResponse: 'success': False, 'message': f'{action}: {e.__class__.__name__}="{str(e)}"', } + logger.error(f'{self.worker}: {action}: {e.__class__.__name__}="{str(e)}"', exc_info=True) finally: total_time = int((time() - start_time) * 1000) response.update({ @@ -100,8 +101,8 @@ def handle(self, request: AsyncMessageRequest) -> AsyncMessageResponse: 'response_time': total_time, }) - logger.debug(f'handled {action}') - logger.debug(jsondumps(response, indent=2, cls=JsonBytesEncoder)) + logger.debug(f'{self.worker}: handled {action}') + logger.debug(f'{self.worker}: {jsondumps(response, indent=2, cls=JsonBytesEncoder)}') return response diff --git a/grizzly_extras/async_message/mq.py b/grizzly_extras/async_message/mq.py index 6e5abbf5..66ce5c14 100644 --- a/grizzly_extras/async_message/mq.py +++ b/grizzly_extras/async_message/mq.py @@ -125,7 +125,7 @@ def _create_gmo(self, message_wait: Optional[int] = None, browsing: Optional[boo def _find_message(self, queue_name: str, expression: str, content_type: TransformerContentType, message_wait: Optional[int]) -> Optional[bytearray]: start_time = time() - logger.debug(f'_find_message: searching {queue_name} for messages matching: {expression}, content_type {content_type.name.lower()}') + logger.debug(f'{self.worker}: _find_message: searching {queue_name} for messages matching: {expression}, content_type {content_type.name.lower()}') transform = transformer.available.get(content_type, None) if transform is None: raise AsyncMessageError(f'could not find a transformer for {content_type.name}') @@ -156,7 +156,7 @@ def _find_message(self, queue_name: str, expression: str, content_type: Transfor if len(values) > 0: # Found a matching message, return message id - logger.debug(f'_find_message: found matching message: {md["MsgId"]}') + logger.debug(f'{self.worker}: _find_message: found matching message: {md["MsgId"]}') return cast(bytearray, md['MsgId']) gmo.Options = pymqi.CMQC.MQGMO_BROWSE_NEXT @@ -176,7 +176,7 @@ def _find_message(self, queue_name: str, expression: str, content_type: Transfor elif message_wait is None: return None else: - logger.debug(f'_find_message: no matching message found, trying again after some sleep') + logger.debug(f'{self.worker}: _find_message: no matching message found, trying again after some sleep') sleep(0.5) def _get_content_type(self, request: AsyncMessageRequest) -> TransformerContentType: @@ -225,7 +225,7 @@ def _request(self, request: AsyncMessageRequest) -> AsyncMessageResponse: # Adjust message_wait for getting the message if message_wait is not None: message_wait -= elapsed_time - logger.debug(f'_request: remaining message_wait after finding message: {message_wait}') + logger.debug(f'{self.worker}: _request: remaining message_wait after finding message: {message_wait}') md = pymqi.MD() with self.queue_context(endpoint=queue_name) as queue: diff --git a/grizzly_extras/async_message/sb.py b/grizzly_extras/async_message/sb.py index 3f0b98a8..7d1edc69 100644 --- a/grizzly_extras/async_message/sb.py +++ b/grizzly_extras/async_message/sb.py @@ -1,12 +1,15 @@ import logging from typing import Any, Callable, Dict, Optional, Union, Tuple, Iterable, cast +from time import monotonic as time, sleep from mypy_extensions import VarArg, KwArg from azure.servicebus import ServiceBusClient, ServiceBusMessage, TransportType, ServiceBusSender, ServiceBusReceiver from azure.servicebus.amqp import AmqpMessageBodyType from azure.servicebus.amqp._amqp_message import DictMixin +from grizzly_extras.transformer import TransformerError, transformer, TransformerContentType + from ..arguments import parse_arguments, get_unsupported_arguments from . import ( @@ -16,6 +19,7 @@ AsyncMessageResponse, AsyncMessageError, register, + logger, ) __all__ = [ @@ -33,6 +37,7 @@ class AsyncServiceBusHandler(AsyncMessageHandler): _sender_cache: Dict[str, ServiceBusSender] _receiver_cache: Dict[str, ServiceBusReceiver] + _arguments: Dict[str, Dict[str, str]] client: Optional[ServiceBusClient] = None @@ -41,84 +46,59 @@ def __init__(self, worker: str) -> None: self._sender_cache = {} self._receiver_cache = {} + self._arguments = {} # silence uamqp loggers logging.getLogger('uamqp').setLevel(logging.ERROR) @classmethod - def get_endpoint_details(cls, instance_type: str, endpoint: str) -> Tuple[str, str, Optional[str]]: - if ':' not in endpoint: - raise AsyncMessageError(f'"{endpoint}" is not prefixed with queue: or topic:') - - endpoint_type: str - endpoint_name: str - subscription_name: Optional[str] = None - - arguments = parse_arguments(endpoint, ':') - - if 'queue' not in arguments and 'topic' not in arguments: - raise AsyncMessageError(f'only support for endpoint types queue and topic, not {", ".join(arguments.keys())}') - - endpoint_type = 'topic' if 'topic' in arguments else 'queue' - - if instance_type != 'receiver' and len(arguments) > 1: - raise AsyncMessageError(f'additional arguments in endpoint is not supported for {instance_type}') - - unsupported_arguments = get_unsupported_arguments(['queue', 'topic', 'subscription'], arguments) - if len(unsupported_arguments) > 0: - raise AsyncMessageError(f'arguments {", ".join(unsupported_arguments)} is not supported') - - endpoint_name = arguments.get(endpoint_type, None) - subscription_name = arguments.get('subscription', None) - - if endpoint_type == 'topic' and subscription_name is None and instance_type == 'receiver': - raise AsyncMessageError('endpoint needs to include subscription when receiving messages from a topic') + def get_sender_instance(cls, client: ServiceBusClient, arguments: Dict[str, str]) -> ServiceBusSender: + endpoint_type = arguments['endpoint_type'] + endpoint_name = arguments['endpoint'] - return endpoint_type, endpoint_name, subscription_name - - @classmethod - def get_sender_instance(cls, client: ServiceBusClient, endpoint: str) -> ServiceBusSender: - arguments: Dict[str, Any] = {} - endpoint_type, endpoint_name, _ = cls.get_endpoint_details('sender', endpoint) + sender_arguments: Dict[str, str] = {} sender_type: Callable[[KwArg(Any)], ServiceBusSender] if endpoint_type == 'queue': - arguments.update({'queue_name': endpoint_name}) + sender_arguments.update({'queue_name': endpoint_name}) sender_type = cast( Callable[[KwArg(Any)], ServiceBusSender], client.get_queue_sender, ) else: - arguments.update({'topic_name': endpoint_name}) + sender_arguments.update({'topic_name': endpoint_name}) sender_type = cast( Callable[[KwArg(Any)], ServiceBusSender], client.get_topic_sender, ) - return sender_type(**arguments) + return sender_type(**sender_arguments) @classmethod - def get_receiver_instance(cls, client: ServiceBusClient, endpoint: str, message_wait: Optional[int] = None) -> ServiceBusReceiver: - arguments: Dict[str, Any] = {} - endpoint_type, endpoint_name, subscription_name = cls.get_endpoint_details('receiver', endpoint) + def get_receiver_instance(cls, client: ServiceBusClient, arguments: Dict[str, str]) -> ServiceBusReceiver: + endpoint_type = arguments['endpoint_type'] + endpoint_name = arguments['endpoint'] + subscription_name = arguments.get('subscription', None) + message_wait = arguments.get('wait', None) + receiver_arguments: Dict[str, Any] = {} receiver_type: Callable[[KwArg(Any)], ServiceBusReceiver] if message_wait is not None: - arguments.update({'max_wait_time': message_wait}) + receiver_arguments.update({'max_wait_time': int(message_wait)}) if endpoint_type == 'queue': receiver_type = cast(Callable[[KwArg(Any)], ServiceBusReceiver], client.get_queue_receiver) - arguments.update({'queue_name': endpoint_name}) + receiver_arguments.update({'queue_name': endpoint_name}) else: receiver_type = cast(Callable[[KwArg(Any)], ServiceBusReceiver], client.get_subscription_receiver) - arguments.update({ + receiver_arguments.update({ 'topic_name': endpoint_name, 'subscription_name': subscription_name, }) - return receiver_type(**arguments) + return receiver_type(**receiver_arguments) @classmethod def from_message(cls, message: Optional[ServiceBusMessage]) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: @@ -156,13 +136,46 @@ def to_dict(obj: Optional[DictMixin]) -> Dict[str, Any]: return metadata, payload + @classmethod + def get_endpoint_arguments(cls, instance_type: str, endpoint: str) -> Dict[str, str]: + arguments = parse_arguments(endpoint, ':') + + if 'queue' not in arguments and 'topic' not in arguments: + raise ValueError('endpoint needs to be prefixed with queue: or topic:') + + if 'queue' in arguments and 'topic' in arguments: + raise ValueError('cannot specify both topic: and queue: in endpoint') + + endpoint_type = 'topic' if 'topic' in arguments else 'queue' + + if len(arguments) > 1: + if endpoint_type != 'topic' and 'subscription' in arguments: + raise ValueError('argument subscription is only allowed if endpoint is a topic') + + unsupported_arguments = get_unsupported_arguments(['topic', 'queue', 'subscription', 'expression'], arguments) + + if len(unsupported_arguments) > 0: + raise ValueError(f'arguments {", ".join(unsupported_arguments)} is not supported') + + if endpoint_type == 'topic' and arguments.get('subscription', None) is None and instance_type == 'receiver': + raise ValueError('endpoint needs to include subscription when receiving messages from a topic') + + if instance_type == 'sender' and arguments.get('expression', None) is not None: + raise ValueError('argument expression is only allowed when receiving messages') + + arguments['endpoint_type'] = endpoint_type + arguments['endpoint'] = arguments[endpoint_type] + + del arguments[endpoint_type] + + return arguments + @register(handlers, 'HELLO') def hello(self, request: AsyncMessageRequest) -> AsyncMessageResponse: context = request.get('context', None) if context is None: raise AsyncMessageError('no context in request') - self.message_wait = context.get('message_wait', None) url = context['url'] if self.client is None: @@ -174,11 +187,16 @@ def hello(self, request: AsyncMessageRequest) -> AsyncMessageResponse: transport_type=TransportType.AmqpOverWebsocket, ) + endpoint = context['endpoint'] instance_type = context['connection'] + message_wait = context.get('message_wait', None) + + arguments = self.get_endpoint_arguments(instance_type, endpoint) + if message_wait is not None and instance_type == 'receiver': + arguments['wait'] = str(message_wait) cache: GenericCache - endpoint = context['endpoint'] - arguments: Tuple[Any, ...] = (self.client, endpoint, ) + get_instance: GenericInstance if instance_type == 'sender': @@ -186,13 +204,13 @@ def hello(self, request: AsyncMessageRequest) -> AsyncMessageResponse: get_instance = cast(GenericInstance, self.get_sender_instance) elif instance_type == 'receiver': cache = cast(GenericCache, self._receiver_cache) - arguments += (self.message_wait, ) get_instance = cast(GenericInstance, self.get_receiver_instance) else: raise AsyncMessageError(f'"{instance_type}" is not a valid value for context.connection') if endpoint not in cache: - cache.update({endpoint: get_instance(*arguments).__enter__()}) + self._arguments[f'{instance_type}={endpoint}'] = arguments + cache.update({endpoint: get_instance(self.client, arguments).__enter__()}) return { 'message': 'there general kenobi', @@ -206,39 +224,117 @@ def request(self, request: AsyncMessageRequest) -> AsyncMessageResponse: instance_type = context.get('connection', None) endpoint = context['endpoint'] + endpoint_arguments = parse_arguments(endpoint, ':') + request_arguments = dict(endpoint_arguments) + + try: + del endpoint_arguments['expression'] + except: + pass + + cache_endpoint = ', '.join([f'{key}:{value}' for key, value in endpoint_arguments.items()]) - message: ServiceBusMessage + message: Optional[ServiceBusMessage] = None metadata: Optional[Dict[str, Any]] = None payload = request.get('payload', None) + if instance_type not in ['receiver', 'sender']: + raise AsyncMessageError(f'"{instance_type}" is not a valid value for context.connection') + + arguments = self._arguments.get(f'{instance_type}={cache_endpoint}', None) + + if arguments is None: + raise AsyncMessageError(f'no HELLO received for {cache_endpoint}') + + expression = request_arguments.get('expression', None) + if instance_type == 'sender': if payload is None: raise AsyncMessageError('no payload') - sender = self._sender_cache.get(endpoint, None) - if sender is None: - raise AsyncMessageError(f'no HELLO sent for {endpoint}') - + sender = self._sender_cache[cache_endpoint] message = ServiceBusMessage(payload) + try: sender.send_messages(message) except Exception as e: - raise AsyncMessageError('failed to send message') from e + raise AsyncMessageError(f'failed to send message: {str(e)}') from e elif instance_type == 'receiver': if payload is not None: raise AsyncMessageError('payload not allowed') - receiver = self._receiver_cache.get(endpoint, None) - if receiver is None: - raise AsyncMessageError(f'no HELLO sent for {endpoint}') + receiver = self._receiver_cache[cache_endpoint] + message_wait = int(request_arguments.get('message_wait', str(context.get('message_wait', 0)))) + try: - message = cast(ServiceBusMessage, receiver.next()) - receiver.complete_message(message) + wait_start = time() + if expression is not None: + try: + content_type = TransformerContentType.from_string(cast(str, request.get('context', {})['content_type'])) + transform = transformer.available[content_type] + get_values = transform.parser(request_arguments['expression']) + except Exception as e: + raise AsyncMessageError(str(e)) from e + + for received_message in receiver: + message = cast(ServiceBusMessage, received_message) + + logger.debug(f'{self.worker}: got message id: {message.message_id}') + + if expression is None: + logger.debug(f'{self.worker}: completing message id: {message.message_id}') + receiver.complete_message(message) + break + + had_error = True + try: + metadata, payload = self.from_message(message) + + if payload is None: + raise AsyncMessageError('no payload in message') + + try: + _, transformed_payload = transform.transform(content_type, payload) + except TransformerError as e: + logger.error(f'{self.worker}: {payload}') + raise AsyncMessageError(e.message) + + values = get_values(transformed_payload) + + logger.debug(f'{self.worker}: expression={request_arguments["expression"]}, matches={values}, payload={transformed_payload}') + + if len(values) > 0: + logger.debug(f'{self.worker}: completing message id: {message.message_id}, with expression "{request_arguments["expression"]}"') + receiver.complete_message(message) + had_error = False + break + except: + raise + finally: + if had_error: + if message is not None: + logger.debug(f'{self.worker}: abandoning message id: {message.message_id}, {message._raw_amqp_message.header.delivery_count}') + receiver.abandon_message(message) + message = None + + wait_now = time() + if message_wait > 0 and wait_now - wait_start >= message_wait: + raise StopIteration() + + sleep(0.2) + + if message is None: + raise StopIteration() + except StopIteration: - raise AsyncMessageError(f'no messages on {endpoint}') - else: - raise AsyncMessageError(f'"{instance_type}" is not a valid value for context.connection') + error_message = f'no messages on {endpoint}' + message = None + if message_wait > 0: + error_message = f'{error_message} within {message_wait} seconds' + raise AsyncMessageError(error_message) + + if expression is None: + metadata, payload = self.from_message(message) - metadata, payload = self.from_message(message) response_length = len(payload or '') return { diff --git a/grizzly_extras/transformer.py b/grizzly_extras/transformer.py index c6592d59..6d1a56d5 100644 --- a/grizzly_extras/transformer.py +++ b/grizzly_extras/transformer.py @@ -95,7 +95,16 @@ def transform(cls, content_type: TransformerContentType, raw: str) -> Tuple[Tran @classmethod def validate(cls, expression: str) -> bool: - return expression.startswith('$.') and len(expression) > 2 + valid = expression.startswith('$.') and len(expression) > 2 + if not valid: + return valid + + try: + jsonpath_parse(expression) + except: + valid = False + + return valid @classmethod def parser(cls, expression: str) -> Callable[[Any], List[str]]: @@ -108,7 +117,7 @@ def parser(cls, expression: str) -> Callable[[Any], List[str]]: def get_values(input_payload: Any) -> List[str]: values: List[str] = [] for m in jsonpath.find(input_payload): - if m.value is None: + if m is None or m.value is None: continue if isinstance(m.value, (dict, list, )): diff --git a/tests/test_grizzly/testdata/variables/test_messagequeue.py b/tests/test_grizzly/testdata/variables/test_messagequeue.py index 4513d3d6..04384449 100644 --- a/tests/test_grizzly/testdata/variables/test_messagequeue.py +++ b/tests/test_grizzly/testdata/variables/test_messagequeue.py @@ -15,6 +15,7 @@ from grizzly.testdata.variables.messagequeue import atomicmessagequeue__base_type__ from grizzly.context import GrizzlyContext from grizzly_extras.async_message import AsyncMessageResponse +from grizzly_extras.transformer import TransformerContentType try: import pymqi @@ -459,7 +460,7 @@ def mock_response(response: Optional[AsyncMessageResponse], repeat: int = 1) -> assert v._endpoint_messages['test'][0] == jsondumps({'test': {'result': 'hello world'}}) send_json_spy = mocker.spy(zmq.sugar.socket.Socket, 'send_json') - v._settings['test']['content_type'] = 'xml' + v._settings['test']['content_type'] = TransformerContentType.XML v['test'] send_json_spy.assert_called_once() arg_in = send_json_spy.call_args_list[0][0][1] diff --git a/tests/test_grizzly/testdata/variables/test_servicebus.py b/tests/test_grizzly/testdata/variables/test_servicebus.py index 90f0efd6..67846a26 100644 --- a/tests/test_grizzly/testdata/variables/test_servicebus.py +++ b/tests/test_grizzly/testdata/variables/test_servicebus.py @@ -8,6 +8,7 @@ from grizzly.testdata.variables.servicebus import AtomicServiceBus, atomicservicebus_url, atomicservicebus_endpoint, atomicservicebus__base_type__ from grizzly.context import GrizzlyContext from grizzly_extras.async_message import AsyncMessageResponse +from grizzly_extras.transformer import TransformerContentType from ...fixtures import noop_zmq # pylint: disable=unused-import @@ -104,6 +105,11 @@ def test_atomicservicebus_endpoint() -> None: atomicservicebus_endpoint(endpoint) assert 'AtomicServiceBus: endpoint needs to include subscription when receiving messages from a topic' in str(ve) + endpoint = 'topic:document-in, queue:document-in' + with pytest.raises(ValueError) as ve: + atomicservicebus_endpoint(endpoint) + assert 'AtomicServiceBus: cannot specify both topic: and queue: in endpoint' in str(ve) + endpoint = 'topic:document-in, asdf:subscription' with pytest.raises(ValueError) as ve: atomicservicebus_endpoint(endpoint) @@ -117,7 +123,7 @@ def test_atomicservicebus_endpoint() -> None: endpoint = 'queue:document-in, subscription:application-x' with pytest.raises(ValueError) as ve: atomicservicebus_endpoint(endpoint) - assert 'AtomicServiceBus: additional arguments in endpoint is only supported for topic' in str(ve) + assert 'AtomicServiceBus: argument subscription is only allowed if endpoint is a topic' in str(ve) endpoint = 'queue:"{{ queue_name }}"' with pytest.raises(ValueError) as ve: @@ -129,6 +135,16 @@ def test_atomicservicebus_endpoint() -> None: atomicservicebus_endpoint(endpoint) assert 'AtomicServiceBus: configuration variable "sb.endpoint.queue" is not set' in str(ve) + endpoint = 'topic:documents-in, subscription:"$conf::sb.endpoint.subscription"' + with pytest.raises(ValueError) as ve: + atomicservicebus_endpoint(endpoint) + assert 'AtomicServiceBus: configuration variable "sb.endpoint.subscription" is not set' in str(ve) + + endpoint = 'topic:documents-in, subscription:application-x, expression:"{{ expression }}"' + with pytest.raises(ValueError) as ve: + atomicservicebus_endpoint(endpoint) + assert 'AtomicServiceBus: value contained variable "expression" which has not been set' in str(ve) + try: grizzly = GrizzlyContext() @@ -142,9 +158,13 @@ def test_atomicservicebus_endpoint() -> None: grizzly.state.configuration['sb.endpoint.subscription'] = 'test-subscription' grizzly.state.configuration['sb.endpoint.topic'] = 'test-topic' - endpoint = 'topic:"$conf::sb.endpoint.topic",subscription:"$conf::sb.endpoint.subscription"' + endpoint = 'topic:"$conf::sb.endpoint.topic",subscription:"$conf::sb.endpoint.subscription"' assert atomicservicebus_endpoint(endpoint) == 'topic:test-topic, subscription:test-subscription' + + endpoint = 'topic:"$conf::sb.endpoint.topic",subscription:"$conf::sb.endpoint.subscription",expression:"{{ queue_name }}"' + assert atomicservicebus_endpoint(endpoint) == 'topic:test-topic, subscription:test-subscription, expression:test-queue' + finally: try: GrizzlyContext.destroy() @@ -184,13 +204,14 @@ def test___init__(self, mocker: MockerFixture) -> None: 'url': 'Endpoint=sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key', 'context': None, 'worker': None, + 'content_type': None, } assert v._endpoint_clients.get('test1', None) is not None assert isinstance(v._zmq_context, zmq.Context) t = AtomicServiceBus( 'test2', - 'topic:documents-in, subscription:application-x | url="sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key", wait=15', + 'topic:documents-in, subscription:application-x | url="sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key", wait=15, content_type=json', ) assert v is t @@ -208,8 +229,20 @@ def test___init__(self, mocker: MockerFixture) -> None: 'url': 'sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key', 'context': None, 'worker': None, + 'content_type': TransformerContentType.JSON, } assert v._endpoint_clients.get('test2', None) is not None + + with pytest.raises(ValueError) as ve: + t = AtomicServiceBus( + 'test3', + ( + 'topic:documents-in, subscription:application-x, expression:"$.document[?(@.id==10)]" | ' + 'url="sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key", wait=15' + ), + ) + assert 'AtomicServiceBus.test3: argument "content_type" is mandatory when "expression" is used in endpoint' in str(ve) + finally: try: AtomicServiceBus.destroy() @@ -236,6 +269,7 @@ def test_create_context(self) -> None: 'url': 'sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key', 'endpoint_name': 'topic:documents-in, subscription:application-x', 'wait': 120, + 'content_type': TransformerContentType.JSON, } context = AtomicServiceBus.create_context(settings) @@ -244,6 +278,7 @@ def test_create_context(self) -> None: 'endpoint': 'topic:documents-in, subscription:application-x', 'connection': 'receiver', 'message_wait': 120, + 'content_type': 'json', } finally: try: @@ -258,7 +293,7 @@ def test_create_client(self, mocker: MockerFixture, noop_zmq: Callable[[str], No try: say_hello_spy = mocker.patch( 'grizzly.testdata.variables.servicebus.AtomicServiceBus.say_hello', - side_effect=[None], + side_effect=[None] * 2, ) v = AtomicServiceBus( @@ -275,6 +310,7 @@ def test_create_client(self, mocker: MockerFixture, noop_zmq: Callable[[str], No 'worker': None, 'url': 'sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key', 'endpoint_name': 'topic:documents-in, subscription:application-x', + 'content_type': None, 'context': { 'url': 'sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key', 'endpoint': 'topic:documents-in, subscription:application-x', @@ -283,10 +319,40 @@ def test_create_client(self, mocker: MockerFixture, noop_zmq: Callable[[str], No }, } assert say_hello_spy.call_count == 1 - args, _ = say_hello_spy.call_args_list[0] + args, _ = say_hello_spy.call_args_list[-1] assert isinstance(args[0], zmq.Socket) assert args[1] == 'test' assert args[0] is v._endpoint_clients.get('test', None) + + v = AtomicServiceBus( + 'test-variable', + ( + 'queue:documents-in, expression:"$.document[?(@.name=="TPM Report")]" | url="Endpoint=sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key", ' + 'wait=15, content_type=json' + ), + ) + assert isinstance(v._endpoint_clients.get('test-variable', None), zmq.Socket) + print(v._settings.get('test-variable', None)) + assert v._settings.get('test-variable', None) == { + 'repeat': False, + 'wait': 15, + 'worker': None, + 'url': 'Endpoint=sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key', + 'content_type': TransformerContentType.JSON, + 'endpoint_name': 'queue:documents-in, expression:"$.document[?(@.name=="TPM Report")]"', + 'context': { + 'url': 'sb://sb.example.org/;SharedAccessKeyName=name;SharedAccessKey=key', + 'endpoint': 'queue:documents-in, expression:"$.document[?(@.name=="TPM Report")]"', + 'connection': 'receiver', + 'message_wait': 15, + 'content_type': 'json', + }, + } + assert say_hello_spy.call_count == 2 + args, _ = say_hello_spy.call_args_list[-1] + assert isinstance(args[0], zmq.Socket) + assert args[1] == 'test-variable' + assert args[0] is v._endpoint_clients.get('test-variable', None) finally: try: AtomicServiceBus.destroy() @@ -321,7 +387,9 @@ def mock_response(client: zmq.Socket, response: Optional[AsyncMessageResponse]) client = context.socket(zmq.REQ) v._settings['test2'] = { - 'context': {}, + 'context': { + 'endpoint': 'topic:test-topic', + }, 'worker': None, } @@ -344,7 +412,9 @@ def mock_response(client: zmq.Socket, response: Optional[AsyncMessageResponse]) assert args[0] == { 'worker': None, 'action': 'HELLO', - 'context': {}, + 'context': { + 'endpoint': 'topic:test-topic', + }, } assert v._settings['test2'].get('worker', '') is None diff --git a/tests/test_grizzly/users/test_messagequeue.py b/tests/test_grizzly/users/test_messagequeue.py index a273cb12..24e5893b 100644 --- a/tests/test_grizzly/users/test_messagequeue.py +++ b/tests/test_grizzly/users/test_messagequeue.py @@ -425,9 +425,15 @@ def test_get(self, mq_user: Tuple[MessageQueueUser, GrizzlyContextScenario, Envi autospec=True, ) + mocker.patch( + 'grizzly.users.messagequeue.zmq.sugar.socket.Socket.disconnect', + side_effect=[zmq.ZMQError] * 10, + ) + mocker.patch( 'grizzly.users.messagequeue.zmq.sugar.socket.Socket.recv_json', side_effect=[ + zmq.Again, { 'success': True, 'worker': '0000-1337', @@ -581,7 +587,7 @@ def test_get(self, mq_user: Tuple[MessageQueueUser, GrizzlyContextScenario, Envi user.request(request) assert send_json_spy.call_count == 1 args, _ = send_json_spy.call_args_list[0] - ctx : Dict[str, str] = args[1]['context'] + ctx: Dict[str, str] = args[1]['context'] assert ctx['endpoint'] == request.endpoint # Test with specifying queue: prefix as endpoint @@ -600,6 +606,7 @@ def test_get(self, mq_user: Tuple[MessageQueueUser, GrizzlyContextScenario, Envi ctx = args[1]['context'] assert ctx['endpoint'] == request.endpoint + # Test specifying queue: prefix with expression, and spacing request.endpoint = 'queue: IFKTEST2 , expression: /class/student[marks>85]' user.request(request) @@ -621,7 +628,33 @@ def test_get(self, mq_user: Tuple[MessageQueueUser, GrizzlyContextScenario, Envi with pytest.raises(StopUser): user.request(request) + # Test with expression argument but wrong method + request.endpoint = 'queue:IFKTEST3, expression:/class/student[marks<55]' + request.method = RequestMethod.PUT + + with pytest.raises(StopUser): + user.request(request) + + assert response_event_spy.call_count == 7 assert send_json_spy.call_count == 5 + _, kwargs = response_event_spy.call_args_list[6] + exception = kwargs.get('exception', None) + assert isinstance(exception, RuntimeError) + assert str(exception) == 'argument "expression" is not allowed when sending to an endpoint' + + # Test with empty queue name + request.endpoint = 'queue:, expression:/class/student[marks<55]' + request.method = RequestMethod.GET + + with pytest.raises(StopUser): + user.request(request) + + assert response_event_spy.call_count == 8 + assert send_json_spy.call_count == 5 + _, kwargs = response_event_spy.call_args_list[7] + exception = kwargs.get('exception', None) + assert isinstance(exception, RuntimeError) + assert str(exception) == f'invalid value for argument "queue"' send_json_spy.reset_mock() request_event_spy.reset_mock() diff --git a/tests/test_grizzly/users/test_servicebus.py b/tests/test_grizzly/users/test_servicebus.py index aa57c06b..9cf18400 100644 --- a/tests/test_grizzly/users/test_servicebus.py +++ b/tests/test_grizzly/users/test_servicebus.py @@ -17,6 +17,7 @@ from grizzly.task import RequestTask, WaitTask from grizzly.context import GrizzlyContextScenario from grizzly_extras.async_message import AsyncMessageResponse, AsyncMessageError +from grizzly_extras.transformer import TransformerContentType from ..fixtures import behave_context, request_task, locust_environment, noop_zmq # pylint: disable=unused-import @@ -109,11 +110,14 @@ def test_say_hello(self, noop_zmq: Callable[[str], None], locust_environment: En user.hellos = set(['sender=queue:test-queue']) - task = RequestTask(RequestMethod.SEND, name='test-send', endpoint='queue:{{ queue_name }}') + task = RequestTask(RequestMethod.SEND, name='test-send', endpoint='queue:"{{ queue_name }}"') + scenario = GrizzlyContextScenario() + scenario.name = 'test' + scenario.add_task(task) with caplog.at_level(logging.WARNING): user.say_hello(task, task.endpoint) - assert 'ServiceBusUser: cannot say hello for test-send when endpoint (queue:{{ queue_name }}) is a template' in caplog.text + assert 'ServiceBusUser: cannot say hello for test-send when endpoint is a template' in caplog.text assert user.hellos == set(['sender=queue:test-queue']) assert async_action_spy.call_count == 0 caplog.clear() @@ -130,8 +134,7 @@ def test_say_hello(self, noop_zmq: Callable[[str], None], locust_environment: En args, kwargs = async_action_spy.call_args_list[0] assert len(args) == 4 - assert len(kwargs) == 1 - assert kwargs.get('meta', False) + assert len(kwargs) == 0 assert args[1] is task assert args[2] == { 'worker': None, @@ -145,6 +148,7 @@ def test_say_hello(self, noop_zmq: Callable[[str], None], locust_environment: En assert args[3] == 'sender=topic:test-topic' task = RequestTask(RequestMethod.RECEIVE, name='test-recv', endpoint='topic:test-topic, subscription:test-subscription') + scenario.add_task(task) user.say_hello(task, task.endpoint) @@ -153,8 +157,7 @@ def test_say_hello(self, noop_zmq: Callable[[str], None], locust_environment: En args, kwargs = async_action_spy.call_args_list[1] assert len(args) == 4 - assert len(kwargs) == 1 - assert kwargs.get('meta', False) + assert len(kwargs) == 0 assert args[1] is task assert args[2] == { 'worker': None, @@ -165,7 +168,45 @@ def test_say_hello(self, noop_zmq: Callable[[str], None], locust_environment: En 'message_wait': None, } } - assert args[3] == 'receiver=topic:test-topic' + assert args[3] == 'receiver=topic:test-topic, subscription:test-subscription' + + # error handling + task.endpoint = 'test-topic' + with pytest.raises(RuntimeError) as re: + user.say_hello(task, task.endpoint) + assert 'incorrect format in arguments: "test-topic"' in str(re) + + task.endpoint = 'subscription:test-subscription' + with pytest.raises(RuntimeError) as re: + user.say_hello(task, task.endpoint) + assert 'endpoint needs to be prefixed with queue: or topic:' in str(re) + + task.endpoint = 'topic:test-topic, queue:test-queue' + with pytest.raises(RuntimeError) as re: + user.say_hello(task, task.endpoint) + assert 'cannot specify both topic: and queue: in endpoint' in str(re) + + task.endpoint = 'queue:test-queue, subscription:test-subscription' + with pytest.raises(RuntimeError) as re: + user.say_hello(task, task.endpoint) + assert 'argument subscription is only allowed if endpoint is a topic' in str(re) + + task.endpoint = 'topic:test-topic, subscription:test-subscription, argument:False' + with pytest.raises(RuntimeError) as re: + user.say_hello(task, task.endpoint) + assert 'arguments argument is not supported' in str(re) + + task.endpoint = 'topic:test-topic' + with pytest.raises(RuntimeError) as re: + user.say_hello(task, task.endpoint) + assert 'endpoint needs to include subscription when receiving messages from a topic' in str(re) + + task.method = RequestMethod.SEND + task.endpoint = 'topic:test-topic2, expression:$.test.result' + with pytest.raises(RuntimeError) as re: + user.say_hello(task, task.endpoint) + assert 'argument expression is only allowed when receiving messages' in str(re) + @pytest.mark.usefixtures('locust_environment', 'noop_zmq') def test_request(self, locust_environment: Environment, noop_zmq: Callable[[str], None], mocker: MockerFixture) -> None: @@ -245,6 +286,7 @@ def mock_recv_json(response: AsyncMessageResponse) -> None: _, kwargs = response_event_fire_spy.call_args_list[1] assert kwargs.get('name', None) == f'{scenario.identifier} {task.name}' assert kwargs.get('request', None) is task + metadata, payload = kwargs.get('context', (None, None,)) assert metadata is None assert payload is None @@ -259,6 +301,7 @@ def mock_recv_json(response: AsyncMessageResponse) -> None: assert kwargs.get('context', None) == user._context exception = kwargs.get('exception', None) assert 'unknown error' in str(exception) + args, _ = send_json_spy.call_args_list[0] assert args[0] == { 'worker': 'asdf-asdf-asdf', @@ -294,6 +337,7 @@ def mock_recv_json(response: AsyncMessageResponse) -> None: _, kwargs = response_event_fire_spy.call_args_list[2] assert kwargs.get('name', None) == f'{scenario.identifier} {task.name}' assert kwargs.get('request', None) is task + metadata, payload = kwargs.get('context', (None, None,)) assert metadata == {'meta': True} assert payload is 'hello' @@ -307,8 +351,8 @@ def mock_recv_json(response: AsyncMessageResponse) -> None: assert kwargs.get('response_length', None) == 133 assert kwargs.get('context', None) == user._context assert kwargs.get('exception', '') is None + args, _ = send_json_spy.call_args_list[1] - print(args[0]) assert args[0] == { 'worker': 'asdf-asdf-asdf', 'action': 'RECEIVE', @@ -320,3 +364,55 @@ def mock_recv_json(response: AsyncMessageResponse) -> None: 'message_wait': None, } } + + task.method = RequestMethod.RECEIVE + task.template = None + task.source = None + task.response.content_type = TransformerContentType.JSON + task.endpoint = f'{task.endpoint}, expression:"$.document[?(@.name=="TPM Report")]' + + mock_recv_json({ + 'worker': 'asdf-asdf-asdf', + 'success': True, + 'payload': 'hello', + 'metadata': {'meta': True}, + 'response_length': 133, + }) + + user.request(task) + assert say_hello_spy.call_count == 4 + assert send_json_spy.call_count == 3 + assert request_fire_spy.call_count == 4 + assert response_event_fire_spy.call_count == 4 + + _, kwargs = response_event_fire_spy.call_args_list[3] + assert kwargs.get('name', None) == f'{scenario.identifier} {task.name}' + assert kwargs.get('request', None) is task + + metadata, payload = kwargs.get('context', (None, None,)) + assert metadata == {'meta': True} + assert payload is 'hello' + assert kwargs.get('user', None) is user + assert kwargs.get('exception', '') is None + + _, kwargs = request_fire_spy.call_args_list[3] + assert kwargs.get('request_type', None) == 'sb:RECV' + assert kwargs.get('name', None) == f'{scenario.identifier} {task.name}' + assert kwargs.get('response_time', None) >= 0.0 + assert kwargs.get('response_length', None) == 133 + assert kwargs.get('context', None) == user._context + assert kwargs.get('exception', '') is None + + args, _ = send_json_spy.call_args_list[2] + assert args[0] == { + 'worker': 'asdf-asdf-asdf', + 'action': 'RECEIVE', + 'payload': None, + 'context': { + 'endpoint': 'queue:test-queue, expression:"$.document[?(@.name=="TPM Report")]', + 'connection': 'receiver', + 'url': 'sb://sb.example.org/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123def456ghi789=', + 'message_wait': None, + 'content_type': 'json', + } + } diff --git a/tests/test_grizzly_extras/async_message/test_sb.py b/tests/test_grizzly_extras/async_message/test_sb.py index 7d3a2ea7..643e2158 100644 --- a/tests/test_grizzly_extras/async_message/test_sb.py +++ b/tests/test_grizzly_extras/async_message/test_sb.py @@ -1,10 +1,14 @@ import logging +from typing import cast +from json import dumps as jsondumps + import pytest from pytest_mock import MockerFixture, mocker # pylint: disable=unused-import from azure.servicebus import ServiceBusMessage, TransportType, ServiceBusClient, ServiceBusSender, ServiceBusReceiver +from grizzly_extras.arguments import parse_arguments from grizzly_extras.async_message import AsyncMessageError, AsyncMessageRequest from grizzly_extras.async_message.sb import AsyncServiceBusHandler @@ -39,34 +43,59 @@ def test_from_message(self) -> None: assert isinstance(metadata, dict) assert len(metadata) > 0 - def test_get_endpoint_details(self) -> None: - with pytest.raises(AsyncMessageError) as ame: - AsyncServiceBusHandler.get_endpoint_details('receiver', 'test') - assert '"test" is not prefixed with queue: or topic:' in str(ame) + def test_get_arguments(self) -> None: + with pytest.raises(ValueError) as ve: + AsyncServiceBusHandler.get_endpoint_arguments('receiver', 'test') + assert 'incorrect format in arguments: "test"' in str(ve) - with pytest.raises(AsyncMessageError) as ame: - AsyncServiceBusHandler.get_endpoint_details('sender', 'asdf:test') - assert 'only support for endpoint types queue and topic, not asdf' in str(ame) + with pytest.raises(ValueError) as ve: + AsyncServiceBusHandler.get_endpoint_arguments('sender', 'asdf:test') + assert 'endpoint needs to be prefixed with queue: or topic:' in str(ve) - with pytest.raises(AsyncMessageError) as ame: - AsyncServiceBusHandler.get_endpoint_details('sender', 'topic:test, dummy:test') - assert 'additional arguments in endpoint is not supported for sender' in str(ame) + with pytest.raises(ValueError) as ve: + AsyncServiceBusHandler.get_endpoint_arguments('sender', 'topic:test, dummy:test') + assert 'arguments dummy is not supported' in str(ve) - with pytest.raises(AsyncMessageError) as ame: - AsyncServiceBusHandler.get_endpoint_details('receiver', 'topic:test, dummy:test') - assert 'arguments dummy is not supported' in str(ame) + with pytest.raises(ValueError) as ve: + AsyncServiceBusHandler.get_endpoint_arguments('receiver', 'topic:test, dummy:test') + assert 'arguments dummy is not supported' in str(ve) - with pytest.raises(AsyncMessageError) as ame: - AsyncServiceBusHandler.get_endpoint_details('receiver', 'topic:test') - assert 'endpoint needs to include subscription when receiving messages from a topic' in str(ame) + with pytest.raises(ValueError) as ve: + AsyncServiceBusHandler.get_endpoint_arguments('receiver', 'topic:test') + assert 'endpoint needs to include subscription when receiving messages from a topic' in str(ve) - assert AsyncServiceBusHandler.get_endpoint_details('sender', 'queue:test') == ('queue', 'test', None, ) + with pytest.raises(ValueError) as ve: + AsyncServiceBusHandler.get_endpoint_arguments('receiver', 'topic:test, queue:test') + assert 'cannot specify both topic: and queue: in endpoint' in str(ve) - assert AsyncServiceBusHandler.get_endpoint_details('sender', 'topic:test') == ('topic', 'test', None, ) + with pytest.raises(ValueError) as ve: + AsyncServiceBusHandler.get_endpoint_arguments('receiver', 'queue:test, subscription:test') + assert 'argument subscription is only allowed if endpoint is a topic' in str(ve) - assert AsyncServiceBusHandler.get_endpoint_details('receiver', 'queue:test') == ('queue', 'test', None, ) + with pytest.raises(ValueError) as ve: + AsyncServiceBusHandler.get_endpoint_arguments('sender', 'queue:test, expression:test') + assert 'argument expression is only allowed when receiving messages' in str(ve) - assert AsyncServiceBusHandler.get_endpoint_details('receiver', 'topic:test, subscription:test') == ('topic', 'test', 'test', ) + assert AsyncServiceBusHandler.get_endpoint_arguments('sender', 'queue:test') == { + 'endpoint': 'test', + 'endpoint_type': 'queue', + } + + assert AsyncServiceBusHandler.get_endpoint_arguments('sender', 'topic:test') == { + 'endpoint': 'test', + 'endpoint_type': 'topic', + } + + assert AsyncServiceBusHandler.get_endpoint_arguments('receiver', 'queue:test') == { + 'endpoint': 'test', + 'endpoint_type': 'queue', + } + + assert AsyncServiceBusHandler.get_endpoint_arguments('receiver', 'topic:test, subscription:test') == { + 'endpoint': 'test', + 'endpoint_type': 'topic', + 'subscription': 'test', + } def test_get_sender_instance(self, mocker: MockerFixture) -> None: url = 'Endpoint=sb://sb.example.org/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123def456ghi789=' @@ -80,7 +109,7 @@ def test_get_sender_instance(self, mocker: MockerFixture) -> None: handler = AsyncServiceBusHandler('asdf-asdf-asdf') - sender = handler.get_sender_instance(client, 'queue:test-queue') + sender = handler.get_sender_instance(client, handler.get_endpoint_arguments('sender', 'queue:test-queue')) assert isinstance(sender, ServiceBusSender) assert topic_spy.call_count == 0 assert queue_spy.call_count == 1 @@ -88,7 +117,7 @@ def test_get_sender_instance(self, mocker: MockerFixture) -> None: assert len(kwargs) == 1 assert kwargs.get('queue_name', None) == 'test-queue' - sender = handler.get_sender_instance(client, 'topic:test-topic') + sender = handler.get_sender_instance(client, handler.get_endpoint_arguments('sender', 'topic:test-topic')) assert isinstance(sender, ServiceBusSender) assert queue_spy.call_count == 1 assert topic_spy.call_count == 1 @@ -108,7 +137,7 @@ def test_get_receiver_instance(self, mocker: MockerFixture) -> None: handler = AsyncServiceBusHandler('asdf-asdf-asdf') - receiver = handler.get_receiver_instance(client, 'queue:test-queue') + receiver = handler.get_receiver_instance(client, handler.get_endpoint_arguments('receiver', 'queue:test-queue')) assert isinstance(receiver, ServiceBusReceiver) assert topic_spy.call_count == 0 assert queue_spy.call_count == 1 @@ -116,7 +145,7 @@ def test_get_receiver_instance(self, mocker: MockerFixture) -> None: assert len(kwargs) == 1 assert kwargs.get('queue_name', None) == 'test-queue' - handler.get_receiver_instance(client, 'queue:test-queue', 100) + handler.get_receiver_instance(client, dict({'wait': '100'}, **handler.get_endpoint_arguments('receiver', 'queue:test-queue'))) assert topic_spy.call_count == 0 assert queue_spy.call_count == 2 _, kwargs = queue_spy.call_args_list[-1] @@ -124,7 +153,7 @@ def test_get_receiver_instance(self, mocker: MockerFixture) -> None: assert kwargs.get('queue_name', None) == 'test-queue' assert kwargs.get('max_wait_time', None) == 100 - receiver = handler.get_receiver_instance(client, 'topic:test-topic, subscription:test-subscription') + receiver = handler.get_receiver_instance(client, handler.get_endpoint_arguments('receiver', 'topic:test-topic, subscription: test-subscription')) assert topic_spy.call_count == 1 assert queue_spy.call_count == 2 _, kwargs = topic_spy.call_args_list[-1] @@ -132,7 +161,7 @@ def test_get_receiver_instance(self, mocker: MockerFixture) -> None: assert kwargs.get('topic_name', None) == 'test-topic' assert kwargs.get('subscription_name', None) == 'test-subscription' - receiver = handler.get_receiver_instance(client, 'topic:test-topic, subscription:test-subscription', 100) + receiver = handler.get_receiver_instance(client, dict({'wait': '100'}, **handler.get_endpoint_arguments('receiver', 'topic:test-topic, subscription:test-subscription'))) assert topic_spy.call_count == 2 assert queue_spy.call_count == 2 _, kwargs = topic_spy.call_args_list[-1] @@ -196,7 +225,7 @@ def test_hello(self, mocker: MockerFixture) -> None: args, _ = sender_instance_spy.call_args_list[0] assert len(args) == 2 assert args[0] is handler.client - assert args[1] == 'queue:test-queue' + assert args[1] == {'endpoint_type': 'queue', 'endpoint': 'test-queue'} assert handler._sender_cache.get('queue:test-queue', None) is not None assert handler._receiver_cache == {} @@ -226,10 +255,9 @@ def test_hello(self, mocker: MockerFixture) -> None: assert receiver_instance_spy.return_value.__enter__.call_count == 1 args, _ = receiver_instance_spy.call_args_list[0] - assert len(args) == 3 + assert len(args) == 2 assert args[0] is handler.client - assert args[1] == 'topic:test-topic, subscription:test-subscription' - assert args[2] == 10 + assert args[1] == {'endpoint_type': 'topic', 'endpoint': 'test-topic', 'subscription': 'test-subscription', 'wait': '10'} assert handler._sender_cache.get('queue:test-queue', None) is not None assert handler._receiver_cache.get('topic:test-topic, subscription:test-subscription', None) is not None @@ -251,13 +279,28 @@ def test_request(self, mocker: MockerFixture) -> None: from grizzly_extras.async_message.sb import handlers handler = AsyncServiceBusHandler(worker='asdf-asdf-asdf') - sender_instance_spy = mocker.patch.object(handler, 'get_sender_instance', autospec=True) - receiver_instance_spy = mocker.patch.object(handler, 'get_receiver_instance', autospec=True) + sender_instance_mock = mocker.patch.object(handler, 'get_sender_instance') + receiver_instance_mock = mocker.patch.object(handler, 'get_receiver_instance') request: AsyncMessageRequest = { 'action': 'SEND', } + def setup_handler(handler: AsyncServiceBusHandler, request: AsyncMessageRequest) -> None: + handler._arguments.update({ + f'{request["context"]["connection"]}={request["context"]["endpoint"]}': handler.get_endpoint_arguments( + request['context']['connection'], + request['context']['endpoint'], + ) + }) + + endpoint = request['context']['endpoint'] + + if request['context']['connection'] == 'sender': + handler._sender_cache[endpoint] = sender_instance_mock.return_value + else: + handler._receiver_cache[endpoint] = receiver_instance_mock.return_value + with pytest.raises(AsyncMessageError) as ame: handlers[request['action']](handler, request) assert 'no context in request' in str(ame) @@ -283,21 +326,24 @@ def test_request(self, mocker: MockerFixture) -> None: with pytest.raises(AsyncMessageError) as ame: handlers[request['action']](handler, request) - assert 'no payload' in str(ame) + assert 'no HELLO received for queue:test-queue' in str(ame) - request['payload'] = 'grizzly <3 service bus' + setup_handler(handler, request) with pytest.raises(AsyncMessageError) as ame: handlers[request['action']](handler, request) - assert 'no HELLO sent for queue:test-queue' in str(ame) + assert 'no payload' in str(ame) + + request['payload'] = 'grizzly <3 service bus' - handler._sender_cache[request['context']['endpoint']] = sender_instance_spy + sender_instance_mock.return_value.send_messages.side_effect = [RuntimeError('unknown error')] with pytest.raises(AsyncMessageError) as ame: handlers[request['action']](handler, request) - assert 'failed to send message' in str(ame) + assert 'failed to send message: unknown error' in str(ame) - handler._sender_cache[request['context']['endpoint']] = sender_instance_spy.return_value + sender_instance_mock.reset_mock(return_value=True, side_effect=True) + setup_handler(handler, request) response = handlers[request['action']](handler, request) @@ -320,32 +366,28 @@ def test_request(self, mocker: MockerFixture) -> None: 'endpoint': 'topic:test-topic, subscription:test-subscription', }) + setup_handler(handler, request) + with pytest.raises(AsyncMessageError) as ame: handlers[request['action']](handler, request) assert 'payload not allowed' in str(ame) del request['payload'] - with pytest.raises(AsyncMessageError) as ame: - handlers[request['action']](handler, request) - assert 'no HELLO sent for topic:test-topic, subscription:test-subscription' in str(ame) - received_message = ServiceBusMessage('grizzly >3 service bus') - - receiver_instance_spy.return_value.next.side_effect = [StopIteration, received_message] - handler._receiver_cache[request['context']['endpoint']] = receiver_instance_spy.return_value + receiver_instance_mock.return_value.__iter__.side_effect = [StopIteration, iter([received_message])] with pytest.raises(AsyncMessageError) as ame: handlers[request['action']](handler, request) assert 'no messages on topic:test-topic, subscription:test-subscription' in str(ame) - assert receiver_instance_spy.return_value.next.call_count == 1 - assert receiver_instance_spy.return_value.complete_message.call_count == 0 + assert receiver_instance_mock.return_value.__iter__.call_count == 1 + assert receiver_instance_mock.return_value.complete_message.call_count == 0 response = handlers[request['action']](handler, request) - assert receiver_instance_spy.return_value.next.call_count == 2 - assert receiver_instance_spy.return_value.complete_message.call_count == 1 - args, _ = receiver_instance_spy.return_value.complete_message.call_args_list[0] + assert receiver_instance_mock.return_value.__iter__.call_count == 2 + assert receiver_instance_mock.return_value.complete_message.call_count == 1 + args, _ = receiver_instance_mock.return_value.complete_message.call_args_list[0] assert args[0] is received_message assert len(response) == 3 @@ -361,6 +403,141 @@ def test_request(self, mocker: MockerFixture) -> None: assert actual_metadata == expected_metadata assert response.get('response_length', 0) == len(expected_payload) + def test_request_expression(self, mocker: MockerFixture) -> None: + from grizzly_extras.async_message.sb import handlers + + handler = AsyncServiceBusHandler(worker='asdf-asdf-asdf') + receiver_instance_mock = mocker.patch.object(handler, 'get_receiver_instance') + + request: AsyncMessageRequest = { + 'action': 'RECEIVE', + 'context': { + 'message_wait': 10, + 'url': 'sb://sb.example.org/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123def456ghi789=', + 'endpoint': 'queue:test-queue, expression:"$.`this`[?(@.name="test")]"', + 'connection': 'receiver', + 'content_type': 'json', + }, + } + + def setup_handler(handler: AsyncServiceBusHandler, request: AsyncMessageRequest) -> None: + endpoint_arguments = parse_arguments(request['context']['endpoint'], ':') + try: + del endpoint_arguments['expression'] + except: + pass + cache_endpoint = ', '.join([f'{key}:{value}' for key, value in endpoint_arguments.items()]) + + key = f'{request["context"]["connection"]}={cache_endpoint}' + handler._arguments.update({ + key: handler.get_endpoint_arguments( + request['context']['connection'], + request['context']['endpoint'], + ) + }) + + handler._arguments[key]['content_type'] = cast(str, request['context']['content_type']) + handler._receiver_cache[cache_endpoint] = receiver_instance_mock.return_value + + setup_handler(handler, request) + message1 = ServiceBusMessage(jsondumps({ + 'document': { + 'name': 'not-test', + 'id': 10, + } + })) + message2 = ServiceBusMessage(jsondumps({ + 'document': { + 'name': 'test', + 'id': 13, + } + })) + receiver_instance_mock.return_value.__iter__.side_effect = [ + iter([message1, message2]), + ] + + response = handlers[request['action']](handler, request) + + assert receiver_instance_mock.return_value.__iter__.call_count == 1 + assert receiver_instance_mock.return_value.complete_message.call_count == 1 + assert receiver_instance_mock.return_value.abandon_message.call_count == 1 + + args, _ = receiver_instance_mock.return_value.complete_message.call_args_list[-1] + assert args[0] is message2 + + args, _ = receiver_instance_mock.return_value.abandon_message.call_args_list[-1] + assert args[0] is message1 + + assert len(response) == 3 + expected_metadata, expected_payload = handler.from_message(message2) + actual_metadata = response.get('metadata', None) + assert actual_metadata is not None + assert expected_metadata is not None + assert expected_payload is not None + actual_metadata['message_id'] = None + expected_metadata['message_id'] = None + + assert response.get('payload', None) == expected_payload + assert actual_metadata == expected_metadata + assert response.get('response_length', 0) == len(expected_payload) + + + message_error = ServiceBusMessage('') + + receiver_instance_mock.return_value.__iter__.side_effect = [ + iter([message_error]), + ] * 2 + + with pytest.raises(AsyncMessageError) as ame: + handlers[request['action']](handler, request) + assert 'failed to transform input as JSON: Expecting value: line 1 column 1 (char 0)' in str(ame) + assert receiver_instance_mock.return_value.abandon_message.call_count == 2 + + endpoint_backup = request['context']['endpoint'] + request['context']['endpoint'] = 'queue:test-queue, expression:"//document[@name="test-document"]"' + with pytest.raises(AsyncMessageError) as ame: + handlers[request['action']](handler, request) + assert 'JsonTransformer: unable to parse "//document[@name="test-document"]": JsonTransformer: not a valid expression' in str(ame) + + request['context']['endpoint'] = endpoint_backup + + from_message = handler.from_message + mocker.patch.object(handler, 'from_message', side_effect=[(None, None,)]) + receiver_instance_mock.return_value.__iter__.side_effect = [ + iter([message2]), + ] + + with pytest.raises(AsyncMessageError) as ame: + handlers[request['action']](handler, request) + assert 'no payload in message' in str(ame) + + assert receiver_instance_mock.return_value.abandon_message.call_count == 3 + + setattr(handler, 'from_message', from_message) + + message3 = ServiceBusMessage(jsondumps({ + 'document': { + 'name': 'not-test', + 'id': 14, + } + })) + + receiver_instance_mock.return_value.__iter__.side_effect = [ + iter([message1, message3]), + ] + + mocker.patch( + 'grizzly_extras.async_message.sb.time', + side_effect=[0.0, 0.1, 11.0], + ) + + with pytest.raises(AsyncMessageError) as ame: + handlers[request['action']](handler, request) + assert 'no messages on queue:test-queue, expression:"$.`this`[?(@.name="test")]"' in str(ame) + + assert receiver_instance_mock.return_value.abandon_message.call_count == 5 + + def test_get_handler(self) -> None: handler = AsyncServiceBusHandler(worker='asdf-asdf-asdf') diff --git a/tests/test_grizzly_extras/test_transformer.py b/tests/test_grizzly_extras/test_transformer.py index 165b6e30..5ed71068 100644 --- a/tests/test_grizzly_extras/test_transformer.py +++ b/tests/test_grizzly_extras/test_transformer.py @@ -1,4 +1,4 @@ -from typing import List, Tuple, Any +from typing import List, Tuple, Any, Dict from json import dumps as jsondumps, loads as jsonloads from json.decoder import JSONDecodeError @@ -150,7 +150,7 @@ def test_parser(self) -> None: with pytest.raises(ValueError): JsonTransformer.parser('$.') - example = { + example: Dict[str, Any] = { 'glossary': { 'title': 'example glossary', 'GlossDiv': { @@ -214,6 +214,17 @@ def test_parser(self) -> None: assert len(actual) == 1 assert actual == ['False'] + example = { + 'document': { + 'name': 'test', + 'id': 13, + }, + } + get_values = JsonTransformer.parser('$.`this`[?(@.name="test")]') + actual = get_values(example) + + assert len(actual) > 0 + class TestXmlTransformer: def test_transform(self) -> None: