In [None]:
# Guarded Setup
DRY_RUN = True
from notebooks._utils.common import *
CLI_OK = shell_available('forensic-cli')
LAB_ID = '80_malware_and_iocs'
LAB_ROOT = lab_root(LAB_ID)
print(f'CLI available: {CLI_OK}')
print(f'Artifacts root: {LAB_ROOT}')


# Lab 80 · Malware and IoCs

We craft a deterministic indicator set and sweep a synthetic text corpus. The
workflow mirrors the behaviour of the guarded IoC scanner: refanging defanged
values, consolidating hits, and exporting JSON/CSV artefacts.


## Synthetic Indicator Catalogue
Indicators cover domains, IP addresses, cryptographic hashes, and crypto
wallets. Keys and lists are sorted to keep the catalogue deterministic.


In [None]:
IOCS_DIR = LAB_ROOT / 'iocs'
IOCS_DIR.mkdir(parents=True, exist_ok=True)

ioc_catalogue = {
    'generated_at': _ts(),
    'domains': sorted({'evil-c2.onion', 'updates.example.com', 'payload.control-node.example'}),
    'ips': sorted({'198.51.100.23', '203.0.113.42'}),
    'hashes': sorted({
        'd41d8cd98f00b204e9800998ecf8427e',
        '44d88612fea8a8f36de82e1278abb02f',
    }),
    'wallets': sorted({
        'bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080',
        '3J98t1WpEZ73CNmQviecrnyiWrnqRhWNLy',
    }),
}

ioc_json_path = json_dump_sorted(ioc_catalogue, IOCS_DIR / 'synthetic_iocs.json')

rows = [('type', 'indicator')]
for category in ('domains', 'ips', 'hashes', 'wallets'):
    for value in ioc_catalogue[category]:
        rows.append((category, value))

ioc_csv_path = IOCS_DIR / 'synthetic_iocs.csv'
csv_write_rows_sorted(rows[1:], ioc_csv_path, header=rows[0])
{'json': str(ioc_json_path), 'csv': str(ioc_csv_path)}


## Refang Helper
Many incident reports defang indicators (`hxxp`, `[.]`). The helper below
reverses these transformations so that the scanner can match the canonical
forms stored in the catalogue.


In [None]:
def refang(text: str) -> str:
    replacements = {
        'hxxp://': 'http://',
        'hxxps://': 'https://',
        '[.]': '.',
        '(.)': '.',
    }
    result = text
    for old, new in replacements.items():
        result = result.replace(old, new)
    return result

example = 'Contact hxxps://evil-c2[.]onion for updates.'
{'defanged': example, 'refanged': refang(example)}


## Synthetic Corpus
We create a small directory of text artefacts that reference indicators in a
mix of plain and defanged forms.


In [None]:
TEXT_DIR = LAB_ROOT / 'inputs' / 'logs'
TEXT_DIR.mkdir(parents=True, exist_ok=True)

samples = {
    'firewall.log': [
        '2024-03-01T12:03:12Z DROP TCP 10.0.0.5 -> 198.51.100.23:443 policy=block',
        '2024-03-01T12:05:44Z ALLOW TCP 10.0.0.5 -> 203.0.113.42:22 policy=allow',
    ],
    'beacon.txt': [
        'hxxps://evil-c2[.]onion/api/v1/status responded with 204',
        'Beacon SHA256: 44d88612fea8a8f36de82e1278abb02f',
    ],
    'wallets.txt': [
        'Analyst observed transfer to 3J98t1WpEZ73CNmQviecrnyiWrnqRhWNLy',
        'btc wallet bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080 flagged for review',
    ],
}

