diff --git a/src/cryptography/hazmat/primitives/serialization/pkcs7.py b/src/cryptography/hazmat/primitives/serialization/pkcs7.py index 7b8ab300fecb..593c9b159db3 100644 --- a/src/cryptography/hazmat/primitives/serialization/pkcs7.py +++ b/src/cryptography/hazmat/primitives/serialization/pkcs7.py @@ -5,6 +5,7 @@ import email.base64mime import email.generator import email.message +import email.policy import io import typing @@ -176,7 +177,9 @@ def sign( return rust_pkcs7.sign_and_serialize(self, encoding, options) -def _smime_encode(data: bytes, signature: bytes, micalg: str) -> bytes: +def _smime_encode( + data: bytes, signature: bytes, micalg: str, text_mode: bool +) -> bytes: # This function works pretty hard to replicate what OpenSSL does # precisely. For good and for ill. @@ -191,9 +194,10 @@ def _smime_encode(data: bytes, signature: bytes, micalg: str) -> bytes: m.preamble = "This is an S/MIME signed message\n" - msg_part = email.message.MIMEPart() + msg_part = OpenSSLMimePart() msg_part.set_payload(data) - msg_part.add_header("Content-Type", "text/plain") + if text_mode: + msg_part.add_header("Content-Type", "text/plain") m.attach(msg_part) sig_part = email.message.MIMEPart() @@ -212,7 +216,18 @@ def _smime_encode(data: bytes, signature: bytes, micalg: str) -> bytes: fp = io.BytesIO() g = email.generator.BytesGenerator( - fp, maxheaderlen=0, mangle_from_=False, policy=m.policy + fp, + maxheaderlen=0, + mangle_from_=False, + policy=m.policy.clone(linesep="\r\n"), ) g.flatten(m) return fp.getvalue() + + +class OpenSSLMimePart(email.message.MIMEPart): + # A MIMEPart subclass that replicates OpenSSL's behavior of not including + # a newline if there are no headers. + def _write_headers(self, generator) -> None: + if list(self.raw_items()): + generator._write_headers(self) diff --git a/src/rust/src/pkcs7.rs b/src/rust/src/pkcs7.rs index d760776564e3..99488e13dc15 100644 --- a/src/rust/src/pkcs7.rs +++ b/src/rust/src/pkcs7.rs @@ -135,14 +135,15 @@ fn sign_and_serialize<'p>( .getattr(crate::intern!(py, "PKCS7Options"))?; let raw_data = builder.getattr(crate::intern!(py, "_data"))?.extract()?; - let data = if options.contains(pkcs7_options.getattr(crate::intern!(py, "Binary"))?)? { - Cow::Borrowed(raw_data) - } else { - smime_canonicalize( - raw_data, - options.contains(pkcs7_options.getattr(crate::intern!(py, "Text"))?)?, - ) - }; + let (data_with_header, data_without_header) = + if options.contains(pkcs7_options.getattr(crate::intern!(py, "Binary"))?)? { + (Cow::Borrowed(raw_data), Cow::Borrowed(raw_data)) + } else { + smime_canonicalize( + raw_data, + options.contains(pkcs7_options.getattr(crate::intern!(py, "Text"))?)?, + ) + }; let content_type_bytes = asn1::write_single(&PKCS7_DATA_OID)?; let signing_time_bytes = asn1::write_single(&x509::certificate::time_from_chrono( @@ -179,7 +180,7 @@ fn sign_and_serialize<'p>( { ( None, - x509::sign::sign_data(py, py_private_key, py_hash_alg, &data)?, + x509::sign::sign_data(py, py_private_key, py_hash_alg, &data_with_header)?, ) } else { let mut authenticated_attrs = vec![]; @@ -197,7 +198,8 @@ fn sign_and_serialize<'p>( ])), }); - let digest = asn1::write_single(&x509::ocsp::hash_data(py, py_hash_alg, &data)?)?; + let digest = + asn1::write_single(&x509::ocsp::hash_data(py, py_hash_alg, &data_with_header)?)?; // Gross hack: copy to PyBytes to extend the lifetime to 'p let digest_bytes = pyo3::types::PyBytes::new(py, &digest); authenticated_attrs.push(x509::csr::Attribute { @@ -263,7 +265,7 @@ fn sign_and_serialize<'p>( if options.contains(pkcs7_options.getattr(crate::intern!(py, "DetachedSignature"))?)? { None } else { - data_tlv_bytes = asn1::write_single(&data.deref())?; + data_tlv_bytes = asn1::write_single(&data_with_header.deref())?; Some(asn1::parse_single(&data_tlv_bytes).unwrap()) }; @@ -305,9 +307,10 @@ fn sign_and_serialize<'p>( .import("cryptography.hazmat.primitives.serialization.pkcs7")? .getattr(crate::intern!(py, "_smime_encode"))? .call1(( - pyo3::types::PyBytes::new(py, &data), + pyo3::types::PyBytes::new(py, &data_without_header), pyo3::types::PyBytes::new(py, &content_info_bytes), mic_algs, + options.contains(pkcs7_options.getattr(crate::intern!(py, "Text"))?)?, ))? .extract()?) } else { @@ -316,28 +319,37 @@ fn sign_and_serialize<'p>( } } -fn smime_canonicalize(data: &[u8], text_mode: bool) -> Cow<'_, [u8]> { - let mut new_data = vec![]; +fn smime_canonicalize(data: &[u8], text_mode: bool) -> (Cow<'_, [u8]>, Cow<'_, [u8]>) { + let mut new_data_with_header = vec![]; + let mut new_data_without_header = vec![]; if text_mode { - new_data.extend_from_slice(b"Content-Type: text/plain\r\n\r\n"); + new_data_with_header.extend_from_slice(b"Content-Type: text/plain\r\n\r\n"); } let mut last_idx = 0; for (i, c) in data.iter().copied().enumerate() { if c == b'\n' && (i == 0 || data[i - 1] != b'\r') { - new_data.extend_from_slice(&data[last_idx..i]); - new_data.push(b'\r'); - new_data.push(b'\n'); + new_data_with_header.extend_from_slice(&data[last_idx..i]); + new_data_with_header.push(b'\r'); + new_data_with_header.push(b'\n'); + + new_data_without_header.extend_from_slice(&data[last_idx..i]); + new_data_without_header.push(b'\r'); + new_data_without_header.push(b'\n'); last_idx = i + 1; } } // If there's stuff in new_data, that means we need to copy the rest of // data over. - if !new_data.is_empty() { - new_data.extend_from_slice(&data[last_idx..]); - Cow::Owned(new_data) + if !new_data_with_header.is_empty() { + new_data_with_header.extend_from_slice(&data[last_idx..]); + new_data_without_header.extend_from_slice(&data[last_idx..]); + ( + Cow::Owned(new_data_with_header), + Cow::Owned(new_data_without_header), + ) } else { - Cow::Borrowed(data) + (Cow::Borrowed(data), Cow::Borrowed(data)) } } @@ -358,27 +370,60 @@ mod tests { #[test] fn test_smime_canonicalize() { - for (input, text_mode, expected, expected_is_borrowed) in [ + for ( + input, + text_mode, + expected_with_header, + expected_without_header, + expected_is_borrowed, + ) in [ // Values with text_mode=false - (b"" as &[u8], false, b"" as &[u8], true), - (b"\n", false, b"\r\n", false), - (b"abc", false, b"abc", true), - (b"abc\r\ndef\n", false, b"abc\r\ndef\r\n", false), - (b"abc\r\n", false, b"abc\r\n", true), - (b"abc\ndef\n", false, b"abc\r\ndef\r\n", false), + (b"" as &[u8], false, b"" as &[u8], b"" as &[u8], true), + (b"\n", false, b"\r\n", b"\r\n", false), + (b"abc", false, b"abc", b"abc", true), + ( + b"abc\r\ndef\n", + false, + b"abc\r\ndef\r\n", + b"abc\r\ndef\r\n", + false, + ), + (b"abc\r\n", false, b"abc\r\n", b"abc\r\n", true), + ( + b"abc\ndef\n", + false, + b"abc\r\ndef\r\n", + b"abc\r\ndef\r\n", + false, + ), // Values with text_mode=true - (b"", true, b"Content-Type: text/plain\r\n\r\n", false), - (b"abc", true, b"Content-Type: text/plain\r\n\r\nabc", false), + (b"", true, b"Content-Type: text/plain\r\n\r\n", b"", false), + ( + b"abc", + true, + b"Content-Type: text/plain\r\n\r\nabc", + b"abc", + false, + ), ( b"abc\n", true, b"Content-Type: text/plain\r\n\r\nabc\r\n", + b"abc\r\n", false, ), ] { - let result = smime_canonicalize(input, text_mode); - assert_eq!(result.deref(), expected); - assert_eq!(matches!(result, Cow::Borrowed(_)), expected_is_borrowed); + let (result_with_header, result_without_header) = smime_canonicalize(input, text_mode); + assert_eq!(result_with_header.deref(), expected_with_header); + assert_eq!(result_without_header.deref(), expected_without_header); + assert_eq!( + matches!(result_with_header, Cow::Borrowed(_)), + expected_is_borrowed + ); + assert_eq!( + matches!(result_without_header, Cow::Borrowed(_)), + expected_is_borrowed + ); } } } diff --git a/tests/hazmat/primitives/test_pkcs7.py b/tests/hazmat/primitives/test_pkcs7.py index ebb8dc0a9baa..d879563e17d9 100644 --- a/tests/hazmat/primitives/test_pkcs7.py +++ b/tests/hazmat/primitives/test_pkcs7.py @@ -3,6 +3,7 @@ # for complete details. +import email.parser import os import typing @@ -289,6 +290,7 @@ def test_smime_sign_detached(self, backend): sig = builder.sign(serialization.Encoding.SMIME, options) sig_binary = builder.sign(serialization.Encoding.DER, options) + assert b"text/plain" not in sig # We don't have a generic ASN.1 parser available to us so we instead # will assert on specific byte sequences being present based on the # parameters chosen above. @@ -298,8 +300,17 @@ def test_smime_sign_detached(self, backend): # as a separate section before the PKCS7 data. So we should expect to # have data in sig but not in sig_binary assert data in sig + # Parse the message to get the signed data, which is the + # first payload in the message + message = email.parser.BytesParser().parsebytes(sig) + signed_data = message.get_payload()[0].get_payload().encode() _pkcs7_verify( - serialization.Encoding.SMIME, sig, data, [cert], options, backend + serialization.Encoding.SMIME, + sig, + signed_data, + [cert], + options, + backend, ) assert data not in sig_binary _pkcs7_verify( @@ -492,10 +503,14 @@ def test_sign_text(self, backend): # The text option adds text/plain headers to the S/MIME message # These headers are only relevant in SMIME mode, not binary, which is # just the PKCS7 structure itself. - assert b"text/plain" in sig_pem - # When passing the Text option the header is prepended so the actual - # signed data is this. - signed_data = b"Content-Type: text/plain\r\n\r\nhello world" + assert sig_pem.count(b"text/plain") == 1 + assert b"Content-Type: text/plain\r\n\r\nhello world\r\n" in sig_pem + # Parse the message to get the signed data, which is the + # first payload in the message + message = email.parser.BytesParser().parsebytes(sig_pem) + signed_data = message.get_payload()[0].as_bytes( + policy=message.policy.clone(linesep="\r\n") + ) _pkcs7_verify( serialization.Encoding.SMIME, sig_pem,