Skip to content

Commit

Permalink
feat: add offline mode (#314)
Browse files Browse the repository at this point in the history
Fixes #317
  • Loading branch information
ocervell committed Apr 23, 2024
1 parent 2a92dc8 commit 6b55e99
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 31 deletions.
26 changes: 17 additions & 9 deletions secator/__init__.py
@@ -1,3 +1,4 @@
import os
from pathlib import Path
from subprocess import call, DEVNULL
from typing import Dict, List
Expand All @@ -6,7 +7,7 @@
import requests
import yaml
from dotmap import DotMap
from pydantic import AfterValidator, BaseModel, model_validator, ValidationError, Extra
from pydantic import AfterValidator, BaseModel, model_validator, ValidationError

from secator.rich import console, console_stdout

Expand All @@ -18,7 +19,7 @@
CONFIGS_FOLDER = LIB_FOLDER / 'configs'


class StrictModel(BaseModel, extra=Extra.forbid):
class StrictModel(BaseModel, extra='forbid'):
pass


Expand Down Expand Up @@ -143,6 +144,7 @@ class SecatorConfig(StrictModel):
payloads: Payloads = Payloads()
wordlists: Wordlists = Wordlists()
addons: Addons = Addons()
offline_mode: bool = False


class Config(DotMap):
Expand Down Expand Up @@ -419,7 +421,6 @@ def apply_env_overrides(self):
prefix = "SECATOR_"

# Loop through environment variables
import os
for var in os.environ:
if var.startswith(prefix):
# Remove prefix and get the path from the key map
Expand All @@ -431,20 +432,21 @@ def apply_env_overrides(self):
# Set the new value recursively
success = self.set(path, value, set_partial=False)
if success:
console.print(f'[bold green4]{var:<50} (override success)[/]')
console.print(f'[bold green4]{var} (override success)[/]')
else:
console.print(f'[bold red]{var:<50} (override failed: cannot update value)[/]')
console.print(f'[bold red]{var} (override failed: cannot update value)[/]')
else:
console.print(f'[bold red]{var:<50} (override failed: key not found in config)[/]')
console.print(f'[bold red]{var} (override failed: key not found in config)[/]')


def download_files(data: dict, target_folder: Path, type: str):
def download_files(data: dict, target_folder: Path, offline_mode: bool, type: str):
"""Download remote files to target folder, clone git repos, or symlink local files.
Args:
data (dict): Dict of name to url or local path prefixed with 'git+' for Git repos.
target_folder (Path): Target folder for storing files or repos.
type (str): Type of files to handle.
offline_mode (bool): Offline mode.
"""
for name, url_or_path in data.items():
if url_or_path.startswith('git+'):
Expand All @@ -456,6 +458,9 @@ def download_files(data: dict, target_folder: Path, type: str):
target_path = target_folder / repo_name
if not target_path.exists():
console.print(f'[bold turquoise4]Cloning git {type} [bold magenta]{repo_name}[/] ...[/] ', end='')
if offline_mode:
console.print('[bold orange1]skipped [dim][offline[/].[/]')
continue
try:
call(['git', 'clone', git_url, str(target_path)], stderr=DEVNULL, stdout=DEVNULL)
console.print('[bold green]ok.[/]')
Expand All @@ -481,6 +486,9 @@ def download_files(data: dict, target_folder: Path, type: str):
if not target_path.exists():
try:
console.print(f'[bold turquoise4]Downloading {type} [bold magenta]{filename}[/] ...[/] ', end='')
if offline_mode:
console.print('[bold orange1]skipped [dim](offline)[/].[/]')
continue
resp = requests.get(url_or_path, timeout=3)
resp.raise_for_status()
with open(target_path, 'wb') as f:
Expand Down Expand Up @@ -516,14 +524,14 @@ def download_files(data: dict, target_folder: Path, type: str):


# Download wordlists and set defaults
download_files(CONFIG.wordlists.templates, CONFIG.dirs.wordlists, 'wordlist')
download_files(CONFIG.wordlists.templates, CONFIG.dirs.wordlists, CONFIG.offline_mode, 'wordlist')
for category, name in CONFIG.wordlists.defaults.items():
if name in CONFIG.wordlists.templates.keys():
CONFIG.wordlists.defaults[category] = str(CONFIG.wordlists.templates[name])


# Download payloads
download_files(CONFIG.payloads.templates, CONFIG.dirs.payloads, 'payload')
download_files(CONFIG.payloads.templates, CONFIG.dirs.payloads, CONFIG.offline_mode, 'payload')

# Print config
if CONFIG.debug.component == 'config':
Expand Down
19 changes: 18 additions & 1 deletion secator/cli.py
Expand Up @@ -163,6 +163,9 @@ def util():
@click.option('--number', '-n', type=int, default=1, help='Number of proxies')
def proxy(timeout, number):
"""Get random proxies from FreeProxy."""
if CONFIG.offline_mode:
console.print('[bold red]Cannot run this command in offline mode.[/]')
return
proxy = FreeProxy(timeout=timeout, rand=True, anonym=True)
for _ in range(number):
url = proxy.get()
Expand Down Expand Up @@ -191,6 +194,9 @@ def revshell(name, host, port, interface, listen, force):
# Download reverse shells JSON from repo
revshells_json = f'{CONFIG.dirs.revshells}/revshells.json'
if not os.path.exists(revshells_json) or force:
if CONFIG.offline_mode:
console.print('[bold red]Cannot run this command in offline mode.[/]')
return
ret = Command.execute(
f'wget https://raw.githubusercontent.com/freelabz/secator/main/scripts/revshells.json && mv revshells.json {CONFIG.dirs.revshells}', # noqa: E501
cls_attributes={'shell': True}
Expand Down Expand Up @@ -443,7 +449,6 @@ def publish_docker(tag, latest):
# CONFIG #
#--------#


@cli.group('config')
def _config():
"""View or edit configuration."""
Expand Down Expand Up @@ -636,6 +641,9 @@ def health(json, debug):


def run_install(cmd, title, next_steps=None):
if CONFIG.offline_mode:
console.print('[bold red]Cannot run this command in offline mode.[/]')
return
with console.status(f'[bold yellow] Installing {title}...'):
ret = Command.execute(cmd, cls_attributes={'shell': True}, print_cmd=True, print_line=True)
if ret.return_code != 0:
Expand Down Expand Up @@ -791,6 +799,9 @@ def install_ruby():
@click.argument('cmds', required=False)
def install_tools(cmds):
"""Install supported tools."""
if CONFIG.offline_mode:
console.print('[bold red]Cannot run this command in offline mode.[/]')
return
if cmds is not None:
cmds = cmds.split(',')
tools = [cls for cls in ALL_TASKS if cls.__name__ in cmds]
Expand All @@ -807,6 +818,9 @@ def install_tools(cmds):
@click.option('--force', is_flag=True)
def install_cves(force):
"""Install CVEs (enables passive vulnerability search)."""
if CONFIG.offline_mode:
console.print('[bold red]Cannot run this command in offline mode.[/]')
return
cve_json_path = f'{CONFIG.dirs.cves}/circl-cve-search-expanded.json'
if not os.path.exists(cve_json_path) or force:
with console.status('[bold yellow]Downloading zipped CVEs from cve.circl.lu ...[/]'):
Expand All @@ -832,6 +846,9 @@ def install_cves(force):
@cli.command('update')
def update():
"""[dim]Update to latest version.[/]"""
if CONFIG.offline_mode:
console.print('[bold red]Cannot run this command in offline mode.[/]')
return
info = get_version_info('secator', github_handle='freelabz/secator', version=VERSION)
latest_version = info['latest_version']
if info['status'] == 'latest':
Expand Down
8 changes: 6 additions & 2 deletions secator/installer.py
Expand Up @@ -281,8 +281,10 @@ def get_version_info(name, version_flag=None, github_handle=None, version=None):
info['version'] = version

# Get latest version
latest_version = GithubInstaller.get_latest_version(github_handle)
info['latest_version'] = latest_version
latest_version = None
if not CONFIG.offline_mode:
latest_version = GithubInstaller.get_latest_version(github_handle)
info['latest_version'] = latest_version

if location:
info['installed'] = True
Expand All @@ -295,6 +297,8 @@ def get_version_info(name, version_flag=None, github_handle=None, version=None):
info['status'] = 'current unknown'
elif not latest_version:
info['status'] = 'latest unknown'
if CONFIG.offline_mode:
info['status'] += ' [dim orange1]\[offline][/]'
else:
info['status'] = 'missing'

Expand Down
45 changes: 26 additions & 19 deletions secator/tasks/_categories.py
@@ -1,5 +1,4 @@
import json
import logging
import os

import requests
Expand All @@ -13,10 +12,9 @@
USERNAME, WORDLIST)
from secator.output_types import Ip, Port, Subdomain, Tag, Url, UserAccount, Vulnerability
from secator import CONFIG
from secator.rich import console
from secator.runners import Command
from secator.utils import debug

logger = logging.getLogger(__name__)

OPTS = {
HEADER: {'type': str, 'help': 'Custom header to add to each request in the form "KEY1:VALUE1; KEY2:VALUE2"'},
Expand Down Expand Up @@ -132,9 +130,11 @@ def lookup_local_cve(cve_id):
# def lookup_exploitdb(exploit_id):
# print('looking up exploit')
# try:
# cve_info = requests.get(f'https://exploit-db.com/exploits/{exploit_id}', timeout=5).content
# print(cve_info)
# except Exception:
# resp = requests.get(f'https://exploit-db.com/exploits/{exploit_id}', timeout=5)
# resp.raise_for_status()
# content = resp.content
# except requests.RequestException as e:
# debug(f'Failed remote query for {exploit_id} ({str(e)}).', sub='cve')
# logger.error(f'Could not fetch exploit info for exploit {exploit_id}. Skipping.')
# return None
# return cve_info
Expand All @@ -151,18 +151,21 @@ def lookup_cve(cve_id, cpes=[]):
dict: vulnerability data.
"""
cve_info = Vuln.lookup_local_cve(cve_id)

# Online CVE lookup
if not cve_info:
if CONFIG.runners.skip_cve_search:
logger.debug(f'{cve_id} not found locally, and config.runners.skip_cve_search is set: ignoring.')
debug(f'Skip remote query for {cve_id} since config.runners.skip_cve_search is set.', sub='cve')
return None
if CONFIG.offline_mode:
debug(f'Skip remote query for {cve_id} since config.offline_mode is set.', sub='cve')
return None
# logger.debug(f'{cve_id} not found locally. Use `secator install cves` to install CVEs locally.')
try:
cve_info = requests.get(f'https://cve.circl.lu/api/cve/{cve_id}', timeout=5).json()
if not cve_info:
console.print(f'Could not fetch CVE info for cve {cve_id}. Skipping.', highlight=False)
return None
except Exception:
console.print(f'Could not fetch CVE info for cve {cve_id}. Skipping.', highlight=False)
resp = requests.get(f'https://cve.circl.lu/api/cve/{cve_id}', timeout=5)
resp.raise_for_status()
cve_info = resp.json()
except requests.RequestException as e:
debug(f'Failed remote query for {cve_id} ({str(e)}).', sub='cve')
return None

# Match the CPE string against the affected products CPE FS strings from the CVE data if a CPE was passed.
Expand All @@ -178,10 +181,10 @@ def lookup_cve(cve_id, cpes=[]):
cpe_fs = cpe_obj.as_fs()
# cpe_version = cpe_obj.get_version()[0]
vulnerable_fs = cve_info['vulnerable_product']
# logger.debug(f'Matching CPE {cpe} against {len(vulnerable_fs)} vulnerable products for {cve_id}')
debug(f'Matching CPE {cpe} against {len(vulnerable_fs)} vulnerable products for {cve_id}', sub='cve')
for fs in vulnerable_fs:
if fs == cpe_fs:
# logger.debug(f'Found matching CPE FS {cpe_fs} ! The CPE is vulnerable to CVE {cve_id}')
debug(f'Found matching CPE FS {cpe_fs} ! The CPE is vulnerable to CVE {cve_id}', sub='cve')
cpe_match = True
tags.append('cpe-match')
if not cpe_match:
Expand Down Expand Up @@ -258,9 +261,13 @@ def lookup_ghsa(ghsa_id):
Returns:
dict: vulnerability data.
"""
reference = f'https://github.com/advisories/{ghsa_id}'
response = requests.get(reference)
soup = BeautifulSoup(response.text, 'lxml')
try:
resp = requests.get(f'https://github.com/advisories/{ghsa_id}', timeout=5)
resp.raise_for_status()
except requests.RequestException as e:
debug(f'Failed remote query for {ghsa_id} ({str(e)}).', sub='cve')
return None
soup = BeautifulSoup(resp.text, 'lxml')
sidebar_items = soup.find_all('div', {'class': 'discussion-sidebar-item'})
cve_id = sidebar_items[2].find('div').text.strip()
data = Vuln.lookup_cve(cve_id)
Expand Down
43 changes: 43 additions & 0 deletions tests/unit/test_offline.py
@@ -0,0 +1,43 @@
import os
import sys
import unittest


class TestOffline(unittest.TestCase):
def setUp(self):
try:
# This allows to drop the secator module loaded from other tests in order to reload the config with modified
# environment variables.
# See https://stackoverflow.com/questions/7460363/re-import-module-under-test-to-lose-context for context.
del sys.modules['secator']
except KeyError:
pass
os.environ['SECATOR_OFFLINE_MODE'] = '1'

def test_offline_cve_lookup(self):
from secator.tasks._categories import Vuln
result = Vuln.lookup_cve('CVE-2022-23491')
self.assertEqual(result, None)

def test_offline_downloads(self):
from secator import download_files, CONFIG
download_files(
{'pyproject.toml': 'https://raw.githubusercontent.com/freelabz/secator/main/pyproject.toml'},
CONFIG.dirs.data,
CONFIG.offline_mode,
'toml file'
)
path = CONFIG.dirs.data / 'pyproject.toml'
self.assertFalse(path.exists())

def test_offline_cli_install(self):
# TODO: https://github.com/ewels/rich-click/issues/188
# from secator.cli import cli
# import click
# from click.testing import CliRunner
# result = CliRunner.invoke(cli, None, None)
pass

def test_offline_cli(self):
# TODO: https://github.com/freelabz/secator/issues/319
pass

0 comments on commit 6b55e99

Please sign in to comment.