for name, lineset in samples.items():
    path = TEXT_DIR / name
    path.write_text('
'.join(lineset) + '
', encoding='utf-8')

sorted(str(path) for path in TEXT_DIR.iterdir())


## Deterministic IoC Scan (SDK-style)
A lightweight scanner iterates over the corpus, refangs content, and records
any indicator matches. Output is stable thanks to sorted keys and rows.


In [None]:
from dataclasses import dataclass

@dataclass(frozen=True)
class Match:
    indicator_type: str
    indicator: str
    file_path: str
    line_number: int
    line_text: str


def scan_corpus(iocs: dict, directory: Path) -> list[Match]:
    matches: list[Match] = []
    canonical = {key: set(iocs[key]) for key in ('domains', 'ips', 'hashes', 'wallets')}
    for file_path in sorted(directory.glob('**/*')):
        if not file_path.is_file():
            continue
        lines = file_path.read_text(encoding='utf-8').splitlines()
        for idx, raw_line in enumerate(lines, start=1):
            normalised = refang(raw_line.lower())
            for category, indicators in canonical.items():
                for indicator in indicators:
                    if indicator.lower() in raw_line.lower() or indicator.lower() in normalised:
                        matches.append(
                            Match(
                                indicator_type=category,
                                indicator=indicator,
                                file_path=str(file_path),
                                line_number=idx,
                                line_text=raw_line.strip(),
                            )
                        )
    matches.sort(key=lambda item: (item.indicator_type, item.indicator, item.file_path, item.line_number))
    return matches

matches = scan_corpus(ioc_catalogue, TEXT_DIR)
len(matches)


### Persist Results
Matches are exported as JSON for downstream automation and CSV for quick
review. Both include the original line context.


In [None]:
RESULTS_DIR = LAB_ROOT / 'results'
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

results_payload = {
    'generated_at': _ts(),
    'match_count': len(matches),
    'matches': [match.__dict__ for match in matches],
}

results_json = json_dump_sorted(results_payload, RESULTS_DIR / 'ioc_scan_results.json')

csv_rows = [
    {
        'indicator_type': match.indicator_type,
        'indicator': match.indicator,
        'file_path': match.file_path,
        'line_number': match.line_number,
        'line_text': match.line_text,
    }
    for match in matches
]
results_csv = RESULTS_DIR / 'ioc_scan_results.csv'
csv_write_rows_sorted(csv_rows, results_csv, header=['indicator_type', 'indicator', 'file_path', 'line_number', 'line_text'])
{'json': str(results_json), 'csv': str(results_csv)}


### CLI Mirror (optional)
A CLI invocation can mirror the behaviour. We provide the command string for
documentation and only execute it when `forensic-cli` is available.


In [None]:
if CLI_OK:
    cli_command = [
        'forensic-cli', 'modules', 'run', 'ioc_scan',
        '--param', f"input={TEXT_DIR}",
        '--param', f"ioc_file={ioc_json_path}",
        '--dry-run',
    ]
    cli_result = run_cli(cli_command)
    print(cli_result.stdout or cli_result.stderr or cli_result.returncode)
else:
    print('forensic-cli not available; documented command only.')


## Inspect Top Matches
We display a subset of matches and aggregate counts per indicator type.


In [None]:
from collections import Counter

summary = Counter(match.indicator_type for match in matches)
preview_rows = [match.__dict__ for match in matches[:5]]

inspection = {
    'counts_by_type': dict(summary),
    'sample_matches': preview_rows,
}
json_dump_sorted(inspection, RESULTS_DIR / 'ioc_scan_preview.json')
inspection


### Checkpoint
The indicator catalogue and scan artefacts should now exist.


In [None]:
expected = [
    IOCS_DIR / 'synthetic_iocs.json',
    IOCS_DIR / 'synthetic_iocs.csv',
    RESULTS_DIR / 'ioc_scan_results.json',
    RESULTS_DIR / 'ioc_scan_results.csv',
]
for item in expected:
    assert Path(item).exists(), f'Missing artefact: {item}'

IOC_REPORT = {
    'ioc_catalogue_json': str(IOCS_DIR / 'synthetic_iocs.json'),
    'ioc_results_json': str(RESULTS_DIR / 'ioc_scan_results.json'),
    'ioc_results_csv': str(RESULTS_DIR / 'ioc_scan_results.csv'),
}
json_dump_sorted(IOC_REPORT, LAB_ROOT / 'ioc_report.json')
IOC_REPORT
