# IP Analysis (BMC Discovery)

This notebook reproduces the DisMAL `ip_analysis` report using the raw CSV exports generated by DisMAL.
It loads the relevant query outputs from `raw_exports/<appliance>/` for preview and optional re-export to the standard output folders.


In [None]:
# TODO: Fix report headers

## Requirements

We rely on `pandas` for tabular wrangling and `PyYAML` for configuration.
Uncomment the next cell to install them in your environment if needed.


In [None]:
# %pip install -q pandas pyyaml

import ast
import re
from pathlib import Path

import pandas as pd
import yaml


## Configuration (from config.yaml)

Loads target details, resolves the output folder, and points to the matching raw export directory for each appliance.


In [None]:
from pathlib import Path
import re
import yaml

def load_config_params(
    start: Path,
    appliance_name: str = None,
    appliance_index: int = 0,
) -> dict:
    def _find_repo_root(start: Path) -> Path:
        for p in [start] + list(start.parents):
            if (p / 'config.yaml').exists():
                return p
        return start.parent

    def _slugify(value: str) -> str:
        return re.sub(r'[^A-Za-z0-9]+', '_', value).strip('_').lower() or 'default'

    repo_root = _find_repo_root(start)
    config_path = repo_root / 'config.yaml'

    with open(config_path, 'r') as fh:
        cfg = yaml.safe_load(fh) or {}

    apps = cfg.get('appliances') or []
    selected = None
    if isinstance(apps, list) and apps:
        if appliance_name:
            selected = next((a for a in apps if a.get('name') == appliance_name), None)
            if selected is None:
                raise ValueError(f"No appliance named '{appliance_name}' in config.yaml")
        else:
            try:
                selected = apps[int(appliance_index)]
            except Exception:
                selected = apps[0]

    target = ((selected or {}).get('target') or cfg.get('target') or '').strip()
    if not target:
        raise ValueError('config.yaml missing "target"')

    token = (((selected or {}).get('token') or cfg.get('token') or '').strip())
    token_file = (selected or {}).get('token_file') or cfg.get('token_file') or cfg.get('f_token')
    if not token and token_file:
        tf_path = Path(token_file)
        if not tf_path.is_absolute():
            tf_path = repo_root / tf_path
        with open(tf_path, 'r') as tf:
            token = tf.read().strip()
    if not token:
        token = None

    api_version = str((selected or {}).get('api_version') or cfg.get('api_version') or 'v1.14')
    verify_ssl = bool((selected or {}).get('verify_ssl', cfg.get('verify_ssl', True)))

    sanitized = target.replace('.', '_').replace(':', '_').replace('/', '_')
    output_dir = repo_root / f'output_{sanitized}'
    output_dir.mkdir(parents=True, exist_ok=True)

    export_name = ((selected or {}).get('name') or appliance_name or sanitized)
    raw_export_dir = repo_root / 'raw_exports' / _slugify(export_name)

    return {
        "repo_root": repo_root,
        "config_path": config_path,
        "cfg": cfg,
        "selected": selected,
        "target": target,
        "token": token,
        "api_version": api_version,
        "verify_ssl": verify_ssl,
        "output_dir": output_dir,
        "raw_export_dir": raw_export_dir,
    }


In [None]:
def init_appliance(appliance_name: str = "prod"):
    params = load_config_params(Path.cwd(), appliance_name=appliance_name)

    target = params["target"]
    api_version = params["api_version"]
    verify_ssl = params["verify_ssl"]
    output_dir = params["output_dir"]
    raw_export_dir = params["raw_export_dir"]

    print('Appliance Name :', appliance_name)
    print('Target         :', target)
    print('API Version    :', api_version)
    print('Verify SSL     :', verify_ssl)
    print('Raw CSV folder :', raw_export_dir)
    print('Output folder  :', output_dir)

    return {
        "params": params,
        "target": target,
        "api_version": api_version,
        "verify_ssl": verify_ssl,
        "output_dir": output_dir,
        "raw_export_dir": raw_export_dir,
        "appliance_name": appliance_name,
    }


# Initialise Instances

In [None]:
print("Initialise Prod:")
twprod = init_appliance("prod")

print("Initialise Dev:")
twdev = init_appliance("dev")


In [None]:
BASE_EXPORT_COLUMNS = ['Appliance Target', 'Appliance Name', 'Query Title']
_SLUG_PATTERN = re.compile(r'[^A-Za-z0-9]+')
DEFAULT_SCHEDULE_TITLE = 'Scheduled Scan Ranges'
DEFAULT_EXCLUDE_TITLE = 'Exclude Ranges'
DEFAULT_SCHEDULE_COUNT_TITLE = 'DiscoveryAccess Schedule Counts'
DEFAULT_UNSCANNED_TITLE = 'Unscanned Connections'

def slugify_title(value: str) -> str:
    slug = _SLUG_PATTERN.sub('_', value or '').strip('_').lower()
    return slug or 'unnamed'

