In [13]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
import json
import sys
import os
import getpass
import ipaddress
import socket
from collections import defaultdict
from datetime import datetime

try:
    import psutil
except Exception:
    print("ERROR: psutil is required. Install with: pip install psutil")
    raise

def normalize_rule_example():
    return None

def gather_listeners(only_user: bool) -> list:
    """Return list of dicts describing listening inet sockets."""
    current_user = getpass.getuser()
    try:
        conns = psutil.net_connections(kind='inet')
    except Exception as e:
        # On some systems calling net_connections may require elevated rights; still try best-effort
        print("Warning: psutil.net_connections raised:", e)
        conns = []

    listeners = []
    for c in conns:
        # Want only LISTEN sockets
        if c.status != psutil.CONN_LISTEN:
            continue

        pid = c.pid

        # only_user filter
        if only_user:
            if pid is None:
                continue
            try:
                proc = psutil.Process(pid)
                owner = proc.username()
            except Exception:
                continue
            if owner != current_user:
                continue

        # Determine protocol type
        proto = None
        if c.type == socket.SOCK_STREAM:
            proto = 'tcp'
        elif c.type == socket.SOCK_DGRAM:
            proto = 'udp'
        else:
            proto = str(c.type)

        # Local address
        laddr = None
        if c.laddr:
            try:
                # On some platforms laddr may be a tuple
                if hasattr(c.laddr, 'ip') and hasattr(c.laddr, 'port'):
                    lip = c.laddr.ip
                    lport = c.laddr.port
                else:
                    lip, lport = c.laddr[0], c.laddr[1]
                laddr = {'ip': lip, 'port': lport}
            except Exception:
                laddr = None

        item = {
            'pid': pid,
            'fd': c.fd,
            'family': c.family,
            'type': proto,
            'local_ip': laddr['ip'] if laddr else None,
            'local_port': laddr['port'] if laddr else None,
            'status': c.status
        }

        # process info
        if pid is not None:
            try:
                p = psutil.Process(pid)
                item['exe'] = p.exe()
                item['cmdline'] = p.cmdline()
                item['username'] = p.username()
                item['name'] = p.name()
            except Exception:
                item['exe'] = None
                item['cmdline'] = None
                item['username'] = None
                item['name'] = None
        else:
            item['exe'] = None
            item['cmdline'] = None
            item['username'] = None
            item['name'] = None

        listeners.append(item)

    return listeners

