Skip to content

Commit

Permalink
Merge pull request from GHSA-4r7v-whpg-8rx3
Browse files Browse the repository at this point in the history
* CVE-2024-32651 - Security fix - Server Side Template Injection in Jinja2 allows Remote Command Execution

* use ImmutableSandboxedEnvironment also in validation
  • Loading branch information
dgtlmoon committed Apr 25, 2024
1 parent 1ba2965 commit bd6eda6
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 44 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-only.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:
echo "run test with unittest"
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_notification_diff'
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_watch_model'
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_jinja2_security'
# All tests
echo "run test with pytest"
Expand Down
2 changes: 1 addition & 1 deletion changedetectionio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError
import os
#os.environ['EVENTLET_NO_GREENDNS'] = 'yes'
os.environ['EVENTLET_NO_GREENDNS'] = 'yes'
import eventlet
import eventlet.wsgi
import getopt
Expand Down
7 changes: 3 additions & 4 deletions changedetectionio/blueprint/browser_steps/browser_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from loguru import logger

from changedetectionio.content_fetchers.base import manage_user_agent
from changedetectionio.safe_jinja import render as jinja_render

# Two flags, tell the JS which of the "Selector" or "Value" field should be enabled in the front end
# 0- off, 1- on
Expand Down Expand Up @@ -64,14 +65,12 @@ def call_action(self, action_name, selector=None, optional_value=None):
action_handler = getattr(self, "action_" + call_action_name)

# Support for Jinja2 variables in the value and selector
from jinja2 import Environment
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])

if selector and ('{%' in selector or '{{' in selector):
selector = str(jinja2_env.from_string(selector).render())
selector = jinja_render(template_str=selector)

if optional_value and ('{%' in optional_value or '{{' in optional_value):
optional_value = str(jinja2_env.from_string(optional_value).render())
optional_value = jinja_render(template_str=optional_value)

action_handler(selector, optional_value)
self.page.wait_for_timeout(1.5 * 1000)
Expand Down
6 changes: 4 additions & 2 deletions changedetectionio/blueprint/check_proxies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ def long_task(uuid, preferred_proxy):
import time
from changedetectionio.content_fetchers import exceptions as content_fetcher_exceptions
from changedetectionio.processors import text_json_diff
from changedetectionio.safe_jinja import render as jinja_render

status = {'status': '', 'length': 0, 'text': ''}
from jinja2 import Environment, BaseLoader

contents = ''
now = time.time()
Expand Down Expand Up @@ -64,7 +64,9 @@ def long_task(uuid, preferred_proxy):
status.update({'status': 'OK', 'length': len(contents), 'text': ''})

if status.get('text'):
status['text'] = Environment(loader=BaseLoader()).from_string('{{text|e}}').render({'text': status['text']})
# parse 'text' as text for safety
v = {'text': status['text']}
status['text'] = jinja_render(template_str='{{text|e}}', **v)

status['time'] = "{:.2f}s".format(time.time() - now)

Expand Down
7 changes: 3 additions & 4 deletions changedetectionio/content_fetchers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ def browser_steps_get_valid_steps(self):
def iterate_browser_steps(self):
from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
from playwright._impl._errors import TimeoutError, Error
from jinja2 import Environment
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])
from changedetectionio.safe_jinja import render as jinja_render

step_n = 0

Expand All @@ -143,9 +142,9 @@ def iterate_browser_steps(self):
selector = step['selector']
# Support for jinja2 template in step values, with date module added
if '{%' in step['optional_value'] or '{{' in step['optional_value']:
optional_value = str(jinja2_env.from_string(step['optional_value']).render())
optional_value = jinja_render(template_str=step['optional_value'])
if '{%' in step['selector'] or '{{' in step['selector']:
selector = str(jinja2_env.from_string(step['selector']).render())
selector = jinja_render(template_str=step['selector'])

getattr(interface, "call_action")(action_name=step['operation'],
selector=selector,
Expand Down
8 changes: 3 additions & 5 deletions changedetectionio/flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import queue
import threading
import time
from copy import deepcopy
from .safe_jinja import render as jinja_render
from changedetectionio.strtobool import strtobool
from copy import deepcopy
from functools import wraps
from threading import Event

import flask_login
import pytz
import timeago
Expand Down Expand Up @@ -319,8 +319,6 @@ def before_request_handle_cookie_x_settings():

@app.route("/rss", methods=['GET'])
def rss():
from jinja2 import Environment, BaseLoader
jinja2_env = Environment(loader=BaseLoader)
now = time.time()
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
Expand Down Expand Up @@ -388,7 +386,7 @@ def rss():
# @todo Make this configurable and also consider html-colored markup
# @todo User could decide if <link> goes to the diff page, or to the watch link
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_title}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
content = jinja2_env.from_string(rss_template).render(watch_title=watch_title, html_diff=html_diff, watch_url=watch.link)
content = jinja_render(template_str=rss_template, watch_title=watch_title, html_diff=html_diff, watch_url=watch.link)

