# api

> Low level functions for interacting with Sherlock's API.

In [None]:
#| default_exp api

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import httpx
from datetime import datetime
from typing import Literal, List, Optional, TypedDict

from fastcore.test import *
from sherlock_domains.keys import load_key

In [None]:
#| export
API_URL = "https://api.sherlockdomains.com"

# Authentication

The authentication flow has two steps:
 1. The client sends the public key it wants to authenticate with. The server will return a challange (random bytes) linked to that public key.
 2. The client has a few minutes to send the signature of that challenge. If a valid signatrue is provided the server will return a session token that can be used to authenticate the calls (currenlty a JWT)

In [None]:
#| export
class NewChallengeRequest(TypedDict):
    public_key: str

class NewChallengeResponse(TypedDict):
    challenge: str
    expires_at: datetime

def get_new_challenge(public_key_hex: str) -> str:
    """Get a new challenge for a given public key."""
    response = httpx.post(
        f"{API_URL}/api/v0/auth/challenge",
        json={"public_key": public_key_hex},
        headers={"Content-Type": "application/json"},
    )
    
    response.raise_for_status()
    response_data = NewChallengeResponse(**response.json())
    return response_data["challenge"]


In [None]:
test_key = load_key("3557ed77dd346fcbeefff47115622aa449c0054384347aaa2a6d1284d54caf6d")

challenge = get_new_challenge(test_key.public_key())
challenge

'8a388b02bf85728ff968a4bfb383a22c2159cf98333143ae2b4efbc9e18e4a1b'

In [None]:
#| export
class AgentLoginRequest(TypedDict):
    public_key: str
    challenge: str
    signature: str

class AgentLoginResponse(TypedDict):
    access: str
    refresh: str

def agent_login(public_key_hex: str, challenge: str, signature: str) -> tuple[str, str]:
    """Submit login request with signed challenge to get access and refresh tokens."""
    response = httpx.post(
        f"{API_URL}/api/v0/auth/login",
        json={"public_key": public_key_hex, "challenge": challenge, "signature": signature},
        headers={"Content-Type": "application/json"},
    )
    
    response.raise_for_status()
    response_data = AgentLoginResponse(**response.json())
    return response_data["access"], response_data["refresh"]

In [None]:
signature = test_key.sign(bytes.fromhex(challenge))
access_token, refresh_token = agent_login(test_key.public_key(), challenge, signature.hex())
access_token, refresh_token


