# Test NIP-66 Relay Monitoring

Comprehensive test notebook for the `Nip66` model covering:
- RTT (Round-Trip Time) tests
- SSL certificate checks
- Geolocation lookups
- Property access
- Error handling
- Edge cases

**GeoIP databases are in `implementations/bigbrotr/`**

In [1]:
import sys
sys.path.insert(0, "../src")

import asyncio
import json
from pathlib import Path

import geoip2.database
from nostr_sdk import EventBuilder

from models.nip66 import (
    Nip66,
    Nip66TestError,
    Nip66RttData,
    Nip66SslData,
    Nip66GeoData,
)
from models.relay import Relay
from models.keys import Keys
from models.metadata import Metadata

## 1. Setup GeoIP Databases and Keys

In [2]:
# Load GeoIP databases
geoip_dir = Path("../implementations/bigbrotr")

city_db_path = geoip_dir / "GeoLite2-City.mmdb"
asn_db_path = geoip_dir / "GeoLite2-ASN.mmdb"

print(f"City DB exists: {city_db_path.exists()}")
print(f"ASN DB exists: {asn_db_path.exists()}")

# Open readers
city_reader = geoip2.database.Reader(str(city_db_path)) if city_db_path.exists() else None
asn_reader = geoip2.database.Reader(str(asn_db_path)) if asn_db_path.exists() else None

print(f"\nCity reader: {city_reader is not None}")
print(f"ASN reader: {asn_reader is not None}")

City DB exists: True
ASN DB exists: True

City reader: True
ASN reader: True


In [3]:
# Generate test keys
keys = Keys.generate()
print(f"Test pubkey: {keys.public_key().to_hex()[:16]}...")

# Create event builder for write tests
event_builder = EventBuilder.text_note("NIP-66 test event")
print(f"Event builder: {type(event_builder)}")

Test pubkey: 099af7ec60c78259...
Event builder: <class 'nostr_sdk.nostr_sdk.EventBuilder'>


## 2. Full Test on a Real Relay

In [4]:
# Test a well-known relay with all tests enabled
relay = Relay("wss://relay.damus.io")
print(f"Testing: {relay.url}")
print(f"Network: {relay.network}")
print(f"Scheme: {relay.scheme}")

Testing: wss://relay.damus.io
Network: clearnet
Scheme: wss


In [5]:
# Run full NIP-66 test
nip66 = await Nip66.test(
    relay,
    keys=keys,
    event_builder=event_builder,
    city_reader=city_reader,
    asn_reader=asn_reader,
)

print(f"Test successful: {nip66 is not None}")
print(f"Type: {type(nip66)}")

Test successful: True
Type: <class 'models.nip66.Nip66'>


## 3. RTT Properties

In [6]:
print("=== RTT Results ===")
print(f"has_rtt: {nip66.has_rtt}")
print()
print(f"rtt_dns: {nip66.rtt_dns} ms")
print(f"rtt_open: {nip66.rtt_open} ms")
print(f"rtt_read: {nip66.rtt_read} ms")
print(f"rtt_write: {nip66.rtt_write} ms")
print()
print(f"is_openable: {nip66.is_openable}")
print(f"is_readable: {nip66.is_readable}")
print(f"is_writable: {nip66.is_writable}")

=== RTT Results ===
has_rtt: True

rtt_dns: 40 ms
rtt_open: 4 ms
rtt_read: None ms
rtt_write: 567 ms

is_openable: True
is_readable: False
is_writable: True


## 4. SSL Properties

In [7]:
print("=== SSL Results ===")
print(f"has_ssl: {nip66.has_ssl}")
print()
print(f"ssl_valid: {nip66.ssl_valid}")
print(f"ssl_issuer: {nip66.ssl_issuer}")
print(f"ssl_expires: {nip66.ssl_expires}")

if nip66.ssl_expires:
    from datetime import datetime
    expiry_date = datetime.fromtimestamp(nip66.ssl_expires)
    days_until = (expiry_date - datetime.now()).days
    print(f"  Expiry date: {expiry_date.isoformat()}")
    print(f"  Days until expiry: {days_until}")

=== SSL Results ===
has_ssl: True

