# U-01

## root 계정 원격 접속 제한


In [None]:
from __future__ import annotations

import os
import sys
from pathlib import Path


def _resolve_base_dir() -> Path:
    cwd = Path.cwd().resolve()
    if (cwd / "common").is_dir():
        return cwd
    for p in [cwd] + list(cwd.parents):
        candidate = p / "2026 주정통 코드 정리"
        if (candidate / "common").is_dir():
            return candidate
    raise RuntimeError("common 패키지 경로를 찾지 못했습니다.")


BASE_DIR = _resolve_base_dir()
if str(BASE_DIR) not in sys.path:
    sys.path.insert(0, str(BASE_DIR))


In [None]:
from common.credentials import resolve_credential
from common.db import (
    create_db_engine,
    end_run,
    get_or_create_target,
    init_db,
    open_session,
    save_outcome,
    start_run,
)
from common.ssh import open_ssh, run_command
from common.types import InspectionOutcome, TargetSpec, utcnow


In [None]:
DB_URL = os.getenv("DB_URL", "sqlite:///" + (BASE_DIR / "inspection_results.sqlite3").as_posix())
CREDENTIALS_PATH = os.getenv("CREDENTIALS_PATH", str(BASE_DIR / "credentials.json"))
TOOL_VERSION = "2026-jujeongtong-u01-u10"
GUIDE_VERSION = "2026"
RUN_NOTE = os.getenv("RUN_NOTE", "U-01 자동점검")

targets = [
    {
        "ip": "192.0.2.10",
        "port": 22,
        "vendor": "Generic",
        "model": "Linux",
        "device_type": "UNIX_SERVER",
        "os_name": "linux",
        "os_version": None,
        "credential_ref": "example_linux_admin",
        "meta": {},
    }
]

targets = [TargetSpec(**t) for t in targets]


In [None]:
from __future__ import annotations

import re
import shlex


INSPECTION_CODE = "U-01"


def _sh(cmd: str) -> str:
    return "sh -lc " + shlex.quote(cmd)


def _parse_permit_root_login(*texts: str):
    for text in texts:
        for line in text.splitlines():
            s = line.strip()
            if not s:
                continue
            if s.lower().startswith("permitrootlogin"):
                parts = s.split()
                if len(parts) >= 2:
                    return parts[1].strip().lower()

    candidates: list[str] = []
    for text in texts:
        for line in text.splitlines():
            s = line.strip()
            if not s or s.startswith("#"):
                continue
            m = re.match(r"(?i)^permitrootlogin\s+(.+)$", s)
            if m:
                candidates.append(m.group(1).split()[0].strip().lower())
    return candidates[-1] if candidates else None


def inspect_target(target: TargetSpec) -> InspectionOutcome:
    started_at = utcnow()
    commands = []
    raw = []
    parsed = {}
    status = "ERROR"
    summary = "점검 실패"
    error = None

    try:
        cred = resolve_credential(target.credential_ref, credentials_path=CREDENTIALS_PATH)
        with open_ssh(target, cred) as client:
            r_sshd_t = run_command(client, _sh("sshd -T 2>/dev/null | grep -i '^permitrootlogin' || true"))
            r_cfg = run_command(
                client,
                _sh(
                    "grep -iE '^[[:space:]]*PermitRootLogin' /etc/ssh/sshd_config 2>/dev/null "
                    "|| sudo -n grep -iE '^[[:space:]]*PermitRootLogin' /etc/ssh/sshd_config 2>/dev/null || true"
                ),
            )
            r_cfg_d = run_command(
                client,
                _sh(
                    "test -d /etc/ssh/sshd_config.d && "
                    "(grep -RHiE '^[[:space:]]*PermitRootLogin' /etc/ssh/sshd_config.d 2>/dev/null "
                    "|| sudo -n grep -RHiE '^[[:space:]]*PermitRootLogin' /etc/ssh/sshd_config.d 2>/dev/null || true) || true"
                ),
            )

            commands += [r_sshd_t.command, r_cfg.command, r_cfg_d.command]
            raw += [r_sshd_t, r_cfg, r_cfg_d]

            permit = _parse_permit_root_login(r_sshd_t.stdout, r_cfg.stdout, r_cfg_d.stdout)
            parsed["ssh_permitrootlogin"] = permit

            r_ps = run_command(client, _sh("ps -ef 2>/dev/null | egrep '[t]elnetd|[x]inetd|[i]netd' || true"))
            r_inetd = run_command(client, _sh("grep -RHiE 'telnet' /etc/inetd.conf /etc/xinetd.d 2>/dev/null || true"))
            r_securetty_exists = run_command(client, _sh("test -f /etc/securetty && echo 1 || echo 0"))
            r_securetty_pts = run_command(client, _sh("test -f /etc/securetty && (grep -nE '^[[:space:]]*pts/' /etc/securetty || true) || true"))
            r_pam_login = run_command(client, _sh("test -f /etc/pam.d/login && (grep -nE 'pam_securetty\\.so' /etc/pam.d/login || true) || true"))

            commands += [
                r_ps.command,
                r_inetd.command,
                r_securetty_exists.command,
                r_securetty_pts.command,
                r_pam_login.command,
            ]
            raw += [r_ps, r_inetd, r_securetty_exists, r_securetty_pts, r_pam_login]

            ps_lower = r_ps.stdout.lower()
            inetd_active = any(
                ("telnet" in ln.lower()) and (not ln.strip().startswith("#")) for ln in r_inetd.stdout.splitlines()
            )
            telnet_active = ("telnetd" in ps_lower) or inetd_active

            securetty_exists = r_securetty_exists.stdout.strip() == "1"
            securetty_pts_present = bool(r_securetty_pts.stdout.strip())
            pam_securetty_present = bool(r_pam_login.stdout.strip())

            parsed.update(
                {
                    "telnet_active": telnet_active,
                    "securetty_exists": securetty_exists,
                    "securetty_pts_present": securetty_pts_present,
                    "pam_securetty_present": pam_securetty_present,
                }
            )

            if permit is None:
                status = "ERROR"
                summary = "SSH PermitRootLogin 값을 확인할 수 없습니다."
            else:
                ssh_ok = permit == "no"
                telnet_ok = (not telnet_active) or (
                    securetty_exists and (not securetty_pts_present) and pam_securetty_present
                )

                if ssh_ok and telnet_ok:
                    status = "PASS"
                    summary = "원격 root 직접 접속이 차단되어 있습니다."
                else:
                    status = "FAIL"
                    summary = "원격 root 직접 접속 차단 설정이 미흡합니다."

    except Exception as e:
        status = "ERROR"
        summary = "점검 중 예외가 발생했습니다."
        error = str(e)

    ended_at = utcnow()
    return InspectionOutcome(
        inspection_code=INSPECTION_CODE,
        status=status,
        summary=summary,
        commands=commands,
        raw=raw,
        parsed=parsed,
        error=error,
        started_at=started_at,
        ended_at=ended_at,
    )


In [None]:
engine = create_db_engine(DB_URL)
init_db(engine)
session = open_session(engine)

run = start_run(session, tool_version=TOOL_VERSION, guide_version=GUIDE_VERSION, note=RUN_NOTE)
try:
    for t in targets:
        target_row = get_or_create_target(session, t)
        outcome = inspect_target(t)
        save_outcome(session, run=run, target=target_row, outcome=outcome)
finally:
    end_run(session, run)
    session.close()

print("run_id:", run.id)
