Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Updated version of enforcemx with fake SPF capability #4

Merged
merged 2 commits into from

2 participants

neonknight Oli
neonknight
Collaborator

No description provided.

Oli gryphius merged commit 44e04fc into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 12, 2013
  1. updated enforcemx plugin: use netaddr instead of netcidr because neta…

    neonknight authored
    …ddr also supports IPv6.
Commits on Mar 13, 2013
This page is out of date. Refresh to see the latest.
3  conf/conf.d/enforcemx.conf.dist
View
@@ -1,2 +1,3 @@
[EnforceMX]
-datafile=/etc/postomaat/conf.d/enforcemx.txt
+datafile_mx=/etc/postomaat/conf.d/enforcemx.txt
+datavile_spf=/etc/postomaat/conf.d/fakespf.txt
3  conf/conf.d/fakespf.txt.dist
View
@@ -0,0 +1,3 @@
+# set "unknown" to accept from servers without PTR record
+# domain names always accept subdomain entries, e.g. if you set example.com, mail from mx.example.com will be accepted
+example.com 192.168.0.6, mail.example.com
150 src/postomaat/plugins/enforcemx.py
View
@@ -1,30 +1,38 @@
# -*- coding: UTF-8 -*-
"""
-This plugin allows you to configure from which IPs you
+This plugin allows you to configure from which hosts you
are willing to accept mail for a given domain.
-This can be useful if you provide shared hosting (many domains on one mail
+Check by recipient domain (MX Rules):
+This can be useful if you provide shared hosting (= many domains on one mail
server) and some of the domains use a cloud based spam filter (= MX records
not pointing directly to your hosting server). You can reject mail coming
from unexpected hosts trying to bypass the spam filter.
+
+Check by sender domain (SPF Rules):
+Some domains/freemailers do not have an SPF record, although their
+domains are frequently forged and abused as spam sender.
+This plugin allows you to build your own fake SPF database.
"""
+__version__ = "0.0.3"
import logging
import os
+import re
from threading import Lock
try:
- from netcidr import CIDR
- have_netcidr = True
+ from netaddr import IPAddress, IPNetwork
+ have_netaddr = True
except:
- have_netcidr = False
-
+ have_netaddr = False
from postomaat.shared import ScannerPlugin, DUNNO, DEFER_IF_PERMIT, REJECT, strip_address, extract_domain
+
class FuFileCache(object):
__shared_state = {}
@@ -80,14 +88,17 @@ def _loadData(self, filename):
self.lastreload=ctime
self._reallyloadData(filename)
+
-class MXCache(FuFileCache):
+class RulesCache(FuFileCache):
def _initlocal(self, **kw):
- self.mxnets = {}
+ self.addresses = {}
+ self.names = {}
- def _reallyloadData(self, filename):
+ def _reallyloadData(self, filename):
+ regex_ip = '^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(/\d{1,2})?|[a-f0-9:]{3,39})$'
handle=open(filename)
for line in handle.readlines():
line.strip()
@@ -100,29 +111,55 @@ def _reallyloadData(self, filename):
domain = data[0]
nets = data[1]
- if not domain in self.mxnets:
- self.mxnets[domain] = []
-
for item in nets.split(','):
- item = item.strip()
- item = CIDR(item)
- if not item in self.mxnets[domain]:
- self.mxnets[domain].append(item)
+ item = item.strip().lower()
+ if re.match(regex_ip, item):
+ if not domain in self.addresses:
+ self.addresses[domain] = []
+ item = IPNetwork(item)
+ if not item in self.addresses[domain]:
+ self.addresses[domain].append(item)
+ else:
+ if not domain in self.names:
+ self.names[domain] = []
+ if not item in self.names[domain]:
+ self.names[domain].append(item)
- def permitted(self, domain, ip):
- self.reloadifnecessary(self.filename)
+
+ def _permitted_ip(self, domain, ip):
+ if domain not in self.addresses:
+ return True
- #domain is not listed, we accept mail from everywhere
- if not domain in self.mxnets:
+ perm = False
+ for net in self.addresses[domain]:
+ if IPAddress(ip) in net:
+ perm = True
+ break
+ return perm
+
+ def _permitted_name(self, domain, hostname):
+ if domain not in self.names:
return True
perm = False
- for net in self.mxnets[domain]:
- if ip in net.iterIPs():
+ for name in self.names[domain]:
+ if name.endswith(hostname):
perm = True
break
return perm
+
+ def permitted(self, domain, ip, hostname):
+ self.reloadifnecessary(self.filename)
+
+ #domain is not listed, we accept mail from everywhere
+ if not domain in self.addresses and not domain in self.names:
+ return True
+
+ ip_perm = self._permitted_ip(domain, ip)
+ name_perm = self._permitted_name(domain, hostname)
+
+ return ip_perm and name_perm
@@ -131,12 +168,13 @@ class EnforceMX(ScannerPlugin):
def __init__(self,config,section=None):
ScannerPlugin.__init__(self,config,section)
self.logger=self._logger()
- self.mxcache = None
+ self.mxrules = None
+ self.spfrules = None
def examine(self,suspect):
- if not have_netcidr:
+ if not have_netaddr:
return DUNNO,None
client_address=suspect.get_value('client_address')
@@ -144,26 +182,65 @@ def examine(self,suspect):
self.logger.error('No client address found - skipping')
return DUNNO
+ client_name=suspect.get_value('client_name')
+ if client_name is None:
+ client_name = 'unknown'
+
+ action, message = self._examine_mx(suspect, client_address, client_name)
+ if action == DUNNO:
+ action, message = self._examine_spf(suspect, client_address, client_name)
+
+ return action, message
+
+
+
+ def _examine_mx(self, suspect, client_address, client_name):
to_address=suspect.get_value('recipient')
if to_address==None:
self.logger.warning('No RCPT address found')
- return DEFER_IF_PERMIT,'internal policy error(no from address)'
+ return DEFER_IF_PERMIT,'internal policy error (no rcpt address)'
to_address=strip_address(to_address)
to_domain=extract_domain(to_address)
- if not self.mxcache:
- datafile = self.config.get('EnforceMX','datafile')
+ if not self.mxrules:
+ datafile = self.config.get('EnforceMX','datafile_mx')
if os.path.exists(datafile):
- self.mxcache = MXCache(datafile)
+ self.mxrules = RulesCache(datafile)
else:
return DUNNO,None
-
+
action = DUNNO
message = None
- if not self.mxcache.permitted(to_domain, client_address):
+ if not self.mxrules.permitted(to_domain, client_address, client_name):
action = REJECT
message = 'We do not accept mail for %s from %s. Please send to MX records!' % (to_domain, client_address)
+
+ return action, message
+
+
+
+ def _examine_spf(self, suspect, client_address, client_name):
+ from_address=suspect.get_value('sender')
+ if from_address==None:
+ self.logger.warning('No FROM address found')
+ return DEFER_IF_PERMIT,'internal policy error (no from address)'
+
+ from_address=strip_address(from_address)
+ from_domain=extract_domain(from_address)
+
+ if not self.spfrules:
+ datafile = self.config.get('EnforceMX', 'datafile_spf')
+ if os.path.exists(datafile):
+ self.spfrules = RulesCache(datafile)
+ else:
+ return DUNNO,None
+
+ action = DUNNO
+ message = None
+ if not self.spfrules.permitted(from_domain, client_address, client_name):
+ action = REJECT
+ message = 'We do not accept mail for %s from %s with name %s. Please use the official mail servers!' % (from_domain, client_address, client_name)
return action, message
@@ -172,17 +249,22 @@ def examine(self,suspect):
def lint(self):
lint_ok = True
- if not have_netcidr:
- print 'netcidr python module not available - please install'
+ if not have_netaddr:
+ print 'netaddr python module not available - please install'
lint_ok = False
if not self.checkConfig():
print 'Error checking config'
lint_ok = False
- datafile = self.config.get('EnforceMX','datafile')
+ datafile = self.config.get('EnforceMX', 'datafile_mx')
+ if not os.path.exists(datafile):
+ print 'MX datafile not found - this plugin will not enforce MX usage'
+ lint_ok = False
+
+ datafile = self.config.get('EnforceMX', 'datafile_spf')
if not os.path.exists(datafile):
- print 'datafile not found - this plugin will not do anything'
+ print 'SPF datafile not found - this plugin will not check fake SPF'
lint_ok = False
return lint_ok
Something went wrong with that request. Please try again.