diff --git a/.coveragerc b/.coveragerc index d315b87..1fe2ed9 100644 --- a/.coveragerc +++ b/.coveragerc @@ -2,7 +2,7 @@ # https://coverage.readthedocs.io/en/latest/config.html [run] -source = src/example +source = src/hash_http_content omit = branch = true diff --git a/.github/lineage.yml b/.github/lineage.yml index 8dfc20b..569fc89 100644 --- a/.github/lineage.yml +++ b/.github/lineage.yml @@ -3,4 +3,4 @@ version: "1" lineage: skeleton: - remote-url: https://github.com/cisagov/skeleton-generic.git + remote-url: https://github.com/cisagov/skeleton-python-library.git diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0134014..86f45d4 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -64,6 +64,8 @@ jobs: ${{ hashFiles('**/requirements.txt') }}" restore-keys: | ${{ env.BASE_CACHE_KEY }} + - name: Download and extract a serverless-chrome binary + run: ./get_serverless_chrome_binary.sh - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 24d6e98..71e7997 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -15,7 +15,7 @@ all of which should be in this repository. If you want to report a bug or request a new feature, the most direct method is to [create an -issue](https://github.com/cisagov/skeleton-python-library/issues) in +issue](https://github.com/cisagov/hash-http-content/issues) in this repository. We recommend that you first search through existing issues (both open and closed) to check if your particular issue has already been reported. If it has then you might want to add a comment @@ -25,7 +25,7 @@ one. ## Pull requests ## If you choose to [submit a pull -request](https://github.com/cisagov/skeleton-python-library/pulls), +request](https://github.com/cisagov/hash-http-content/pulls), you will notice that our continuous integration (CI) system runs a fairly extensive set of linters, syntax checkers, system, and unit tests. Your pull request may fail these checks, and that's OK. If you want @@ -111,9 +111,9 @@ can create and configure the Python virtual environment with these commands: ```console -cd skeleton-python-library -pyenv virtualenv skeleton-python-library -pyenv local skeleton-python-library +cd hash-http-content +pyenv virtualenv hash-http-content +pyenv local hash-http-content pip install --requirement requirements-dev.txt ``` diff --git a/README.md b/README.md index 7f20bda..0a3fdd0 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,39 @@ -# skeleton-python-library # - -[![GitHub Build Status](https://github.com/cisagov/skeleton-python-library/workflows/build/badge.svg)](https://github.com/cisagov/skeleton-python-library/actions) -[![Coverage Status](https://coveralls.io/repos/github/cisagov/skeleton-python-library/badge.svg?branch=develop)](https://coveralls.io/github/cisagov/skeleton-python-library?branch=develop) -[![Total alerts](https://img.shields.io/lgtm/alerts/g/cisagov/skeleton-python-library.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/cisagov/skeleton-python-library/alerts/) -[![Language grade: Python](https://img.shields.io/lgtm/grade/python/g/cisagov/skeleton-python-library.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/cisagov/skeleton-python-library/context:python) -[![Known Vulnerabilities](https://snyk.io/test/github/cisagov/skeleton-python-library/develop/badge.svg)](https://snyk.io/test/github/cisagov/skeleton-python-library) - -This is a generic skeleton project that can be used to quickly get a -new [cisagov](https://github.com/cisagov) Python library GitHub -project started. This skeleton project contains [licensing -information](LICENSE), as well as -[pre-commit hooks](https://pre-commit.com) and -[GitHub Actions](https://github.com/features/actions) configurations -appropriate for a Python library project. - -## New Repositories from a Skeleton ## - -Please see our [Project Setup guide](https://github.com/cisagov/development-guide/tree/develop/project_setup) -for step-by-step instructions on how to start a new repository from -a skeleton. This will save you time and effort when configuring a -new repository! +# hash-http-content # + +[![GitHub Build Status](https://github.com/cisagov/hash-http-content/workflows/build/badge.svg)](https://github.com/cisagov/hash-http-content/actions) +[![Coverage Status](https://coveralls.io/repos/github/cisagov/hash-http-content/badge.svg?branch=develop)](https://coveralls.io/github/cisagov/hash-http-content?branch=develop) +[![Total alerts](https://img.shields.io/lgtm/alerts/g/cisagov/hash-http-content.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/cisagov/hash-http-content/alerts/) +[![Language grade: Python](https://img.shields.io/lgtm/grade/python/g/cisagov/hash-http-content.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/cisagov/hash-http-content/context:python) +[![Known Vulnerabilities](https://snyk.io/test/github/cisagov/hash-http-content/develop/badge.svg)](https://snyk.io/test/github/cisagov/hash-http-content) + +This is a Python library to retrieve the contents of a given URL via HTTP (or +HTTPS) and hash the processed contents. + +## Content processing ## + +If an encoding is detected, this package will convert content into the UTF-8 +encoding before proceeding. + +Additional content processing is currently implemented for the following types +of content: + +* HTML +* JSON + +### HTML ### + +HTML content is processed by leveraging the +[pyppeteer](https://github.com/pyppeteer/pyppeteer) package to execute any +JavaScript on a retrieved page. The result is then parsed by +[Beautiful Soup](https://www.crummy.com/software/BeautifulSoup/) to reduce the +content to the human visible portions of a page. + +### JSON ### + +JSON content is processed by using the +[`json` library](https://docs.python.org/3/library/json.html) that is part of +the Python standard library. It is read in and then output in a deterministic +manner to adjust for any styling differences between content. ## Contributing ## diff --git a/bump_version.sh b/bump_version.sh index 861eed0..b5ca161 100755 --- a/bump_version.sh +++ b/bump_version.sh @@ -6,7 +6,7 @@ set -o nounset set -o errexit set -o pipefail -VERSION_FILE=src/example/_version.py +VERSION_FILE=src/hash_http_content/_version.py HELP_INFORMATION="bump_version.sh (show|major|minor|patch|prerelease|build|finalize)" diff --git a/get_serverless_chrome_binary.sh b/get_serverless_chrome_binary.sh new file mode 100755 index 0000000..3255af0 --- /dev/null +++ b/get_serverless_chrome_binary.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash + +set -o nounset +set -o errexit +set -o pipefail + +function usage { + echo "Usage:" + echo " ${0##*/} [options]" + echo + echo "Options:" + echo " -h, --help Show the help message." + echo " -l, --latest Pull down the latest release on GitHub." + exit "$1" +} + +# Defaults to a specific version for use in GitHub Actions +DOWNLOAD_URL="https://github.com/adieuadieu/serverless-chrome/releases/download/v1.0.0-57/stable-headless-chromium-amazonlinux-2.zip" +LOCAL_FILE="serverless-chrome.zip" +LOCAL_DIR="tests/files/" + + +# Get the URL of the latest stable release available +function get_latest_stable_url { + releases_url="https://api.github.com/repos/adieuadieu/serverless-chrome/releases" + # Get the URL for the latest release's assets + latest_assets=$(curl -s "$releases_url" | jq -r '.[0].assets_url') + # Download the zip for the stable branch + DOWNLOAD_URL=$(curl -s "$latest_assets" | jq -r '.[] | select(.browser_download_url | contains("stable")) | .browser_download_url') +} + +while (( "$#" )) +do + case "$1" in + -h|--help) + usage 0 + ;; + -l|--latest) + get_latest_stable_url + shift 1 + ;; + -*) + usage 1 + ;; + esac +done + +# Follow redirects and output as the specified file name +curl -L --output "$LOCAL_FILE" "$DOWNLOAD_URL" +# Extract the specified file to the specified directory and overwrite without +# prompting +unzip -o "$LOCAL_FILE" -d "$LOCAL_DIR" diff --git a/setup.py b/setup.py index a458722..445ccbd 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ """ -This is the setup module for the example project. +This is the setup module for the hash-http-content project. Based on: @@ -42,16 +42,16 @@ def get_version(version_file): setup( - name="example", + name="hash-http-content", # Versions should comply with PEP440 - version=get_version("src/example/_version.py"), - description="Example python library", + version=get_version("src/hash_http_content/_version.py"), + description="HTTP content hasher", long_description=readme(), long_description_content_type="text/markdown", # NCATS "homepage" url="https://www.us-cert.gov/resources/ncats", # The project's main homepage - download_url="https://github.com/cisagov/skeleton-python-library", + download_url="https://github.com/cisagov/hash-http-content", # Author details author="Cyber and Infrastructure Security Agency", author_email="ncats@hq.dhs.gov", @@ -77,13 +77,20 @@ def get_version(version_file): ], python_requires=">=3.6", # What does your project relate to? - keywords="skeleton", + keywords="hash http requests", packages=find_packages(where="src"), package_dir={"": "src"}, - package_data={"example": ["data/*.txt"]}, py_modules=[splitext(basename(path))[0] for path in glob("src/*.py")], include_package_data=True, - install_requires=["docopt", "schema", "setuptools >= 24.2.0"], + install_requires=[ + "beautifulsoup4", + "docopt", + "lxml", + "pyppeteer", + "requests", + "schema", + "setuptools >= 24.2.0", + ], extras_require={ "test": [ "coverage", @@ -99,6 +106,6 @@ def get_version(version_file): "pytest", ] }, - # Conveniently allows one to run the CLI tool as `example` - entry_points={"console_scripts": ["example = example.example:main"]}, + # Conveniently allows one to run the CLI tool as `hash-url` + entry_points={"console_scripts": ["hash-url = hash_http_content.cli:main"]}, ) diff --git a/src/example/data/secret.txt b/src/example/data/secret.txt deleted file mode 100644 index c40a49b..0000000 --- a/src/example/data/secret.txt +++ /dev/null @@ -1 +0,0 @@ -Three may keep a secret, if two of them are dead. diff --git a/src/example/example.py b/src/example/example.py deleted file mode 100644 index 73faa33..0000000 --- a/src/example/example.py +++ /dev/null @@ -1,108 +0,0 @@ -"""example is an example Python library and tool. - -Divide one integer by another and log the result. Also log some information -from an environment variable and a package resource. - -EXIT STATUS - This utility exits with one of the following values: - 0 Calculation completed successfully. - >0 An error occurred. - -Usage: - example [--log-level=LEVEL] - example (-h | --help) - -Options: - -h --help Show this message. - --log-level=LEVEL If specified, then the log level will be set to - the specified value. Valid values are "debug", "info", - "warning", "error", and "critical". [default: info] -""" - -# Standard Python Libraries -import logging -import os -import sys -from typing import Any, Dict - -# Third-Party Libraries -import docopt -import pkg_resources -from schema import And, Schema, SchemaError, Use - -from ._version import __version__ - -DEFAULT_ECHO_MESSAGE: str = "Hello World from the example default!" - - -def example_div(dividend: float, divisor: float) -> float: - """Print some logging messages.""" - logging.debug("This is a debug message") - logging.info("This is an info message") - logging.warning("This is a warning message") - logging.error("This is an error message") - logging.critical("This is a critical message") - return dividend / divisor - - -def main() -> int: - """Set up logging and call the example function.""" - args: Dict[str, str] = docopt.docopt(__doc__, version=__version__) - # Validate and convert arguments as needed - schema: Schema = Schema( - { - "--log-level": And( - str, - Use(str.lower), - lambda n: n in ("debug", "info", "warning", "error", "critical"), - error="Possible values for --log-level are " - + "debug, info, warning, error, and critical.", - ), - "": Use(int, error=" must be an integer."), - "": And( - Use(int), - lambda n: n != 0, - error=" must be an integer that is not 0.", - ), - str: object, # Don't care about other keys, if any - } - ) - - try: - validated_args: Dict[str, Any] = schema.validate(args) - except SchemaError as err: - # Exit because one or more of the arguments were invalid - print(err, file=sys.stderr) - return 1 - - # Assign validated arguments to variables - dividend: int = validated_args[""] - divisor: int = validated_args[""] - log_level: str = validated_args["--log-level"] - - # Set up logging - logging.basicConfig( - format="%(asctime)-15s %(levelname)s %(message)s", level=log_level.upper() - ) - - logging.info(f"{dividend} / {divisor} == {example_div(dividend, divisor)}") - - # Access some data from an environment variable - message: str = os.getenv("ECHO_MESSAGE", DEFAULT_ECHO_MESSAGE) - logging.info(f'ECHO_MESSAGE="{message}"') - - # Access some data from our package data (see the setup.py) - secret_message: str = ( - pkg_resources.resource_string("example", "data/secret.txt") - .decode("utf-8") - .strip() - ) - logging.info(f'Secret="{secret_message}"') - - # Stop logging and clean up - logging.shutdown() - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/src/example/__init__.py b/src/hash_http_content/__init__.py similarity index 62% rename from src/example/__init__.py rename to src/hash_http_content/__init__.py index 98b5e04..9cb1be8 100644 --- a/src/example/__init__.py +++ b/src/hash_http_content/__init__.py @@ -1,9 +1,12 @@ -"""The example library.""" +"""The hash-http-content library.""" +# Standard Python Libraries +from typing import List + # We disable a Flake8 check for "Module imported but unused (F401)" here because # although this import is not directly used, it populates the value # package_name.__version__, which is used to get version information about this # Python package. from ._version import __version__ # noqa: F401 -from .example import example_div +from .hasher import UrlHasher, UrlResult -__all__ = ["example_div"] +__all__: List[str] = ["UrlHasher", "UrlResult"] diff --git a/src/example/__main__.py b/src/hash_http_content/__main__.py similarity index 73% rename from src/example/__main__.py rename to src/hash_http_content/__main__.py index 11a3238..c123d36 100644 --- a/src/example/__main__.py +++ b/src/hash_http_content/__main__.py @@ -1,5 +1,5 @@ """Code to run if this package is used as a Python module.""" -from .example import main +from .cli import main main() diff --git a/src/example/_version.py b/src/hash_http_content/_version.py similarity index 100% rename from src/example/_version.py rename to src/hash_http_content/_version.py diff --git a/src/hash_http_content/cli.py b/src/hash_http_content/cli.py new file mode 100644 index 0000000..ddaf6d6 --- /dev/null +++ b/src/hash_http_content/cli.py @@ -0,0 +1,107 @@ +"""Command line interface to the hash-http-content package. + +Usage: + site-hash [--hash-algorithm=algorithm] ([--show-content] [--show-redirect] | [--json]) URL ... + site-hash --list-algorithms + site-hash (-v | --version) + site-hash (-h | --help) + +Options: + -h, --help Display this help text. + -a, --hash-algorithm=algorithm Use the provided hash alogorithm. + [default: sha256] + -l, --list-algorithms List available hash algorithms. + -j, --json Output the results as a JSON. + -c, --show-content Output the content after processing. + -r, --show-redirect Output if the requested URL was redirected. + -v, --version Show version information. +""" + +# Standard Python Libraries +import hashlib +from json import dumps +import sys +from typing import Any, Dict +from urllib.parse import urlparse + +# Third-Party Libraries +import docopt +from schema import And, Schema, SchemaError, Use + +from ._version import __version__ +from .hasher import UrlHasher + + +def main(): + """Return the hash(es) and information from the requested URL(s).""" + args: Dict[str, str] = docopt.docopt(__doc__, version=__version__) + schema: Schema = Schema( + { + "--hash-algorithm": And( + str, + Use(str.lower), + lambda a: a in hashlib.algorithms_available, + error=f"Invalid algorithm provided. Must be one of: {sorted(hashlib.algorithms_available)}", + ), + str: object, + } + ) + + try: + validated_args: Dict[str, Any] = schema.validate(args) + except SchemaError as err: + # Exit because one or more of the arguments were invalid + print(err, file=sys.stderr) + return 1 + + if validated_args["--list-algorithms"]: + print("Algorithms supported for this platform:") + for algo in sorted(hashlib.algorithms_available): + print(f"- {algo}") + return 0 + + if validated_args["--json"]: + results = [] + + for url in validated_args["URL"]: + # Prefer an HTTPS URL + parsed_url = urlparse(url, "https") + if not parsed_url.netloc: + parsed_url = parsed_url._replace(netloc=parsed_url.path, path="") + + hasher = UrlHasher(validated_args["--hash-algorithm"]) + url_results = hasher.hash_url(parsed_url.geturl()) + + if validated_args["--json"]: + # We cannot guarantee that the contents are serializable, so they are + # excluded from JSON results. + results.append( + { + "content_type": url_results.content_type, + "contents_hash": url_results.hash, + "is_redirected": url_results.is_redirect, + "requested_url": url, + "retrieved_url": url_results.visited_url, + "status_code": url_results.status, + } + ) + else: + print(f"Results for {url}:") + print(f" Retrieved URL - '{url_results.visited_url}'") + print(f" Status code - '{url_results.status}'") + print(f" Content type - '{url_results.content_type}'") + if validated_args["--show-redirect"]: + print(f" Redirect - {url_results.is_redirect}") + print( + f" Hash ({validated_args['--hash-algorithm']}) of contents - {url_results.hash}" + ) + if validated_args["--show-content"]: + print() + print("Contents:") + print(url_results.contents) + print() + + if validated_args["--json"]: + print(dumps(results, separators=(",", ":"), sort_keys=True)) + + return 0 diff --git a/src/hash_http_content/hasher.py b/src/hash_http_content/hasher.py new file mode 100644 index 0000000..f858b14 --- /dev/null +++ b/src/hash_http_content/hasher.py @@ -0,0 +1,312 @@ +"""Functionality to get a hash of an HTTP URL's visible content.""" + +# Standard Python Libraries +import asyncio +import hashlib +import json +import logging +import tempfile +from typing import Any, Callable, Dict, NamedTuple, Union + +# Third-Party Libraries +from bs4 import BeautifulSoup +from bs4.element import Comment, PageElement +from pyppeteer import launch +from pyppeteer.browser import Browser +from pyppeteer.errors import TimeoutError +from pyppeteer.page import Page +import requests +from requests.exceptions import ConnectionError, Timeout + + +def get_hasher(hash_algorithm: str) -> "hashlib._Hash": + """Get a hashing object.""" + logging.debug("Creating a %s hashing object", hash_algorithm) + # Not all implementations support the "usedforsecurity" keyword argument, + # which is used to indicate that the algorithm is being used for non-security + # related tasks. This is required for some algorithms on FIPS systems. + try: + hasher = getattr(hashlib, hash_algorithm)(usedforsecurity=False) + except AttributeError: + # There is no named constructor for the desired hashing algorithm + try: + # mypy relies on typeshed (https://github.com/python/typeshed) for + # stdlib type hinting, but it does not have the correct type hints for + # hashlib.new(). The PR I submitted to fix them + # (https://github.com/python/typeshed/pull/4973) was approved, but I + # am not sure if mypy will still have issues with the usage of this + # keyword in non Python 3.9 (when the usedforsecurity kwarg was added) + # environments. I believe the earliest I can test this will be in mypy + # v0.900, and I have made + # https://github.com/cisagov/hash-http-content/issues/3 to document + # the status of this workaround. + # hasher = hashlib.new(hash_algorithm, usedforsecurity=False) + hasher = getattr(hashlib, "new")(hash_algorithm, usedforsecurity=False) + except TypeError: + hasher = hashlib.new(hash_algorithm) + except TypeError: + hasher = getattr(hashlib, hash_algorithm)() + return hasher + + +def get_hash_digest(hash_algorithm: str, contents: bytes) -> str: + """Get a hex digest representing a hash of the given contents.""" + logging.debug( + "Generating a %s digest for provided content of length %d", + hash_algorithm, + len(contents), + ) + hasher: "hashlib._Hash" = get_hasher(hash_algorithm) + hasher.update(contents) + return hasher.hexdigest() + + +class HandlerResult(NamedTuple): + """Named tuple to store the result of a handler call.""" + + hash: str + contents: bytes + + +class UrlResult(NamedTuple): + """Named tuple to store the result of a SiteHasher.hash_url() call.""" + + status: int + visited_url: str + is_redirect: bool + content_type: str + hash: str + contents: bytes + + +class UrlHasher: + """Provide functionality to get the hash digest of a given URL.""" + + def __init__( + self, + hash_algorithm: str, + encoding: str = "utf-8", + browser_options: Dict[str, Any] = {}, + ): + """Initialize an instance of this class.""" + logging.debug("Initializing UrlHasher object") + default_browser_options = {"headless": True} + logging.debug("Default browser options: %s", default_browser_options) + + # Number of retries + self._retries: int = 3 + logging.debug("Using retry value of '%d'", self._retries) + + # Timeout in seconds + self._timeout: int = 5 + logging.debug("Using request timeout limit of '%d' seconds", self._timeout) + + self.__browser_options: Dict[str, Any] = { + **default_browser_options, + **browser_options, + } + logging.debug("Using browser options: %s", self.__browser_options) + + self._browser: Browser = None + self._browser_page: Page = None + self._default_encoding: str = encoding + self._hash_algorithm: str = hash_algorithm + + logging.debug("Using default encoding '%s'", self._default_encoding) + logging.debug("Using hashing algorithm '%s'", self._hash_algorithm) + + self._handlers: Dict[str, Callable] = { + "application/json": self._handle_json, + "text/html": self._handle_html, + "text/plain": self._handle_plaintext, + } + + def __init_browser(self): + """Initialize the pyppeteer Browser if it does not exist.""" + if not self._browser: + logging.debug("Initializing Browser object") + self._browser = asyncio.get_event_loop().run_until_complete( + launch(**self.__browser_options) + ) + self._browser_page = asyncio.get_event_loop().run_until_complete( + self._browser.newPage() + ) + + def _is_visible_element(self, element: PageElement) -> bool: + """Return True if the given website element would be visible.""" + discard_tags = ["[document]", "script", "style"] + if isinstance(element, Comment): + logging.debug("Skipping Comment tag") + return False + if element.parent.name in discard_tags: + logging.debug("Skipping element in parent tag '%s'", element.parent.name) + return False + return True + + def _handle_raw_bytes(self, contents: bytes, encoding: str) -> HandlerResult: + """Handle bytes in an unspecified format or encoding.""" + logging.debug("Handling content as raw bytes") + digest: str = get_hash_digest(self._hash_algorithm, contents) + return HandlerResult(digest, contents) + + def _handle_plaintext(self, contents: bytes, encoding: str) -> HandlerResult: + """Handle plaintext contents.""" + logging.debug("Handling content as plaintext") + if encoding: + contents = bytes(contents.decode(encoding), self._default_encoding) + digest: str = get_hash_digest(self._hash_algorithm, contents) + return HandlerResult(digest, contents) + + def _handle_json(self, contents: bytes, encoding: str) -> HandlerResult: + """Handle JSON contents.""" + logging.debug("Handling content as JSON") + # Translate the original encoding to utf-8 + if encoding: + json_str = str(contents, encoding) + else: + json_str = str(contents, self._default_encoding) + + json_data = json.loads(json_str) + # Sort the keys to make this deterministic + json_bytes = bytes( + json.dumps(json_data, separators=(",", ":"), sort_keys=True), + self._default_encoding, + ) + + digest: str = get_hash_digest(self._hash_algorithm, json_bytes) + + return HandlerResult(digest, json_bytes) + + def _handle_html(self, contents: bytes, encoding: str) -> HandlerResult: + """Handle an HTML page.""" + logging.debug("Handling content as HTML") + self.__init_browser() + + # Until the Page.setContent() method allows options, writing the HTML + # document to a temporary file and navigating to it with Page.goto() is + # the only way to leverage the `waitUntil` option to give time for the + # page's contents to load. Support for options in Page.setContent() is + # expected in pyppeteer when the puppeteer v2.1.1 feature parity rewrite + # is completed per: + # https://github.com/pyppeteer/pyppeteer/issues/134 for more information + with tempfile.NamedTemporaryFile(suffix=".html") as fp: + # Output to a temporary file so it's available to the browser + fp.write(contents) + fp.flush() + + logging.debug("Navigating to temporary file '%s'", fp.name) + + try: + # Wait for everything to load after navigating to the temporary file + asyncio.get_event_loop().run_until_complete( + self._browser_page.goto( + f"file://{fp.name}", + { + # Wait for load and networkidle2 events up to the + # value of self_timeout (in milliseconds) + "timeout": self._timeout * 1000, + "waitUntil": ["load", "networkidle2"], + }, + ) + ) + # Waiting for load and networkidle2 events to occur exceeded the + # configured timeout + except TimeoutError: + pass + page_contents: str = asyncio.get_event_loop().run_until_complete( + self._browser_page.content() + ) + + # Try to guarantee our preferred encoding + page_contents = bytes(page_contents.encode(self._default_encoding)).decode( + self._default_encoding + ) + + logging.debug("Parsing rendered page contents") + soup: BeautifulSoup = BeautifulSoup(page_contents, "lxml") + text_elements = soup.find_all(text=True) + visible_text_elements = filter(self._is_visible_element, text_elements) + visible_text: str = " ".join( + t.strip() for t in visible_text_elements if t.strip() + ) + visible_bytes: bytes = bytes(visible_text, self._default_encoding) + + digest: str = get_hash_digest(self._hash_algorithm, visible_bytes) + + return HandlerResult(digest, visible_bytes) + + def hash_url(self, url: str, verify: Union[bool, str] = True) -> UrlResult: + """Get a hash of the contents of the provided URL.""" + logging.debug("Hashing provided URL '%s'", url) + + # These values were chosen to keep in line with the type of redirection + # that indicates the desired resource is at a different URI per + # https://tools.ietf.org/html/rfc7231#section-6.4 + # 1. Redirects that indicate the resource might be available at a + # different URI, as provided by the Location field, as in the + # status codes 301 (Moved Permanently), 302 (Found), and 307 + # (Temporary Redirect). + # This follows the logic in the creation of status code 308 per + # https://tools.ietf.org/html/rfc7238#section-1 + redirect_status_codes = [301, 302, 307, 308] + + # Attempt to retrieve the given URL, retrying self._retries times before + # raising an exception + get_tries = 0 + while True: + try: + resp = requests.get(url, timeout=self._timeout, verify=verify) + break + except (ConnectionError, Timeout) as err: + logging.debug( + "Encountered a(n) %s exception while attempting to GET from '%s'", + type(err).__name__, + url, + ) + get_tries += 1 + if get_tries <= self._retries: + logging.warning( + "Performing retry %d/%d for '%s'", get_tries, self._retries, url + ) + else: + raise err + + # https://tools.ietf.org/html/rfc7231#section-3.1.1.5 + content_type: str = ( + resp.headers.get("content-type", "application/octet-stream").strip().lower() + ) + + # Pull off any parameters included + if ";" in content_type: + content_type = content_type.split(";", 1)[0] + logging.debug("Using content type '%s'", content_type) + + logging.debug("Checking for a redirect in the request") + is_redirect = False + for r in resp.history: + if r.status_code in redirect_status_codes: + is_redirect = True + break + + processed: HandlerResult + # If the content appears to be text, we should fall back to processing it + # as plaintext instead of raw bytes. + if resp.apparent_encoding == "ascii": + # Default to processing as plaintext if no appropriate handler is found + processed = self._handlers.get(content_type, self._handle_plaintext)( + resp.content, resp.encoding + ) + else: + # Default to processing as raw bytes if no appropriate handler is found + processed = self._handlers.get(content_type, self._handle_raw_bytes)( + resp.content, resp.encoding + ) + + return UrlResult( + resp.status_code, + resp.url, + is_redirect, + content_type, + processed.hash, + processed.contents, + ) diff --git a/tests/files/testing.bin b/tests/files/testing.bin new file mode 100644 index 0000000..3fe7cac --- /dev/null +++ b/tests/files/testing.bin @@ -0,0 +1 @@ +Þ­¾ï diff --git a/tests/files/testing.json b/tests/files/testing.json new file mode 100644 index 0000000..7645927 --- /dev/null +++ b/tests/files/testing.json @@ -0,0 +1,4 @@ +{ + "motto": "Commit today, secure tomorrow.", + "org": "cisagov" +} diff --git a/tests/files/testing.txt b/tests/files/testing.txt new file mode 100644 index 0000000..22b1aea --- /dev/null +++ b/tests/files/testing.txt @@ -0,0 +1 @@ +Commit today, secure tomorrow. diff --git a/tests/files/testing_dynamic.html b/tests/files/testing_dynamic.html new file mode 100644 index 0000000..cde8e65 --- /dev/null +++ b/tests/files/testing_dynamic.html @@ -0,0 +1,16 @@ + + + Example Page + + + +
+ Example text! +
+ + diff --git a/tests/files/testing_html_dynamic.bin b/tests/files/testing_html_dynamic.bin new file mode 100644 index 0000000..0b74935 --- /dev/null +++ b/tests/files/testing_html_dynamic.bin @@ -0,0 +1 @@ +Example Page Dynamic example text! diff --git a/tests/files/testing_html_static.bin b/tests/files/testing_html_static.bin new file mode 100644 index 0000000..02f96f0 --- /dev/null +++ b/tests/files/testing_html_static.bin @@ -0,0 +1 @@ +Example Page Example text! diff --git a/tests/files/testing_static.html b/tests/files/testing_static.html new file mode 100644 index 0000000..09d18b7 --- /dev/null +++ b/tests/files/testing_static.html @@ -0,0 +1,10 @@ + + + Example Page + + +
+ Example text! +
+ + diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..96b3817 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,232 @@ +#!/usr/bin/env pytest -vs +"""Tests for hash_http_content command line interface.""" + +# Standard Python Libraries +import hashlib +import json +import os +import sys +from unittest.mock import patch + +# Third-Party Libraries +import pytest + +# cisagov Libraries +from hash_http_content import __version__, cli + +# define sources of version strings +RELEASE_TAG = os.getenv("RELEASE_TAG") +PROJECT_VERSION = __version__ + + +def test_stdout_version(capsys): + """Verify that version string sent to stdout agrees with the module version.""" + with pytest.raises(SystemExit): + with patch.object(sys, "argv", ["bogus", "--version"]): + cli.main() + captured = capsys.readouterr() + assert ( + captured.out == f"{PROJECT_VERSION}\n" + ), "standard output by '--version' should agree with module.__version__" + + +def test_running_as_module(capsys): + """Verify that the __main__.py file loads correctly.""" + with pytest.raises(SystemExit): + with patch.object(sys, "argv", ["bogus", "--version"]): + # F401 is a "Module imported but unused" warning. This import + # emulates how this project would be run as a module. The only thing + # being done by __main__ is importing the main entrypoint of the + # package and running it, so there is nothing to use from this + # import. As a result, we can safely ignore this warning. + # cisagov Libraries + import hash_http_content.__main__ # noqa: F401 + captured = capsys.readouterr() + assert ( + captured.out == f"{PROJECT_VERSION}\n" + ), "standard output by '--version' should agree with module.__version__" + + +@pytest.mark.skipif( + RELEASE_TAG in [None, ""], reason="this is not a release (RELEASE_TAG not set)" +) +def test_release_version(): + """Verify that release tag version agrees with the module version.""" + assert ( + RELEASE_TAG == f"v{PROJECT_VERSION}" + ), "RELEASE_TAG does not match the project version" + + +def test_list_algorithms(capsys): + """Validate a matching list of algorithms is returned.""" + expected_output = "Algorithms supported for this platform:\n" + "\n".join( + f"- {a}" for a in sorted(hashlib.algorithms_available) + ) + with patch.object(sys, "argv", ["bogus", "--list-algorithms"]): + return_code = cli.main() + captured = capsys.readouterr() + assert return_code == 0 + assert captured.out.rstrip() == expected_output + + +def test_invalid_hash_type(capsys): + """Validate that an unsupported hash type causes an error.""" + expected_output = f"Invalid algorithm provided. Must be one of: {sorted(hashlib.algorithms_available)}" + with patch.object( + sys, "argv", ["bogus", "--hash-algorithm", "nonsensical", "localhost"] + ): + return_code = cli.main() + captured = capsys.readouterr() + assert return_code == 1 + assert captured.err.rstrip() == expected_output + + +def test_full_run_no_http_schema(capsys): + """Validate output for a given URL with no schema.""" + expected_output = "\n".join( + [ + "Results for example.com:", + " Retrieved URL - 'https://example.com/'", + " Status code - '200'", + " Content type - 'text/html'", + " Hash (sha256) of contents - 6fba1a7167467b6dd3da090b5ec437c1b811dd2c2133504a448fb7ca59d390c2", + ] + ) + with patch.object(sys, "argv", ["bogus", "example.com"]): + return_code = cli.main() + captured = capsys.readouterr() + + assert return_code == 0 + assert captured.out.rstrip() == expected_output + + +def test_full_run_with_http_schema(capsys): + """Validate output for a given URL with a provided schema.""" + expected_output = "\n".join( + [ + "Results for https://example.com:", + " Retrieved URL - 'https://example.com/'", + " Status code - '200'", + " Content type - 'text/html'", + " Hash (sha256) of contents - 6fba1a7167467b6dd3da090b5ec437c1b811dd2c2133504a448fb7ca59d390c2", + ] + ) + with patch.object(sys, "argv", ["bogus", "https://example.com"]): + return_code = cli.main() + captured = capsys.readouterr() + + assert return_code == 0 + assert captured.out.rstrip() == expected_output + + +def test_full_run_no_redirect(capsys): + """Validate output for a given URL that has no redirect.""" + expected_output = [ + "Results for http://example.com:", + " Retrieved URL - 'http://example.com/'", + " Status code - '200'", + " Content type - 'text/html'", + " Redirect - False", + ] + with patch.object(sys, "argv", ["bogus", "--show-redirect", "http://example.com"]): + return_code = cli.main() + captured = capsys.readouterr() + captured_lines = captured.out.split("\n") + + assert return_code == 0 + + for i, value in enumerate(expected_output): + assert captured_lines[i] == value + + +def test_full_run_with_redirect(capsys): + """Validate output for a given URL that has a redirect.""" + expected_output = [ + "Results for http://rules.ncats.cyber.dhs.gov:", + " Retrieved URL - 'https://rules.ncats.cyber.dhs.gov/'", + " Status code - '200'", + " Content type - 'text/plain'", + " Redirect - True", + ] + with patch.object( + sys, "argv", ["bogus", "--show-redirect", "http://rules.ncats.cyber.dhs.gov"] + ): + return_code = cli.main() + captured = capsys.readouterr() + captured_lines = captured.out.split("\n") + + assert return_code == 0 + + for i, value in enumerate(expected_output): + assert captured_lines[i] == value + + +def test_full_run_with_content(capsys): + """Validate output with content for a given URL.""" + expected_output = "\n".join( + [ + "Results for https://example.com:", + " Retrieved URL - 'https://example.com/'", + " Status code - '200'", + " Content type - 'text/html'", + " Hash (sha256) of contents - 6fba1a7167467b6dd3da090b5ec437c1b811dd2c2133504a448fb7ca59d390c2", + "", + "Contents:", + r"b'Example Domain Example Domain This domain is for use in illustrative examples in documents. You may use this\n domain in literature without prior coordination or asking for permission. More information...'", + ] + ) + with patch.object(sys, "argv", ["bogus", "--show-content", "https://example.com"]): + return_code = cli.main() + captured = capsys.readouterr() + + assert return_code == 0 + assert captured.out.rstrip() == expected_output + + +def test_full_run_check_redirect_with_content(capsys): + """Validate output with content for a given URL with redirect check.""" + expected_output = "\n".join( + [ + "Results for https://example.com:", + " Retrieved URL - 'https://example.com/'", + " Status code - '200'", + " Content type - 'text/html'", + " Redirect - False", + " Hash (sha256) of contents - 6fba1a7167467b6dd3da090b5ec437c1b811dd2c2133504a448fb7ca59d390c2", + "", + "Contents:", + r"b'Example Domain Example Domain This domain is for use in illustrative examples in documents. You may use this\n domain in literature without prior coordination or asking for permission. More information...'", + ] + ) + with patch.object( + sys, + "argv", + ["bogus", "--show-content", "--show-redirect", "https://example.com"], + ): + return_code = cli.main() + captured = capsys.readouterr() + + assert return_code == 0 + assert captured.out.rstrip() == expected_output + + +def test_full_run_json_output(capsys): + """Validate JSON output for a given URL.""" + expected_result = [ + { + "content_type": "text/html", + "contents_hash": "6fba1a7167467b6dd3da090b5ec437c1b811dd2c2133504a448fb7ca59d390c2", + "is_redirected": False, + "requested_url": "https://example.com", + "retrieved_url": "https://example.com/", + "status_code": 200, + } + ] + with patch.object(sys, "argv", ["bogus", "--json", "https://example.com"]): + return_code = cli.main() + captured = capsys.readouterr() + + captured_result = json.loads(captured.out) + + assert return_code == 0 + assert captured_result == expected_result diff --git a/tests/test_example.py b/tests/test_example.py deleted file mode 100644 index 3a22848..0000000 --- a/tests/test_example.py +++ /dev/null @@ -1,128 +0,0 @@ -#!/usr/bin/env pytest -vs -"""Tests for example.""" - -# Standard Python Libraries -import logging -import os -import sys -from unittest.mock import patch - -# Third-Party Libraries -import pytest - -# cisagov Libraries -import example - -div_params = [ - (1, 1, 1), - (2, 2, 1), - (0, 1, 0), - (8, 2, 4), -] - -log_levels = ( - "debug", - "info", - "warning", - "error", - "critical", -) - -# define sources of version strings -RELEASE_TAG = os.getenv("RELEASE_TAG") -PROJECT_VERSION = example.__version__ - - -def test_stdout_version(capsys): - """Verify that version string sent to stdout agrees with the module version.""" - with pytest.raises(SystemExit): - with patch.object(sys, "argv", ["bogus", "--version"]): - example.example.main() - captured = capsys.readouterr() - assert ( - captured.out == f"{PROJECT_VERSION}\n" - ), "standard output by '--version' should agree with module.__version__" - - -def test_running_as_module(capsys): - """Verify that the __main__.py file loads correctly.""" - with pytest.raises(SystemExit): - with patch.object(sys, "argv", ["bogus", "--version"]): - # F401 is a "Module imported but unused" warning. This import - # emulates how this project would be run as a module. The only thing - # being done by __main__ is importing the main entrypoint of the - # package and running it, so there is nothing to use from this - # import. As a result, we can safely ignore this warning. - # cisagov Libraries - import example.__main__ # noqa: F401 - captured = capsys.readouterr() - assert ( - captured.out == f"{PROJECT_VERSION}\n" - ), "standard output by '--version' should agree with module.__version__" - - -@pytest.mark.skipif( - RELEASE_TAG in [None, ""], reason="this is not a release (RELEASE_TAG not set)" -) -def test_release_version(): - """Verify that release tag version agrees with the module version.""" - assert ( - RELEASE_TAG == f"v{PROJECT_VERSION}" - ), "RELEASE_TAG does not match the project version" - - -@pytest.mark.parametrize("level", log_levels) -def test_log_levels(level): - """Validate commandline log-level arguments.""" - with patch.object(sys, "argv", ["bogus", f"--log-level={level}", "1", "1"]): - with patch.object(logging.root, "handlers", []): - assert ( - logging.root.hasHandlers() is False - ), "root logger should not have handlers yet" - return_code = example.example.main() - assert ( - logging.root.hasHandlers() is True - ), "root logger should now have a handler" - assert return_code == 0, "main() should return success (0)" - - -def test_bad_log_level(): - """Validate bad log-level argument returns error.""" - with patch.object(sys, "argv", ["bogus", "--log-level=emergency", "1", "1"]): - return_code = example.example.main() - assert return_code == 1, "main() should return failure" - - -@pytest.mark.parametrize("dividend, divisor, quotient", div_params) -def test_division(dividend, divisor, quotient): - """Verify division results.""" - result = example.example_div(dividend, divisor) - assert result == quotient, "result should equal quotient" - - -@pytest.mark.slow -def test_slow_division(): - """Example of using a custom marker. - - This test will only be run if --runslow is passed to pytest. - Look in conftest.py to see how this is implemented. - """ - # Standard Python Libraries - import time - - result = example.example_div(256, 16) - time.sleep(4) - assert result == 16, "result should equal be 16" - - -def test_zero_division(): - """Verify that division by zero throws the correct exception.""" - with pytest.raises(ZeroDivisionError): - example.example_div(1, 0) - - -def test_zero_divisor_argument(): - """Verify that a divisor of zero is handled as expected.""" - with patch.object(sys, "argv", ["bogus", "1", "0"]): - return_code = example.example.main() - assert return_code == 1, "main() should exit with error" diff --git a/tests/test_hasher.py b/tests/test_hasher.py new file mode 100644 index 0000000..c6a728a --- /dev/null +++ b/tests/test_hasher.py @@ -0,0 +1,311 @@ +#!/usr/bin/env pytest -vs +"""Tests for hash_http_content URL hashing functionality.""" + +# Standard Python Libraries +import hashlib +import json +import os.path + +# Third-Party Libraries +from bs4 import Comment, Tag +import pytest + +# cisagov Libraries +import hash_http_content + +# Hashing algorithm to use for testing. +HASH_ALGORITHM = "sha256" +# Alternate encoding to verify conversion to utf-8 +ALT_ENCODING = "utf-16" +# Files with test values +TEST_VALUE_SOURCES = { + "html_dynamic": "tests/files/testing_dynamic.html", + "html_dynamic_bytes": "tests/files/testing_html_dynamic.bin", + "html_static": "tests/files/testing_static.html", + "html_static_bytes": "tests/files/testing_html_static.bin", + "json": "tests/files/testing.json", + "plaintext": "tests/files/testing.txt", + "raw_bytes": "tests/files/testing.bin", +} +# Digests expected for each test +EXPECTED_DIGESTS = { + "html_dynamic": "3a6f9739ba635b5bfe57246ebf137f00df890a200b6dca01388f05d81479098a", + "html_static": "206794946ac5783ddbaa03713fe9eba7be069d970731b20a7f6cadb5d845680f", + "json": "25ba3da8ab38c80c8d1e6162caeb1924a777b04b7351ce31176f7bef9cd6584d", + "plaintext": "d09107e7b64bee9d7375d734a7bfc9cc316d7c48695722be3ec2218659d59be5", + "raw_bytes": "5f78c33274e43fa9de5659265c1d917e25c03722dcb0b8d27db8d5feaa813953", +} + + +@pytest.mark.parametrize("algorithm", hashlib.algorithms_available) +def test_get_hasher(algorithm): + """Verify that the desired hashing object is created.""" + assert hash_http_content.hasher.get_hasher(algorithm).name == algorithm + + +def test_hash_hash_digest(): + """Verify that an expected hash digest is generated.""" + expected_digest = "d5f8f30f25636b1f3efc2f52a0a8724c9ffa280875a1fc9a92cfe3f644b7d5c3" + digest = hash_http_content.hasher.get_hash_digest(HASH_ALGORITHM, b"cisagov") + assert digest == expected_digest + + +def test_init_browser(): + """Ensure that a browser object is initialized.""" + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + assert hasher._browser is None + # Call through name mangling + hasher._UrlHasher__init_browser() + assert hasher._browser is not None + + +@pytest.mark.parametrize( + "tag,expected", + [ + (Tag(name="html", parent=Tag(name="[document]")), False), + (Tag(name="", parent=Tag(name="script")), False), + (Tag(name="", parent=Tag(name="style")), False), + (Comment("Testing page."), False), + (Tag(name="", parent=Tag(name="title")), True), + (Tag(name="", parent=Tag(name="p")), True), + ], +) +def test__is_visible_element(tag, expected): + """Verify that elements are correctly identified as visible or not.""" + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + assert hasher._is_visible_element(tag) == expected + + +def test_handle_raw_bytes(): + """Test the handler for bytes of an unspecified format and encoding.""" + with open(TEST_VALUE_SOURCES["raw_bytes"], "rb") as f: + # Work around the end-of-file-fixer pre-commit hook + test_bytes = f.read().rstrip() + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher._handle_raw_bytes(test_bytes, None) + + assert result.hash == EXPECTED_DIGESTS["raw_bytes"] + assert result.contents == test_bytes + + +def test_handle_plaintext(): + """Test the handler with plaintext in utf-8 encoding.""" + with open(TEST_VALUE_SOURCES["plaintext"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, "utf-8") + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher._handle_plaintext(test_bytes, None) + + assert result.hash == EXPECTED_DIGESTS["plaintext"] + assert result.contents == test_bytes + + +def test_handle_plaintext_with_encoding(): + """Test the handler converting to utf-8 encoding.""" + with open(TEST_VALUE_SOURCES["plaintext"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, ALT_ENCODING) + expected_bytes = bytes(test_value, "utf-8") + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher._handle_plaintext(test_bytes, ALT_ENCODING) + + assert result.hash == EXPECTED_DIGESTS["plaintext"] + assert result.contents == expected_bytes + + +def test_handle_json(): + """Test the handler with JSON in utf-8 encoding.""" + with open(TEST_VALUE_SOURCES["json"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, "utf-8") + expected_bytes = bytes( + json.dumps(json.loads(test_value), separators=(",", ":"), sort_keys=True), + "utf-8", + ) + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher._handle_json(test_bytes, None) + + assert result.hash == EXPECTED_DIGESTS["json"] + assert result.contents == expected_bytes + + +def test_handle_json_with_encoding(): + """Test the handler converting JSON to utf-8 encoding.""" + with open(TEST_VALUE_SOURCES["json"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, ALT_ENCODING) + expected_bytes = bytes( + json.dumps(json.loads(test_value), separators=(",", ":"), sort_keys=True), + "utf-8", + ) + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher._handle_json(test_bytes, ALT_ENCODING) + + assert result.hash == EXPECTED_DIGESTS["json"] + assert result.contents == expected_bytes + + +def test_handle_html_static(): + """Test the handler with static HTML in utf-8 encoding.""" + with open(TEST_VALUE_SOURCES["html_static"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, "utf-8") + with open(TEST_VALUE_SOURCES["html_static_bytes"], "rb") as f: + # Work around the end-of-file-fixer pre-commit hook + expected_bytes = f.read().rstrip() + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher._handle_html(test_bytes, None) + + assert result.hash == EXPECTED_DIGESTS["html_static"] + assert result.contents == expected_bytes + + +def test_handle_html_static_with_encoding(): + """Test the handler converting static HTML to utf-8 encoding.""" + with open(TEST_VALUE_SOURCES["html_static"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, ALT_ENCODING) + with open(TEST_VALUE_SOURCES["html_static_bytes"], "rb") as f: + # Work around the end-of-file-fixer pre-commit hook + expected_bytes = f.read().rstrip() + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher._handle_html(test_bytes, ALT_ENCODING) + + assert result.hash == EXPECTED_DIGESTS["html_static"] + assert result.contents == expected_bytes + + +def test_handle_html_dynamic(): + """Test the handler with dynamic HTML in utf-8 encoding.""" + with open(TEST_VALUE_SOURCES["html_dynamic"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, "utf-8") + with open(TEST_VALUE_SOURCES["html_dynamic_bytes"], "rb") as f: + # Work around the end-of-file-fixer pre-commit hook + expected_bytes = f.read().rstrip() + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher._handle_html(test_bytes, None) + + assert result.hash == EXPECTED_DIGESTS["html_dynamic"] + assert result.contents == expected_bytes + + +def test_handle_html_dynmamic_with_encoding(): + """Test the handler converting dynamic HTML to utf-8 encoding.""" + with open(TEST_VALUE_SOURCES["html_dynamic"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, ALT_ENCODING) + with open(TEST_VALUE_SOURCES["html_dynamic_bytes"], "rb") as f: + # Work around the end-of-file-fixer pre-commit hook + expected_bytes = f.read().rstrip() + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher._handle_html(test_bytes, ALT_ENCODING) + + assert result.hash == EXPECTED_DIGESTS["html_dynamic"] + assert result.contents == expected_bytes + + +def test_hash_url_html_status_200(): + """Test againt a URL that returns HTML content from an existing location.""" + test_url = "https://example.com" + expected_digest = "6fba1a7167467b6dd3da090b5ec437c1b811dd2c2133504a448fb7ca59d390c2" + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher.hash_url(test_url) + + assert result.status == 200 + assert result.is_redirect is False + assert result.hash == expected_digest + + +def test_hash_url_html_status_404(): + """Test against a URL that returns HTML content from a missing location.""" + test_url = "https://example.com/404" + expected_digest = "6fba1a7167467b6dd3da090b5ec437c1b811dd2c2133504a448fb7ca59d390c2" + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher.hash_url(test_url) + + assert result.status == 404 + assert result.is_redirect is False + assert result.hash == expected_digest + + +def test_hash_url_with_redirect(): + """Test against a URL that redirects and has no content-type parameters.""" + test_url = "http://rules.ncats.cyber.dhs.gov" + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM) + result = hasher.hash_url(test_url) + + assert result.status == 200 + assert result.is_redirect is True + + +def test_browser_additional_options(): + """Verify that additional options are used in invoking the browser.""" + # These options are expected for an AWS Lambda style environment + options = { + "headless": True, + "args": [ + "--no-sandbox", + "--single-process", + "--disable-dev-shm-usage", + "--disable-gpu", + "--no-zygote", + ], + "executablePath": "tests/files/serverless-chrome", + } + with open(TEST_VALUE_SOURCES["plaintext"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, ALT_ENCODING) + expected_bytes = bytes(test_value, "utf-8") + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM, browser_options=options) + result = hasher._handle_plaintext(test_bytes, ALT_ENCODING) + + assert hasher._UrlHasher__browser_options == options + assert result.hash == EXPECTED_DIGESTS["plaintext"] + assert result.contents == expected_bytes + + +def test_browser_with_specified_executable(): + """Test running with the executablePath option.""" + serverless_chrome_path = "tests/files/headless-chromium" + # If this file does not exist, do not perform this test. + if not os.path.isfile(serverless_chrome_path): + pytest.skip("no serverless-chrome binary found") + + # These options are expected for an AWS Lambda style environment + options = { + "headless": True, + "args": [ + "--no-sandbox", + "--single-process", + "--disable-dev-shm-usage", + "--disable-gpu", + "--no-zygote", + ], + "executablePath": serverless_chrome_path, + } + + with open(TEST_VALUE_SOURCES["plaintext"]) as f: + test_value = f.read() + test_bytes = bytes(test_value, ALT_ENCODING) + expected_bytes = bytes(test_value, "utf-8") + + hasher = hash_http_content.UrlHasher(HASH_ALGORITHM, browser_options=options) + result = hasher._handle_plaintext(test_bytes, ALT_ENCODING) + + assert hasher._UrlHasher__browser_options == options + assert result.hash == EXPECTED_DIGESTS["plaintext"] + assert result.contents == expected_bytes