ssl_valid: True
ssl_issuer: Google Trust Services
ssl_expires: 1770753244
  Expiry date: 2026-02-10T20:54:04
  Days until expiry: 36


## 5. Geo Properties

In [8]:
print("=== Geo Results ===")
print(f"has_geo: {nip66.has_geo}")
print()
print(f"geo_ip: {nip66.geo_ip}")
print(f"geo_country: {nip66.geo_country}")
print(f"geo_region: {nip66.geo_region}")
print(f"geo_city: {nip66.geo_city}")
print(f"geo_lat: {nip66.geo_lat}")
print(f"geo_lon: {nip66.geo_lon}")
print(f"geo_tz: {nip66.geo_tz}")
print(f"geohash: {nip66.geohash}")
print(f"geo_asn: {nip66.geo_asn}")
print(f"geo_asn_org: {nip66.geo_asn_org}")

=== Geo Results ===
has_geo: True

geo_ip: 104.21.61.55
geo_country: None
geo_region: None
geo_city: None
geo_lat: None
geo_lon: None
geo_tz: None
geohash: None
geo_asn: 13335
geo_asn_org: CLOUDFLARENET


## 6. Raw Metadata Access

In [9]:
print("=== Raw Metadata ===")
print("\nRTT Metadata:")
if nip66.rtt_metadata:
    print(json.dumps(nip66.rtt_metadata.data, indent=2))
else:
    print("  None")

print("\nSSL Metadata:")
if nip66.ssl_metadata:
    print(json.dumps(nip66.ssl_metadata.data, indent=2))
else:
    print("  None")

print("\nGeo Metadata:")
if nip66.geo_metadata:
    print(json.dumps(nip66.geo_metadata.data, indent=2))
else:
    print("  None")

=== Raw Metadata ===

RTT Metadata:
{
  "rtt_open": 4,
  "rtt_write": 567,
  "rtt_dns": 40
}

SSL Metadata:
{
  "ssl_valid": true,
  "ssl_issuer": "Google Trust Services",
  "ssl_expires": 1770753244
}

Geo Metadata:
{
  "geo_ip": "104.21.61.55",
  "geo_asn": 13335,
  "geo_asn_org": "CLOUDFLARENET"
}


## 7. Conversion to RelayMetadata

In [10]:
# Convert to RelayMetadata for DB storage
rtt_meta, ssl_meta, geo_meta = nip66.to_relay_metadata()

print("=== RelayMetadata Objects ===")

print("\nRTT:")
if rtt_meta:
    print(f"  type: {rtt_meta.metadata_type}")
    print(f"  generated_at: {rtt_meta.generated_at}")
else:
    print("  None")

print("\nSSL:")
if ssl_meta:
    print(f"  type: {ssl_meta.metadata_type}")
    print(f"  generated_at: {ssl_meta.generated_at}")
else:
    print("  None")

print("\nGeo:")
if geo_meta:
    print(f"  type: {geo_meta.metadata_type}")
    print(f"  generated_at: {geo_meta.generated_at}")
else:
    print("  None")

=== RelayMetadata Objects ===

RTT:
  type: nip66_rtt
  generated_at: 1767561786

SSL:
  type: nip66_ssl
  generated_at: 1767561786

Geo:
  type: nip66_geo
  generated_at: 1767561786


## 8. Test Multiple Relays

In [11]:
# Test multiple relays in parallel
relay_urls = [
    "wss://relay.damus.io",
    "wss://nos.lol",
    "wss://relay.nostr.band",
    "wss://nostr.wine",
]

async def test_nip66_safe(url: str) -> tuple[str, Nip66 | None, Exception | None]:
    relay = Relay(url)
    try:
        result = await Nip66.test(
            relay,
            timeout=10.0,
            keys=keys,
            event_builder=EventBuilder.text_note(f"Test {url}"),
            city_reader=city_reader,
            asn_reader=asn_reader,
        )
        return url, result, None
    except Exception as e:
        return url, None, e

results = await asyncio.gather(*[test_nip66_safe(url) for url in relay_urls])

