Skip to content
Closed
5 changes: 4 additions & 1 deletion django/contrib/messages/storage/cookie.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def default(self, obj):
return message
return super().default(obj)

def encode(self, s):
return signing.compress_b64(super().encode(s))


class MessageDecoder(json.JSONDecoder):
"""
Expand All @@ -47,7 +50,7 @@ def process_messages(self, obj):
return obj

def decode(self, s, **kwargs):
decoded = super().decode(s, **kwargs)
decoded = super().decode(signing.decompress_b64(s), **kwargs)
return self.process_messages(decoded)


Expand Down
48 changes: 30 additions & 18 deletions django/core/signing.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,10 @@ def dumps(obj, key=None, salt='django.core.signing', serializer=JSONSerializer,
"""
data = serializer().dumps(obj)

# Flag for if it's been compressed or not
is_compressed = False

if compress:
# Avoid zlib dependency unless compress is being used
compressed = zlib.compress(data)
if len(compressed) < (len(data) - 1):
data = compressed
is_compressed = True
base64d = b64_encode(data).decode()
if is_compressed:
base64d = '.' + base64d
base64d = compress_b64(data.decode('latin-1'), charset='latin-1')
else:
base64d = b64_encode(data).decode('latin-1')
return TimestampSigner(key, salt=salt).sign(base64d)


