diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..0f60d3e --- /dev/null +++ b/.flake8 @@ -0,0 +1,25 @@ +[flake8] +max-line-length = 80 +# Select (turn on) +# * Complexity violations reported by mccabe (C) - +# http://flake8.pycqa.org/en/latest/user/error-codes.html#error-violation-codes +# * Documentation conventions compliance reported by pydocstyle (D) - +# http://www.pydocstyle.org/en/stable/error_codes.html +# * Default errors and warnings reported by pycodestyle (E and W) - +# https://pycodestyle.readthedocs.io/en/latest/intro.html#error-codes +# * Default errors reported by pyflakes (F) - +# http://flake8.pycqa.org/en/latest/glossary.html#term-pyflakes +# * Default warnings reported by flake8-bugbear (B) - +# https://github.com/PyCQA/flake8-bugbear#list-of-warnings +# * The B950 flake8-bugbear opinionated warning - +# https://github.com/PyCQA/flake8-bugbear#opinionated-warnings +select = C,D,E,F,W,B,B950 +# Ignore flake8's default warning about maximum line length, which has +# a hard stop at the configured value. Instead we use +# flake8-bugbear's B950, which allows up to 10% overage. +# +# Also ignore flake8's warning about line breaks before binary +# operators. It no longer agrees with PEP8. See, for example, here: +# https://github.com/ambv/black/issues/21. Guido agrees here: +# https://github.com/python/peps/commit/c59c4376ad233a62ca4b3a6060c81368bd21e85b. +ignore = E501,W503 diff --git a/.travis.yml b/.travis.yml index c8f61fd..08510f1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,7 @@ language: python +dist: xenial + services: - docker @@ -9,22 +11,17 @@ env: - DOCKER_USER=jsf9k - secure: "IAbtmubLq2hL71aollQEfoV+t9Zbqn4rNVqi5YNerqxvXr6WiDzwmYUujOCnQiHli8xkIU0J8OSPX0aA4cOTxPGiZeNguGuVWmR2ZCB8SMyjbKJOEIpLZv/jG1Be6dVXiJwKwQM3yX4pqPfLIuYkE6S1GAodudPzcJ8xM/h1qzJijknJEqsCJQw43zSHZ/epYulgWcmnCAmaoehZTCjVcW4l8iyVHSNfgO7gu5iWC9y4AIIX96E9TZkSDeud1yqUcORMwfdOl2pHhDh3KnF8HZSOCetTHP7JacLUMJiiCpKN34Xn9RQgjbVKOrF/rcC8WEDrWKn14SFfgUP0dl6lCr6P9HE4aHKvrH/nCyflFOANbEKMywE8DzNA3zd7MC0HQkyb40LAiBSbTD58myUsc+WlmaZyvpJ7akukbmVjVPQWNvs1laz/bqyLdPh4WMqgOTbP7BrgDeIrCVedwdvhqq9KEaoak8RBs/Wb8LisI6j+vAY/HKlVnuIXRL0RNGTko96kBluEpAsWDjls39Hmu5hl1glbqCWDW9+dE0/Zx4MuSt4OVAywMW+lzxA16SdqDLCZqGv00vnPpxuBAvEEQbVtsmQ6lR+fwTYKFEDZM7axRwBlLzIDEzSoz4K0fSq3EuU4mkoVZKcHGnuHuRqaNhce0zzZx3lHdNTm7oa7b0U=" -# Matrix approach here due to: https://github.com/travis-ci/travis-ci/issues/9815 -matrix: - include: - - python: 3.6 - dist: xenial - sudo: true - - python: 3.7 - dist: xenial - sudo: true +python: + - 3.6 + - 3.7 + - 3.8 before_install: - sudo apt-get install -y shellcheck install: - pip install flake8 pytest-cov pytest coveralls - - pip install -e . + - pip install --editable . script: - shellcheck bump_version.sh tag.sh travis_scripts/*.sh @@ -43,7 +40,7 @@ deploy: distributions: sdist bdist_wheel on: tags: true - python: '3.7' + python: '3.8' # - provider: script # script: bash travis_scripts/deploy_to_docker_hub.sh # on: diff --git a/Dockerfile b/Dockerfile index 3d9c761..bba529c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,11 +2,21 @@ FROM python:3 WORKDIR /app -COPY requirements.txt requirements.txt +RUN pip install --no-cache-dir --upgrade pip setuptools wheel -RUN pip install --no-cache-dir -r requirements.txt +COPY requirements.txt . -COPY . . +RUN pip install --no-cache-dir --requirement requirements.txt + +COPY scripts/ scripts/ + +COPY trustymail/ trustymail/ + +COPY README.md . + +COPY requirements-dev.txt . + +COPY setup.py . RUN pip install --editable . diff --git a/bump_version.sh b/bump_version.sh index bfed350..573be27 100755 --- a/bump_version.sh +++ b/bump_version.sh @@ -10,7 +10,7 @@ VERSION_FILE=trustymail/__init__.py HELP_INFORMATION="bump_version.sh (show|major|minor|patch|prerelease|build|finalize)" -old_version=$(sed -n "s/^__version__ = '\(.*\)'$/\1/p" $VERSION_FILE) +old_version=$(sed -n "s/^__version__ = \"\(.*\)\"$/\1/p" $VERSION_FILE) if [ $# -ne 1 ] then @@ -20,7 +20,11 @@ else major|minor|patch|prerelease|build) new_version=$(python -c "import semver; print(semver.bump_$1('$old_version'))") echo Changing version from "$old_version" to "$new_version" - sed -i "s/$old_version/$new_version/" $VERSION_FILE + # A temp file is used to provide compatability with macOS development + # as a result of macOS using the BSD version of sed + tmp_file=/tmp/version.$$ + sed "s/$old_version/$new_version/" $VERSION_FILE > $tmp_file + mv $tmp_file $VERSION_FILE git add $VERSION_FILE git commit -m"Bump version from $old_version to $new_version" git push @@ -28,9 +32,13 @@ else finalize) new_version=$(python -c "import semver; print(semver.finalize_version('$old_version'))") echo Changing version from "$old_version" to "$new_version" - sed -i "s/$old_version/$new_version/" $VERSION_FILE + # A temp file is used to provide compatability with macOS development + # as a result of macOS using the BSD version of sed + tmp_file=/tmp/version.$$ + sed "s/$old_version/$new_version/" $VERSION_FILE > $tmp_file + mv $tmp_file $VERSION_FILE git add $VERSION_FILE - git commit -m"Finalize version from $old_version to $new_version" + git commit -m"Bump version from $old_version to $new_version" git push ;; show) diff --git a/run b/run index 756ef66..50e2887 100755 --- a/run +++ b/run @@ -7,5 +7,7 @@ docker build -t trustymail/cli . docker run --rm -it \ --name trustymail \ - -v $(pwd):/app \ - trustymail/cli $@ + --volume "$(pwd)":/workspace \ + --workdir="/workspace" \ + --user "$(id -u)" \ + trustymail/cli "$@" diff --git a/scripts/trustymail b/scripts/trustymail index 3281bc6..248a498 100755 --- a/scripts/trustymail +++ b/scripts/trustymail @@ -68,97 +68,105 @@ def main(): args = docopt.docopt(__doc__, version=trustymail.__version__) # Monkey patching trustymail to make it cache the PSL where we want - if args['--psl-filename'] is not None: - trustymail.PublicSuffixListFilename = args['--psl-filename'] + if args["--psl-filename"] is not None: + trustymail.PublicSuffixListFilename = args["--psl-filename"] # Monkey patching trustymail to make the PSL cache read-only - if args['--psl-read-only']: + if args["--psl-read-only"]: trustymail.PublicSuffixListReadOnly = True import trustymail.trustymail as tmail log_level = logging.WARN - if args['--debug']: + if args["--debug"]: log_level = logging.DEBUG - logging.basicConfig(format='%(asctime)-15s %(message)s', level=log_level) + logging.basicConfig(format="%(asctime)-15s %(message)s", level=log_level) # Allow for user to input a csv for many domain names. - if args['INPUT'][0].endswith('.csv'): - domains = tmail.domain_list_from_csv(open(args['INPUT'][0])) + if args["INPUT"][0].endswith(".csv"): + domains = tmail.domain_list_from_csv(open(args["INPUT"][0])) else: - domains = args['INPUT'] + domains = args["INPUT"] - if args['--timeout'] is not None: - timeout = int(args['--timeout']) + if args["--timeout"] is not None: + timeout = int(args["--timeout"]) else: timeout = 5 - if args['--smtp-timeout'] is not None: - smtp_timeout = int(args['--smtp-timeout']) + if args["--smtp-timeout"] is not None: + smtp_timeout = int(args["--smtp-timeout"]) else: smtp_timeout = 5 - if args['--smtp-localhost'] is not None: - smtp_localhost = args['--smtp-localhost'] + if args["--smtp-localhost"] is not None: + smtp_localhost = args["--smtp-localhost"] else: smtp_localhost = None - if args['--smtp-ports'] is not None: - smtp_ports = {int(port) for port in args['--smtp-ports'].split(',')} + if args["--smtp-ports"] is not None: + smtp_ports = {int(port) for port in args["--smtp-ports"].split(",")} else: smtp_ports = _DEFAULT_SMTP_PORTS - if args['--dns'] is not None: - dns_hostnames = args['--dns'].split(',') + if args["--dns"] is not None: + dns_hostnames = args["--dns"].split(",") else: dns_hostnames = None # --starttls implies --mx - if args['--starttls']: - args['--mx'] = True + if args["--starttls"]: + args["--mx"] = True # User might not want every scan performed. scan_types = { - 'mx': args['--mx'], - 'starttls': args['--starttls'], - 'spf': args['--spf'], - 'dmarc': args['--dmarc'] + "mx": args["--mx"], + "starttls": args["--starttls"], + "spf": args["--spf"], + "dmarc": args["--dmarc"], } domain_scans = [] for domain_name in domains: - domain_scans.append(tmail.scan(domain_name, timeout, - smtp_timeout, smtp_localhost, - smtp_ports, not args['--no-smtp-cache'], - scan_types, dns_hostnames)) + domain_scans.append( + tmail.scan( + domain_name, + timeout, + smtp_timeout, + smtp_localhost, + smtp_ports, + not args["--no-smtp-cache"], + scan_types, + dns_hostnames, + ) + ) # Default output file name is results. - if args['--output'] is None: - output_file_name = 'results' + if args["--output"] is None: + output_file_name = "results" else: - output_file_name = args['--output'] + output_file_name = args["--output"] # Ensure file extension is present in filename. - if args['--json'] and '.json' not in output_file_name: - output_file_name += '.json' - elif '.csv' not in output_file_name: - output_file_name += '.csv' + if args["--json"] and ".json" not in output_file_name: + output_file_name += ".json" + elif ".csv" not in output_file_name: + output_file_name += ".csv" - if args['--json']: + if args["--json"]: json_out = tmail.generate_json(domain_scans) - if args['--output'] is None: + if args["--output"] is None: print(json_out) else: write(json_out, output_file_name) - logging.warn('Wrote results to %s.' % output_file_name) + logging.warn("Wrote results to %s." % output_file_name) else: tmail.generate_csv(domain_scans, output_file_name) def write(content, out_file): parent = os.path.dirname(out_file) - if parent is not '': + if parent != "": mkdir_p(parent) - f = open(out_file, 'w') # no utf-8 in python 2 + f = open(out_file, "w") # no utf-8 in python 2 f.write(content) f.close() @@ -175,5 +183,5 @@ def mkdir_p(path): raise -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/setup.py b/setup.py index 0ea4b60..00d2242 100644 --- a/setup.py +++ b/setup.py @@ -11,67 +11,55 @@ def readme(): - with open('README.md') as f: + with open("README.md") as f: return f.read() -with open('requirements.txt') as fp: +with open("requirements.txt") as fp: reqs = [line.strip() for line in fp.readlines() if line] -with open('requirements-dev.txt') as fp: +with open("requirements-dev.txt") as fp: lines = [line.strip() for line in fp.readlines() if line] - dev_reqs = [line for line in lines if line and '-r requirements.txt' not in line] + dev_reqs = [line for line in lines if line and "-r requirements.txt" not in line] setup( - name='trustymail', + name="trustymail", version=__version__, - description='Scan domains and return data based on trustworthy email best practices', + description="Scan domains and return data based on trustworthy email best practices", long_description=readme(), - long_description_content_type='text/markdown', - + long_description_content_type="text/markdown", # NCATS "homepage" url="https://www.us-cert.gov/resources/ncats", # The project's main homepage - download_url='https://github.com/cisagov/trustymail', - + download_url="https://github.com/cisagov/trustymail", # Author details - author='Cyber and Infrastructure Security Agency', - author_email='ncats@hq.dhs.gov', - - license='License :: CC0 1.0 Universal (CC0 1.0) Public Domain Dedication', - + author="Cyber and Infrastructure Security Agency", + author_email="ncats@hq.dhs.gov", + license="License :: CC0 1.0 Universal (CC0 1.0) Public Domain Dedication", # See https://pypi.python.org/pypi?%3Aaction=list_classifiers classifiers=[ # How mature is this project? Common values are # 3 - Alpha # 4 - Beta # 5 - Production/Stable - 'Development Status :: 4 - Beta', - + "Development Status :: 4 - Beta", # Indicate who your project is intended for - 'Intended Audience :: Developers', - + "Intended Audience :: Developers", # Pick your license as you wish (should match "license" above) - 'License :: CC0 1.0 Universal (CC0 1.0) Public Domain Dedication', - + "License :: CC0 1.0 Universal (CC0 1.0) Public Domain Dedication", # Specify the Python versions you support here. In particular, ensure # that you indicate whether you support Python 2, Python 3 or both. - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], - + python_requires=">=3.6", # What does your project relate to? - keywords='email authentication, STARTTLS', - - packages=['trustymail'], - + keywords="email authentication, STARTTLS", + packages=["trustymail"], install_requires=reqs, - - extras_require={ - 'dev': dev_reqs, - }, - - scripts=['scripts/trustymail'] + extras_require={"dev": dev_reqs}, + scripts=["scripts/trustymail"], ) diff --git a/tox.ini b/tox.ini index 1a5fdde..a2053d9 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py34,py35,py36,py37,flake8 +envlist = py36,py37,py38,flake8 skip_missing_interpreters = true ; usedevelop = true diff --git a/trustymail/__init__.py b/trustymail/__init__.py index d00ae04..5efd435 100644 --- a/trustymail/__init__.py +++ b/trustymail/__init__.py @@ -1,6 +1,6 @@ from __future__ import unicode_literals, absolute_import, print_function -__version__ = '0.7.5' +__version__ = "0.7.5" -PublicSuffixListFilename = 'public_suffix_list.dat' +PublicSuffixListFilename = "public_suffix_list.dat" PublicSuffixListReadOnly = False diff --git a/trustymail/domain.py b/trustymail/domain.py index 286101e..432de5d 100755 --- a/trustymail/domain.py +++ b/trustymail/domain.py @@ -20,7 +20,7 @@ def get_psl(): def download_psl(): fresh_psl = publicsuffix.fetch() - with open(PublicSuffixListFilename, 'w', encoding='utf-8') as fresh_psl_file: + with open(PublicSuffixListFilename, "w", encoding="utf-8") as fresh_psl_file: fresh_psl_file.write(fresh_psl.read()) # Download the psl if necessary @@ -28,11 +28,13 @@ def download_psl(): if not path.exists(PublicSuffixListFilename): download_psl() else: - psl_age = datetime.now() - datetime.fromtimestamp(stat(PublicSuffixListFilename).st_mtime) + psl_age = datetime.now() - datetime.fromtimestamp( + stat(PublicSuffixListFilename).st_mtime + ) if psl_age > timedelta(hours=24): download_psl() - with open(PublicSuffixListFilename, encoding='utf-8') as psl_file: + with open(PublicSuffixListFilename, encoding="utf-8") as psl_file: psl = publicsuffix.PublicSuffixList(psl_file) return psl @@ -54,13 +56,22 @@ def format_list(record_list): if not record_list: return None - return ', '.join(record_list) + return ", ".join(record_list) class Domain: base_domains = {} - def __init__(self, domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache, dns_hostnames): + def __init__( + self, + domain_name, + timeout, + smtp_timeout, + smtp_localhost, + smtp_ports, + smtp_cache, + dns_hostnames, + ): self.domain_name = domain_name.lower() self.base_domain_name = get_public_suffix(self.domain_name) @@ -71,7 +82,16 @@ def __init__(self, domain_name, timeout, smtp_timeout, smtp_localhost, smtp_port self.is_base_domain = False if self.base_domain_name not in Domain.base_domains: # Populate DMARC for parent. - domain = trustymail.scan(self.base_domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache, {'mx': False, 'starttls': False, 'spf': False, 'dmarc': True}, dns_hostnames) + domain = trustymail.scan( + self.base_domain_name, + timeout, + smtp_timeout, + smtp_localhost, + smtp_ports, + smtp_cache, + {"mx": False, "starttls": False, "spf": False, "dmarc": True}, + dns_hostnames, + ) Domain.base_domains[self.base_domain_name] = domain self.base_domain = Domain.base_domains[self.base_domain_name] @@ -127,8 +147,15 @@ def has_supports_smtp(self): """ result = None if len(self.starttls_results) > 0: - result = len(filter(lambda x: self.starttls_results[x]['supports_smtp'], - self.starttls_results.keys())) > 0 + result = ( + len( + filter( + lambda x: self.starttls_results[x]["supports_smtp"], + self.starttls_results.keys(), + ) + ) + > 0 + ) return result def has_starttls(self): @@ -138,8 +165,15 @@ def has_starttls(self): """ result = None if len(self.starttls_results) > 0: - result = len(filter(lambda x: self.starttls_results[x]['starttls'], - self.starttls_results.keys())) > 0 + result = ( + len( + filter( + lambda x: self.starttls_results[x]["starttls"], + self.starttls_results.keys(), + ) + ) + > 0 + ) return result def has_spf(self): @@ -160,7 +194,7 @@ def add_mx_record(self, record): # the record will contain a trailing period if it is a FQDN. if self.mail_servers is None: self.mail_servers = [] - self.mail_servers.append(record.exchange.to_text().rstrip('.').lower()) + self.mail_servers.append(record.exchange.to_text().rstrip(".").lower()) def parent_has_dmarc(self): ans = self.has_dmarc() @@ -190,7 +224,7 @@ def get_dmarc_policy(self): ans = self.dmarc_policy # If the policy was never set, or isn't in the list of valid # policies, check the parents. - if ans is None or ans.lower() not in ['quarantine', 'reject', 'none']: + if ans is None or ans.lower() not in ["quarantine", "reject", "none"]: if self.base_domain: # We check the *subdomain* policy in case one was # explicitly set. If one was not explicitly set then @@ -207,7 +241,7 @@ def get_dmarc_subdomain_policy(self): ans = self.dmarc_subdomain_policy # If the policy was never set, or isn't in the list of valid # policies, check the parents. - if ans is None or ans.lower() not in ['quarantine', 'reject', 'none']: + if ans is None or ans.lower() not in ["quarantine", "reject", "none"]: if self.base_domain: ans = self.base_domain.get_dmarc_subdomain_policy() else: @@ -256,57 +290,84 @@ def generate_results(self): mail_servers_that_support_smtp = None mail_servers_that_support_starttls = None else: - mail_servers_that_support_smtp = [x for x in self.starttls_results.keys() if self.starttls_results[x][ - 'supports_smtp']] - mail_servers_that_support_starttls = [x for x in self.starttls_results.keys() if self.starttls_results[x][ - 'starttls']] + mail_servers_that_support_smtp = [ + x + for x in self.starttls_results.keys() + if self.starttls_results[x]["supports_smtp"] + ] + mail_servers_that_support_starttls = [ + x + for x in self.starttls_results.keys() + if self.starttls_results[x]["starttls"] + ] domain_supports_smtp = bool(mail_servers_that_support_smtp) - domain_supports_starttls = domain_supports_smtp and all([self.starttls_results[x]['starttls'] for x in mail_servers_that_support_smtp]) - - results = OrderedDict([ - ('Domain', self.domain_name), - ('Base Domain', self.base_domain_name), - ('Live', self.is_live), - - ('MX Record', self.has_mail()), - ('MX Record DNSSEC', self.mx_records_dnssec), - ('Mail Servers', format_list(self.mail_servers)), - ('Mail Server Ports Tested', format_list([str(port) for port in self.ports_tested])), - ('Domain Supports SMTP Results', format_list(mail_servers_that_support_smtp)), - # True if and only if at least one mail server speaks SMTP - ('Domain Supports SMTP', domain_supports_smtp), - ('Domain Supports STARTTLS Results', format_list(mail_servers_that_support_starttls)), - # True if and only if all mail servers that speak SMTP - # also support STARTTLS - ('Domain Supports STARTTLS', domain_supports_starttls), - - ('SPF Record', self.has_spf()), - ('SPF Record DNSSEC', self.spf_dnssec), - ('Valid SPF', self.valid_spf), - ('SPF Results', format_list(self.spf)), - - ('DMARC Record', self.has_dmarc()), - ('DMARC Record DNSSEC', self.dmarc_dnssec), - ('Valid DMARC', self.has_dmarc() and self.valid_dmarc), - ('DMARC Results', format_list(self.dmarc)), - - ('DMARC Record on Base Domain', self.parent_has_dmarc()), - ('DMARC Record on Base Domain DNSSEC', self.parent_dmarc_dnssec()), - ('Valid DMARC Record on Base Domain', self.parent_has_dmarc() and self.parent_valid_dmarc()), - ('DMARC Results on Base Domain', self.parent_dmarc_results()), - ('DMARC Policy', self.get_dmarc_policy()), - ('DMARC Subdomain Policy', self.get_dmarc_subdomain_policy()), - ('DMARC Policy Percentage', self.get_dmarc_pct()), - - ("DMARC Aggregate Report URIs", format_list(self.get_dmarc_aggregate_uris())), - ("DMARC Forensic Report URIs", format_list(self.get_dmarc_forensic_uris())), - - ('DMARC Has Aggregate Report URI', self.get_dmarc_has_aggregate_uri()), - ('DMARC Has Forensic Report URI', self.get_dmarc_has_forensic_uri()), - ('DMARC Reporting Address Acceptance Error', self.dmarc_reports_address_error), - - ('Syntax Errors', format_list(self.syntax_errors)), - ('Debug Info', format_list(self.debug_info)) - ]) + domain_supports_starttls = domain_supports_smtp and all( + [ + self.starttls_results[x]["starttls"] + for x in mail_servers_that_support_smtp + ] + ) + + results = OrderedDict( + [ + ("Domain", self.domain_name), + ("Base Domain", self.base_domain_name), + ("Live", self.is_live), + ("MX Record", self.has_mail()), + ("MX Record DNSSEC", self.mx_records_dnssec), + ("Mail Servers", format_list(self.mail_servers)), + ( + "Mail Server Ports Tested", + format_list([str(port) for port in self.ports_tested]), + ), + ( + "Domain Supports SMTP Results", + format_list(mail_servers_that_support_smtp), + ), + # True if and only if at least one mail server speaks SMTP + ("Domain Supports SMTP", domain_supports_smtp), + ( + "Domain Supports STARTTLS Results", + format_list(mail_servers_that_support_starttls), + ), + # True if and only if all mail servers that speak SMTP + # also support STARTTLS + ("Domain Supports STARTTLS", domain_supports_starttls), + ("SPF Record", self.has_spf()), + ("SPF Record DNSSEC", self.spf_dnssec), + ("Valid SPF", self.valid_spf), + ("SPF Results", format_list(self.spf)), + ("DMARC Record", self.has_dmarc()), + ("DMARC Record DNSSEC", self.dmarc_dnssec), + ("Valid DMARC", self.has_dmarc() and self.valid_dmarc), + ("DMARC Results", format_list(self.dmarc)), + ("DMARC Record on Base Domain", self.parent_has_dmarc()), + ("DMARC Record on Base Domain DNSSEC", self.parent_dmarc_dnssec()), + ( + "Valid DMARC Record on Base Domain", + self.parent_has_dmarc() and self.parent_valid_dmarc(), + ), + ("DMARC Results on Base Domain", self.parent_dmarc_results()), + ("DMARC Policy", self.get_dmarc_policy()), + ("DMARC Subdomain Policy", self.get_dmarc_subdomain_policy()), + ("DMARC Policy Percentage", self.get_dmarc_pct()), + ( + "DMARC Aggregate Report URIs", + format_list(self.get_dmarc_aggregate_uris()), + ), + ( + "DMARC Forensic Report URIs", + format_list(self.get_dmarc_forensic_uris()), + ), + ("DMARC Has Aggregate Report URI", self.get_dmarc_has_aggregate_uri()), + ("DMARC Has Forensic Report URI", self.get_dmarc_has_forensic_uri()), + ( + "DMARC Reporting Address Acceptance Error", + self.dmarc_reports_address_error, + ), + ("Syntax Errors", format_list(self.syntax_errors)), + ("Debug Info", format_list(self.debug_info)), + ] + ) return results diff --git a/trustymail/trustymail.py b/trustymail/trustymail.py index 79cd68c..9077b25 100755 --- a/trustymail/trustymail.py +++ b/trustymail/trustymail.py @@ -20,7 +20,9 @@ # A cache for SMTP scanning results _SMTP_CACHE = {} -MAILTO_REGEX = re.compile(r"(mailto):([\w\-!#$%&'*+-/=?^_`{|}~][\w\-.!#$%&'*+-/=?^_`{|}~]*@[\w\-.]+)(!\w+)?") +MAILTO_REGEX = re.compile( + r"(mailto):([\w\-!#$%&'*+-/=?^_`{|}~][\w\-.!#$%&'*+-/=?^_`{|}~]*@[\w\-.]+)(!\w+)?" +) def domain_list_from_url(url): @@ -29,11 +31,13 @@ def domain_list_from_url(url): with requests.Session() as session: # Download current list of agencies, then let csv reader handle it. - return domain_list_from_csv(session.get(url).content.decode('utf-8').splitlines()) + return domain_list_from_csv( + session.get(url).content.decode("utf-8").splitlines() + ) def domain_list_from_csv(csv_file): - domain_list = list(csv.reader(csv_file, delimiter=',')) + domain_list = list(csv.reader(csv_file, delimiter=",")) # Check the headers for the word domain - use that column. @@ -41,7 +45,7 @@ def domain_list_from_csv(csv_file): for i in range(0, len(domain_list[0])): header = domain_list[0][i] - if 'domain' in header.lower(): + if "domain" in header.lower(): domain_column = i # CSV starts with headers, remove first row. domain_list.pop(0) @@ -72,7 +76,7 @@ def check_dnssec(domain, domain_name, record_type): else: return False except Exception as error: - handle_error('[MX DNSSEC]', domain, error) + handle_error("[MX DNSSEC]", domain, error) return None @@ -84,20 +88,20 @@ def mx_scan(resolver, domain): domain.mail_servers = [] # Use TCP, since we care about the content and correctness of the # records more than whether their records fit in a single UDP packet. - for record in resolver.query(domain.domain_name, 'MX', tcp=True): + for record in resolver.query(domain.domain_name, "MX", tcp=True): domain.add_mx_record(record) - domain.mx_records_dnssec = check_dnssec(domain, domain.domain_name, 'MX') + domain.mx_records_dnssec = check_dnssec(domain, domain.domain_name, "MX") except (dns.resolver.NoNameservers) as error: # The NoNameServers exception means that we got a SERVFAIL response. # These responses are almost always permanent, not temporary, so let's # treat the domain as not live. domain.is_live = False - handle_error('[MX]', domain, error) + handle_error("[MX]", domain, error) except dns.resolver.NXDOMAIN as error: domain.is_live = False # NXDOMAIN can still have DNSSEC - domain.mx_records_dnssec = check_dnssec(domain, domain.domain_name, 'MX') - handle_error('[MX]', domain, error) + domain.mx_records_dnssec = check_dnssec(domain, domain.domain_name, "MX") + handle_error("[MX]", domain, error) except (dns.resolver.NoAnswer) as error: # The NoAnswer exception means that the domain does exist in # DNS, but it does not have any MX records. It sort of makes @@ -110,11 +114,11 @@ def mx_scan(resolver, domain): # See also https://github.com/cisagov/trustymail/pull/91 # NoAnswer can still have DNSSEC - domain.mx_records_dnssec = check_dnssec(domain, domain.domain_name, 'MX') - handle_error('[MX]', domain, error) + domain.mx_records_dnssec = check_dnssec(domain, domain.domain_name, "MX") + handle_error("[MX]", domain, error) except dns.exception.Timeout as error: - domain.mx_records_dnssec = check_dnssec(domain, domain.domain_name, 'MX') - handle_error('[MX]', domain, error) + domain.mx_records_dnssec = check_dnssec(domain, domain.domain_name, "MX") + handle_error("[MX]", domain, error) def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): @@ -145,18 +149,19 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): for mail_server in domain.mail_servers: for port in smtp_ports: domain.ports_tested.add(port) - server_and_port = mail_server + ':' + str(port) + server_and_port = mail_server + ":" + str(port) if not smtp_cache or (server_and_port not in _SMTP_CACHE): domain.starttls_results[server_and_port] = {} - smtp_connection = smtplib.SMTP(timeout=smtp_timeout, - local_hostname=smtp_localhost) + smtp_connection = smtplib.SMTP( + timeout=smtp_timeout, local_hostname=smtp_localhost + ) # The following line is useful when debugging why an # SMTP connection fails. It prints out all the # traffic sent to and from the SMTP server. smtp_connection.set_debuglevel(1) - logging.debug('Testing ' + server_and_port + ' for STARTTLS support') + logging.debug("Testing " + server_and_port + " for STARTTLS support") # Look up the IPv4 address for mail_server. # @@ -186,12 +191,14 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): # Since we can't evaluate such cases we will # simply log this and give them credit. One of # the other mail servers will support IPv4. - error_str = f'The mail server {mail_server} does not have an IPv4 address.' - handle_error('[STARTTLS]', domain, error_str) + error_str = ( + f"The mail server {mail_server} does not have an IPv4 address." + ) + handle_error("[STARTTLS]", domain, error_str) logging.warn(error_str) - domain.starttls_results[server_and_port]['is_listening'] = True - domain.starttls_results[server_and_port]['supports_smtp'] = True - domain.starttls_results[server_and_port]['starttls'] = True + domain.starttls_results[server_and_port]["is_listening"] = True + domain.starttls_results[server_and_port]["supports_smtp"] = True + domain.starttls_results[server_and_port]["starttls"] = True continue # Extract the IP address from the socket addrinfo @@ -202,17 +209,23 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): # listening. try: smtp_connection.connect(mail_server_ip_address, port) - domain.starttls_results[server_and_port]['is_listening'] = True - except (socket.timeout, smtplib.SMTPConnectError, - smtplib.SMTPServerDisconnected, - ConnectionRefusedError, OSError) as error: - handle_error('[STARTTLS]', domain, error) - domain.starttls_results[server_and_port]['is_listening'] = False - domain.starttls_results[server_and_port]['supports_smtp'] = False - domain.starttls_results[server_and_port]['starttls'] = False + domain.starttls_results[server_and_port]["is_listening"] = True + except ( + socket.timeout, + smtplib.SMTPConnectError, + smtplib.SMTPServerDisconnected, + ConnectionRefusedError, + OSError, + ) as error: + handle_error("[STARTTLS]", domain, error) + domain.starttls_results[server_and_port]["is_listening"] = False + domain.starttls_results[server_and_port]["supports_smtp"] = False + domain.starttls_results[server_and_port]["starttls"] = False if smtp_cache: - _SMTP_CACHE[server_and_port] = domain.starttls_results[server_and_port] + _SMTP_CACHE[server_and_port] = domain.starttls_results[ + server_and_port + ] continue @@ -220,28 +233,30 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): # thing that is listening is an SMTP server. try: smtp_connection.ehlo_or_helo_if_needed() - domain.starttls_results[server_and_port]['supports_smtp'] = True - logging.debug('\t Supports SMTP') + domain.starttls_results[server_and_port]["supports_smtp"] = True + logging.debug("\t Supports SMTP") except (smtplib.SMTPHeloError, smtplib.SMTPServerDisconnected) as error: - handle_error('[STARTTLS]', domain, error) - domain.starttls_results[server_and_port]['supports_smtp'] = False - domain.starttls_results[server_and_port]['starttls'] = False + handle_error("[STARTTLS]", domain, error) + domain.starttls_results[server_and_port]["supports_smtp"] = False + domain.starttls_results[server_and_port]["starttls"] = False # smtplib freaks out if you call quit on a non-open # connection try: smtp_connection.quit() except smtplib.SMTPServerDisconnected as error2: - handle_error('[STARTTLS]', domain, error2) + handle_error("[STARTTLS]", domain, error2) if smtp_cache: - _SMTP_CACHE[server_and_port] = domain.starttls_results[server_and_port] + _SMTP_CACHE[server_and_port] = domain.starttls_results[ + server_and_port + ] continue # Now check if the server supports STARTTLS. - has_starttls = smtp_connection.has_extn('STARTTLS') - domain.starttls_results[server_and_port]['starttls'] = has_starttls - logging.debug('\t Supports STARTTLS: ' + str(has_starttls)) + has_starttls = smtp_connection.has_extn("STARTTLS") + domain.starttls_results[server_and_port]["starttls"] = has_starttls + logging.debug("\t Supports STARTTLS: " + str(has_starttls)) # Close the connection # smtplib freaks out if you call quit on a non-open @@ -249,13 +264,15 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): try: smtp_connection.quit() except smtplib.SMTPServerDisconnected as error: - handle_error('[STARTTLS]', domain, error) + handle_error("[STARTTLS]", domain, error) # Copy the results into the cache, if necessary if smtp_cache: - _SMTP_CACHE[server_and_port] = domain.starttls_results[server_and_port] + _SMTP_CACHE[server_and_port] = domain.starttls_results[ + server_and_port + ] else: - logging.debug('\tUsing cached results for ' + server_and_port) + logging.debug("\tUsing cached results for " + server_and_port) # Copy the cached results into the domain object domain.starttls_results[server_and_port] = _SMTP_CACHE[server_and_port] @@ -295,9 +312,13 @@ def check_spf_record(record_text, domain, strict=2): # Passing verbose=True causes the SPF library being used to # print out the SPF records encountered as include and # redirect cause other SPF records to be looked up. - query = spf.query('100.27.42.254', - 'email_wizard@' + domain.domain_name, - domain.domain_name, strict=strict, verbose=True) + query = spf.query( + "100.27.42.254", + "email_wizard@" + domain.domain_name, + domain.domain_name, + strict=strict, + verbose=True, + ) response = query.check(spf=record_text) response_type = response[0] @@ -305,17 +326,24 @@ def check_spf_record(record_text, domain, strict=2): # from DNS. We get this result when we get an ambiguous # result because of an SPF record with incorrect syntax, then # rerun check_spf_record() with strict=True (instead of 2). - if response_type == 'temperror' or response_type == 'permerror' \ - or response_type == 'none': + if ( + response_type == "temperror" + or response_type == "permerror" + or response_type == "none" + ): domain.valid_spf = False - handle_error('[SPF]', domain, - 'SPF query returned {}: {}'.format(response_type, - response[2])) - elif response_type == 'ambiguous': + handle_error( + "[SPF]", + domain, + "SPF query returned {}: {}".format(response_type, response[2]), + ) + elif response_type == "ambiguous": # Log the ambiguity so it appears in the results CSV - handle_error('[SPF]', domain, - 'SPF query returned {}: {}'.format(response_type, - response[2])) + handle_error( + "[SPF]", + domain, + "SPF query returned {}: {}".format(response_type, response[2]), + ) # Now rerun the check with less strictness to get an # actual result. (With strict=2, the SPF library stops @@ -326,7 +354,7 @@ def check_spf_record(record_text, domain, strict=2): domain.valid_spf = True except spf.AmbiguityWarning as error: domain.valid_spf = False - handle_error('[SPF]', domain, error) + handle_error("[SPF]", domain, error) def get_spf_record_text(resolver, domain_name, domain, follow_redirect=False): @@ -360,39 +388,39 @@ def get_spf_record_text(resolver, domain_name, domain, follow_redirect=False): try: # Use TCP, since we care about the content and correctness of the # records more than whether their records fit in a single UDP packet. - for record in resolver.query(domain_name, 'TXT', tcp=True): + for record in resolver.query(domain_name, "TXT", tcp=True): record_text = remove_quotes(record.to_text()) - if not record_text.startswith('v=spf1'): + if not record_text.startswith("v=spf1"): # Not an spf record, ignore it. continue - match = re.search(r'v=spf1\s*redirect=(\S*)', record_text) + match = re.search(r"v=spf1\s*redirect=(\S*)", record_text) if follow_redirect and match: redirect_domain_name = match.group(1) - record_to_return = get_spf_record_text(resolver, - redirect_domain_name, - domain) + record_to_return = get_spf_record_text( + resolver, redirect_domain_name, domain + ) else: record_to_return = record_text - domain.spf_dnssec = check_dnssec(domain, domain.domain_name, 'TXT') + domain.spf_dnssec = check_dnssec(domain, domain.domain_name, "TXT") except (dns.resolver.NoNameservers) as error: # The NoNameservers exception means that we got a SERVFAIL response. # These responses are almost always permanent, not temporary, so let's # treat the domain as not live. domain.is_live = False - handle_error('[SPF]', domain, error) + handle_error("[SPF]", domain, error) except (dns.resolver.NXDOMAIN) as error: domain.is_live = False - domain.spf_dnssec = check_dnssec(domain, domain.domain_name, 'TXT') - handle_error('[SPF]', domain, error) + domain.spf_dnssec = check_dnssec(domain, domain.domain_name, "TXT") + handle_error("[SPF]", domain, error) except (dns.resolver.NoAnswer) as error: - domain.spf_dnssec = check_dnssec(domain, domain.domain_name, 'TXT') - handle_error('[SPF]', domain, error) + domain.spf_dnssec = check_dnssec(domain, domain.domain_name, "TXT") + handle_error("[SPF]", domain, error) except (dns.exception.Timeout) as error: - domain.spf_dnssec = check_dnssec(domain, domain.domain_name, 'TXT') - handle_error('[SPF]', domain, error) + domain.spf_dnssec = check_dnssec(domain, domain.domain_name, "TXT") + handle_error("[SPF]", domain, error) return record_to_return @@ -416,16 +444,15 @@ def spf_scan(resolver, domain): # If an SPF record exists, record the raw SPF record text in the # Domain object - record_text_not_following_redirect = get_spf_record_text(resolver, - domain.domain_name, - domain) + record_text_not_following_redirect = get_spf_record_text( + resolver, domain.domain_name, domain + ) if record_text_not_following_redirect: domain.spf.append(record_text_not_following_redirect) - record_text_following_redirect = get_spf_record_text(resolver, - domain.domain_name, - domain, - True) + record_text_following_redirect = get_spf_record_text( + resolver, domain.domain_name, domain, True + ) if record_text_following_redirect: check_spf_record(record_text_following_redirect, domain) @@ -458,7 +485,9 @@ def parse_dmarc_report_uri(uri): if size_limit == "": size_limit = None - return OrderedDict([("scheme", scheme), ("address", email_address), ("size_limit", size_limit)]) + return OrderedDict( + [("scheme", scheme), ("address", email_address), ("size_limit", size_limit)] + ) def dmarc_scan(resolver, domain): @@ -466,21 +495,25 @@ def dmarc_scan(resolver, domain): try: if domain.dmarc is None: domain.dmarc = [] - dmarc_domain = '_dmarc.%s' % domain.domain_name + dmarc_domain = "_dmarc.%s" % domain.domain_name # Use TCP, since we care about the content and correctness of the # records more than whether their records fit in a single UDP packet. - all_records = resolver.query(dmarc_domain, 'TXT', tcp=True) - domain.dmarc_dnssec = check_dnssec(domain, dmarc_domain, 'TXT') + all_records = resolver.query(dmarc_domain, "TXT", tcp=True) + domain.dmarc_dnssec = check_dnssec(domain, dmarc_domain, "TXT") # According to step 4 in section 6.6.3 of the RFC # (https://tools.ietf.org/html/rfc7489#section-6.6.3), "Records that do # not start with a "v=" tag that identifies the current version of # DMARC are discarded." - records = [record for record in all_records if record.to_text().startswith('"v=DMARC1;')] + records = [ + record + for record in all_records + if record.to_text().startswith('"v=DMARC1;') + ] # Treat multiple DMARC records as an error, in accordance with the RFC # (https://tools.ietf.org/html/rfc7489#section-6.6.3) if len(records) > 1: - handle_error('[DMARC]', domain, 'Warning: Multiple DMARC records present') + handle_error("[DMARC]", domain, "Warning: Multiple DMARC records present") domain.valid_dmarc = False elif records: record = records[0] @@ -489,13 +522,15 @@ def dmarc_scan(resolver, domain): # Ensure the record is a DMARC record. Some domains that # redirect will cause an SPF record to show. - if record_text.startswith('v=DMARC1'): + if record_text.startswith("v=DMARC1"): domain.dmarc.append(record_text) - elif record_text.startswith('v=spf1'): - msg = "Found a SPF record where a DMARC record should be; most likely, the _dmarc " \ - "subdomain record does not actually exist, and the request for TXT records was " \ - "redirected to the base domain" - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + elif record_text.startswith("v=spf1"): + msg = ( + "Found a SPF record where a DMARC record should be; most likely, the _dmarc " + "subdomain record does not actually exist, and the request for TXT records was " + "redirected to the base domain" + ) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False # Remove excess whitespace @@ -505,109 +540,144 @@ def dmarc_scan(resolver, domain): # defined - tag:value We can split this up into a easily # manipulatable dictionary tag_dict = {} - for options in record_text.split(';'): - if '=' not in options: + for options in record_text.split(";"): + if "=" not in options: continue - tag = options.split('=')[0].strip() - value = options.split('=')[1].strip() + tag = options.split("=")[0].strip() + value = options.split("=")[1].strip() tag_dict[tag] = value # Before we set sp=p if it is not explicitly contained in # the DMARC record, log a warning if it is explicitly set # for a subdomain of an organizational domain. - if 'sp' in tag_dict and not domain.is_base_domain: - handle_error('[DMARC]', domain, 'Warning: The sp tag will be ignored for DMARC records published on subdomains. See here for details: https://tools.ietf.org/html/rfc7489#section-6.3.', syntax_error=False) - if 'p' not in tag_dict: - msg = 'Record missing required policy (p) tag' - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + if "sp" in tag_dict and not domain.is_base_domain: + handle_error( + "[DMARC]", + domain, + "Warning: The sp tag will be ignored for DMARC records published on subdomains. See here for details: https://tools.ietf.org/html/rfc7489#section-6.3.", + syntax_error=False, + ) + if "p" not in tag_dict: + msg = "Record missing required policy (p) tag" + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False - elif 'sp' not in tag_dict: - tag_dict['sp'] = tag_dict['p'] - if 'ri' not in tag_dict: - tag_dict['ri'] = 86400 - if 'pct' not in tag_dict: - tag_dict['pct'] = 100 - if 'adkim' not in tag_dict: - tag_dict['adkim'] = 'r' - if 'aspf' not in tag_dict: - tag_dict['aspf'] = 'r' - if 'fo' not in tag_dict: - tag_dict['fo'] = '0' - if 'rf' not in tag_dict: - tag_dict['rf'] = 'afrf' - if 'rua' not in tag_dict: + elif "sp" not in tag_dict: + tag_dict["sp"] = tag_dict["p"] + if "ri" not in tag_dict: + tag_dict["ri"] = 86400 + if "pct" not in tag_dict: + tag_dict["pct"] = 100 + if "adkim" not in tag_dict: + tag_dict["adkim"] = "r" + if "aspf" not in tag_dict: + tag_dict["aspf"] = "r" + if "fo" not in tag_dict: + tag_dict["fo"] = "0" + if "rf" not in tag_dict: + tag_dict["rf"] = "afrf" + if "rua" not in tag_dict: domain.dmarc_has_aggregate_uri = False - if 'ruf' not in tag_dict: + if "ruf" not in tag_dict: domain.dmarc_has_forensic_uri = False for tag in tag_dict: - if tag not in ['v', 'mailto', 'rf', 'p', 'sp', 'adkim', 'aspf', 'fo', 'pct', 'ri', 'rua', 'ruf']: - msg = 'Unknown DMARC tag {0}'.format(tag) - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + if tag not in [ + "v", + "mailto", + "rf", + "p", + "sp", + "adkim", + "aspf", + "fo", + "pct", + "ri", + "rua", + "ruf", + ]: + msg = "Unknown DMARC tag {0}".format(tag) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False - elif tag == 'p': - if tag_dict[tag] not in ['none', 'quarantine', 'reject']: - msg = 'Unknown DMARC policy {0}'.format(tag) - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + elif tag == "p": + if tag_dict[tag] not in ["none", "quarantine", "reject"]: + msg = "Unknown DMARC policy {0}".format(tag) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False else: domain.dmarc_policy = tag_dict[tag] - elif tag == 'sp': - if tag_dict[tag] not in ['none', 'quarantine', 'reject']: - msg = 'Unknown DMARC subdomain policy {0}'.format(tag_dict[tag]) - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + elif tag == "sp": + if tag_dict[tag] not in ["none", "quarantine", "reject"]: + msg = "Unknown DMARC subdomain policy {0}".format(tag_dict[tag]) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False else: domain.dmarc_subdomain_policy = tag_dict[tag] - elif tag == 'fo': - values = tag_dict[tag].split(':') - if '0' in values and '1' in values: - msg = 'fo tag values 0 and 1 are mutually exclusive' - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + elif tag == "fo": + values = tag_dict[tag].split(":") + if "0" in values and "1" in values: + msg = "fo tag values 0 and 1 are mutually exclusive" + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) for value in values: - if value not in ['0', '1', 'd', 's']: - msg = 'Unknown DMARC fo tag value {0}'.format(value) - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + if value not in ["0", "1", "d", "s"]: + msg = "Unknown DMARC fo tag value {0}".format(value) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False - elif tag == 'rf': - values = tag_dict[tag].split(':') + elif tag == "rf": + values = tag_dict[tag].split(":") for value in values: - if value not in ['afrf']: - msg = 'Unknown DMARC rf tag value {0}'.format(value) - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + if value not in ["afrf"]: + msg = "Unknown DMARC rf tag value {0}".format(value) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False - elif tag == 'ri': + elif tag == "ri": try: int(tag_dict[tag]) except ValueError: - msg = 'Invalid DMARC ri tag value: {0} - must be an integer'.format(tag_dict[tag]) - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + msg = "Invalid DMARC ri tag value: {0} - must be an integer".format( + tag_dict[tag] + ) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False - elif tag == 'pct': + elif tag == "pct": try: pct = int(tag_dict[tag]) if pct < 0 or pct > 100: - msg = 'Error: invalid DMARC pct tag value: {0} - must be an integer between ' \ - '0 and 100'.format(tag_dict[tag]) - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + msg = ( + "Error: invalid DMARC pct tag value: {0} - must be an integer between " + "0 and 100".format(tag_dict[tag]) + ) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False domain.dmarc_pct = pct if pct < 100: - handle_syntax_error('[DMARC]', domain, 'Warning: The DMARC pct tag value may be less than 100 (the implicit default) during deployment, but should be removed or set to 100 upon full deployment') + handle_syntax_error( + "[DMARC]", + domain, + "Warning: The DMARC pct tag value may be less than 100 (the implicit default) during deployment, but should be removed or set to 100 upon full deployment", + ) except ValueError: - msg = 'invalid DMARC pct tag value: {0} - must be an integer'.format(tag_dict[tag]) - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + msg = "invalid DMARC pct tag value: {0} - must be an integer".format( + tag_dict[tag] + ) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False - elif tag == 'rua' or tag == 'ruf': - uris = tag_dict[tag].split(',') + elif tag == "rua" or tag == "ruf": + uris = tag_dict[tag].split(",") if len(uris) > 2: - handle_error('[DMARC]', domain, 'Warning: The {} tag specifies {} URIs. Receivers are not required to send reports to more than two URIs - https://tools.ietf.org/html/rfc7489#section-6.2.'.format(tag, len(uris)), syntax_error=False) + handle_error( + "[DMARC]", + domain, + "Warning: The {} tag specifies {} URIs. Receivers are not required to send reports to more than two URIs - https://tools.ietf.org/html/rfc7489#section-6.2.".format( + tag, len(uris) + ), + syntax_error=False, + ) for uri in uris: # mailto: is currently the only type of DMARC URI parsed_uri = parse_dmarc_report_uri(uri) if parsed_uri is None: - msg = 'Error: {0} is an invalid DMARC URI'.format(uri) - handle_syntax_error('[DMARC]', domain, '{0}'.format(msg)) + msg = "Error: {0} is an invalid DMARC URI".format(uri) + handle_syntax_error("[DMARC]", domain, "{0}".format(msg)) domain.valid_dmarc = False else: if tag == "rua": @@ -615,60 +685,109 @@ def dmarc_scan(resolver, domain): elif tag == "ruf": domain.dmarc_forensic_uris.append(uri) email_address = parsed_uri["address"] - email_domain = email_address.split('@')[-1] - if get_public_suffix(email_domain).lower() != domain.base_domain_name.lower(): - target = '{0}._report._dmarc.{1}'.format(domain.domain_name, email_domain) - error_message = '{0} does not indicate that it accepts DMARC reports about {1} - ' \ - 'https://tools.ietf.org' \ - '/html/rfc7489#section-7.1'.format(email_domain, - domain.domain_name) + email_domain = email_address.split("@")[-1] + if ( + get_public_suffix(email_domain).lower() + != domain.base_domain_name.lower() + ): + target = "{0}._report._dmarc.{1}".format( + domain.domain_name, email_domain + ) + error_message = ( + "{0} does not indicate that it accepts DMARC reports about {1} - " + "https://tools.ietf.org" + "/html/rfc7489#section-7.1".format( + email_domain, domain.domain_name + ) + ) try: - answer = remove_quotes(resolver.query(target, 'TXT', tcp=True)[0].to_text()) - if not answer.startswith('v=DMARC1'): - handle_error('[DMARC]', domain, '{0}'.format(error_message)) + answer = remove_quotes( + resolver.query(target, "TXT", tcp=True)[ + 0 + ].to_text() + ) + if not answer.startswith("v=DMARC1"): + handle_error( + "[DMARC]", + domain, + "{0}".format(error_message), + ) domain.dmarc_reports_address_error = True domain.valid_dmarc = False - except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.exception.Timeout): - handle_syntax_error('[DMARC]', domain, '{0}'.format(error_message)) + except ( + dns.resolver.NXDOMAIN, + dns.resolver.NoAnswer, + dns.resolver.NoNameservers, + dns.exception.Timeout, + ): + handle_syntax_error( + "[DMARC]", domain, "{0}".format(error_message) + ) domain.dmarc_reports_address_error = True domain.valid_dmarc = False try: # Ensure ruf/rua/email domains have MX records - resolver.query(email_domain, 'MX', tcp=True) - except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.exception.Timeout): - handle_syntax_error('[DMARC]', domain, 'The domain for reporting ' - 'address {0} does not have any ' - 'MX records'.format(email_address)) + resolver.query(email_domain, "MX", tcp=True) + except ( + dns.resolver.NXDOMAIN, + dns.resolver.NoAnswer, + dns.resolver.NoNameservers, + dns.exception.Timeout, + ): + handle_syntax_error( + "[DMARC]", + domain, + "The domain for reporting " + "address {0} does not have any " + "MX records".format(email_address), + ) domain.valid_dmarc = False # Log a warning if the DMARC record specifies a policy but does not # specify any ruf or rua URIs, since this greatly reduces the # usefulness of DMARC. - if 'p' in tag_dict and 'rua' not in tag_dict and 'ruf' not in tag_dict: - handle_syntax_error('[DMARC]', domain, 'Warning: A DMARC policy is specified but no reporting URIs. This makes the DMARC implementation considerably less useful than it could be. See https://tools.ietf.org/html/rfc7489#section-6.5 for more details.') + if "p" in tag_dict and "rua" not in tag_dict and "ruf" not in tag_dict: + handle_syntax_error( + "[DMARC]", + domain, + "Warning: A DMARC policy is specified but no reporting URIs. This makes the DMARC implementation considerably less useful than it could be. See https://tools.ietf.org/html/rfc7489#section-6.5 for more details.", + ) domain.dmarc_has_aggregate_uri = len(domain.dmarc_aggregate_uris) > 0 domain.dmarc_has_forensic_uri = len(domain.dmarc_forensic_uris) > 0 - except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout) as error: - domain.dmarc_dnssec = check_dnssec(domain, dmarc_domain, 'TXT') - handle_error('[DMARC]', domain, error) + except ( + dns.resolver.NoAnswer, + dns.resolver.NXDOMAIN, + dns.exception.Timeout, + ) as error: + domain.dmarc_dnssec = check_dnssec(domain, dmarc_domain, "TXT") + handle_error("[DMARC]", domain, error) except (dns.resolver.NoNameservers) as error: # Normally we count a NoNameservers exception as indicating # that a domain is "not live". In this case we don't, though, # since the DMARC DNS check doesn't query for the domain name # itself. If the domain name is domain.com, the DMARC DNS # check queries for _dmarc.domain.com. - handle_error('[DMARC]', domain, error) + handle_error("[DMARC]", domain, error) def find_host_from_ip(resolver, ip_addr): # Use TCP, since we care about the content and correctness of the records # more than whether their records fit in a single UDP packet. - hostname, _ = resolver.query(dns.reversename.from_address(ip_addr), 'PTR', tcp=True) + hostname, _ = resolver.query(dns.reversename.from_address(ip_addr), "PTR", tcp=True) return str(hostname) -def scan(domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache, scan_types, dns_hostnames): +def scan( + domain_name, + timeout, + smtp_timeout, + smtp_localhost, + smtp_ports, + smtp_cache, + scan_types, + dns_hostnames, +): # # Configure the dnspython library # @@ -709,33 +828,46 @@ def scan(domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_ca # The spf library uses py3dns behind the scenes, so we need to configure # that too # - DNS.defaults['timeout'] = timeout + DNS.defaults["timeout"] = timeout # Use TCP instead of UDP - DNS.defaults['protocol'] = 'tcp' + DNS.defaults["protocol"] = "tcp" # If the user passed in DNS hostnames to query against then use them if dns_hostnames: - DNS.defaults['server'] = dns_hostnames + DNS.defaults["server"] = dns_hostnames # Domain's constructor needs all these parameters because it does a DMARC # scan in its init - domain = Domain(domain_name, timeout, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache, dns_hostnames) - - logging.debug('[{0}]'.format(domain_name.lower())) - - if scan_types['mx'] and domain.is_live: + domain = Domain( + domain_name, + timeout, + smtp_timeout, + smtp_localhost, + smtp_ports, + smtp_cache, + dns_hostnames, + ) + + logging.debug("[{0}]".format(domain_name.lower())) + + if scan_types["mx"] and domain.is_live: mx_scan(resolver, domain) - if scan_types['starttls'] and domain.is_live: + if scan_types["starttls"] and domain.is_live: starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache) - if scan_types['spf'] and domain.is_live: + if scan_types["spf"] and domain.is_live: spf_scan(resolver, domain) - if scan_types['dmarc'] and domain.is_live: + if scan_types["dmarc"] and domain.is_live: dmarc_scan(resolver, domain) # If the user didn't specify any scans then run a full scan. - if domain.is_live and not (scan_types['mx'] or scan_types['starttls'] or scan_types['spf'] or scan_types['dmarc']): + if domain.is_live and not ( + scan_types["mx"] + or scan_types["starttls"] + or scan_types["spf"] + or scan_types["dmarc"] + ): mx_scan(resolver, domain) starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache) spf_scan(resolver, domain) @@ -784,16 +916,26 @@ def handle_error(prefix, domain, error, syntax_error=False): filename = function.co_filename line = frame.f_lineno - error_template = '{prefix} In {function_name} at {filename}:{line}: {error}' + error_template = "{prefix} In {function_name} at {filename}:{line}: {error}" - if hasattr(error, 'message'): - if syntax_error and 'NXDOMAIN' in error.message and prefix != '[DMARC]': + if hasattr(error, "message"): + if syntax_error and "NXDOMAIN" in error.message and prefix != "[DMARC]": domain.is_live = False - error_string = error_template.format(prefix=prefix, function_name=function_name, line=line, filename=filename, - error=error.message) + error_string = error_template.format( + prefix=prefix, + function_name=function_name, + line=line, + filename=filename, + error=error.message, + ) else: - error_string = error_template.format(prefix=prefix, function_name=function_name, line=line, filename=filename, - error=str(error)) + error_string = error_template.format( + prefix=prefix, + function_name=function_name, + line=line, + filename=filename, + error=str(error), + ) if syntax_error: domain.syntax_errors.append(error_string) @@ -808,8 +950,10 @@ def handle_syntax_error(prefix, domain, error): def generate_csv(domains, file_name): - with open(file_name, 'w', encoding='utf-8', newline='\n') as output_file: - writer = csv.DictWriter(output_file, fieldnames=domains[0].generate_results().keys()) + with open(file_name, "w", encoding="utf-8", newline="\n") as output_file: + writer = csv.DictWriter( + output_file, fieldnames=domains[0].generate_results().keys() + ) # First row should always be the headers writer.writeheader() @@ -857,4 +1001,4 @@ def remove_quotes(txt_record): """ # This regular expression removes leading and trailing double quotes and # also removes any pairs of double quotes separated by one or more spaces. - return re.sub('^"|"$|" +"', '', txt_record) + return re.sub('^"|"$|" +"', "", txt_record)