Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions didcomm_messaging/resolver/web.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""did:web resolver.

Resolve did:web style dids to a did document. did:web spec:
https://w3c-ccg.github.io/did-method-web/
"""

from . import DIDResolver, DIDNotFound, DIDResolutionError
from pydid import DID
from urllib.parse import urlparse
from datetime import datetime, timedelta
import urllib.request as url_request
import re
import json
import urllib

domain_regex = (
r"((?!-))(xn--)?[a-z0-9][a-z0-9-_]{0,61}[a-z0-9]{0,1}"
r"\.(xn--)?([a-z0-9\._-]{1,61}|[a-z0-9-]{1,30})"
r"(%3[aA]\d+)?" # Port
r"(:[a-zA-Z]+)*" # Path
)
did_web_pattern = re.compile(rf"^did:web:{domain_regex}$")
cache = {}
TIME_TO_CACHE = 1800 # 30 minutes


class DIDWeb(DIDResolver):
"""Utility functions for building and interacting with did:web."""

async def resolve(self, did: str) -> dict:
"""Resolve a did:web to a did document via http request."""

# Check to see if we've seen the did recently
if did in cache:
if cache[did]["timestamp"] > datetime.now() + timedelta(
seconds=-TIME_TO_CACHE
):
return cache[did]["doc"]
else:
del cache[did]

uri = DIDWeb._did_to_uri(did)
headers = {
"User-Agent": "DIDCommRelay/1.0",
}
request = url_request.Request(url=uri, method="GET", headers=headers)
try:
with url_request.urlopen(request) as response:
doc = json.loads(response.read().decode())
cache[did] = {
"timestamp": datetime.now(),
"doc": doc,
}
return doc
except urllib.error.HTTPError as e:
if e.code == 404:
raise DIDNotFound(
f"The did:web {did} returned a 404 not found while resolving"
)
else:
raise DIDResolutionError(
f"Unknown server error ({e.code}) while resolving did:web: {did}"
)
except json.decoder.JSONDecodeError as e:
msg = str(e)
raise DIDNotFound(f"The did:web {did} returned invalid JSON {msg}")
except Exception as e:
raise DIDResolutionError("Failed to fetch did document") from e

@staticmethod
def _did_to_uri(did: str) -> str:
# Split the did by it's segments
did_segments = did.split(":")

# Get the hostname & port
hostname = did_segments[2].lower()
hostname = hostname.replace("%3a", ":")

# Resolve the path portion of the DID, if there is no path, default to
# a .well-known address
path = ".well-known"
if len(did_segments) > 3:
path = "/".join(did_segments[3:])

# Assemble the URI
did_uri = f"https://{hostname}/{path}/did.json"

return did_uri

async def is_resolvable(self, did: str) -> bool:
"""Determine if the did is a valid did:web did that can be resolved."""
if DID.is_valid(did) and did_web_pattern.match(did):
return True
return False

@staticmethod
def did_from_url(url: str) -> DID:
"""Convert a URL into a did:web did."""

# Make sure that the URL starts with a scheme
if not url.startswith("http"):
url = f"https://{url}"

# Parse it out to we can grab pieces
parsed_url = urlparse(url)

# Assemble the domain portion of the DID
did = "did:web:%s" % parsed_url.netloc.replace(":", "%3A")

# Cleanup the path
path = parsed_url.path.replace(".well-known/did.json", "")
path = path.replace("/did.json", "")

# Add the path portion of the did
if len(path) > 1:
did += path.replace("/", ":")
return did
24 changes: 24 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest


def pytest_addoption(parser):
parser.addoption(
"--runexternal",
action="store_true",
default=False,
help="run tests that make external requests",
)


def pytest_configure(config):
config.addinivalue_line("markers", "external_fetch: mark test as slow to run")


def pytest_collection_modifyitems(config, items):
if config.getoption("--runexternal"):
# --runslow given in cli: do not skip slow tests
return
skip_external = pytest.mark.skip(reason="need --runexternal option to run")
for item in items:
if "external_fetch" in item.keywords:
item.add_marker(skip_external)
106 changes: 106 additions & 0 deletions tests/test_didweb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import pytest


from didcomm_messaging.resolver.web import DIDWeb

DIDWEB = "did:web:example.com"
DIDWEB_URI = "https://example.com/.well-known/did.json"
DIDWEB_COMPLEX = "did:web:example.com%3A4443:DIDs:alice:relay"
DIDWEB_COMPLEX_URI = "https://example.com:4443/DIDs/alice/relay/did.json"


@pytest.mark.asyncio
async def test_didweb_from_didurl_domain():
did = DIDWeb.did_from_url("example.com")
assert did
assert did == DIDWEB


@pytest.mark.asyncio
async def test_didweb_from_didurl_schema_and_domain():
did = DIDWeb.did_from_url("https://example.com")
assert did
assert did == DIDWEB


@pytest.mark.asyncio
async def test_didweb_from_didurl_schema_and_domain_slash():
did = DIDWeb.did_from_url("https://example.com/")
assert did
assert did == DIDWEB


@pytest.mark.asyncio
async def test_didweb_from_didurl_schema_and_domain_path():
did = DIDWeb.did_from_url("https://example.com/did.json")
assert did
assert did == DIDWEB


@pytest.mark.asyncio
async def test_didweb_from_didurl_schema_and_domain_wellknown():
did = DIDWeb.did_from_url("https://example.com/.well-known/did.json")
assert did
assert did == DIDWEB


@pytest.mark.asyncio
async def test_didweb_from_didurl_schema_and_domain_port_wellknown():
did = DIDWeb.did_from_url("https://example.com:443/.well-known/did.json")
assert did
assert did == DIDWEB + "%3A443"


@pytest.mark.asyncio
async def test_didweb_from_didurl_schema_and_complex_domain_path():
did = DIDWeb.did_from_url("https://example.com:4443/DIDs/alice/relay/did.json")
assert did
assert did == DIDWEB_COMPLEX


@pytest.mark.asyncio
async def test_didweb_to_url():
uri = DIDWeb._did_to_uri(DIDWEB)
assert uri
assert uri == DIDWEB_URI


@pytest.mark.asyncio
async def test_didweb_to_url_complex():
uri = DIDWeb._did_to_uri(DIDWEB_COMPLEX)
assert uri
assert uri == DIDWEB_COMPLEX_URI


@pytest.mark.asyncio
async def test_didweb_is_resolvable():
resolver = DIDWeb()
resolvable = await resolver.is_resolvable(DIDWEB)
assert resolvable
resolvable_complex = await resolver.is_resolvable(DIDWEB_COMPLEX)
assert resolvable_complex


@pytest.mark.external_fetch
@pytest.mark.asyncio
async def test_didweb_fetch():
did_web = "did:web:colton.wolkins.net"
resolver = DIDWeb()
uri = await resolver.resolve(did_web)
print(uri)
assert uri
assert isinstance(uri, dict)


@pytest.mark.external_fetch
@pytest.mark.asyncio
async def test_didweb_double_fetch():
did_web = "did:web:colton.wolkins.net"
resolver = DIDWeb()
uri = await resolver.resolve(did_web)
print(uri)
assert uri
assert isinstance(uri, dict)
uri = await resolver.resolve(did_web)
assert uri
assert isinstance(uri, dict)