This notebook processes Censys JSON (or JSONL) data for IP addresses and produces a CSV with:

| Column | Description |
|------|------------|
| ip | IP address |
| is_vpn | Boolean if the IP address is VPN |
| city | City |
| province | Province / State |
| country | Country |

In [1]:
import csv
import json
from typing import Any, Dict, Iterable, List, Optional, Tuple

In [2]:
VPN_LABELS = {
    "network.device.vpn",   # Censys label
}

VPN_PORTS = {
    500,   # IKE
    4500,  # IPsec NAT-T (often VPN-related too)
}

In [3]:
def load_records(path: str) -> List[Dict[str, Any]]:
    """
    Supports:
      - JSON Lines: each line is a JSON object
      - Single JSON file containing: object OR list
    Returns a list of "envelope" objects (each likely having "result": {...})
    """
    with open(path, "r", encoding="utf-8") as f:
        content = f.read().strip()

    if not content:
        return []

    # Try full JSON first (object or list)
    try:
        obj = json.loads(content)
        if isinstance(obj, list):
            return [x for x in obj if isinstance(x, dict)]
        if isinstance(obj, dict):
            return [obj]
    except json.JSONDecodeError:
        pass

    # Fallback: JSON Lines
    records: List[Dict[str, Any]] = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                rec = json.loads(line)
                if isinstance(rec, dict):
                    records.append(rec)
            except json.JSONDecodeError:
                # Skip malformed lines
                continue
    return records

def get_result(envelope: Dict[str, Any]) -> Dict[str, Any]:
    """
    Your sample shows an envelope like:
      {"code":200,"status":"OK","result":{...}}
    But sometimes you may already have the inner object.
    """
    if "result" in envelope and isinstance(envelope["result"], dict):
        return envelope["result"]
    return envelope

In [4]:
def get_result(envelope: Dict[str, Any]) -> Dict[str, Any]:
    """
    Your sample shows an envelope like:
      {"code":200,"status":"OK","result":{...}}
    But sometimes you may already have the inner object.
    """
    if "result" in envelope and isinstance(envelope["result"], dict):
        return envelope["result"]
    return envelope

### VPN detection logic
An IP is marked as a VPN if **any** of the following are present:
- `network.device.vpn` label
- IKE service (`_decoded == "ike"` or `service_name == "IKE"`)
- UDP port `500` (IPsec/IKE)

In [5]:
def any_label_matches(labels: Any, wanted: set) -> bool:
    if not labels:
        return False
    if isinstance(labels, list):
        return any(isinstance(x, str) and x.lower() in wanted for x in labels)
    return False


def is_vpn_record(result: Dict[str, Any]) -> bool:
    # 1) top-level labels
    if any_label_matches(result.get("labels"), {x.lower() for x in VPN_LABELS}):
        return True

    services = result.get("services") or []
    if not isinstance(services, list):
        return False

    for s in services:
        if not isinstance(s, dict):
            continue

        port = s.get("port")
        if isinstance(port, int) and port in VPN_PORTS:
            return True

        # Service name / decoded type checks
        decoded = (s.get("_decoded") or "").lower()
        service_name = (s.get("service_name") or "").lower()
        extended = (s.get("extended_service_name") or "").lower()

        if decoded == "ike" or service_name == "ike" or extended == "ike":
            return True

        # Direct presence of an "ike" object is a strong VPN signal
        if isinstance(s.get("ike"), dict):
            return True

        # Censys service labels (like "network.device.vpn")
        if any_label_matches(s.get("labels"), {x.lower() for x in VPN_LABELS}):
            return True

    return False

### Location Logic
Extracts values of the keys city, province, and country

In [6]:
def extract_location(result: Dict[str, Any]) -> Tuple[Optional[str], Optional[str], Optional[str]]:
    loc = result.get("location") or {}
    if not isinstance(loc, dict):
        return None, None, None
    city = loc.get("city")
    province = loc.get("province")
    country = loc.get("country")
    return city, province, country

# Main script logic

In [11]:
input = "/content/drive/MyDrive/VPN Deprecated/Data/com.browsec.vpn.json"
output = "/content/drive/MyDrive/VPN Deprecated/Output/output.csv"

envelopes = load_records(input)

with open(output, "w", newline="", encoding="utf-8") as f:
    w = csv.writer(f)
    w.writerow(["ip", "is_vpn", "city", "province", "country"])

    for env in envelopes:
        res = get_result(env)
        ip = res.get("ip")
        if not isinstance(ip, str) or not ip:
            continue

        vpn = is_vpn_record(res)
        city, province, country = extract_location(res)

        w.writerow([
            ip,
            "true" if vpn else "false",
            city or "",
            province or "",
            country or "",
        ])

print(f"Wrote {output} ({len(envelopes)} input record(s) processed)")

Wrote /content/drive/MyDrive/VPN Deprecated/Output/output.csv (265 input record(s) processed)


In [10]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
