diff --git a/README.md b/README.md index 85e8ff1..206cd4c 100644 --- a/README.md +++ b/README.md @@ -143,10 +143,12 @@ The following values are returned in `results.csv`: #### Etc. -* `Syntax Errors` - A list of syntax errors that were detected when - scanning DMARC or SPF records, or checking for STARTTLS support. -* `Errors` - A list of any other errors encountered, such as DNS - failures. +* `Syntax Errors` - A list of syntax errors that were encountered when + analyzing SPF records. +* `Debug` - A list of any other warnings or errors encountered, such + as DNS failures. These can be helpful when determining how + `trustymail` reached its conclusions, and are indispensible for bug + reports. ## Public domain diff --git a/trustymail/cli.py b/trustymail/cli.py index 4256352..45a33e1 100755 --- a/trustymail/cli.py +++ b/trustymail/cli.py @@ -22,7 +22,7 @@ --spf Only check spf records --dmarc Only check dmarc records --json Output is in json format (default csv) - --debug Output should include error messages. + --debug Output should include more verbose logging. --dns-hostnames=HOSTNAMES A comma-delimited list of DNS servers to query against. For example, if you want to use Google's DNS then you would use the @@ -35,13 +35,16 @@ Notes: If no scan type options are specified, all are run against a given domain/input. """ -from trustymail import __version__ - +# Built-in imports +import errno import logging -import docopt import os -import errno +# Dependency imports +import docopt + +# Local imports +from trustymail import __version__ from trustymail import trustymail # The default ports to be checked to see if an SMTP server is listening. @@ -51,8 +54,10 @@ def main(): args = docopt.docopt(__doc__, version=__version__) + log_level = logging.WARN if args['--debug']: - logging.basicConfig(format='%(message)s', level=logging.DEBUG) + log_level = logging.DEBUG + logging.basicConfig(format='%(asctime)-15s %(message)s', level=log_level) # Allow for user to input a csv for many domain names. if args['INPUT'][0].endswith('.csv'): @@ -91,10 +96,10 @@ def main(): # User might not want every scan performed. scan_types = { - "mx": args["--mx"], - "starttls": args["--starttls"], - "spf": args["--spf"], - "dmarc": args["--dmarc"] + 'mx': args['--mx'], + 'starttls': args['--starttls'], + 'spf': args['--spf'], + 'dmarc': args['--dmarc'] } domain_scans = [] diff --git a/trustymail/domain.py b/trustymail/domain.py index 0e73049..393fd08 100755 --- a/trustymail/domain.py +++ b/trustymail/domain.py @@ -17,7 +17,7 @@ def __init__(self, domain_name, timeout, smtp_timeout, smtp_localhost, smtp_port if self.base_domain_name != self.domain_name: if self.base_domain_name not in Domain.base_domains: # Populate DMARC for parent. - domain = trustymail.scan(self.base_domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache, {"mx": False, "starttls": False, "spf": False, "dmarc": True}, dns_hostnames) + domain = trustymail.scan(self.base_domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache, {'mx': False, 'starttls': False, 'spf': False, 'dmarc': True}, dns_hostnames) Domain.base_domains[self.base_domain_name] = domain self.base_domain = Domain.base_domains[self.base_domain_name] else: @@ -47,8 +47,8 @@ def __init__(self, domain_name, timeout, smtp_timeout, smtp_localhost, smtp_port # 3. Whether or not the server supports STARTTLS self.starttls_results = {} - # A list of any errors that occurred while scanning records. - self.errors = [] + # A list of any debugging information collected while scanning records. + self.debug_info = [] # A list of the ports tested for SMTP self.ports_tested = set() @@ -61,7 +61,7 @@ def has_supports_smtp(self): Returns True if any of the mail servers associated with this domain are listening and support SMTP. """ - return len(filter(lambda x: self.starttls_results[x]["supports_smtp"], + return len(filter(lambda x: self.starttls_results[x]['supports_smtp'], self.starttls_results.keys())) > 0 def has_starttls(self): @@ -69,7 +69,7 @@ def has_starttls(self): Returns True if any of the mail servers associated with this domain are listening and support STARTTLS. """ - return len(filter(lambda x: self.starttls_results[x]["starttls"], + return len(filter(lambda x: self.starttls_results[x]['starttls'], self.starttls_results.keys())) > 0 def has_spf(self): @@ -101,49 +101,49 @@ def parent_dmarc_results(self): def get_dmarc_policy(self): # If the policy was never set, or isn't in the list of valid policies, check the parents. - if self.dmarc_policy is None or self.dmarc_policy.lower() not in ["quarantine", "reject", "none"]: + if self.dmarc_policy is None or self.dmarc_policy.lower() not in ['quarantine', 'reject', 'none']: if self.base_domain is None: - return "" + return '' else: return self.base_domain.get_dmarc_policy() return self.dmarc_policy def generate_results(self): - mail_servers_that_support_smtp = [x for x in self.starttls_results.keys() if self.starttls_results[x]["supports_smtp"]] - mail_servers_that_support_starttls = [x for x in self.starttls_results.keys() if self.starttls_results[x]["starttls"]] + mail_servers_that_support_smtp = [x for x in self.starttls_results.keys() if self.starttls_results[x]['supports_smtp']] + mail_servers_that_support_starttls = [x for x in self.starttls_results.keys() if self.starttls_results[x]['starttls']] domain_supports_smtp = bool(mail_servers_that_support_starttls) results = { - "Domain": self.domain_name, - "Base Domain": self.base_domain_name, - "Live": self.is_live, - - "MX Record": self.has_mail(), - "Mail Servers": self.format_list(self.mail_servers), - "Mail Server Ports Tested": self.format_list([str(port) for port in self.ports_tested]), - "Domain Supports SMTP Results": self.format_list(mail_servers_that_support_smtp), + 'Domain': self.domain_name, + 'Base Domain': self.base_domain_name, + 'Live': self.is_live, + + 'MX Record': self.has_mail(), + 'Mail Servers': self.format_list(self.mail_servers), + 'Mail Server Ports Tested': self.format_list([str(port) for port in self.ports_tested]), + 'Domain Supports SMTP Results': self.format_list(mail_servers_that_support_smtp), # True if and only if at least one mail server speaks SMTP - "Domain Supports SMTP": domain_supports_smtp, - "Domain Supports STARTTLS Results": self.format_list(mail_servers_that_support_starttls), + 'Domain Supports SMTP': domain_supports_smtp, + 'Domain Supports STARTTLS Results': self.format_list(mail_servers_that_support_starttls), # True if and only if all mail servers that speak SMTP # also support STARTTLS - "Domain Supports STARTTLS": domain_supports_smtp and all([self.starttls_results[x]["starttls"] for x in mail_servers_that_support_smtp]), + 'Domain Supports STARTTLS': domain_supports_smtp and all([self.starttls_results[x]['starttls'] for x in mail_servers_that_support_smtp]), - "SPF Record": self.has_spf(), - "Valid SPF": self.valid_spf, - "SPF Results": self.format_list(self.spf), + 'SPF Record': self.has_spf(), + 'Valid SPF': self.valid_spf, + 'SPF Results': self.format_list(self.spf), - "DMARC Record": self.has_dmarc(), - "Valid DMARC": self.has_dmarc() and self.valid_dmarc, - "DMARC Results": self.format_list(self.dmarc), + 'DMARC Record': self.has_dmarc(), + 'Valid DMARC': self.has_dmarc() and self.valid_dmarc, + 'DMARC Results': self.format_list(self.dmarc), - "DMARC Record on Base Domain": self.parent_has_dmarc(), - "Valid DMARC Record on Base Domain": self.parent_has_dmarc() and self.parent_valid_dmarc(), - "DMARC Results on Base Domain": self.parent_dmarc_results(), - "DMARC Policy": self.get_dmarc_policy(), + 'DMARC Record on Base Domain': self.parent_has_dmarc(), + 'Valid DMARC Record on Base Domain': self.parent_has_dmarc() and self.parent_valid_dmarc(), + 'DMARC Results on Base Domain': self.parent_dmarc_results(), + 'DMARC Policy': self.get_dmarc_policy(), - "Syntax Errors": self.format_list(self.syntax_errors), - "Errors": self.format_list(self.errors) + 'Syntax Errors': self.format_list(self.syntax_errors), + 'Debug Info': self.format_list(self.debug_info) } return results @@ -157,4 +157,4 @@ def format_list(self, record_list): if not record_list: return None - return ", ".join(record_list) + return ', '.join(record_list) diff --git a/trustymail/trustymail.py b/trustymail/trustymail.py index de67b2e..5243c66 100755 --- a/trustymail/trustymail.py +++ b/trustymail/trustymail.py @@ -1,5 +1,6 @@ import csv import datetime +import inspect import json import logging import re @@ -15,15 +16,15 @@ from trustymail.domain import Domain CSV_HEADERS = [ - "Domain", "Base Domain", "Live", - "MX Record", "Mail Servers", "Mail Server Ports Tested", - "Domain Supports SMTP", "Domain Supports SMTP Results", - "Domain Supports STARTTLS", "Domain Supports STARTTLS Results", - "SPF Record", "Valid SPF", "SPF Results", - "DMARC Record", "Valid DMARC", "DMARC Results", - "DMARC Record on Base Domain", "Valid DMARC Record on Base Domain", - "DMARC Results on Base Domain", "DMARC Policy", - "Syntax Errors", "Errors" + 'Domain', 'Base Domain', 'Live', + 'MX Record', 'Mail Servers', 'Mail Server Ports Tested', + 'Domain Supports SMTP', 'Domain Supports SMTP Results', + 'Domain Supports STARTTLS', 'Domain Supports STARTTLS Results', + 'SPF Record', 'Valid SPF', 'SPF Results', + 'DMARC Record', 'Valid DMARC', 'DMARC Results', + 'DMARC Record on Base Domain', 'Valid DMARC Record on Base Domain', + 'DMARC Results on Base Domain', 'DMARC Policy', + 'Syntax Errors', 'Debug Info' ] # A cache for SMTP scanning results @@ -48,7 +49,7 @@ def domain_list_from_csv(csv_file): for i in range(0, len(domain_list[0])): header = domain_list[0][i] - if "domain" in header.lower(): + if 'domain' in header.lower(): domain_column = i # CSV starts with headers, remove first row. domain_list.pop(0) @@ -68,12 +69,11 @@ def mx_scan(resolver, domain): for record in resolver.query(domain.domain_name, 'MX', tcp=True): domain.add_mx_record(record) except (dns.resolver.NoNameservers, dns.resolver.NoAnswer, dns.exception.Timeout, dns.resolver.NXDOMAIN) as error: - handle_error("[MX]", domain, error) + handle_error('[MX]', domain, error) def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): - """ - Scan a domain to see if it supports SMTP and supports STARTTLS. + """Scan a domain to see if it supports SMTP and supports STARTTLS. Scan a domain to see if it supports SMTP. If the domain does support SMTP, a further check will be done to see if it supports STARTTLS. @@ -100,24 +100,24 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): for mail_server in domain.mail_servers: for port in smtp_ports: domain.ports_tested.add(port) - server_and_port = mail_server + ":" + str(port) + server_and_port = mail_server + ':' + str(port) if not smtp_cache or (server_and_port not in _SMTP_CACHE): domain.starttls_results[server_and_port] = {} smtp_connection = smtplib.SMTP(timeout=smtp_timeout, local_hostname=smtp_localhost) - logging.debug("Testing " + server_and_port + " for STARTTLS support") + logging.debug('Testing ' + server_and_port + ' for STARTTLS support') # Try to connect. This will tell us if something is # listening. try: smtp_connection.connect(mail_server, port) - domain.starttls_results[server_and_port]["is_listening"] = True + domain.starttls_results[server_and_port]['is_listening'] = True except (socket.timeout, smtplib.SMTPConnectError, smtplib.SMTPServerDisconnected, ConnectionRefusedError, OSError) as error: - handle_error("[STARTTLS]", domain, error) - domain.starttls_results[server_and_port]["is_listening"] = False - domain.starttls_results[server_and_port]["supports_smtp"] = False - domain.starttls_results[server_and_port]["starttls"] = False + handle_error('[STARTTLS]', domain, error) + domain.starttls_results[server_and_port]['is_listening'] = False + domain.starttls_results[server_and_port]['supports_smtp'] = False + domain.starttls_results[server_and_port]['starttls'] = False if smtp_cache: _SMTP_CACHE[server_and_port] = domain.starttls_results[server_and_port] @@ -128,18 +128,18 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): # thing that is listening is an SMTP server. try: smtp_connection.ehlo_or_helo_if_needed() - domain.starttls_results[server_and_port]["supports_smtp"] = True - logging.debug("\t Supports SMTP") + domain.starttls_results[server_and_port]['supports_smtp'] = True + logging.debug('\t Supports SMTP') except (smtplib.SMTPHeloError, smtplib.SMTPServerDisconnected) as error: - handle_error("[STARTTLS]", domain, error) - domain.starttls_results[server_and_port]["supports_smtp"] = False - domain.starttls_results[server_and_port]["starttls"] = False + handle_error('[STARTTLS]', domain, error) + domain.starttls_results[server_and_port]['supports_smtp'] = False + domain.starttls_results[server_and_port]['starttls'] = False # smtplib freaks out if you call quit on a non-open # connection try: smtp_connection.quit() except smtplib.SMTPServerDisconnected as error2: - handle_error("[STARTTLS]", domain, error2) + handle_error('[STARTTLS]', domain, error2) if smtp_cache: _SMTP_CACHE[server_and_port] = domain.starttls_results[server_and_port] @@ -147,9 +147,9 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): continue # Now check if the server supports STARTTLS. - has_starttls = smtp_connection.has_extn("STARTTLS") - domain.starttls_results[server_and_port]["starttls"] = has_starttls - logging.debug("\t Supports STARTTLS: " + str(has_starttls)) + has_starttls = smtp_connection.has_extn('STARTTLS') + domain.starttls_results[server_and_port]['starttls'] = has_starttls + logging.debug('\t Supports STARTTLS: ' + str(has_starttls)) # Close the connection # smtplib freaks out if you call quit on a non-open @@ -157,20 +157,19 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): try: smtp_connection.quit() except smtplib.SMTPServerDisconnected as error: - handle_error("[STARTTLS]", domain, error) + handle_error('[STARTTLS]', domain, error) # Copy the results into the cache, if necessary if smtp_cache: _SMTP_CACHE[server_and_port] = domain.starttls_results[server_and_port] else: - logging.debug("\tUsing cached results for " + server_and_port) + logging.debug('\tUsing cached results for ' + server_and_port) # Copy the cached results into the domain object domain.starttls_results[server_and_port] = _SMTP_CACHE[server_and_port] def check_spf_record(record_text, expected_result, domain): - """ - Test to see if an SPF record is valid and correct. + """Test to see if an SPF record is valid and correct. The record is tested by checking the response when we query if it allows us to send mail from an IP that is known not to be a mail @@ -196,32 +195,25 @@ def check_spf_record(record_text, expected_result, domain): # I'm actually temporarily using an IP that virginia.edu resolves to # until we resolve why Google DNS does not return the same PTR records # as the CAL DNS does for 64.69.57.18. - query = spf.query("128.143.22.36", "email_wizard@" + domain.domain_name, domain.domain_name, strict=2) + query = spf.query('128.143.22.36', 'email_wizard@' + domain.domain_name, domain.domain_name, strict=2) response = query.check() - if response[0] == 'temperror': - logging.debug(response[2]) - elif response[0] == 'permerror': - logging.debug('\t' + response[2]) - domain.syntax_errors.append(response[2]) - elif response[0] == 'ambiguous': - logging.debug('\t' + response[2]) - domain.syntax_errors.append(response[2]) - elif response[0] == expected_result: - # Everything checks out the SPF syntax seems valid. + response_type = response[0] + if response_type == 'temperror' or response_type == 'permerror' or response_type == 'ambiguous': + handle_error('[SPF]', domain, 'SPF query returned {}: {}'.format(response_type, response[2])) + elif response_type == expected_result: + # Everything checks out. The SPF syntax seems valid domain.valid_spf = True else: domain.valid_spf = False - logging.debug('\tResult Differs: Expected [{0}] - Actual [{1}]'.format(expected_result, response[0])) - domain.errors.append('Result Differs: Expected [{0}] - Actual [{1}]'.format(expected_result, response[0])) + msg = 'Result unexpectedly differs: Expected [{}] - actual [{}]'.format(expected_result, response_type) + handle_error('[SPF]', domain, msg) except spf.AmbiguityWarning as error: - logging.debug('\t' + error.msg) - domain.syntax_errors.append(error.msg) + handle_syntax_error('[SPF]', domain, error) def get_spf_record_text(resolver, domain_name, domain, follow_redirect=False): - """ - Get the SPF record text for the given domain name. + """Get the SPF record text for the given domain name. DNS queries are performed using the dns.resolver.Resolver object. Errors are logged to the trustymail.Domain object. The Boolean @@ -271,8 +263,7 @@ def get_spf_record_text(resolver, domain_name, domain, follow_redirect=False): def spf_scan(resolver, domain): - """ - Scan a domain to see if it supports SPF. If the domain has an SPF + """Scan a domain to see if it supports SPF. If the domain has an SPF record, verify that it properly rejects mail sent from an IP known to be disallowed. @@ -321,7 +312,7 @@ def dmarc_scan(resolver, domain): # Ensure the record is a DMARC record. Some domains that # redirect will cause an SPF record to show. - if record_text.startswith("v=DMARC1"): + if record_text.startswith('v=DMARC1'): domain.dmarc.append(record_text) # Remove excess whitespace @@ -330,28 +321,28 @@ def dmarc_scan(resolver, domain): # DMARC records follow a specific outline as to how they are defined - tag:value # We can split this up into a easily manipulatable tag_dict = {} - for options in record_text.split(";"): + for options in record_text.split(';'): if '=' not in options: continue - tag = options.split("=")[0].strip() - value = options.split("=")[1].strip() + tag = options.split('=')[0].strip() + value = options.split('=')[1].strip() tag_dict[tag] = value for tag in tag_dict: - if tag not in ["v", "mailto", "rf", "p", "sp", "adkim", "aspf", "fo", "pct", "ri", "rua", "ruf"]: - logging.debug("\tWarning: Unknown DMARC mechanism {0}".format(tag)) + if tag not in ['v', 'mailto', 'rf', 'p', 'sp', 'adkim', 'aspf', 'fo', 'pct', 'ri', 'rua', 'ruf']: + handle_error('[DMARC]', domain, 'Warning: Unknown DMARC mechanism {0}'.format(tag)) domain.valid_dmarc = False - elif tag == "p": + elif tag == 'p': domain.dmarc_policy = tag_dict[tag] except (dns.resolver.NoNameservers, dns.resolver.NoAnswer, dns.exception.Timeout, dns.resolver.NXDOMAIN) as error: - handle_error("[DMARC]", domain, error) + handle_error('[DMARC]', domain, error) def find_host_from_ip(resolver, ip_addr): # Use TCP, since we care about the content and correctness of the records # more than whether their records fit in a single UDP packet. - hostname, _ = resolver.query(dns.reversename.from_address(ip_addr), "PTR", tcp=True) + hostname, _ = resolver.query(dns.reversename.from_address(ip_addr), 'PTR', tcp=True) return str(hostname) @@ -391,22 +382,22 @@ def scan(domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_ca # scan in its init domain = Domain(domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache, dns_hostnames) - logging.debug("[{0}]".format(domain_name.lower())) + logging.debug('[{0}]'.format(domain_name.lower())) - if scan_types["mx"] and domain.is_live: + if scan_types['mx'] and domain.is_live: mx_scan(resolver, domain) - if scan_types["starttls"] and domain.is_live: + if scan_types['starttls'] and domain.is_live: starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache) - if scan_types["spf"] and domain.is_live: + if scan_types['spf'] and domain.is_live: spf_scan(resolver, domain) - if scan_types["dmarc"] and domain.is_live: + if scan_types['dmarc'] and domain.is_live: dmarc_scan(resolver, domain) # If the user didn't specify any scans then run a full scan. - if domain.is_live and not (scan_types["mx"] or scan_types["starttls"] or scan_types["spf"] or scan_types["dmarc"]): + if domain.is_live and not (scan_types['mx'] or scan_types['starttls'] or scan_types['spf'] or scan_types['dmarc']): mx_scan(resolver, domain) starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache) spf_scan(resolver, domain) @@ -415,15 +406,65 @@ def scan(domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_ca return domain -def handle_error(prefix, domain, error): - if hasattr(error, "message"): - if "NXDOMAIN" in error.message and prefix != "[DMARC]": +def handle_error(prefix, domain, error, syntax_error=False): + """Handle an error by logging via the Python logging library and + recording it in the debug_info or syntax_error members of the + trustymail.Domain object. + + Since the "Debug Info" and "Syntax Error" fields in the CSV output + of trustymail come directly from the debug_info and syntax_error + members of the trustymail.Domain object, and that CSV is likely + all we will have to reconstruct how trustymail reached the + conclusions it did, it is vital to record as much helpful + information as possible. + + Parameters + ---------- + prefix : str + The prefix to use when constructing the log string. This is + usually the type of trustymail test that was being performed + when the error condition occurred. + + domain : trustymail.Domain + The Domain object in which the error or syntax error should be + recorded. + + error : str, BaseException, or Exception + Either a string describing the error, or an exception object + representing the error. + + syntax_error : bool + If True then the error will be recorded in the syntax_error + member of the trustymail.Domain object. Otherwise it is + recorded in the error member of the trustymail.Domain object. + """ + # Get the previous frame in the stack - the one that is calling + # this function + frame = inspect.currentframe().f_back + function = frame.f_code + function_name = function.co_name + filename = function.co_filename + line = frame.f_lineno + + error_template = '{prefix} In {function_name} at {filename}:{line}: {error}' + + if hasattr(error, 'message'): + if syntax_error and 'NXDOMAIN' in error.message and prefix != '[DMARC]': domain.is_live = False - domain.errors.append(error.message) - logging.debug(" {0} {1}".format(prefix, error.message)) + error_string = error_template.format(prefix=prefix, function_name=function_name, line=line, filename=filename, error=error.message) else: - domain.errors.append(str(error)) - logging.debug(" {0} {1}".format(prefix, str(error))) + error_string = error_template.format(prefix=prefix, function_name=function_name, line=line, filename=filename, error=str(error)) + + if syntax_error: + domain.syntax_errors.append(error_string) + else: + domain.debug_info.append(error_string) + logging.debug(error_string) + + +def handle_syntax_error(prefix, domain, error): + """Convenience method for handle_error""" + handle_error(prefix, domain, error, syntax_error=True) def generate_csv(domains, file_name):