Skip to content

Commit

Permalink
Added consumption of requests and test to verify
Browse files Browse the repository at this point in the history
  • Loading branch information
ytreister committed Dec 12, 2021
1 parent eea553d commit cc5c252
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 15 deletions.
23 changes: 14 additions & 9 deletions stoq/core.py
Expand Up @@ -105,7 +105,7 @@
---------------
Using stoQ with providers allows for the scanning of multiple payloads from
multiple sources. This method will instantiate a `Queue` which payloads
multiple sources. This method will instantiate a `Queue` which payloads or requests
are published to for scanning by `stoQ`. Additionally, payloads may be
retrieved from multiple disparate data sources using `Archiver` plugins.
Expand Down Expand Up @@ -545,20 +545,20 @@ async def run(
f'Starting provider queue: RequestMeta: {request_meta}, '
f'start_dispatches: {add_start_dispatch}'
)
payload_queue: asyncio.Queue = asyncio.Queue(maxsize=self.max_queue)
provider_queue: asyncio.Queue = asyncio.Queue(maxsize=self.max_queue)
providers = [
asyncio.ensure_future(plugin.ingest(payload_queue))
asyncio.ensure_future(plugin.ingest(provider_queue))
for name, plugin in self._loaded_provider_plugins.items()
]
workers = [
asyncio.ensure_future(
self._consume(payload_queue, request_meta, add_start_dispatch)
self._consume(provider_queue, request_meta, add_start_dispatch)
)
for n in range(self.provider_consumers)
]
try:
await asyncio.gather(*providers)
await payload_queue.join()
await provider_queue.join()
except KeyboardInterrupt:
pass
except Exception as e:
Expand Down Expand Up @@ -857,19 +857,24 @@ def _resolve_plugin_dependencies(

async def _consume(
self,
payload_queue: asyncio.Queue,
provider_queue: asyncio.Queue,
request_meta: Optional[RequestMeta] = None,
add_start_dispatch: Optional[List[str]] = None,
) -> None:
while True:
try:
task = await payload_queue.get()
# Determine whether the provider has returned a `Payload`, or a task.
task = await provider_queue.get()
# Determine whether the provider has returned a `Payload`, `Request` or a task.
# If it is a task, load the defined archiver plugin to load the
# `Payload`, otherwise, simply continue on with the scanning.
if isinstance(task, Payload):
request = Request([task], request_meta)
await self.scan_request(request, add_start_dispatch)
elif isinstance(task, Request):
# Only set request_meta if the task does not have request_meta already set
if task.request_meta == RequestMeta():
task.request_meta = request_meta
await self.scan_request(task, add_start_dispatch)
else:
for source_archiver, task_meta in task.items():
self.log.debug(
Expand All @@ -888,7 +893,7 @@ async def _consume(
self.log.warn(
f'"{task_meta}" failed with archiver "{source_archiver}": {str(e)}'
)
payload_queue.task_done()
provider_queue.task_done()
except asyncio.QueueEmpty:
pass

Expand Down
7 changes: 5 additions & 2 deletions stoq/data_classes.py
Expand Up @@ -42,7 +42,7 @@ def __init__(
>>> payload = Payload(b'test bytes')
>>> err = Error(
... error='This is our error message',
... plugin_name='test_plugin',
... plugin_name='test_plugin',
... payload_id=payload.results.payload_id
... )
>>> errors.append(err)
Expand Down Expand Up @@ -166,6 +166,9 @@ def __str__(self) -> str:
def __repr__(self):
return repr(self.__dict__)

def __eq__(self, other):
return self.__dict__ == other.__dict__


class PayloadResults:
def __init__(
Expand Down Expand Up @@ -220,7 +223,7 @@ def __init__(
):
"""
Object that contains the state of a ``Stoq`` scan. This object is accessible within
Object that contains the state of a ``Stoq`` scan. This object is accessible within
all archiver, dispatcher, and worker plugins.
:param payloads: All payloads that are being processed, to include extracted payloads
Expand Down
12 changes: 8 additions & 4 deletions stoq/plugins/provider.py
Expand Up @@ -59,16 +59,20 @@
Writing a plugin
================
`Provider plugins` add either ``Payload`` objects to the `stoQ` queue, or a ``str``.
If a ``Payload`` object is added, `stoQ` will begin processing the payload. However,
if a ``str`` is added, `stoQ` will pass it to ``Archiver`` plugins that were
loaded when ``Stoq`` was instantiated with the ``source_archivers`` argument.
`Provider plugins` add ``Payload`` or ``Request`` objects to the `stoQ` queue, or a ``str``.
If a ``Payload`` object is added, `stoQ` will begin processing the payload. If a ``Request`` object
is added, `stoQ` will begin processing the request (which should contain at least one payload).
If a ``str`` is added, `stoQ` will pass it to ``Archiver`` plugins that were loaded when ``Stoq``
was instantiated with the ``source_archivers`` argument.
A `provider` plugin must be a subclass of the ``ProviderPlugin`` class.
As with any plugin, a :ref:`configuration file <pluginconfig>` must also exist
and be properly configured.
If a ``Request`` object is added to the queue and has `request_meta` set, then the
`request_meta` passed to the ``Stoq`` `run()` method is ignored for this request.
Example
-------
Expand Down
@@ -0,0 +1,29 @@
#!/usr/bin/env python3

# Copyright 2014-2018 PUNCH Cyber Analytics Group
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from asyncio import Queue

from stoq import Payload, Request, RequestMeta
from stoq.plugins import ProviderPlugin


class RequestProvider(ProviderPlugin):
SPECIFY_REQUEST_META = True
async def ingest(self, queue: Queue) -> None:
request_meta = None
if self.SPECIFY_REQUEST_META:
request_meta = RequestMeta(extra_data={'dummy': '1'})
await queue.put(Request([Payload(b'Dummy payload')], request_meta))
@@ -0,0 +1,23 @@
# Copyright 2014-2015 PUNCH Cyber Analytics Group
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

[Core]
Name = request_provider
Module = request_provider

[Documentation]
Author = Yair Treister
Version = 0.1
Website = https://github.com/PUNCH-Cyber/stoq-plugins-public
Description = Simple stoQ Provider plugin
25 changes: 25 additions & 0 deletions stoq/tests/test_core.py
Expand Up @@ -667,6 +667,31 @@ async def test_provider_with_start_dispatch(self):
dummy_worker.scan.assert_awaited_once()
dummy_connector.save.assert_awaited_once()

async def test_provider_with_request(self):
logging.disable(logging.NOTSET)
s = Stoq(
base_dir=utils.get_data_dir(),
providers=['request_provider'],
log_level='DEBUG'
)
# Run a provider that provides a Request containing request_meta
run_request_meta = RequestMeta(extra_data={'dummy': '2'})
with self.assertLogs(level='DEBUG') as cm:
await s.run(request_meta=run_request_meta)
self.assertIn('DEBUG:stoq:Request received: RequestMeta:', cm.output[1])
self.assertIn('"extra_data": {"dummy": "1"}', cm.output[1])
self.assertNotIn('"extra_data": {"dummy": "2"}', cm.output[1])

# Now run a provider that provides a Request without request_meta
provider_plugin = s.load_plugin('request_provider')
provider_plugin.SPECIFY_REQUEST_META = False
with self.assertLogs(level='DEBUG') as cm:
await s.run(request_meta=run_request_meta)
self.assertIn('DEBUG:stoq:Request received: RequestMeta:', cm.output[1])
self.assertIn('"extra_data": {"dummy": "2"}', cm.output[1])
self.assertNotIn('"extra_data": {"dummy": "1"}', cm.output[1])
logging.disable(logging.CRITICAL)

def test_stoqresponse_to_str(self):
response = StoqResponse(Request(), [])
response_str = str(response)
Expand Down

0 comments on commit cc5c252

Please sign in to comment.