Skip to content

Commit

Permalink
Merge pull request #2362 from wagner-intevation/sql-output-fail
Browse files Browse the repository at this point in the history
sql output: new parameter `fail_on_error`
  • Loading branch information
sebix committed May 16, 2023
2 parents 69fc2ee + 41cc3b4 commit 64abf29
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 7 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ CHANGELOG
- Avoid extraneous search domain-based queries on NXDOMAIN result (PR#2352)

#### Outputs
- `intelmq.bots.output.cif3.output`: Added (PR#2244 by Michael Davis).
- `intelmq.bots.outputs.cif3.output`: Added (PR#2244 by Michael Davis).
- `intelmq.bots.outputs.sql.output`: New parameter `fail_on_errors` (PR#2362 by Sebastian Wagner).

### Documentation

Expand Down
1 change: 1 addition & 0 deletions docs/user/bots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4116,6 +4116,7 @@ The parameters marked with 'PostgreSQL' will be sent to libpq via psycopg2. Chec
* `table`: name of the database table into which events are to be inserted
* `fields`: list of fields to read from the event. If None, read all fields
* `reconnect_delay`: number of seconds to wait before reconnecting in case of an error
* `fail_on_errors`: If any error should cause the bot to fail (raise an exception) or otherwise rollback. If false (default), the bot eventually waits and re-try (e.g. re-connect) etc. to solve the issue. If true, the bot raises an exception and - depending on the IntelMQ error handling configuration - stops.
PostgreSQL
~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion intelmq/bots/outputs/sql/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def process(self):
query = ('INSERT INTO {table} ("{keys}") VALUES ({values})'
''.format(table=self.table, keys=keys, values=fvalues[:-2]))

if self.execute(query, values, rollback=True):
if self.execute(query, values, rollback=not self.fail_on_errors):
self.con.commit()
self.acknowledge_message()

Expand Down
6 changes: 5 additions & 1 deletion intelmq/lib/mixins/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class SQLMixin:
# overwrite the default value from the OutputBot
message_jsondict_as_string = True
reconnect_delay = 0
# If process()/execute() should raise exceptions (True) or rollback (False)
fail_on_errors = False

def __init__(self, *args, **kwargs):
self._init_sql()
Expand Down Expand Up @@ -116,7 +118,7 @@ def execute(self, query: str, values: tuple, rollback=False):
self.logger.debug('Done.')
except (self._engine.InterfaceError, self._engine.InternalError,
self._engine.OperationalError, AttributeError):
if rollback:
if rollback and not self.fail_on_errors:
try:
self.con.rollback()
self.logger.exception('Executed rollback command '
Expand All @@ -133,6 +135,8 @@ def execute(self, query: str, values: tuple, rollback=False):
if self.reconnect_delay > 0:
sleep(self.reconnect_delay)
self._init_sql()
elif self.fail_on_errors:
raise
else:
self.logger.exception('Database connection problem, connecting again.')
if self.reconnect_delay > 0:
Expand Down
10 changes: 6 additions & 4 deletions intelmq/lib/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False,
prepare=True, parameters={},
allowed_error_count=0,
allowed_warning_count=0,
stop_bot: bool = True):
stop_bot: bool = True,
expected_internal_queue_size: int = 0,
):
"""
Call this method for actually doing a test run for the specified bot.
Expand Down Expand Up @@ -339,10 +341,10 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False,
'iterations of `run_bot`.')

internal_queue_size = len(self.get_input_internal_queue())
self.assertEqual(internal_queue_size, 0,
'The internal input queue is not empty, but has '
self.assertEqual(internal_queue_size, expected_internal_queue_size,
f'The internal input queue does not have expected size {expected_internal_queue_size}, but has '
f'{internal_queue_size} element(s). '
'The bot did not acknowledge all messages.')
'The bot did not acknowledge the expected number of messages.')

""" Test if report has required fields. """
if self.bot_type == 'collector':
Expand Down
38 changes: 38 additions & 0 deletions intelmq/tests/bots/outputs/sql/test_output_postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import json
import os
import unittest
from unittest import mock

import intelmq.lib.test as test
from intelmq.bots.outputs.sql.output import SQLOutputBot

if os.environ.get('INTELMQ_TEST_DATABASES'):
import psycopg2
import psycopg2.extras
from psycopg2 import InterfaceError


INPUT1 = {"__type": "Event",
Expand Down Expand Up @@ -146,6 +148,42 @@ def test_prepare_null(self):
output = self.bot.prepare_values(values)
self.assertEqual(output, ['{"special": "foo\\\\u0000bar"}'])

def test_execution_error(self):
"""
Raise an exception at statement execution
"""

def execute_raising(*args, **kwargs):
raise InterfaceError

with mock.patch('psycopg2.connect') as mock_connect:
mock_con = mock_connect.return_value # result of psycopg2.connect(**connection_stuff)
mock_cur = mock_con.cursor.return_value # result of con.cursor(cursor_factory=DictCursor)
mock_cur.execute = execute_raising

self.run_bot(allowed_error_count=1, expected_internal_queue_size=1)
self.assertLogMatches('psycopg2.InterfaceError', levelname='ERROR')
self.assertLogMatches('Executed rollback command after failed query execution.', levelname='ERROR')
self.assertNotRegexpMatchesLog('Bot has found a problem.')

def test_execution_error_fail(self):
"""
Raise an exception at statement execution with fail_on_errors = True
"""

def execute_raising(*args, **kwargs):
raise InterfaceError

with mock.patch('psycopg2.connect') as mock_connect:
mock_con = mock_connect.return_value # result of psycopg2.connect(**connection_stuff)
mock_cur = mock_con.cursor.return_value # result of con.cursor(cursor_factory=DictCursor)
mock_cur.execute = execute_raising

self.run_bot(allowed_error_count=1, parameters={'fail_on_errors': True})
self.assertLogMatches('psycopg2.InterfaceError', levelname='ERROR')
self.assertNotRegexpMatchesLog('Executed rollback command after failed query execution.')
self.assertLogMatches('Bot has found a problem.', levelname='ERROR')

@classmethod
def tearDownClass(cls):
if not os.environ.get('INTELMQ_TEST_DATABASES'):
Expand Down

0 comments on commit 64abf29

Please sign in to comment.