print("=== Multiple Relay Results ===")
for url, nip66_result, error in results:
    if nip66_result:
        open_ms = nip66_result.rtt_open or "N/A"
        location = f"{nip66_result.geo_country}/{nip66_result.geo_city}" if nip66_result.has_geo else "N/A"
        print(f"✓ {url}")
        print(f"    RTT open: {open_ms}ms, Location: {location}")
    else:
        print(f"✗ {url}: {type(error).__name__}")

=== Multiple Relay Results ===
✓ wss://relay.damus.io
    RTT open: 2ms, Location: None/None
✓ wss://nos.lol
    RTT open: 2ms, Location: DE/Falkenstein
✓ wss://relay.nostr.band
    RTT open: 3ms, Location: FI/Helsinki
✓ wss://nostr.wine
    RTT open: N/Ams, Location: None/None


## 9. Test Selective Tests (run_* flags)

In [12]:
# Test RTT only (no SSL, no geo)
relay = Relay("wss://relay.damus.io")

nip66_rtt_only = await Nip66.test(
    relay,
    keys=keys,
    event_builder=event_builder,
    run_ssl=False,
    run_geo=False,
)

print("=== RTT Only Test ===")
print(f"has_rtt: {nip66_rtt_only.has_rtt}")
print(f"has_ssl: {nip66_rtt_only.has_ssl}")
print(f"has_geo: {nip66_rtt_only.has_geo}")
print(f"rtt_open: {nip66_rtt_only.rtt_open} ms")

=== RTT Only Test ===
has_rtt: True
has_ssl: False
has_geo: False
rtt_open: 0 ms


In [13]:
# Test SSL only (no RTT, no geo)
nip66_ssl_only = await Nip66.test(
    relay,
    run_rtt=False,
    run_geo=False,
)

print("=== SSL Only Test ===")
print(f"has_rtt: {nip66_ssl_only.has_rtt}")
print(f"has_ssl: {nip66_ssl_only.has_ssl}")
print(f"has_geo: {nip66_ssl_only.has_geo}")
print(f"ssl_valid: {nip66_ssl_only.ssl_valid}")

=== SSL Only Test ===
has_rtt: False
has_ssl: True
has_geo: False
ssl_valid: True


In [14]:
# Test Geo only (no RTT, no SSL)
nip66_geo_only = await Nip66.test(
    relay,
    city_reader=city_reader,
    asn_reader=asn_reader,
    run_rtt=False,
    run_ssl=False,
)

print("=== Geo Only Test ===")
print(f"has_rtt: {nip66_geo_only.has_rtt}")
print(f"has_ssl: {nip66_geo_only.has_ssl}")
print(f"has_geo: {nip66_geo_only.has_geo}")
print(f"geo_country: {nip66_geo_only.geo_country}")
print(f"geo_city: {nip66_geo_only.geo_city}")

=== Geo Only Test ===
has_rtt: False
has_ssl: False
has_geo: True
geo_country: None
geo_city: None


## 10. Test with Synthetic Data

In [15]:
# Create Nip66 from synthetic data
test_relay = Relay("wss://test.relay.example")

synthetic_rtt = Metadata({
    "rtt_dns": 10,
    "rtt_open": 150,
    "rtt_read": 200,
    "rtt_write": 180,
})

synthetic_ssl = Metadata({
    "ssl_valid": True,
    "ssl_issuer": "Let's Encrypt",
    "ssl_expires": 1735689600,
})

synthetic_geo = Metadata({
    "geo_ip": "1.2.3.4",
    "geo_country": "US",
    "geo_region": "California",
    "geo_city": "San Francisco",
    "geo_lat": 37.7749,
    "geo_lon": -122.4194,
    "geo_tz": "America/Los_Angeles",
    "geohash": "9q8yyk8yu",
    "geo_asn": 13335,
    "geo_asn_org": "Cloudflare, Inc.",
})

nip66_synthetic = Nip66(
    relay=test_relay,
    rtt_metadata=synthetic_rtt,
    ssl_metadata=synthetic_ssl,
    geo_metadata=synthetic_geo,
)