fe.content(content=content, type='CDATA')

Expand Down
23 changes: 13 additions & 10 deletions changedetectionio/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,26 @@ def __init__(self, message=None):
def __call__(self, form, field):
from changedetectionio import notification

from jinja2 import Environment, BaseLoader, TemplateSyntaxError, UndefinedError
from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError
from jinja2.sandbox import ImmutableSandboxedEnvironment
from jinja2.meta import find_undeclared_variables
import jinja2.exceptions

# Might be a list of text, or might be just text (like from the apprise url list)
joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}"

try:
jinja2_env = Environment(loader=BaseLoader)
jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader)
jinja2_env.globals.update(notification.valid_tokens)

rendered = jinja2_env.from_string(field.data).render()
jinja2_env.from_string(joined_data).render()
except TemplateSyntaxError as e:
raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e
except UndefinedError as e:
raise ValidationError(f"A variable or function is not defined: {e}") from e
except jinja2.exceptions.SecurityError as e:
raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e

ast = jinja2_env.parse(field.data)
ast = jinja2_env.parse(joined_data)
undefined = ", ".join(find_undeclared_variables(ast))
if undefined:
raise ValidationError(
Expand Down Expand Up @@ -415,7 +420,7 @@ class quickWatchForm(Form):
# Common to a single watch and the global settings
class commonSettingsForm(Form):

notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers()])
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()])
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()])
notification_format = SelectField('Notification format', choices=valid_notification_formats.keys())
Expand Down Expand Up @@ -499,11 +504,9 @@ def validate(self, **kwargs):
result = False

# Attempt to validate jinja2 templates in the URL
from jinja2 import Environment
# Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])
try:
ready_url = str(jinja2_env.from_string(self.url.data).render())
from changedetectionio.safe_jinja import render as jinja_render
jinja_render(template_str=self.url.data)
except Exception as e:
self.url.errors.append('Invalid template syntax')
result = False
Expand Down
7 changes: 4 additions & 3 deletions changedetectionio/model/Watch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from changedetectionio.strtobool import strtobool
from changedetectionio.safe_jinja import render as jinja_render

import os
import re
import time
Expand Down Expand Up @@ -137,12 +139,11 @@ def link(self):

ready_url = url
if '{%' in url or '{{' in url:
from jinja2 import Environment
# Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])
try:
ready_url = str(jinja2_env.from_string(url).render())
ready_url = jinja_render(template_str=url)
except Exception as e:
logger.critical(f"Invalid URL template for: '{url}' - {str(e)}")
from flask import (
flash, Markup, url_for
)
Expand Down
10 changes: 5 additions & 5 deletions changedetectionio/notification.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import apprise
import time
from jinja2 import Environment, BaseLoader
from apprise import NotifyFormat
import json
from loguru import logger
Expand Down Expand Up @@ -116,16 +115,17 @@ def apprise_custom_api_call_wrapper(body, title, notify_type, *args, **kwargs):

def process_notification(n_object, datastore):

from .safe_jinja import render as jinja_render
now = time.time()
if n_object.get('notification_timestamp'):
logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s")
# Insert variables into the notification content
notification_parameters = create_notification_parameters(n_object, datastore)

# Get the notification body from datastore
jinja2_env = Environment(loader=BaseLoader)
n_body = jinja2_env.from_string(n_object.get('notification_body', '')).render(**notification_parameters)
n_title = jinja2_env.from_string(n_object.get('notification_title', '')).render(**notification_parameters)
n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters)
n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters)

