diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index aa4578a2..94a6cce8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,6 @@ # pre-commit run --all-files # Update this file: # pre-commit autoupdate -fail_fast: true repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.2.0 diff --git a/censys/cli/args.py b/censys/cli/args.py index e1147708..619883b8 100644 --- a/censys/cli/args.py +++ b/censys/cli/args.py @@ -62,7 +62,7 @@ def print_help(_: argparse.Namespace): } for command in commands.__dict__.values(): try: - include_func = getattr(command, "include") + include_func = command.include except AttributeError: continue diff --git a/censys/cli/commands/__init__.py b/censys/cli/commands/__init__.py index 2371fbbe..de130072 100644 --- a/censys/cli/commands/__init__.py +++ b/censys/cli/commands/__init__.py @@ -1,4 +1,4 @@ """Censys CLI commands.""" -from . import account, asm, config, hnri, search, view +from . import account, asm, config, hnri, search, subdomains, view -__all__ = ["account", "asm", "config", "hnri", "search", "view"] +__all__ = ["account", "asm", "config", "hnri", "search", "subdomains", "view"] diff --git a/censys/cli/commands/subdomains.py b/censys/cli/commands/subdomains.py new file mode 100644 index 00000000..0e3c378c --- /dev/null +++ b/censys/cli/commands/subdomains.py @@ -0,0 +1,97 @@ +"""Censys subdomains CLI.""" +import argparse +import json +import sys +from typing import List, Set + +from censys.cli.utils import console, err_console +from censys.common.exceptions import ( + CensysException, + CensysRateLimitExceededException, + CensysUnauthorizedException, +) +from censys.search import CensysCertificates + + +def print_subdomains(subdomains: Set[str], as_json: bool = False): + """Print subdomains. + + Args: + subdomains (Set[str]): Subdomains. + as_json (bool): Output in JSON format. + """ + if as_json: + console.print_json(json.dumps(list(subdomains))) + else: + for subdomain in subdomains: + console.print(f" - {subdomain}") + + +def cli_subdomains(args: argparse.Namespace): # pragma: no cover + """Subdomain subcommand. + + Args: + args: Argparse Namespace. + """ + subdomains = set() + try: + client = CensysCertificates(api_id=args.api_id, api_secret=args.api_secret) + certificate_query = f"parsed.names: {args.domain}" + + with err_console.status(f"Querying {args.domain} subdomains"): + certificates_search_results = client.search( + certificate_query, fields=["parsed.names"], max_records=args.max_records + ) + + # Flatten the result, and remove duplicates + for search_result in certificates_search_results: + new_subdomains: List[str] = search_result.get("parsed.names", []) + subdomains.update( + [ + subdomain + for subdomain in new_subdomains + if subdomain.endswith(args.domain) + ] + ) + + # Don't make console prints if we're in json mode + if not args.json: + if len(subdomains) == 0: + err_console.print(f"No subdomains found for {args.domain}") + return + console.print( + f"Found {len(subdomains)} unique subdomain(s) of {args.domain}" + ) + print_subdomains(subdomains, args.json) + except CensysRateLimitExceededException: + err_console.print("Censys API rate limit exceeded") + print_subdomains(subdomains, args.json) + except CensysUnauthorizedException: + err_console.print("Invalid Censys API ID or secret") + sys.exit(1) + except CensysException as e: + err_console.print(str(e)) + sys.exit(1) + + +def include(parent_parser: argparse._SubParsersAction, parents: dict): + """Include this subcommand into the parent parser. + + Args: + parent_parser (argparse._SubParsersAction): Parent parser. + parents (dict): Parent arg parsers. + """ + subdomains_parser = parent_parser.add_parser( + "subdomains", + description="enumerate subdomains", + help="enumerate subdomains", + parents=[parents["auth"]], + ) + subdomains_parser.add_argument("domain", help="The domain to scan") + subdomains_parser.add_argument( + "--max-records", type=int, default=100, help="Max records to query" + ) + subdomains_parser.add_argument( + "-j", "--json", action="store_true", help="Output in JSON format" + ) + subdomains_parser.set_defaults(func=cli_subdomains) diff --git a/censys/cli/utils.py b/censys/cli/utils.py index 4d168042..f4b70e80 100644 --- a/censys/cli/utils.py +++ b/censys/cli/utils.py @@ -88,7 +88,7 @@ def write_file( results_list: Results, file_format: Optional[str] = None, file_path: Optional[str] = None, - csv_fields: List[str] = [], + csv_fields: Optional[List[str]] = None, ): """Maps formats and writes results. @@ -107,6 +107,8 @@ def write_file( if file_format == "json": _write_json(file_path, results_list) elif file_format == "csv": + if csv_fields is None: # pragma: no cover + csv_fields = [] _write_csv(file_path, results_list, fields=csv_fields) else: _write_screen(results_list) diff --git a/poetry.lock b/poetry.lock index d671d413..f3b878e3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -496,6 +496,20 @@ pytest = ">=4.6" [package.extras] testing = ["fields", "hunter", "process-tests", "six", "pytest-xdist", "virtualenv"] +[[package]] +name = "pytest-mock" +version = "3.7.0" +description = "Thin-wrapper around the mock package for easier use with pytest" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +pytest = ">=5.0" + +[package.extras] +dev = ["pre-commit", "tox", "pytest-asyncio"] + [[package]] name = "pyupgrade" version = "2.32.1" @@ -656,7 +670,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest- [metadata] lock-version = "1.1" python-versions = "^3.7" -content-hash = "4aae9cebc10c6c07d2a75013e716bbee89e88659244f3c0235e9e7b9ecdfcf7e" +content-hash = "7914d8dc91e2976f5d4792c99f41537c8b717bc510ae92abe7deb1e35b57d357" [metadata.files] astor = [ @@ -912,6 +926,10 @@ pytest-cov = [ {file = "pytest-cov-3.0.0.tar.gz", hash = "sha256:e7f0f5b1617d2210a2cabc266dfe2f4c75a8d32fb89eafb7ad9d06f6d076d470"}, {file = "pytest_cov-3.0.0-py3-none-any.whl", hash = "sha256:578d5d15ac4a25e5f961c938b85a05b09fdaae9deef3bb6de9a6e766622ca7a6"}, ] +pytest-mock = [ + {file = "pytest-mock-3.7.0.tar.gz", hash = "sha256:5112bd92cc9f186ee96e1a92efc84969ea494939c3aead39c50f421c4cc69534"}, + {file = "pytest_mock-3.7.0-py3-none-any.whl", hash = "sha256:6cff27cec936bf81dc5ee87f07132b807bcda51106b5ec4b90a04331cba76231"}, +] pyupgrade = [ {file = "pyupgrade-2.32.1-py2.py3-none-any.whl", hash = "sha256:d874f34870abadd7536c89678f9811076d5df93c13620f90a125355a2d31fa91"}, {file = "pyupgrade-2.32.1.tar.gz", hash = "sha256:11e2c3e4e2e53a61b2d8852ed154ea5683887b6ac42561622ca8d89c94fd951a"}, diff --git a/pyproject.toml b/pyproject.toml index f7dd6f3d..1a91e9a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,6 +74,7 @@ pyupgrade = "^2.31.0" # Tests pytest = "^7.1.2" pytest-cov = "^3.0.0" +pytest-mock = "^3.7.0" responses = "^0.20.0" parameterized = "^0.8.1" # Types diff --git a/tests/cli/test_subdomains.py b/tests/cli/test_subdomains.py new file mode 100644 index 00000000..e507113a --- /dev/null +++ b/tests/cli/test_subdomains.py @@ -0,0 +1,78 @@ +import contextlib +import json +from io import StringIO +from typing import Set +from unittest.mock import patch + +import responses +from parameterized import parameterized + +from tests.utils import V1_URL, CensysTestCase + +from censys.cli import main as cli_main +from censys.cli.commands import subdomains + +TEST_DOMAINS = { + "help.censys.io", + "lp.censys.io", + "about.censys.io", + "fast.github.com", +} + + +def search_callback(request): + payload = json.loads(request.body) + resp_body = { + "results": [{"parsed.names": list(TEST_DOMAINS)}], + "metadata": {"page": payload["page"], "pages": 100}, + } + return (200, {}, json.dumps(resp_body)) + + +class CensysCliSubdomainsTest(CensysTestCase): + @parameterized.expand( + [ + (True,), + (False,), + ] + ) + def test_print_subdomains( + self, + test_json_bool: bool, + test_subdomains: Set[str] = TEST_DOMAINS, + ): + # Mock + mock_print_json = self.mocker.patch("censys.cli.utils.console.print_json") + mock_print = self.mocker.patch("censys.cli.utils.console.print") + + # Actual call + subdomains.print_subdomains(test_subdomains, test_json_bool) + + # Assertions + if test_json_bool: + mock_print_json.assert_called_once() + else: + for subdomain in test_subdomains: + mock_print.assert_any_call(f" - {subdomain}") + + @patch( + "argparse._sys.argv", + ["censys", "subdomains", "censys.io"] + CensysTestCase.cli_args, + ) + def test_search_subdomains(self): + # Test data + self.responses.add_callback( + responses.POST, + V1_URL + "/search/certificates", + callback=search_callback, + content_type="application/json", + ) + + temp_stdout = StringIO() + with contextlib.redirect_stdout(temp_stdout): + cli_main() + + cli_response = temp_stdout.getvalue().strip() + + for line in cli_response.split("\n"): + assert "censys.io" in line diff --git a/tests/utils.py b/tests/utils.py index 63539978..1d0e4ba9 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,6 +1,8 @@ import unittest +import pytest import responses +from pytest_mock import MockerFixture from censys.common.base import CensysAPIBase @@ -24,6 +26,17 @@ class CensysTestCase(unittest.TestCase): api_key, ] api: CensysAPIBase + mocker: MockerFixture + + @pytest.fixture(autouse=True) + def __inject_fixtures(self, mocker: MockerFixture): + """Injects fixtures into the test case. + + Args: + mocker (MockerFixture): pytest-mock fixture. + """ + # Inject mocker fixture + self.mocker = mocker def setUp(self): self.responses = responses.RequestsMock()