From 23712e3ce79b9619f9656ad846971459d82c7382 Mon Sep 17 00:00:00 2001 From: Kiran Jonnalagadda Date: Sat, 16 May 2020 05:54:40 +0530 Subject: [PATCH 1/3] Expanded capabilities in ValidUrl * Support a list of allowed schemes and domains * Optionally disable URL tester * Use html5lib instead of lxml for parsing safety * Misc linter fixes --- baseframe/forms/form.py | 2 +- baseframe/forms/validators.py | 155 ++++++++++++++----- setup.py | 2 +- tests/test_validators.py | 270 ++++++++++++++++++++++++++++++++-- 4 files changed, 376 insertions(+), 53 deletions(-) diff --git a/baseframe/forms/form.py b/baseframe/forms/form.py index 5ca67df0..4b2a01aa 100644 --- a/baseframe/forms/form.py +++ b/baseframe/forms/form.py @@ -176,7 +176,7 @@ def send_signals(self): form_validation_success.send(self) def errors_with_data(self): - # Convert lazy_gettext error strings into unicode so they don't cause problems downstream + # Convert lazy_gettext error strings into str so they don't cause problems downstream # (like when pickling) return { name: { diff --git a/baseframe/forms/validators.py b/baseframe/forms/validators.py index 76eaa982..5078b060 100644 --- a/baseframe/forms/validators.py +++ b/baseframe/forms/validators.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals -from six.moves.urllib.parse import urljoin +from six.moves.urllib.parse import urljoin, urlparse import six from collections import namedtuple @@ -23,10 +23,10 @@ ValidationError, ) -from lxml import html from pyisemail import is_email import dns.resolver import emoji +import html5lib import requests from coaster.utils import deobfuscate_email, make_name, md5sum @@ -123,7 +123,8 @@ class AllowedIf(object): Validator that allows a value only if another field also has a value. :param str fieldname: Name of the other field - :param str message: Validation error message. Will be formatted with an optional ``{field}`` label + :param str message: Validation error message. Will be formatted with an optional + ``{field}`` label """ default_message = __("This requires ‘{field}’ to be specified") @@ -147,7 +148,10 @@ class OptionalIf(Optional): :class:`DataRequired`:: field = forms.StringField("Field", - validators=[forms.validators.OptionalIf('other'), forms.validators.DataRequired()]) + validators=[ + forms.validators.OptionalIf('other'), forms.validators.DataRequired() + ] + ) :param str fieldname: Name of the other field :param str message: Validation error message @@ -172,7 +176,10 @@ class RequiredIf(DataRequired): :class:`Optional`:: field = forms.StringField("Field", - validators=[forms.validators.RequiredIf('other'), forms.validators.Optional()]) + validators=[ + forms.validators.RequiredIf('other'), forms.validators.Optional() + ] + ) :param str fieldname: Name of the other field :param str message: Validation error message @@ -400,13 +407,24 @@ def __call__(self, form, field): class ValidUrl(object): """ - Validator to confirm a URL is valid (returns 2xx status code) + Validator to confirm a HTTP URL is valid (returns 2xx status code). - :param unicode message: Error message (None for default error message) - :param unicode message_urltext: Unused parameter, only used in the :class:`AllUrlsValid` validator - :param list invalid_urls: A list of (patterns, message) tuples for URLs that will be rejected, - where ``patterns`` is a list of strings or regular expressions. If ``invalid_urls`` is - a callable, it will be called to retrieve the list. + URIs using other protocol schemes are not validated, but can be explicitly + disallowed by specifying ``allowed_schemes``. + + :param str message: Error message (None for default error message) + :param str message_urltext: Unused parameter, only used in the :class:`AllUrlsValid` + validator + :param str message_schemes: Error message when the URL scheme is invalid + :param str message_domains: Error message when the URL domain is not whitelisted + :param list invalid_urls: A list of (patterns, message) tuples for URLs that will be + rejected, where ``patterns`` is a list of strings or regular expressions. If + ``invalid_urls`` is a callable, it will be called to retrieve the list. + :param set allowed_schemes: Allowed schemes in URLs (`None` implies no constraints) + :param set allowed_domains: Whitelisted domains (`None` implies no constraints) + :param bool visit_url: Visit the URL to confirm availability (default `True`) + + ``allowed_schemes`` and ``allowed_domains`` may be callables that return a set """ user_agent = ( @@ -419,12 +437,47 @@ class ValidUrl(object): "The URL “{url}” linked from “{text}” is not valid or is currently inaccessible" ) - def __init__(self, message=None, message_urltext=None, invalid_urls=[]): + default_message_schemes = __("This URL’s protocol is not allowed") + + default_message_domains = __("This URL’s domain is not allowed") + + def __init__( + self, + message=None, + message_urltext=None, + message_schemes=None, + message_domains=None, + invalid_urls=(), + allowed_schemes=None, + allowed_domains=None, + visit_url=True, + ): self.message = message or self.default_message - self.invalid_urls = invalid_urls self.message_urltext = message_urltext or self.default_message_urltext + self.message_schemes = message_schemes or self.default_message_schemes + self.message_domains = message_domains or self.default_message_domains + self.invalid_urls = invalid_urls + self.allowed_schemes = allowed_schemes + self.allowed_domains = allowed_domains + self.visit_url = visit_url + + def check_url(self, url, allowed_schemes, allowed_domains, invalid_urls, text=None): + urlparts = urlparse(url) + if allowed_schemes: + if urlparts.scheme not in allowed_schemes: + return self.message_schemes.format( + url=url, schemes=_(', ').join(allowed_schemes) + ) + if allowed_domains: + if urlparts.netloc.lower() not in allowed_domains: + return self.message_domains.format( + url=url, domains=_(', ').join(allowed_domains) + ) + + if urlparts.scheme not in ('http', 'https') or not self.visit_url: + # The rest of this function only validates HTTP urls. + return - def check_url(self, invalid_urls, url, text=None): if six.PY2: cache_key = b'linkchecker/' + md5sum( url.encode('utf-8') if isinstance(url, six.text_type) else url @@ -456,10 +509,13 @@ def check_url(self, invalid_urls, url, text=None): code = r.status_code rurl = r.url except ( - requests.exceptions.MissingSchema, # Still a relative URL? Must be broken - requests.exceptions.ConnectionError, # Name resolution or connection failed + # Still a relative URL? Must be broken + requests.exceptions.MissingSchema, + # Name resolution or connection failed + requests.exceptions.ConnectionError, + # Didn't respond in time requests.exceptions.Timeout, - ): # Didn't respond in time + ): code = None except Exception as e: exception_catchall.send(e) @@ -485,16 +541,18 @@ def check_url(self, invalid_urls, url, text=None): # runs _after_ attempting to load the URL as we want to catch redirects. for patterns, message in invalid_urls: for pattern in patterns: - # For text patterns, do a substring search. For regex patterns (assumed so if not text), - # do a regex search. Test with the final URL from the response, after redirects, - # but report errors using the URL the user provided + # For text patterns, do a substring search. For regex patterns + # (assumed so if not text), do a regex search. Test with the final + # URL from the response, after redirects, but report errors using + # the URL the user provided if ( pattern in rurl if isinstance(pattern, six.string_types) else pattern.search(rurl) is not None ): return message.format(url=url, text=text) - # All good. The URL works and isn't invalid, so save to cache and return without an error message + # All good. The URL works and isn't invalid, so save to cache and return + # without an error message asset_cache.set(cache_key, {'url': rurl, 'code': code}, timeout=86400) return else: @@ -503,8 +561,15 @@ def check_url(self, invalid_urls, url, text=None): else: return self.message.format(url=url) - def call_inner(self, field, current_url, invalid_urls): - error = self.check_url(invalid_urls, urljoin(current_url, field.data)) + def call_inner( + self, field, current_url, allowed_schemes, allowed_domains, invalid_urls + ): + error = self.check_url( + urljoin(current_url, field.data), + allowed_schemes, + allowed_domains, + invalid_urls, + ) if error: raise StopValidation(error) @@ -516,29 +581,43 @@ def __call__(self, form, field): if callable(self.invalid_urls) else self.invalid_urls ) + allowed_schemes = ( + self.allowed_schemes() + if callable(self.allowed_schemes) + else self.allowed_schemes + ) + allowed_domains = ( + self.allowed_domains() + if callable(self.allowed_domains) + else self.allowed_domains + ) - return self.call_inner(field, current_url, invalid_urls) + return self.call_inner( + field, current_url, allowed_schemes, allowed_domains, invalid_urls + ) class AllUrlsValid(ValidUrl): """ - Validator to confirm all URLs in a HTML snippet are valid because loading - them returns 2xx status codes. + Validator to confirm all URLs in a HTML snippet. - :param unicode message: Error message (None for default error message) - :param unicode message_urltext: Error message when the URL also has text (None to use default) - :param list invalid_urls: A list of (patterns, message) tuples for URLs that will be rejected, - where ``patterns`` is a list of strings or regular expressions. If ``invalid_urls`` is - a callable, it will be called to retrieve the list. + Subclasses :class:`ValidUrl` and accepts the same parameters. """ - def call_inner(self, field, current_url, invalid_urls): - html_tree = html.fromstring(field.data) - for text, href in [ - (atag.text_content(), atag.attrib.get('href')) - for atag in html_tree.xpath("//a") - ]: - error = self.check_url(invalid_urls, urljoin(current_url, href), text) + def call_inner( + self, field, current_url, allowed_schemes, allowed_domains, invalid_urls + ): + html_tree = html5lib.parse(field.data, namespaceHTMLElements=False) + for text, href in ( + (tag.text, tag.attrib.get('href')) for tag in html_tree.iter('a') + ): + error = self.check_url( + urljoin(current_url, href), + allowed_schemes, + allowed_domains, + invalid_urls, + text, + ) if error: field.errors.append(error) if field.errors: diff --git a/setup.py b/setup.py index f375faf6..4c638ed4 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ 'Flask-WTF>=0.14', 'Flask>=1.0', 'furl', - 'lxml', + 'html5lib>=1.0.1', 'mxsniff', 'ndg-httpsclient', 'pyasn1', diff --git a/tests/test_validators.py b/tests/test_validators.py index 7a81fc0b..1c1822e6 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -26,7 +26,7 @@ class TestValidators(TestCaseBaseframe): def setUp(self): super(TestValidators, self).setUp() with self.app.test_request_context('/'): - self.form = UrlFormTest(meta={'csrf': False}) + self.url_form = UrlFormTest(meta={'csrf': False}) self.emoji_form = EmojiFormTest(meta={'csrf': False}) self.all_urls_form = AllUrlsFormTest(meta={'csrf': False}) self.webmail_form = PublicEmailDomainFormTest(meta={'csrf': False}) @@ -40,14 +40,14 @@ def tearDown(self): def test_valid_url(self): with self.app.test_request_context('/'): url = 'https://hasgeek.com/' - self.form.process(url=url) - assert self.form.validate() + self.url_form.process(url=url) + assert self.url_form.validate() def test_invalid_url(self): with self.app.test_request_context('/'): url = 'https://hasgeek' - self.form.process(url=url) - assert not self.form.validate() + self.url_form.process(url=url) + assert not self.url_form.validate() def test_valid_emoji(self): with self.app.test_request_context('/'): @@ -123,23 +123,23 @@ def test_public_email_domain_helper(self): def test_url_without_protocol(self): with self.app.test_request_context('/'): url = 'hasgeek.com' - self.form.process(url=url) - assert not self.form.validate() + self.url_form.process(url=url) + assert not self.url_form.validate() def test_inaccessible_url(self): with self.app.test_request_context('/'): url = 'http://4dc1f6f0e7bc44f2b5b44f00abea4eae.com/' - self.form.process(url=url) - assert not self.form.validate() + self.url_form.process(url=url) + assert not self.url_form.validate() def test_disallowed_url(self): with self.app.test_request_context('/'): url = 'https://example.com/' - self.form.process(url=url) - assert not self.form.validate() + self.url_form.process(url=url) + assert not self.url_form.validate() url = 'https://example.in/' - self.form.process(url=url) - assert not self.form.validate() + self.url_form.process(url=url) + assert not self.url_form.validate() def test_html_snippet_valid_urls(self): url1 = 'https://hasgeek.com/' @@ -197,6 +197,250 @@ def test_nonce_form_on_failure(self): assert self.emoji_form.errors +class TestValidUrl(TestCaseBaseframe): + """Additional tests for the ValidUrl validator""" + + def setUp(self): + super(TestValidUrl, self).setUp() + self.ctx = self.app.test_request_context() + self.ctx.push() + + def tearDown(self): + self.ctx.pop() + + def test_no_schemes(self): + class UrlForm(forms.Form): + url = forms.StringField("URL", validators=[forms.validators.ValidUrl()]) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'mailto:example@example.com' + assert form.validate() is True + + def test_static_schemes(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl(allowed_schemes=('http', 'https')) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'mailto:example@example.com' + assert form.validate() is False + + def test_static_schemes_allowed(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl( + allowed_schemes=('http', 'https', 'mailto') + ) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'mailto:example@example.com' + assert form.validate() is True + + def test_callable_schemes(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl(allowed_schemes=lambda: ('http', 'https')) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'mailto:example@example.com' + assert form.validate() is False + + def test_callable_schemes_allowed(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl( + allowed_schemes=lambda: ('http', 'https', 'mailto') + ) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'mailto:example@example.com' + assert form.validate() is True + + def test_no_domains(self): + class UrlForm(forms.Form): + url = forms.StringField("URL", validators=[forms.validators.ValidUrl()]) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'http://www.example.com' + assert form.validate() is True + + def test_static_domains(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl(allowed_domains=('example.net',)) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'http://www.example.com' + assert form.validate() is False + + def test_static_domains_misconfigured(self): + """Domains must be exact matches including subdomains""" + + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl( + allowed_domains=('example.net', 'example.com') + ) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'http://www.example.com' + assert form.validate() is False + + def test_static_domains_allowed(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl( + allowed_domains=( + 'example.net', + 'example.com', + 'www.example.com', + ) + ) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'http://www.example.com' + assert form.validate() is True + + def test_static_domains_case(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl( + allowed_domains=( + 'example.net', + 'example.com', + 'www.example.com', + ) + ) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'http://WWW.EXAMPLE.COM' + assert form.validate() is True + + def test_callable_domains(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl(allowed_domains=lambda: ('example.net',)) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'http://www.example.com' + assert form.validate() is False + + def test_callable_domains_allowed(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl( + allowed_domains=lambda: ( + 'example.net', + 'example.com', + 'www.example.com', + ) + ) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'http://www.example.com' + assert form.validate() is True + + def test_visit_url_true(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", validators=[forms.validators.ValidUrl(visit_url=True)] + ) + + form = UrlForm(meta={'csrf': False}) + # Invalid URL will fail a load test + form.url.data = 'http://localhosta' + assert form.validate() is False + + def test_visit_url_false(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", validators=[forms.validators.ValidUrl(visit_url=False)] + ) + + form = UrlForm(meta={'csrf': False}) + # Invalid URL won't be tested + form.url.data = 'http://localhosta' + assert form.validate() is True + + def test_message_schemes(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl( + allowed_schemes=('https', 'mailto'), + message_schemes="Scheme for '{url}' must be: {schemes}", + ) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'http://example.com' + assert form.validate() is False + assert form.url.errors == [ + "Scheme for 'http://example.com' must be: https, mailto" + ] + + def test_message_domains(self): + class UrlForm(forms.Form): + url = forms.StringField( + "URL", + validators=[ + forms.validators.ValidUrl( + allowed_domains=('example.net', 'example.org'), + message_domains="Allowed domains for '{url}': {domains}", + ) + ], + ) + + form = UrlForm(meta={'csrf': False}) + form.url.data = 'http://example.com' + assert form.validate() is False + assert form.url.errors == [ + "Allowed domains for 'http://example.com': example.net, example.org" + ] + + class TestFormBase(TestCaseBaseframe): # Subclasses must define a `Form` From b4c34be157d54e4f75a6a0dd2b73f6957875b3cb Mon Sep 17 00:00:00 2001 From: Kiran Jonnalagadda Date: Sat, 16 May 2020 06:10:11 +0530 Subject: [PATCH 2/3] Additional documentation --- baseframe/forms/form.py | 2 +- baseframe/forms/validators.py | 20 +++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/baseframe/forms/form.py b/baseframe/forms/form.py index 4b2a01aa..5ca67df0 100644 --- a/baseframe/forms/form.py +++ b/baseframe/forms/form.py @@ -176,7 +176,7 @@ def send_signals(self): form_validation_success.send(self) def errors_with_data(self): - # Convert lazy_gettext error strings into str so they don't cause problems downstream + # Convert lazy_gettext error strings into unicode so they don't cause problems downstream # (like when pickling) return { name: { diff --git a/baseframe/forms/validators.py b/baseframe/forms/validators.py index 5078b060..9f2044b8 100644 --- a/baseframe/forms/validators.py +++ b/baseframe/forms/validators.py @@ -418,13 +418,14 @@ class ValidUrl(object): :param str message_schemes: Error message when the URL scheme is invalid :param str message_domains: Error message when the URL domain is not whitelisted :param list invalid_urls: A list of (patterns, message) tuples for URLs that will be - rejected, where ``patterns`` is a list of strings or regular expressions. If - ``invalid_urls`` is a callable, it will be called to retrieve the list. + rejected, where ``patterns`` is a list of strings or regular expressions :param set allowed_schemes: Allowed schemes in URLs (`None` implies no constraints) :param set allowed_domains: Whitelisted domains (`None` implies no constraints) :param bool visit_url: Visit the URL to confirm availability (default `True`) - ``allowed_schemes`` and ``allowed_domains`` may be callables that return a set + ``invalid_urls``, ``allowed_schemes`` and ``allowed_domains`` may also be callables + that take no parameters and return the required data. They will be called once per + validation. """ user_agent = ( @@ -462,6 +463,19 @@ def __init__( self.visit_url = visit_url def check_url(self, url, allowed_schemes, allowed_domains, invalid_urls, text=None): + """ + Inner method to actually check the URL. + + This method accepts ``allowed_schemes``, ``allowed_domains`` and + ``invalid_urls`` as direct parameters despite their availability via `self` + because they may be callables, and in :class:`AllUrlsValid` we call + :meth:`check_url` repeatedly. The callables should be called only once. This + optimization has no value in the base class :class:`ValidUrl`. + + As the validator is instantiated once per form field, it cannot mutate itself + at runtime to cache the callables' results, and must instead pass them from one + method to the next. + """ urlparts = urlparse(url) if allowed_schemes: if urlparts.scheme not in allowed_schemes: From 357c6fef7aa37a7988e1d622c21af062f8231dce Mon Sep 17 00:00:00 2001 From: Kiran Jonnalagadda Date: Sun, 17 May 2020 15:48:21 +0530 Subject: [PATCH 3/3] Set test patterns for DeepSource --- .deepsource.toml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.deepsource.toml b/.deepsource.toml index 12d1f64c..636ba597 100644 --- a/.deepsource.toml +++ b/.deepsource.toml @@ -1,9 +1,13 @@ version = 1 +test_patterns = [ + "tests/**" +] + [[analyzers]] name = "python" enabled = true -runtime_version = "3.x.x" +runtime_version = "2.x.x" [analyzers.meta] max_line_length = 100