Skip to content

Commit

Permalink
expression support for service bus functionality (#31)
Browse files Browse the repository at this point in the history
* initial commit

* Implemented support for expression argument in "frontend"

* content_type is needed from variable and user

* implementation of handling of expression in AsyncServiceBusHandler

* fixes for problems found when testing

* do not print output from async-messaged

* refinement of expression implementation for service bus
  • Loading branch information
mgor committed Dec 2, 2021
1 parent 2e2695d commit 423bc99
Show file tree
Hide file tree
Showing 18 changed files with 831 additions and 191 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/code-quality.yaml
Expand Up @@ -2,8 +2,8 @@ name: code quality

on:
pull_request:
types: [opened, synchronize]
workflow_dispatch:
branches:
- main

jobs:
code-quality:
Expand Down
3 changes: 1 addition & 2 deletions grizzly/locust.py
Expand Up @@ -412,8 +412,7 @@ def sig_term_handler() -> None:
logger.info(f'stopping {external_dependency}')
external_process.terminate()
if context.config.verbose:
stdout, _ = external_process.communicate()
logger.debug(stdout.decode())
external_process.communicate()
logger.debug(f'{external_process.returncode=}')

external_processes.clear()
3 changes: 3 additions & 0 deletions grizzly/steps/scenario/tasks.py
Expand Up @@ -219,6 +219,9 @@ def step_task_print_message(context: Context, message: str) -> None:
grizzly = cast(GrizzlyContext, context.grizzly)
grizzly.scenario.add_task(PrintTask(message=message))

if '{{' in message and '}}' in message:
grizzly.scenario.orphan_templates.append(message)


@then(u'parse "{content}" as "{content_type:ContentType}" and save value of "{expression}" in variable "{variable}"')
def step_task_transform(context: Context, content: str, content_type: TransformerContentType, expression: str, variable: str) -> None:
Expand Down
10 changes: 9 additions & 1 deletion grizzly/testdata/utils.py
Expand Up @@ -147,6 +147,11 @@ def resolve_variable(grizzly: GrizzlyContext, value: str, guess_datatype: Option
if len(value) < 1:
return value

quote_char: Optional[str] = None
if value[0] in ['"', "'"] and value[0] == value[-1]:
quote_char = value[0]
value = value[1:-1]

resolved_variable: GrizzlyDictValueType
if '{{' in value and '}}' in value:
template = Template(value)
Expand All @@ -158,7 +163,7 @@ def resolve_variable(grizzly: GrizzlyContext, value: str, guess_datatype: Option
assert template_variable in grizzly.state.variables, f'value contained variable "{template_variable}" which has not been set'

resolved_variable = template.render(**grizzly.state.variables)
elif len(value) > 4 and value[0] == '$':
elif len(value) > 4 and value[0] == '$' and value[1] != '.': # $. is jsonpath expression...
if value[0:5] == '$conf':
variable = value[7:]
assert variable in grizzly.state.configuration, f'configuration variable "{variable}" is not set'
Expand All @@ -175,5 +180,8 @@ def resolve_variable(grizzly: GrizzlyContext, value: str, guess_datatype: Option

if guess_datatype:
resolved_variable = GrizzlyDict.guess_datatype(resolved_variable)
elif quote_char is not None and isinstance(resolved_variable, str) and resolved_variable.count(' ') > 0:
resolved_variable = f'{quote_char}{resolved_variable}{quote_char}'


return resolved_variable
92 changes: 75 additions & 17 deletions grizzly/testdata/variables/servicebus.py
@@ -1,25 +1,41 @@
# pylint: disable=line-too-long
'''Listens for messages on Azure Service Bus queue or topic.
Use [transformer task](/grizzly/usage/tasks/transformer/) to extract specific parts of the message.
## Format
Initial value is the name of the queue or topic, prefix with the endpoint type. If the endpoint is a topic the additional value subscription
is mandatory. Arguments support templating for their value, but not the complete endpoint value.
Initial value for a variable must have the prefix `queue:` or `topic:` followed by the name of the targeted
type. When receiving messages from a topic, the argument `subscription:` is mandatory. The format of endpoint is:
```plain
[queue|topic]:<endpoint name>[, subscription:<subscription name>][, expression:<expression>]
```
Where `<expression>` can be a XPath or jsonpath expression, depending on the specified content type. This argument is only allowed when
receiving messages. See example below.
> **Warning**: Do not use `expression` to filter messages unless you do not care about the messages that does not match the expression. If
> you do care about them, you should setup a subscription to do the filtering in Azure.
Arguments support templating for their value, but not the complete endpoint value.
Examples:
```plain
queue:test-queue
topic:test-topic, subscription:test-subscription
queue:$conf::sb.endpoint.queue
topic:$conf::sb.endpoint.topic, subscription:$conf::sb.endpoint.subscription
queue:"$conf::sb.endpoint.queue"
topic:"$conf::sb.endpoint.topic", subscription:"$conf::sb.endpoint.subscription"
queue:"{{ queue_name }}", expression="$.document[?(@.name=='TPM report')]"
```
## Arguments
* `repeat` _bool_ (optional) - if `True`, values read for the queue will be saved in a list and re-used if there are no new messages available
* `repeat` _bool_ (optional) - if `True`, values read from the endpoint will be saved in a list and re-used if there are no new messages available
* `url` _str_ - see format of url below.
* `wait` _int_ - number of seconds to wait for a message on the queue
* `content_type` _str_ (optional) - specify the MIME type of the message received on the queue, only mandatory when `expression` is specified in endpoint
### URL format
Expand All @@ -40,7 +56,7 @@
## Example
```gherkin
And value of variable "AtomicServiceBus.document_id" is "queue:documents-in | wait=120, url=$conf::sb.endpoint, repeat=True, expression='$.document.id', content_type=json"
And value of variable "AtomicServiceBus.document_id" is "queue:documents-in | wait=120, url=$conf::sb.endpoint, repeat=True"
...
Given a user of type "RestApi" load testing "http://example.com"
...
Expand All @@ -51,6 +67,16 @@
If there are no messages within 120 seconds, and it is the first iteration of the scenario, it will fail. If there has been at least one message on the queue since
the scenario started, it will use the oldest of those values, and then add it back in the end of the list again.
### Get message with expression
When specifying an expression, the messages on the endpoint is first peeked on. If any message matches the expression, it is later consumed from the endpoint.
If no matching messages was found when peeking, it is repeated again up until the specified `wait` seconds has elapsed. To use expression, a content type must
be specified for the endpint, e.g. `application/xml`.
```gherking
And value of variable "AtomicServiceBus.document_id" is "queue:documents-in | wait=120, url=$conf::sb.endpoint, repeat=True, content_type=json, expression='$.document[?(@.name=='TPM Report')'"
```
'''

from typing import Dict, Any, List, Type, Optional, cast
Expand All @@ -61,6 +87,7 @@
from gevent import sleep as gsleep
from grizzly_extras.async_message import AsyncMessageContext, AsyncMessageRequest, AsyncMessageResponse
from grizzly_extras.arguments import split_value, parse_arguments, get_unsupported_arguments
from grizzly_extras.transformer import TransformerContentType

from ...types import AtomicVariable, bool_typed
from ...context import GrizzlyContext
Expand Down Expand Up @@ -133,45 +160,57 @@ def atomicservicebus_endpoint(endpoint: str) -> str:
raise ValueError(f'AtomicServiceBus: {endpoint} does not specify queue: or topic:')

try:
arguments = parse_arguments(endpoint, ':')
arguments = parse_arguments(endpoint, ':', unquote=False)
except ValueError as e:
raise ValueError(f'AtomicServiceBus: {str(e)}') from e

if 'topic' not in arguments and 'queue' not in arguments:
raise ValueError(f'AtomicServiceBus: only support endpoint types queue and topic, not {arguments.keys()}')
raise ValueError(f'AtomicServiceBus: endpoint needs to be prefixed with queue: or topic:')

if 'topic' in arguments and 'queue' in arguments:
raise ValueError('AtomicServiceBus: cannot specify both topic: and queue: in endpoint')

endpoint_type = 'topic' if 'topic' in arguments else 'queue'

if len(arguments) > 1:
if endpoint_type != 'topic':
raise ValueError(f'AtomicServiceBus: additional arguments in endpoint is only supported for topic')
if endpoint_type != 'topic' and 'subscription' in arguments:
raise ValueError(f'AtomicServiceBus: argument subscription is only allowed if endpoint is a topic')

unsupported_arguments = get_unsupported_arguments(['topic', 'queue', 'subscription'], arguments)
unsupported_arguments = get_unsupported_arguments(['topic', 'queue', 'subscription', 'expression'], arguments)

if len(unsupported_arguments) > 0:
raise ValueError(f'AtomicServiceBus: arguments {", ".join(unsupported_arguments)} is not supported')

if endpoint_type == 'topic' and arguments.get('subscription', None) is None:
expression = arguments.get('expression', None)
subscription = arguments.get('subscription', None)
if endpoint_type == 'topic' and subscription is None:
raise ValueError(f'AtomicServiceBus: endpoint needs to include subscription when receiving messages from a topic')

grizzly = GrizzlyContext()

try:
resolved_endpoint_name = resolve_variable(grizzly, arguments[endpoint_type])
resolved_endpoint_name = cast(str, resolve_variable(grizzly, arguments[endpoint_type], guess_datatype=False))
except Exception as e:
raise ValueError(f'AtomicServiceBus: {str(e)}') from e

endpoint = f'{endpoint_type}:{resolved_endpoint_name}'

subscription = arguments.get('subscription', None)
if subscription is not None:
try:
resolved_subscription_name = resolve_variable(grizzly, subscription)
resolved_subscription_name = cast(str, resolve_variable(grizzly, subscription, guess_datatype=False))
except Exception as e:
raise ValueError(f'AtomicServiceBus: {str(e)}') from e

endpoint = f'{endpoint}, subscription:{resolved_subscription_name}'

if expression is not None:
try:
resolved_expression = cast(str, resolve_variable(grizzly, expression, guess_datatype=False))
except Exception as e:
raise ValueError(f'AtomicServiceBus: {str(e)}') from e

endpoint = f'{endpoint}, expression:{resolved_expression}'

return endpoint


Expand All @@ -194,21 +233,26 @@ class AtomicServiceBus(AtomicVariable[str]):
'url': atomicservicebus_url,
'wait': int,
'endpoint_name': atomicservicebus_endpoint,
'content_type': TransformerContentType.from_string,
}

def __init__(self, variable: str, value: str) -> None:
safe_value = self.__class__.__base_type__(value)

settings = {'repeat': False, 'wait': None, 'url': None, 'worker': None, 'context': None, 'endpoint_name': None}
settings = {'repeat': False, 'wait': None, 'url': None, 'worker': None, 'context': None, 'endpoint_name': None, 'content_type': None}

endpoint_name, endpoint_arguments = split_value(safe_value)

arguments = parse_arguments(endpoint_arguments)
endpoint_parameters = parse_arguments(endpoint_name, ':')

for argument, caster in self.__class__.arguments.items():
if argument in arguments:
settings[argument] = caster(arguments[argument])

if 'expression' in endpoint_parameters and not 'content_type' in arguments:
raise ValueError(f'{self.__class__.__name__}.{variable}: argument "content_type" is mandatory when "expression" is used in endpoint')

settings['endpoint_name'] = self.arguments['endpoint_name'](endpoint_name)

super().__init__(variable, endpoint_name)
Expand Down Expand Up @@ -246,6 +290,10 @@ def create_context(cls, settings: Dict[str, Any]) -> AsyncMessageContext:
'message_wait': settings.get('wait', None),
}

content_type = settings.get('content_type', None)
if content_type is not None:
context.update({'content_type': content_type.name.lower()})

return context

def create_client(self, variable: str, settings: Dict[str, Any]) -> zmq.Socket:
Expand All @@ -260,7 +308,16 @@ def create_client(self, variable: str, settings: Dict[str, Any]) -> zmq.Socket:

def say_hello(self, client: zmq.Socket, variable: str) -> None:
settings = self._settings[variable]
context = settings['context']
context = cast(AsyncMessageContext, dict(settings['context']))

endpoint_arguments = parse_arguments(context['endpoint'], ':')
try:
del endpoint_arguments['expression']
except:
pass

cache_endpoint = ', '.join([f'{key}:{value}' for key, value in endpoint_arguments.items()])
context['endpoint'] = cache_endpoint

if settings.get('worker', None) is not None:
return
Expand Down Expand Up @@ -382,6 +439,7 @@ def __delitem__(self, variable: str) -> None:
try:
del self._settings[variable]
del self._endpoint_messages[variable]

try:
self._endpoint_clients[variable].disconnect(self._zmq_url)
except (zmq.ZMQError, AttributeError, ):
Expand Down
7 changes: 4 additions & 3 deletions grizzly/users/messagequeue.py
Expand Up @@ -30,7 +30,7 @@
queue:<queue_name>[, expression:<expression>]
```
Where `<expression>` can be of XPath or jsonpath type, depending on the specified content type. See example below.
Where `<expression>` can be a XPath or jsonpath expression, depending on the specified content type. See example below.
## Examples
Expand Down Expand Up @@ -58,7 +58,7 @@
When specifying an expression, the messages on the queue are first browsed. If any message matches the expression, it is
later consumed from the queue. If no matching message was found during browsing, it is repeated again after a slight delay,
up until the specified `message.wait` seconds has elapsed. To use expressions, a content type must be specified for the get
request, e.g. `"application/xml"`:
request, e.g. `application/xml`:
```gherkin
Given a user of type "MessageQueue" load testing "mq://mq.example.com/?QueueManager=QM01&Channel=SRVCONN01"
Expand Down Expand Up @@ -104,6 +104,7 @@

from gevent import sleep as gsleep
from locust.exception import StopUser
from grizzly.types import RequestDirection
from grizzly_extras.async_message import AsyncMessageContext, AsyncMessageRequest, AsyncMessageResponse, AsyncMessageError
from grizzly_extras.arguments import get_unsupported_arguments, parse_arguments

Expand Down Expand Up @@ -314,7 +315,7 @@ def action(am_request: AsyncMessageRequest, name: str) -> Generator[Dict[str, An
metadata['abort'] = True
# Parse the endpoint to validate queue name / expression parts
try:
arguments = parse_arguments(request.endpoint, ':')
arguments = parse_arguments(endpoint, ':')
except ValueError as e:
raise RuntimeError(str(e)) from e

Expand Down

0 comments on commit 423bc99

Please sign in to comment.