('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwdWJsaWNfa2V5IjoiYzY5NTc0OGIxMDA0M2MxMGNlMmFmOTBhMjI3ZWJlNDk1YThlNWE2MDk2ZjVmOTgzMjA4OGI5NjMzMDk1MTdiZSIsImV4cCI6MTczNTU0NzE4OSwiaWF0IjoxNzM1NTQ1Mzg5LCJ0eXBlIjoiYWNjZXNzIn0.u5A1QcetgMy4pBM_JHb-InNdLGT2Pv6KqAEqa7bNLZw',
 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwdWJsaWNfa2V5IjoiYzY5NTc0OGIxMDA0M2MxMGNlMmFmOTBhMjI3ZWJlNDk1YThlNWE2MDk2ZjVmOTgzMjA4OGI5NjMzMDk1MTdiZSIsImV4cCI6MTczNjE1MDE4OSwiaWF0IjoxNzM1NTQ1Mzg5LCJ0eXBlIjoicmVmcmVzaCJ9.ZvBUBoaJGJw0Xo4Fog_se68UAk30TJWtSY4hxxCn_jk')

In [None]:
# TODO: check access token is valid with /me and refresh token works

# Domains

The domains API allows users to search, purchase and manage thier domains.

## Search

The search endpoint is a public one so it can be called without being authenticated. However, we can also send the access token if we have one.

In [None]:
#| export

class DomainResult(TypedDict):
    name: str
    tld: str
    tags: Optional[List[str]]
    price: float
    currency: str
    available: bool

class DomainSearchResponse(TypedDict):
    id: str
    created_at: datetime
    available: List[DomainResult]
    unavailable: List[DomainResult]


def search(query: str, access_token: Optional[str] = None) -> DomainSearchResponse:
    """Perform a domain search with optional authentication."""
    if not query or not query.strip():
        raise ValueError("Search query cannot be empty")
    
    # Remove leading/trailing spaces and ensure no spaces in middle
    parts = query.strip().split()
    if len(parts) > 1:
        raise ValueError(f"Search query cannot contain spaces: {query}")
    
    query = parts[0]
    
    # If we have an access token send it in the headers
    headers = {"Content-Type": "application/json"}
    if access_token:
        headers["Authorization"] = f"Bearer {access_token}"
    
    response = httpx.get(
        f"{API_URL}/api/v0/domains/search",
        params={"query": query},
        headers=headers
    )
    
    if response.status_code == 400:
        raise ValueError(response.json().get("detail", "Invalid search query"))
    
    response.raise_for_status()
    return DomainSearchResponse(**response.json())

In [None]:
domain_name = "thisdomainisnotavailable" # funny thing is that that domain IS available

unauthorized_results = search(domain_name) 
authorized_results = search(domain_name, access_token)
test_eq(unauthorized_results["unavailable"], [])
test_eq(authorized_results["unavailable"], [])
test_eq(unauthorized_results["unavailable"], authorized_results["unavailable"])

authorized_results

{'id': 'f8f1507a-7c80-4045-aa2e-2682d066f937',
 'created_at': '2024-12-30T07:56:30.909Z',
 'available': [{'name': 'thisdomainisnotavailable.com',
   'tld': 'com',
   'tags': [],
   'price': 1105,
   'currency': 'USD',
   'available': True}],
 'unavailable': []}

In [None]:
search_id = authorized_results["id"]
domain = authorized_results["available"][0]["name"]


## Purchase

After confirming domain availability through search, we can request a purchase offers for domains we're interested in. Once we have an offer, we can proceed to get a payment request using our preferred payment method that the server supports.

This payment flow is based on the [L402](https://github.com/l402-protocol/l402) protocol.


**Note**: Purchasing a domain requires contact information as mandated by ICANN:

- First name
- Last name
- Email
- Address
- City
- State
- Country
- Postal Code


In [None]:
#| export

class ContactInformation(TypedDict):
    first_name: str
    last_name: str
    email: str
    address: str
    city: str
    state: str
    country: str
    postal_code: str

class DomainPurchaseRequest(TypedDict):
    domain: str
    search_id: str
    contact_information: ContactInformation


# TODO: use the L402 client library when we have it


class L402Offer(TypedDict):
    id: str
    title: str
    description: str
    type: str
    amount: int
    currency: str
    payment_methods: List[str]

class L402Response(TypedDict):
    version: str
    payment_request_url: str
    payment_context_token: str
    offers: List[L402Offer]

def request_offer(search_id: str, domain: str, contact_information: ContactInformation, access_token: str) -> L402Response:
    """Request a purchase offer for a domain."""
    # TODO: validate contact information

    headers = {"Authorization": f"Bearer {access_token}"}

    data: DomainPurchaseRequest = {
        "domain": domain,
        "search_id": search_id,
        "contact_information": contact_information,
    }

    response = httpx.post(
        f"{API_URL}/api/v0/domains/purchase",
        json=data,
        headers=headers
    )

    if response.status_code != 402:
        response.raise_for_status()
        # If raise for status is not an error it means we got a 200s but that is not 
        # expected here.
        raise ValueError(f"Unexpected status code: {response.status_code}: {response.text}")

    return L402Response(**response.json())


In [None]:
# Contact information only for testing
contact_information = {
    "first_name": "John",
    "last_name": "Doe",
    "email": "john.doe@example.com",
    "address": "123 Test St",
    "city": "Test City",
    "state": "CA",
    "country": "US",
    "postal_code": "12345"
}


In [None]:
response = request_offer(search_id, domain, contact_information, access_token)
response


{'version': '0.2.1',
 'payment_request_url': 'https://api.sherlockdomains.com/api/v0/payments/l402/payment_request',
 'payment_context_token': 'c695748b10043c10ce2af90a227ebe495a8e5a6096f5f9832088b963309517be',
 'offers': [{'id': '84274dd9-ccb9-47fd-87eb-7a18496cc629',
   'title': 'thisdomainisnotavailable.com',
   'description': 'Purchase thisdomainisnotavailable.com for 11.05 USD',
   'type': 'one-time',
   'amount': 1105,
   'currency': 'USD',
   'payment_methods': ['credit_card', 'lightning']}]}

In [None]:
payment_request_url = response["payment_request_url"]
payment_context_token = response["payment_context_token"]
offer_id = response["offers"][0]["id"]


In [None]:
#| export

PaymentMethodType = Literal["credit_card", "lightning"]

class PaymentRequest(TypedDict):
    offer_id: str
    payment_method: PaymentMethodType
    payment_context_token: str

class PaymentMethod(TypedDict, total=False):
    checkout_url: str
    lightning_invoice: str

class PaymentRequestResponse(TypedDict):
    payment_method: PaymentMethod
    expires_at: datetime

def get_payment_request(
    request_url: str,
    context_token: str,
    offer_id: str,
    payment_method: PaymentMethodType
) -> PaymentRequestResponse:
    headers = {
        "Authorization": f"L402 {context_token}"
    }
    
    data: PaymentRequest = {
        "offer_id": offer_id,
        "payment_method": payment_method,
        "payment_context_token": context_token
    }
    
    response = httpx.post(
        request_url,
        json=data, 
        headers=headers,
    )
    response.raise_for_status()
    
    return response.json()

**NOTE**: Do not purchase a domain usinga shared private key as the one used in this docs.

In [None]:
payment_method = "credit_card"
response = get_payment_request(payment_request_url, payment_context_token, offer_id, payment_method)
response


{'payment_method': {'checkout_url': 'https://checkout.stripe.com/c/pay/cs_live_a1ksbZ167h9SzlskvDrvNaHtc7MWZpbv1otgGueAjgDfDzDqttSnIoM4ya#fidkdWxOYHwnPyd1blppbHNgWjA0S3VzXDdBbTFNVlJzfDVRQVQ2dVdBTnJTSH1QMGs2dHRsanJMbkY0PTxKbUtRaWowT2NwMGM8RlVBbGRqSWo3UFYwcVdqR3F9N2BtM2ZTPXc1Z3dQXGc2NTVPYVVSQkM8bycpJ2N3amhWYHdzYHcnP3F3cGApJ2lkfGpwcVF8dWAnPyd2bGtiaWBabHFgaCcpJ2BrZGdpYFVpZGZgbWppYWB3dic%2FcXdwYHgl',
  'lightning_invoice': None},
 'expires_at': '2024-12-30T08:26:31.538Z'}

## Domains

Authenticated users can get a list of the domains they own.

In [None]:
#| export

class DomainInfo(TypedDict):
    id: str
    domain_name: str
    created_at: datetime
    expires_at: datetime
    auto_renew: bool
    locked: bool
    private: bool
    nameservers: List[str]
    status: str

def get_domains(access_token: str) -> List[DomainInfo]:    
    headers = {"Authorization": f"Bearer {access_token}"}
    
    response = httpx.get(
        f"{API_URL}/api/v0/domains/domains",
        headers=headers
    )
    response.raise_for_status()
    
    return response.json()

In [None]:
domains = get_domains(access_token)
domains


[]

# DNS API

The DNS API allows users to manage DNS records for their domains. Through this API, users can create, read, update and delete DNS records like A, AAAA, MX, TXT, and other standard record types for any domain they own.

## Get DNS records

Given a domain provide a list of all its DNS records.

In [None]:
# For this section we will have to use an account with a domain that has DNS records.
# Execute this cell with a valid public key to get the auth token that will be used in this section.
public_key = ""

def quick_login(public_key: str) -> str:
    if not public_key:
       print("No public key provided. Skipping login.")
       return None
    key = load_key(public_key)
    challenge = get_new_challenge(key.public_key())
    signature = key.sign(bytes.fromhex(challenge))
    access_token, _ = agent_login(key.public_key(), challenge, signature.hex())
    return access_token

def get_domain_id(access_token: str) -> str:
    if not access_token:
        print("No access token provided. Skipping domain ID retrieval.")
        return None
    
    domains = get_domains(access_token)
    if len(domains) == 0:
        print("No domains found")
        return None
    
    return domains[0]["id"]

access_token = quick_login(public_key)
domain_id = get_domain_id(access_token)


## List DNS Records

In [None]:
#| export

class DNSRecord(TypedDict):
    id: str
    type: str
    name: str
    value: str
    ttl: int

def get_dns_records(domain_id: str, access_token: str) -> List[DNSRecord]:
    """Get DNS records for a domain."""
    headers = {"Authorization": f"Bearer {access_token}"}
    
    response = httpx.get(
        f"{API_URL}/api/v0/domains/{domain_id}/dns/records",
        headers=headers
    )
    response.raise_for_status()
    
    return response.json()["records"]


In [None]:
if domain_id != None:
    get_dns_records(domain_id, access_token)

[{'id': '7df4c86debbf83b987ee4eb8c9ea57ba',
  'type': 'A',
  'name': 'h402.org',
  'value': '91.195.240.123',
  'ttl': 3603},
 {'id': '195dc76e2d529de79ebce740750302b6',
  'type': 'A',
  'name': 'www.h402.org',
  'value': '91.195.240.123',
  'ttl': 3603},
 {'id': 'ca5fc133797184ab0dbca659f47e5cd5',
  'type': 'TXT',
  'name': '_domainconnect.h402.org',
  'value': 'www.namesilo.com/domainconnect',
  'ttl': 3600}]

## Create DNS record

In [None]:
#| export

class CreateDNSRecord(TypedDict):
    type: str
    name: str
    value: str
    ttl: int

class CreateDNSRecordRequest(TypedDict):
    records: List[CreateDNSRecord]

def create_dns_record(
    domain_id: str, 
    record_type: str,
    name: str,
    value: str,
    ttl: int,
    access_token: str
) -> List[DNSRecord]:
    """Create a new DNS record for a domain."""
    headers = {"Authorization": f"Bearer {access_token}"}
    
    data: CreateDNSRecordRequest = {
        "records": [{
            "type": record_type,
            "name": name,
            "value": value,
            "ttl": ttl
        }]
    }
    
    response = httpx.post(
        f"{API_URL}/api/v0/domains/{domain_id}/dns/records",
        json=data,
        headers=headers
    )
    response.raise_for_status()
    
    return response.json()["records"]

In [None]:
if domain_id != None:
    response = create_dns_record(domain_id, "TXT", "test", "test 1", 3600, access_token)
    created_id = response[0]["id"]
    response


[{'id': '293c030385b3a9a79d31e6a76c8fff44',
  'type': 'TXT',
  'name': 'test',
  'value': 'test 1',
  'ttl': 3600}]

## Update DNS record

In [None]:
#| export

class UpdateDNSRecord(TypedDict):
    id: str
    type: str
    name: str
    value: str
    ttl: int

class UpdateDNSRecordRequest(TypedDict):
    records: List[UpdateDNSRecord]

def update_dns_records(
    domain_id: str,
    records: List[UpdateDNSRecord],
    access_token: str
) -> List[DNSRecord]:
    """Update DNS records for a domain."""
    headers = {"Authorization": f"Bearer {access_token}"}
    
    data: UpdateDNSRecordRequest = {
        "records": records
    }
    
    response = httpx.patch(
        f"{API_URL}/api/v0/domains/{domain_id}/dns/records",
        json=data,
        headers=headers
    )
    response.raise_for_status()
    
    return response.json()["records"]

In [None]:
if domain_id != None:
    update_record = UpdateDNSRecord(id=created_id, type="TXT", name="test", value="test 2", ttl=3600)
    response = update_dns_records(domain_id, [update_record], access_token)
    updated_id = response[0]["id"]
    response


[{'id': '3ddc11a1808b692d836ae8985535103e',
  'type': 'TXT',
  'name': 'test',
  'value': 'test 2',
  'ttl': 3600}]

## Delete DNS record

In [None]:
#| export
class DeleteDNSRecordResponse(TypedDict):
    domain: str
    deleted_records: List[str]

def delete_dns_records(
    domain_id: str,
    record_ids: List[str],
    access_token: str
) -> DeleteDNSRecordResponse:
    """Delete DNS records for a domain."""
    headers = {"Authorization": f"Bearer {access_token}"}
    
    data = {
        "record_ids": record_ids
    }
    
    response = httpx.Client().request(
        "DELETE",
        f"{API_URL}/api/v0/domains/{domain_id}/dns/records",
        json=data,
        headers=headers
    )
    response.raise_for_status()
    
    return response.json()

In [None]:
if domain_id != None:
    response = delete_dns_records(domain_id, [updated_id], access_token)
    response


{'domain': 'h402.org', 'deleted_records': ['3ddc11a1808b692d836ae8985535103e']}