n_format = valid_notification_formats.get(
n_object.get('notification_format', default_notification_format),
valid_notification_formats[default_notification_format],
Expand Down Expand Up @@ -157,7 +157,7 @@ def process_notification(n_object, datastore):
continue

logger.info(">> Process Notification: AppRise notifying {}".format(url))
url = jinja2_env.from_string(url).render(**notification_parameters)
url = jinja_render(template_str=url, **notification_parameters)

# Re 323 - Limit discord length to their 2000 char limit total or it wont send.
# Because different notifications may require different pre-processing, run each sequentially :(
Expand Down
18 changes: 18 additions & 0 deletions changedetectionio/safe_jinja.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
Safe Jinja2 render with max payload sizes
See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations
"""

import jinja2.sandbox
import typing as t
import os

JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))


def render(template_str, **args: t.Any) -> str:
jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(extensions=['jinja2_time.TimeExtension'])
output = jinja2_env.from_string(template_str).render(args)
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]

2 changes: 1 addition & 1 deletion changedetectionio/templates/watch-overview.html
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@
<td>
<a {% if watch.uuid in queued_uuids %}disabled="true"{% endif %} href="{{ url_for('form_watch_checknow', uuid=watch.uuid, tag=request.args.get('tag')) }}"
class="recheck pure-button pure-button-primary">{% if watch.uuid in queued_uuids %}Queued{% else %}Recheck{% endif %}</a>
<a href="{{ url_for('edit_page', uuid=watch.uuid)}}" class="pure-button pure-button-primary">Edit</a>
<a href="{{ url_for('edit_page', uuid=watch.uuid)}}#general" class="pure-button pure-button-primary">Edit</a>
{% if watch.history_n >= 2 %}

{% if is_unviewed %}
Expand Down
4 changes: 2 additions & 2 deletions changedetectionio/tests/test_add_replace_remove_filter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/python3

import os.path
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
Expand Down Expand Up @@ -107,7 +107,6 @@ def test_check_add_line_contains_trigger(client, live_server):
#live_server_setup(live_server)

# Give the endpoint time to spin up
time.sleep(1)
test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://') + "?xxx={{ watch_url }}"

res = client.post(
Expand Down Expand Up @@ -166,6 +165,7 @@ def test_check_add_line_contains_trigger(client, live_server):

# Takes a moment for apprise to fire
time.sleep(3)
assert os.path.isfile("test-datastore/notification.txt"), "Notification fired because I can see the output file"
with open("test-datastore/notification.txt", 'r') as f:
response= f.read()
assert '-Oh yes please-' in response
Expand Down
37 changes: 31 additions & 6 deletions changedetectionio/tests/test_jinja2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import time
from flask import url_for
from .util import live_server_setup
from .util import live_server_setup, wait_for_all_checks


# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_jinja2_in_url_query(client, live_server):
def test_setup(client, live_server):
live_server_setup(live_server)

# Give the endpoint time to spin up
time.sleep(1)
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_jinja2_in_url_query(client, live_server):
#live_server_setup(live_server)

# Add our URL to the import page
test_url = url_for('test_return_query', _external=True)
Expand All @@ -24,10 +24,35 @@ def test_jinja2_in_url_query(client, live_server):
follow_redirects=True
)
assert b"Watch added" in res.data
time.sleep(3)
wait_for_all_checks(client)

# It should report nothing found (no new 'unviewed' class)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
assert b'date=2' in res.data

# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456
def test_jinja2_security_url_query(client, live_server):
#live_server_setup(live_server)

# Add our URL to the import page
test_url = url_for('test_return_query', _external=True)

# because url_for() will URL-encode the var, but we dont here
full_url = "{}?{}".format(test_url,
"date={{ ''.__class__.__mro__[1].__subclasses__()}}", )
res = client.post(
url_for("form_quick_watch_add"),
data={"url": full_url, "tags": "test"},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)

# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'is invalid and cannot be used' in res.data
# Some of the spewed output from the subclasses
assert b'dict_values' not in res.data
57 changes: 57 additions & 0 deletions changedetectionio/tests/unit/test_jinja2_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/python3

# run from dir above changedetectionio/ dir
# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security

import unittest
from changedetectionio import safe_jinja


# mostly
class TestJinja2SSTI(unittest.TestCase):

def test_exception(self):
import jinja2

# Where sandbox should kick in
attempt_list = [
"My name is {{ self.__init__.__globals__.__builtins__.__import__('os').system('id') }}",
"{{ self._TemplateReference__context.cycler.__init__.__globals__.os }}",
"{{ self.__init__.__globals__.__builtins__.__import__('os').popen('id').read() }}",
"{{cycler.__init__.__globals__.os.popen('id').read()}}",
"{{joiner.__init__.__globals__.os.popen('id').read()}}",
"{{namespace.__init__.__globals__.os.popen('id').read()}}",
"{{ ''.__class__.__mro__[2].__subclasses__()[40]('/tmp/hello.txt', 'w').write('Hello here !') }}",
"My name is {{ self.__init__.__globals__ }}",
"{{ dict.__base__.__subclasses__() }}"
]
for attempt in attempt_list:
with self.assertRaises(jinja2.exceptions.SecurityError):
safe_jinja.render(attempt)

def test_exception_debug_calls(self):
import jinja2
# Where sandbox should kick in - configs and debug calls
attempt_list = [
"{% debug %}",
]
for attempt in attempt_list:
# Usually should be something like 'Encountered unknown tag 'debug'.'
with self.assertRaises(jinja2.exceptions.TemplateSyntaxError):
safe_jinja.render(attempt)

# https://book.hacktricks.xyz/pentesting-web/ssti-server-side-template-injection/jinja2-ssti#accessing-global-objects
def test_exception_empty_calls(self):
import jinja2
attempt_list = [
"{{config}}",
"{{ debug }}"
"{{[].__class__}}",
]
for attempt in attempt_list:
self.assertEqual(len(safe_jinja.render(attempt)), 0, f"string test '{attempt}' is correctly empty")



if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit bd6eda6

Please sign in to comment.