Skip to content
Permalink
Browse files

ENH+TST: mail url collector: save file name

  • Loading branch information...
wagner-certat committed Sep 10, 2019
1 parent f4a3328 commit cadc1930e32bb83ecc373d6044081fd57d9ee52d
@@ -49,6 +49,7 @@ CHANGELOG
- `intelmq.bots.collectors.mail.*`: Save email information/metadata in the extra fields of the report. See the bots documentation for a complete list of provided data.
- `intelmq.bots.collectors.mail.collector_mail_attach`: Check for existence/validity of the `attach_regex` parameter.
- Use the lib's `unzip` function for uncompressing attachments and use the .
- `intelmq.bots.collectors.mail.collector_mail_url`: Save the file name of the downloaded file as `extra.file_name`.

#### Parsers
- `intelmq.bot.parsers.html_table.parser`:
@@ -73,7 +74,7 @@ CHANGELOG
- Use UTC timezone.
- Tests for `utils.unzip`.
- Add a new asset: Zip archive with two files, same as with tar.gz archive.
- Added first tests for the Mail Attachment collector.
- Added tests for the Mail Attachment & Mail URL collectors.

### Tools
- intelmqctl:
@@ -278,6 +278,7 @@ The resulting reports contains the following special fields:
* `extra.email_subject`: The subject of the email
* `extra.email_from`: The email's from address
* `extra.email_message_id`: The email's message ID
* `extra.file_name`: The file name of the downloaded file (extracted from the HTTP Response Headers if possible)

* * *

@@ -27,7 +27,7 @@ def process_message(self, uid, message):
report = self.new_report()
report["raw"] = body
report["extra.email_subject"] = message.subject
report["extra.email_from"] = message.sent_from
report["extra.email_from"] = ','.join(x['email'] for x in message.sent_from)
report["extra.email_message_id"] = message.message_id

self.send_message(report)
@@ -6,7 +6,7 @@
import re

from intelmq.lib.splitreports import generate_reports
from intelmq.lib.utils import create_request_session_from_bot
from intelmq.lib.utils import create_request_session_from_bot, file_name_from_response

from .lib import MailCollectorBot

@@ -67,8 +67,9 @@ def process_message(self, uid, message):
template = self.new_report()
template["feed.url"] = url
template["extra.email_subject"] = message.subject
template["extra.email_from"] = message.sent_from
template["extra.email_from"] = ','.join(x['email'] for x in message.sent_from)
template["extra.email_message_id"] = message.message_id
template["extra.file_name"] = file_name_from_response(resp)

for report in generate_reports(template, io.BytesIO(resp.content),
self.chunk_size,
@@ -0,0 +1 @@
bar text
@@ -0,0 +1,18 @@
To: cert@example.com
From: Sebastian Wagner <wagner@cert.at>
Subject: foobar txt
Message-ID: <07ce0153-060b-f48d-73d9-d92a20b3b3aa@cert.at>
Date: Tue, 3 Sep 2019 16:57:40 +0200
MIME-Version: 1.0
Content-Type: multipart/mixed;
boundary="------------1D845FBEEAAC1F68B4B45905"
Content-Language: en-US

This is a multi-part message in MIME format.
--------------1D845FBEEAAC1F68B4B45905
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: 8bit
Please look at http://localhost/foobar.txt
--------------1D845FBEEAAC1F68B4B45905--
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Sep 10 17:10:54 2019
@author: sebastian
"""
import os


from imbox.parser import parse_email
with open(os.path.join(os.path.dirname(__file__), 'foobarzip.eml')) as handle:
EMAIL_ZIP_FOOBAR = parse_email(handle.read())
with open(os.path.join(os.path.dirname(__file__), 'foobartxt.eml')) as handle:
EMAIL_TXT_FOOBAR = parse_email(handle.read())


class MockedImbox():
def __init__(self, hostname, username=None, password=None, ssl=True,
port=None, ssl_context=None, policy=None, starttls=False):
pass

def messages(self, *args, **kwargs):
raise NotImplementedError

def mark_seen(self, uid):
pass

def logout(self):
pass


class MockedZipImbox(MockedImbox):
def messages(self, *args, **kwargs):
yield 0, EMAIL_ZIP_FOOBAR


class MockedTxtImbox(MockedImbox):
def messages(self, *args, **kwargs):
yield 0, EMAIL_TXT_FOOBAR
@@ -8,30 +8,9 @@

import intelmq.lib.test as test

if os.getenv('INTELMQ_TEST_EXOTIC'):
from imbox.parser import parse_email
with open(os.path.join(os.path.dirname(__file__), 'foobarzip.eml')) as handle:
EMAIL_FOOBAR = parse_email(handle.read())

class MockedImbox():
_connected = False

def __init__(self, hostname, username=None, password=None, ssl=True,
port=None, ssl_context=None, policy=None, starttls=False):
pass

def messages(self, *args, **kwargs):
yield 0, EMAIL_FOOBAR

def mark_seen(self, uid):
pass

def logout(self):
pass


from intelmq.bots.collectors.mail.collector_mail_attach import MailAttachCollectorBot

if os.getenv('INTELMQ_TEST_EXOTIC'):
from .lib import MockedZipImbox

REPORT_FOOBARZIP = {
'__type': 'Report',
@@ -63,7 +42,7 @@ def set_bot(cls):
}

def test_one(self):
with mock.patch('imbox.Imbox', new=MockedImbox):
with mock.patch('imbox.Imbox', new=MockedZipImbox):
self.run_bot()
self.assertMessageEqual(0, REPORT_FOOBARZIP)

@@ -1,13 +1,28 @@
# -*- coding: utf-8 -*-
"""
Testing Mail URL collector
TODO: Use (and generalize) the methods used in the Mail Attach Bot Test
"""
import unittest.mock as mock
import unittest
import os

import intelmq.lib.test as test
from intelmq.bots.collectors.mail.collector_mail_url import MailURLCollectorBot
if os.getenv('INTELMQ_TEST_EXOTIC'):
from .lib import MockedTxtImbox


REPORT_FOOBARTXT = {
'__type': 'Report',
'extra.email_from': 'wagner@cert.at',
'extra.email_message_id': '<07ce0153-060b-f48d-73d9-d92a20b3b3aa@cert.at>',
'extra.email_subject': 'foobar txt',
'feed.accuracy': 100.0,
'feed.name': 'IMAP Feed',
'extra.file_name': 'foobar.txt',
'feed.url': 'http://localhost/foobar.txt',
'raw': 'YmFyIHRleHQK',
}


@test.skip_exotic()
@@ -18,7 +33,17 @@ class TestMailURLCollectorBot(test.BotTestCase, unittest.TestCase):
@classmethod
def set_bot(cls):
cls.bot_reference = MailURLCollectorBot
cls.sysconfig = {'http_url': 'http://localhost/two_files.tar.gz',
'extract_files': True,
'name': 'Example feed',
cls.sysconfig = {'mail_host': None,
'mail_user': None,
'mail_password': None,
'mail_ssl': None,
'folder': None,
'subject_regex': None,
'url_regex': 'http://localhost/.*\.txt',
'name': 'IMAP Feed',
}

def test_localhost(self):
with mock.patch('imbox.Imbox', new=MockedTxtImbox):
self.run_bot()
self.assertMessageEqual(0, REPORT_FOOBARTXT)

0 comments on commit cadc193

Please sign in to comment.
You can’t perform that action at this time.