print("=== Synthetic Nip66 ===")
print(f"rtt_open: {nip66_synthetic.rtt_open}")
print(f"ssl_issuer: {nip66_synthetic.ssl_issuer}")
print(f"geo_city: {nip66_synthetic.geo_city}")

=== Synthetic Nip66 ===
rtt_open: 150
ssl_issuer: Let's Encrypt
geo_city: San Francisco


## 11. Test Parsing Edge Cases

In [16]:
# Test: Invalid types are filtered out
invalid_rtt = Metadata({
    "rtt_open": "fast",  # Should be int -> filtered
    "rtt_read": 100,  # Valid
    "unknown_field": "ignored",  # Unknown -> filtered
})

nip66_invalid = Nip66(
    relay=test_relay,
    rtt_metadata=invalid_rtt,
)

print("=== Invalid Types Filtered ===")
print(f"rtt_open: {nip66_invalid.rtt_open} (should be None, was string)")
print(f"rtt_read: {nip66_invalid.rtt_read} (should be 100)")
print(f"Raw metadata: {nip66_invalid.rtt_metadata.data if nip66_invalid.rtt_metadata else None}")

=== Invalid Types Filtered ===
rtt_open: None (should be None, was string)
rtt_read: 100 (should be 100)
Raw metadata: {'rtt_read': 100}


In [17]:
# Test: Empty metadata becomes None
empty_metadata = Metadata({})

# This should fail - at least one metadata must be present
try:
    nip66_empty = Nip66(
        relay=test_relay,
        rtt_metadata=empty_metadata,
        ssl_metadata=empty_metadata,
        geo_metadata=empty_metadata,
    )
    print("ERROR: Should have raised ValueError")
except ValueError as e:
    print(f"=== Empty Metadata Test ===")
    print(f"Correctly raised ValueError: {e}")

=== Empty Metadata Test ===
Correctly raised ValueError: At least one metadata (rtt, ssl, or geo) must be provided


In [18]:
# Test: Partial metadata (only RTT)
partial_rtt = Metadata({"rtt_open": 100})

nip66_partial = Nip66(
    relay=test_relay,
    rtt_metadata=partial_rtt,
    ssl_metadata=None,
    geo_metadata=None,
)

print("=== Partial Metadata Test ===")
print(f"has_rtt: {nip66_partial.has_rtt}")
print(f"has_ssl: {nip66_partial.has_ssl}")
print(f"has_geo: {nip66_partial.has_geo}")
print(f"rtt_open: {nip66_partial.rtt_open}")

=== Partial Metadata Test ===
has_rtt: True
has_ssl: False
has_geo: False
rtt_open: 100


## 12. Error Handling

In [19]:
# Test: Invalid relay (DNS failure)
invalid_relay = Relay("wss://nonexistent.relay.invalid")

try:
    result = await Nip66.test(
        invalid_relay,
        timeout=3.0,
        keys=keys,
        event_builder=event_builder,
        city_reader=city_reader,
    )
    print(f"Unexpected success: {result}")
except Nip66TestError as e:
    print(f"=== Nip66TestError ===")
    print(f"relay: {e.relay.url}")
    print(f"cause type: {type(e.cause).__name__}")
    print(f"cause: {e.cause}")

Unexpected success: Nip66(relay=Relay(discovered_at=1767561797, url='wss://nonexistent.relay.invalid', url_without_scheme='nonexistent.relay.invalid', network=<NetworkType.CLEARNET: 'clearnet'>, scheme='wss', host='nonexistent.relay.invalid', port=None, path=None), rtt_metadata=Metadata(data={'rtt_open': 2, 'rtt_write': 4}), ssl_metadata=None, geo_metadata=None, generated_at=1767561797)


In [20]:
# Test: Missing required parameters
try:
    result = await Nip66.test(
        relay,
        run_rtt=True,  # Requires keys and event_builder
        # Missing keys and event_builder!
    )
    print(f"Unexpected success: {result}")
except ValueError as e:
    print(f"=== Missing Parameters ===")
    print(f"Correctly raised ValueError: {e}")

=== Missing Parameters ===
Correctly raised ValueError: run_rtt=True requires keys and event_builder


## 13. Test ws:// vs wss:// Relays

