Skip to content

Commit

Permalink
Add new bot: cut string from string (#1965)
Browse files Browse the repository at this point in the history
* Add new bot: cut string from string

* Add documentation

* Change int to bool

* change field name, remove init function

* Small fix

* Update documentation

* Upda te bot and documentation

* Add license

* Fix space

* Fix for python 3.6

* Rename bot

* Rename bot
  • Loading branch information
mariuskarotkis committed Sep 24, 2021
1 parent 1b75604 commit 6d7ab08
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 0 deletions.
23 changes: 23 additions & 0 deletions docs/user/bots.rst
Expand Up @@ -1920,6 +1920,29 @@ Public documentation: https://www.team-cymru.com/IP-ASN-mapping.html#dns
* ``: Overwrite existing fields. Default: `True` if not given (for backwards compatibility, will change in version 3.0.0)
.. _intelmq.bots.experts.remove_affix.expert:
RemoveAffix
^^^^^^^^^^^
**Information**
* `name:` `intelmq.bots.experts.remove_affix.expert`
* `lookup:` none
* `public:` yes
* `cache (redis db):` none
* `description:` Cut string from string
**Configuration Parameters**
* `remove_prefix`: True - cut from start, False - cut from end
* `affix`: example 'www.'
* `field`: example field 'source.fqdn'
**Description**
Remove part of string from string, example: `www.` from domains.
.. _intelmq.bots.experts.domain_suffix.expert:
Domain Suffix
Expand Down
Empty file.
41 changes: 41 additions & 0 deletions intelmq/bots/experts/remove_affix/expert.py
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""
Remove Affix
SPDX-FileCopyrightText: 2021 Marius Karotkis <marius.karotkis@gmail.com>
SPDX-License-Identifier: AGPL-3.0-or-later
"""
from intelmq.lib.bot import Bot


class RemoveAffixExpertBot(Bot):
remove_prefix: bool = True # True - from start, False - from end
affix: str = 'www.'
field: str = 'source.fqdn'

def process(self):
event = self.receive_message()

if self.field in event:
if self.remove_prefix:
event.change(self.field, self.removeprefix(event[self.field], self.affix))
else:
event.change(self.field, self.removesuffix(event[self.field], self.affix))

self.send_message(event)
self.acknowledge_message()

def removeprefix(self, field: str, prefix: str) -> str:
if field.startswith(prefix):
return field[len(prefix):]
else:
return field[:]

def removesuffix(self, field: str, suffix: str) -> str:
if suffix and field.endswith(suffix):
return field[:-len(suffix)]
else:
return field[:]


BOT = RemoveAffixExpertBot
Empty file.
93 changes: 93 additions & 0 deletions intelmq/tests/bots/experts/remove_affix/test_expert.py
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
"""
Remove affix - String cut from string
SPDX-FileCopyrightText: 2021 Marius Karotkis <marius.karotkis@gmail.com>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

import unittest
import intelmq.lib.test as test
from intelmq.bots.experts.remove_affix.expert import RemoveAffixExpertBot

EXAMPLE_INPUT = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'www.google.lt',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'google.lt',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT1 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'www.google',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_INPUT_2 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT_2 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}


class TestRemoveAffixExpertBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for TestRemoveAffixExpertBot.
"""

@classmethod
def set_bot(cls):
cls.bot_reference = RemoveAffixExpertBot

def test_event_cut_start(self):
self.input_message = EXAMPLE_INPUT
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT)

def test_event_cut_without_field(self):
self.input_message = EXAMPLE_INPUT_2
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT_2)

def test_event_cut_end(self):
self.input_message = EXAMPLE_INPUT
self.run_bot(parameters={"remove_prefix": False, "affix": ".lt"})
self.assertMessageEqual(0, EXAMPLE_OUTPUT1)


if __name__ == '__main__': # pragma: no cover
unittest.main()

0 comments on commit 6d7ab08

Please sign in to comment.