def load_export(instance, query_title: str, expected_columns=None, drop_duplicates=False):
    expected_columns = expected_columns or []
    csv_path = instance['raw_export_dir'] / f"{slugify_title(query_title)}.csv"
    if not csv_path.exists():
        raise FileNotFoundError(f'Missing export for {query_title}: {csv_path}')

    df = pd.read_csv(csv_path)
    df = df.drop(columns=[c for c in BASE_EXPORT_COLUMNS if c in df.columns], errors='ignore')

    for col in expected_columns:
        if col not in df.columns:
            df[col] = pd.NA

    if drop_duplicates:
        df = df.drop_duplicates().reset_index(drop=True)

    return df

def parse_range_value(value):
    if pd.isna(value):
        return []
    if isinstance(value, list):
        return [str(v).strip() for v in value if str(v).strip()]
    if isinstance(value, str):
        stripped = value.strip()
        if stripped.startswith('[') and stripped.endswith(']'):
            try:
                parsed = ast.literal_eval(stripped)
            except Exception:
                parsed = [stripped]
            else:
                if not isinstance(parsed, (list, tuple)):
                    parsed = [parsed]
            return [str(v).strip() for v in parsed if str(v).strip()]
        if stripped:
            return [stripped]
        return []
    return [str(value).strip()]

def normalise_scan_ranges(df, column='Scan_Range'):
    df = df.copy()
    df[column] = df[column].apply(parse_range_value)
    df = df.explode(column).dropna(subset=[column]).reset_index(drop=True)
    df[column] = df[column].astype(str)
    return df

def attach_schedules(schedule_df: pd.DataFrame, range_df: pd.DataFrame) -> pd.DataFrame:
    if schedule_df.empty:
        return pd.DataFrame(columns=['endpoint', 'scan_schedules'])

    expanded_ranges = normalise_scan_ranges(range_df)
    merged = schedule_df.merge(
        expanded_ranges[['Label', 'Scan_Range']],
        left_on='endpoint',
        right_on='Scan_Range',
        how='left'
    )

    grouped = (
        merged.groupby('endpoint')['Label']
        .apply(lambda values: ', '.join(sorted({v for v in values.dropna() if v})))
        .reset_index()
    )

    grouped['scan_schedules'] = grouped['Label'].replace('', pd.NA)
    grouped = grouped.drop(columns=['Label'])
    return grouped

def add_unscanned(schedule_df: pd.DataFrame, unscanned_df: pd.DataFrame) -> pd.DataFrame:
    unscanned = unscanned_df[['endpoint']].copy()
    unscanned['scan_schedules'] = 'Seen but unscanned'
    combined = pd.concat([schedule_df, unscanned], ignore_index=True)
    combined['scan_schedules'] = combined['scan_schedules'].fillna(
        'Endpoint has previous DiscoveryAccess, but not currently scheduled.'
    )
    combined = combined.drop_duplicates(subset=['endpoint', 'scan_schedules']).reset_index(drop=True)
    return combined

def build_ip_analysis(instance):
    range_columns = ['ID', 'Label', 'Scan_Range', 'Level', 'Date_Rules']
    scheduled = load_export(instance, DEFAULT_SCHEDULE_TITLE, range_columns)
    excludes = load_export(instance, DEFAULT_EXCLUDE_TITLE, ['ID', 'Label', 'Scan_Range', 'Date_Rules'])
    if 'Level' not in excludes.columns:
        excludes['Level'] = pd.NA
    combined_ranges = pd.concat([scheduled, excludes[range_columns]], ignore_index=True, sort=False)

    schedules = load_export(instance, DEFAULT_SCHEDULE_COUNT_TITLE, ['endpoint', 'schedules'])
    if 'schedules' in schedules.columns:
        schedules['schedules'] = pd.to_numeric(schedules['schedules'], errors='coerce')

    schedule_map = attach_schedules(schedules, combined_ranges)

    unscanned = load_export(instance, DEFAULT_UNSCANNED_TITLE, ['endpoint'])
    final = add_unscanned(schedule_map, unscanned)
    final.insert(0, 'Discovery Instance', instance['target'])
    return final.sort_values(['endpoint', 'scan_schedules']).reset_index(drop=True)


## Load and Preview

Build the endpoint schedule mapping from the raw exports and preview the results.


In [None]:
from pathlib import Path
import pandas as pd

def save(df: pd.DataFrame, output_dir: Path, filename: str):
    output_csv = str(output_dir / f"{filename}.csv")
    df.to_csv(output_csv, index=False)
    print(f"Saved to {output_csv}")


In [None]:
prod_df = build_ip_analysis(twprod)
print(twprod['target'])
display(prod_df.head(5))

dev_df = build_ip_analysis(twdev)
print(twdev['target'])
display(dev_df.head(5))


In [None]:
save(prod_df, twprod['output_dir'], 'ip_analysis')
save(dev_df, twdev['output_dir'], 'ip_analysis')