Expand All @@ -132,15 +124,35 @@ def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, ma
"""
# TimestampSigner.unsign() returns str but base64 and zlib compression
# operate on bytes.
base64d = TimestampSigner(key, salt=salt).unsign(s, max_age=max_age).encode()
decompress = base64d[:1] == b'.'
base64d = TimestampSigner(key, salt=salt).unsign(s, max_age=max_age)
data = decompress_b64(base64d, charset='latin-1')
return serializer().loads(data.encode('latin-1'))


def compress_b64(uncompressed, charset='utf-8'):
is_compressed = False
encoded = uncompressed.encode(charset)
compressed = zlib.compress(encoded)
if (len(compressed) < len(uncompressed) - 1):
encoded = compressed
is_compressed = True
b64_encoded = b64_encode(encoded)
b64_encoded = b64_encoded.decode(charset)
if is_compressed:
b64_encoded = '.' + b64_encoded
return b64_encoded


def decompress_b64(compressed, charset='utf-8'):
decompress = compressed[:1] == '.'
if decompress:
# It's compressed; uncompress it first
base64d = base64d[1:]
data = b64_decode(base64d)
compressed = compressed[1:]
compressed = compressed.encode(charset)
b64_decoded = b64_decode(compressed)
if decompress:
data = zlib.decompress(data)
return serializer().loads(data)
b64_decoded = zlib.decompress(b64_decoded)
b64_decoded = b64_decoded.decode(charset)
return b64_decoded


class Signer:
Expand Down
36 changes: 32 additions & 4 deletions tests/messages_tests/test_cookie.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import json
import random
import string

from django.conf import settings
from django.contrib.messages import constants
from django.contrib.messages.storage.base import Message
from django.contrib.messages.storage.cookie import (
CookieStorage, MessageDecoder, MessageEncoder,
)
from django.core.signing import decompress_b64
from django.test import SimpleTestCase, override_settings
from django.test.utils import ignore_warnings
from django.utils.deprecation import RemovedInDjango40Warning
Expand Down Expand Up @@ -61,7 +64,7 @@ def test_get(self):
self.assertEqual(list(storage), example_messages)

@override_settings(SESSION_COOKIE_SAMESITE='Strict')
def test_cookie_setings(self):
def test_cookie_settings(self):
"""
CookieStorage honors SESSION_COOKIE_DOMAIN, SESSION_COOKIE_SECURE, and
SESSION_COOKIE_HTTPONLY (#15618, #20972).
Expand All @@ -71,7 +74,7 @@ def test_cookie_setings(self):
response = self.get_response()
storage.add(constants.INFO, 'test')
storage.update(response)
self.assertIn('test', response.cookies['messages'].value)
self.assertIn('test', decompress_b64(response.cookies['messages'].value, charset='latin-1'))
self.assertEqual(response.cookies['messages']['domain'], '.example.com')
self.assertEqual(response.cookies['messages']['expires'], '')
self.assertIs(response.cookies['messages']['secure'], True)
Expand Down Expand Up @@ -116,15 +119,40 @@ def test_max_cookie_length(self):
# size which will fit 4 messages into the cookie, but not 5.
# See also FallbackTest.test_session_fallback
msg_size = int((CookieStorage.max_cookie_size - 54) / 4.5 - 37)
random.seed(42)
for i in range(5):
storage.add(constants.INFO, str(i) * msg_size)
s = str(i) + ''.join(random.choice(string.ascii_letters) for _ in range(msg_size - 1))
storage.add(constants.INFO, s)

unstored_messages = storage.update(response)

cookie_storing = self.stored_messages_count(storage, response)
self.assertEqual(cookie_storing, 4)

self.assertEqual(len(unstored_messages), 1)
self.assertEqual(unstored_messages[0].message, '0' * msg_size)
self.assertEqual(unstored_messages[0].message[0], '0')

def test_message_rfc6265_compliant(self):
non_compliant_chars = ('\\', ',', ';', '"')
messages = ['\\te,st', ';m"e', '\u2019']
encoder = MessageEncoder()
value = encoder.encode(messages)
for message in list(value):
for illegal in non_compliant_chars:
self.assertEqual(message.find(illegal), -1)

def test_messages_store_any_string(self):
values = [
'a string \u2019',
'test Ä',
'‘’',
b'\x91\x80'.decode('latin-1'),
b'\xe2\x91'.decode('cp1252'),
]
encoder = MessageEncoder()
value = encoder.encode(values)
decoded_messages = json.loads(value, cls=MessageDecoder)
self.assertEqual(values, decoded_messages)

def test_json_encoder_decoder(self):
"""
Expand Down
10 changes: 8 additions & 2 deletions tests/messages_tests/test_fallback.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import random
import string

from django.contrib.messages import constants
from django.contrib.messages.storage.fallback import (
CookieStorage, FallbackStorage,
Expand Down Expand Up @@ -128,8 +131,10 @@ def test_session_fallback(self):
response = self.get_response()
# see comment in CookieTests.test_cookie_max_length()
msg_size = int((CookieStorage.max_cookie_size - 54) / 4.5 - 37)
random.seed(42)
for i in range(5):
storage.add(constants.INFO, str(i) * msg_size)
s = str(i) + ''.join(random.choice(string.ascii_letters) for _ in range(msg_size - 1))
storage.add(constants.INFO, s)
storage.update(response)
cookie_storing = self.stored_cookie_messages_count(storage, response)
self.assertEqual(cookie_storing, 4)
Expand All @@ -143,7 +148,8 @@ def test_session_fallback_only(self):
"""
storage = self.get_storage()
response = self.get_response()
storage.add(constants.INFO, 'x' * 5000)
random.seed(42)
storage.add(constants.INFO, ''.join(random.choice(string.ascii_letters) for _ in range(5000)))
storage.update(response)
cookie_storing = self.stored_cookie_messages_count(storage, response)
self.assertEqual(cookie_storing, 0)
Expand Down
4 changes: 3 additions & 1 deletion tests/messages_tests/test_mixins.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.core.signing import decompress_b64
from django.test import SimpleTestCase, override_settings
from django.urls import reverse

Expand All @@ -11,4 +12,5 @@ def test_set_messages_success(self):
author = {'name': 'John Doe', 'slug': 'success-msg'}
add_url = reverse('add_success_msg')
req = self.client.post(add_url, author)
self.assertIn(ContactFormViewWithMsg.success_message % author, req.cookies['messages'].value)
self.assertIn(ContactFormViewWithMsg.success_message % author,
decompress_b64(req.cookies['messages'].value, charset='latin-1'))
26 changes: 26 additions & 0 deletions tests/signing/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,32 @@ def test_dumps_loads_default_hashing_algorithm_sha1(self):
signed = signing.dumps(value)
self.assertEqual(signing.loads(signed), value)

def test_compress64_decompress64(self):
"compress_b64 and decompress_b64 be reversible for any string"
unicode_values = [
'a string \u2019',
'test Ä',
'‘’',
b'\x91\x80'.decode('latin-1'),
b'\xe2\x91'.decode('cp1252'),
]
for value in unicode_values:
compressed = signing.compress_b64(value)
self.assertEqual(signing.decompress_b64(compressed), value)

latin_1_value = b'\x91\x80'.decode('latin-1')
compressed = signing.compress_b64(latin_1_value, charset='latin-1')
self.assertEqual(signing.decompress_b64(compressed, charset='latin-1'), latin_1_value)

with self.assertRaises(UnicodeDecodeError):
compressed = signing.compress_b64(latin_1_value, charset='latin-1')
signing.decompress_b64(compressed)

with self.assertRaises(AssertionError):
# Value will come back garbled if a non-unicode charset is used to decode and it is not used to encode
compressed = signing.compress_b64(latin_1_value)
self.assertEqual(signing.decompress_b64(compressed, charset='latin-1'), latin_1_value)

def test_decode_detects_tampering(self):
"loads should raise exception for tampered objects"
transforms = (
Expand Down