In [21]:
# ws:// relay - SSL test should return empty metadata
ws_relay = Relay("ws://nostr.example.com")  # Hypothetical ws:// relay

print("=== ws:// Relay SSL Test ===")
print(f"Scheme: {ws_relay.scheme}")
print(f"Network: {ws_relay.network}")
print()

# For ws:// relays, SSL test returns empty metadata
# (not applicable, no TLS)
print("Note: ws:// relays don't have SSL certificates to check")

=== ws:// Relay SSL Test ===
Scheme: ws
Network: clearnet

Note: ws:// relays don't have SSL certificates to check


## 14. Test Class Defaults

In [22]:
print("=== Class Defaults ===")
print(f"_DEFAULT_TEST_TIMEOUT: {Nip66._DEFAULT_TEST_TIMEOUT} seconds")

=== Class Defaults ===
_DEFAULT_TEST_TIMEOUT: 10.0 seconds


## 15. Test Frozen Dataclass

In [23]:
# Verify immutability
try:
    nip66.generated_at = 0  # Should fail
    print("ERROR: Mutation allowed!")
except Exception as e:
    print(f"=== Immutability Test ===")
    print(f"Correctly prevented mutation: {type(e).__name__}")

=== Immutability Test ===
Correctly prevented mutation: FrozenInstanceError


## 16. Summary Statistics

In [24]:
# Collect statistics from multiple relays
test_urls = [
    "wss://relay.damus.io",
    "wss://nos.lol",
    "wss://nostr.wine",
    "wss://relay.snort.social",
    "wss://relay.primal.net",
]

successful = []
for url in test_urls:
    try:
        result = await Nip66.test(
            Relay(url),
            timeout=10.0,
            keys=keys,
            event_builder=EventBuilder.text_note(f"Test {url}"),
            city_reader=city_reader,
            asn_reader=asn_reader,
        )
        successful.append((url, result))
    except:
        pass

print(f"=== Statistics from {len(successful)}/{len(test_urls)} relays ===")
print()

# RTT statistics
open_times = [n.rtt_open for _, n in successful if n.rtt_open is not None]
if open_times:
    print(f"RTT Open: min={min(open_times)}ms, max={max(open_times)}ms, avg={sum(open_times)//len(open_times)}ms")

read_times = [n.rtt_read for _, n in successful if n.rtt_read is not None]
if read_times:
    print(f"RTT Read: min={min(read_times)}ms, max={max(read_times)}ms, avg={sum(read_times)//len(read_times)}ms")

# Geographic distribution
print("\nGeographic Distribution:")
for url, n in successful:
    if n.has_geo:
        print(f"  {url.split('/')[-1]}: {n.geo_country}/{n.geo_city} ({n.geo_asn_org or 'Unknown ASN'})")
    else:
        print(f"  {url.split('/')[-1]}: No geo data")

# SSL issuers
print("\nSSL Issuers:")
for url, n in successful:
    if n.has_ssl:
        print(f"  {url.split('/')[-1]}: {n.ssl_issuer or 'Unknown'}")
    else:
        print(f"  {url.split('/')[-1]}: No SSL data")

=== Statistics from 5/5 relays ===

RTT Open: min=0ms, max=0ms, avg=0ms

Geographic Distribution:
  relay.damus.io: None/None (CLOUDFLARENET)
  nos.lol: DE/Falkenstein (Hetzner Online GmbH)
  nostr.wine: None/None (CLOUDFLARENET)
  relay.snort.social: IE/None (Apex Strata Ltd)
  relay.primal.net: None/None (CLOUDFLARENET)

SSL Issuers:
  relay.damus.io: Google Trust Services
  nos.lol: Let's Encrypt
  nostr.wine: Google Trust Services
  relay.snort.social: Let's Encrypt
  relay.primal.net: Google Trust Services


## 17. Cleanup

In [25]:
# Close GeoIP readers
if city_reader:
    city_reader.close()
    print("City reader closed")

if asn_reader:
    asn_reader.close()
    print("ASN reader closed")

print("\nCleanup complete!")

City reader closed
ASN reader closed

Cleanup complete!


## Done!

All Nip66 tests completed successfully.