def ip_matches_rule_obj(rule_obj, ip_str) -> bool:
    """Helper used earlier — kept for possible extensions (CIDR matching)."""
    if rule_obj == 'any':
        return True
    try:
        ip = ipaddress.ip_address(ip_str)
    except Exception:
        return False
    if isinstance(rule_obj, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
        return ip in rule_obj
    if isinstance(rule_obj, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
        return ip == rule_obj
    return False

# ---------------- Analysis / Heuristics ----------------

def analyze_listeners(listeners: list, desktop_mode: bool = True) -> dict:
    """
    Group listeners by pid and add heuristic hints.
    Heuristics:
      - listening on 0.0.0.0 / :: => HIGH risk (exposed)
      - ports > 1024 on desktop => MEDIUM (unexpected dev servers)
      - missing exe or unknown process info => LOW
    """
    groups = defaultdict(list)
    for it in listeners:
        hints = []

        lip = it.get('local_ip')
        port = it.get('local_port')

        # exposed to all interfaces
        if lip in ('0.0.0.0', '::'):
            hints.append('HIGH: listening on all interfaces (0.0.0.0 / ::)')

        # desktop heuristic: high ports might be dev servers
        if desktop_mode and port is not None:
            try:
                if int(port) > 1024:
                    hints.append('MEDIUM: high port (>1024) — check if this is an intended dev server')
            except Exception:
                pass

        # privileged port but missing exe info
        if port is not None:
            try:
                if int(port) < 1024 and not it.get('exe'):
                    hints.append('LOW: privileged port (<1024) but process info missing')
            except Exception:
                pass

        # missing executable
        if not it.get('exe'):
            hints.append('LOW: unknown executable / missing process info')

        # dedupe hints and attach
        it['hints'] = list(dict.fromkeys(hints))
        groups[it['pid']].append(it)

    return groups

# ---------------- Output ----------------

def _family_to_str(family):
    if family == socket.AF_INET:
        return 'IPv4'
    if family == socket.AF_INET6:
        return 'IPv6'
    return str(family)

def print_table(groups: dict):
    print()
    print("Port Exposure Snapshot —", datetime.utcnow().isoformat() + 'Z')
    print('-' * 100)
    hdr = f"{'PID':>6}  {'USER':<18} {'PROC':<24} {'LOCAL_ADDR':<28} {'PORT':>6}  {'PROTO':<5}  HINTS"
    print(hdr)
    print('-' * 100)
    for pid, items in sorted(groups.items(), key=lambda kv: (kv[0] if kv[0] is not None else -1)):
        first = items[0]
        user = first.get('username') or '-'
        proc = first.get('name') or (first.get('exe') and os.path.basename(first.get('exe'))) or '-'
        for it in items:
            local = it.get('local_ip') or '-'
            port = it.get('local_port') or '-'
            proto = it.get('type') or '-'
            hints = '; '.join(it.get('hints')[:2]) if it.get('hints') else ''
            print(f"{str(pid):>6}  {user:<18} {proc:<24} {local:<28} {str(port):>6}  {proto:<5}  {hints}")
    print('-' * 100)
    print()

def save_json(groups: dict, outpath: str):
    out = {
        'generated_at': datetime.utcnow().isoformat() + 'Z',
        'report': []
    }
    for pid, items in groups.items():
        proc_summary = {
            'pid': pid,
            'process_name': items[0].get('name'),
            'username': items[0].get('username'),
            'exe': items[0].get('exe'),
            'sockets': []
        }
        for it in items:
            proc_summary['sockets'].append({
                'local_ip': it.get('local_ip'),
                'local_port': it.get('local_port'),
                'family': _family_to_str(it.get('family')),
                'type': it.get('type'),
                'hints': it.get('hints'),
            })
        out['report'].append(proc_summary)
    try:
        with open(outpath, 'w', encoding='utf-8') as f:
            json.dump(out, f, indent=2, ensure_ascii=False)
        print(f"Wrote JSON report: {outpath}")
    except Exception as e:
        print("Failed to write JSON:", e)

def remediation_suggestions(groups: dict) -> list:
    suggestions = [
        "1) Close unnecessary listeners: stop personal dev servers when not used (e.g., 'python -m http.server').",
        "2) Bind services to localhost (127.0.0.1) if remote access isn't required; restrict exposure with OS firewall.",
        "3) For public-facing services, require authentication, run minimal-privilege user, and keep software updated."
    ]

    # Add specific findings for clarity
    specific = []
    for pid, items in groups.items():
        for it in items:
            if any('HIGH: listening on all interfaces' in h for h in it.get('hints', [])):
                specific.append(f"PID {pid} ({items[0].get('name') or 'unknown'}) listens on all interfaces (port {it.get('local_port')}). Consider binding to 127.0.0.1 or adding firewall rule.")
    if specific:
        suggestions.append("\nSpecific findings:")
        suggestions.extend(specific[:5])
    return suggestions

# ---------------- CLI / Main ----------------

def main(argv=None):
    parser = argparse.ArgumentParser(description='Port Exposure Snapshot (psutil)')
    parser.add_argument('--only-user', action='store_true', help='Show only sockets owned by current user')
    parser.add_argument('--json', metavar='OUT', help='Write JSON report to OUT file')
    parser.add_argument('--no-desktop-heuristic', action='store_true', help='Do not flag high user-port heuristics (useful on servers)')
    # Use parse_known_args to avoid errors from Jupyter-injected args
    args, unknown = parser.parse_known_args(argv)
    if unknown:
        # show ignored args when in interactive environments (helpful for debugging)
        print("Note: ignored unknown args:", unknown)

    listeners = gather_listeners(only_user=args.only_user)
    groups = analyze_listeners(listeners, desktop_mode=not args.no_desktop_heuristic)
    print_table(groups)
    if args.json:
        save_json(groups, args.json)

    print("Remediation suggestions:")
    for s in remediation_suggestions(groups):
        print(" -", s)
    print()

if __name__ == '__main__':
    # To be extra safe in interactive sessions, pass None so parse_known_args reads sys.argv,
    # but unknown args from Jupyter will be ignored (no crash).
    main()


Note: ignored unknown args: ['-f', 'C:\\Users\\E.Gevorgyan\\AppData\\Roaming\\jupyter\\runtime\\kernel-14a52d47-0ace-4f9e-b813-417b52184933.json']

Port Exposure Snapshot — 2025-10-15T10:15:57.955235Z
----------------------------------------------------------------------------------------------------
   PID  USER               PROC                     LOCAL_ADDR                     PORT  PROTO  HINTS
----------------------------------------------------------------------------------------------------
     4  NT AUTHORITY\SYSTEM System                   0.0.0.0                         445  tcp    HIGH: listening on all interfaces (0.0.0.0 / ::); LOW: privileged port (<1024) but process info missing
     4  NT AUTHORITY\SYSTEM System                   192.168.10.4                    139  tcp    LOW: privileged port (<1024) but process info missing; LOW: unknown executable / missing process info
     4  NT AUTHORITY\SYSTEM System                   ::                              445  tcp  

  5180  -                  -                        0.0.0.0                        7680  tcp    HIGH: listening on all interfaces (0.0.0.0 / ::); MEDIUM: high port (>1024) — check if this is an intended dev server
  5180  -                  -                        ::                             7680  tcp    HIGH: listening on all interfaces (0.0.0.0 / ::); MEDIUM: high port (>1024) — check if this is an intended dev server
  5928  APPA\e.gevorgyan   OneDrive.Sync.Service.exe ::1                           42050  tcp    MEDIUM: high port (>1024) — check if this is an intended dev server
  6692  -                  -                        0.0.0.0                        5040  tcp    HIGH: listening on all interfaces (0.0.0.0 / ::); MEDIUM: high port (>1024) — check if this is an intended dev server
  8332  APPA\e.gevorgyan   python.exe               127.0.0.1                     49647  tcp    MEDIUM: high port (>1024) — check if this is an intended dev server
  8332  APPA\e.gevorgyan   py

 13384  APPA\e.gevorgyan   python.exe               127.0.0.1                     56187  tcp    MEDIUM: high port (>1024) — check if this is an intended dev server
 13384  APPA\e.gevorgyan   python.exe               127.0.0.1                     56189  tcp    MEDIUM: high port (>1024) — check if this is an intended dev server
 13384  APPA\e.gevorgyan   python.exe               127.0.0.1                     56186  tcp    MEDIUM: high port (>1024) — check if this is an intended dev server
 13384  APPA\e.gevorgyan   python.exe               127.0.0.1                     56188  tcp    MEDIUM: high port (>1024) — check if this is an intended dev server
 13384  APPA\e.gevorgyan   python.exe               127.0.0.1                     56200  tcp    MEDIUM: high port (>1024) — check if this is an intended dev server
 13660  APPA\e.gevorgyan   python.exe               127.0.0.1                     58619  tcp    MEDIUM: high port (>1024) — check if this is an intended dev server
 13660  APPA\e.g