Permalink
Join GitHub today
GitHub is home to over 28 million developers working together to host and review code, manage projects, and build software together.
Sign up| import mimetypes | |
| from email import ( | |
| charset as Charset, encoders as Encoders, generator, message_from_string, | |
| ) | |
| from email.errors import InvalidHeaderDefect, NonASCIILocalPartDefect | |
| from email.header import Header | |
| from email.headerregistry import Address | |
| from email.message import Message | |
| from email.mime.base import MIMEBase | |
| from email.mime.message import MIMEMessage | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| from email.utils import formatdate, getaddresses, make_msgid, parseaddr | |
| from io import BytesIO, StringIO | |
| from pathlib import Path | |
| from django.conf import settings | |
| from django.core.mail.utils import DNS_NAME | |
| from django.utils.encoding import force_text | |
| # Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from | |
| # some spam filters. | |
| utf8_charset = Charset.Charset('utf-8') | |
| utf8_charset.body_encoding = None # Python defaults to BASE64 | |
| utf8_charset_qp = Charset.Charset('utf-8') | |
| utf8_charset_qp.body_encoding = Charset.QP | |
| # Default MIME type to use on attachments (if it is not explicitly given | |
| # and cannot be guessed). | |
| DEFAULT_ATTACHMENT_MIME_TYPE = 'application/octet-stream' | |
| RFC5322_EMAIL_LINE_LENGTH_LIMIT = 998 | |
| class BadHeaderError(ValueError): | |
| pass | |
| # Header names that contain structured address data (RFC #5322) | |
| ADDRESS_HEADERS = { | |
| 'from', | |
| 'sender', | |
| 'reply-to', | |
| 'to', | |
| 'cc', | |
| 'bcc', | |
| 'resent-from', | |
| 'resent-sender', | |
| 'resent-to', | |
| 'resent-cc', | |
| 'resent-bcc', | |
| } | |
| def forbid_multi_line_headers(name, val, encoding): | |
| """Forbid multi-line headers to prevent header injection.""" | |
| encoding = encoding or settings.DEFAULT_CHARSET | |
| val = str(val) # val may be lazy | |
| if '\n' in val or '\r' in val: | |
| raise BadHeaderError("Header values can't contain newlines (got %r for header %r)" % (val, name)) | |
| try: | |
| val.encode('ascii') | |
| except UnicodeEncodeError: | |
| if name.lower() in ADDRESS_HEADERS: | |
| val = ', '.join(sanitize_address(addr, encoding) for addr in getaddresses((val,))) | |
| else: | |
| val = Header(val, encoding).encode() | |
| else: | |
| if name.lower() == 'subject': | |
| val = Header(val).encode() | |
| return name, val | |
| def split_addr(addr, encoding): | |
| """ | |
| Split the address into local part and domain and encode them. | |
| When non-ascii characters are present in the local part, it must be | |
| MIME-word encoded. The domain name must be idna-encoded if it contains | |
| non-ascii characters. | |
| """ | |
| if '@' in addr: | |
| localpart, domain = addr.split('@', 1) | |
| # Try to get the simplest encoding - ascii if possible so that | |
| # to@example.com doesn't become =?utf-8?q?to?=@example.com. This | |
| # makes unit testing a bit easier and more readable. | |
| try: | |
| localpart.encode('ascii') | |
| except UnicodeEncodeError: | |
| localpart = Header(localpart, encoding).encode() | |
| domain = domain.encode('idna').decode('ascii') | |
| else: | |
| localpart = Header(addr, encoding).encode() | |
| domain = '' | |
| return (localpart, domain) | |
| def sanitize_address(addr, encoding): | |
| """ | |
| Format a pair of (name, address) or an email address string. | |
| """ | |
| if not isinstance(addr, tuple): | |
| addr = parseaddr(addr) | |
| nm, addr = addr | |
| localpart, domain = None, None | |
| nm = Header(nm, encoding).encode() | |
| try: | |
| addr.encode('ascii') | |
| except UnicodeEncodeError: # IDN or non-ascii in the local part | |
| localpart, domain = split_addr(addr, encoding) | |
| # An `email.headerregistry.Address` object is used since | |
| # email.utils.formataddr() naively encodes the name as ascii (see #25986). | |
| if localpart and domain: | |
| address = Address(nm, username=localpart, domain=domain) | |
| return str(address) | |
| try: | |
| address = Address(nm, addr_spec=addr) | |
| except (InvalidHeaderDefect, NonASCIILocalPartDefect): | |
| localpart, domain = split_addr(addr, encoding) | |
| address = Address(nm, username=localpart, domain=domain) | |
| return str(address) | |
| class MIMEMixin: | |
| def as_string(self, unixfrom=False, linesep='\n'): | |
| """Return the entire formatted message as a string. | |
| Optional `unixfrom' when True, means include the Unix From_ envelope | |
| header. | |
| This overrides the default as_string() implementation to not mangle | |
| lines that begin with 'From '. See bug #13433 for details. | |
| """ | |
| fp = StringIO() | |
| g = generator.Generator(fp, mangle_from_=False) | |
| g.flatten(self, unixfrom=unixfrom, linesep=linesep) | |
| return fp.getvalue() | |
| def as_bytes(self, unixfrom=False, linesep='\n'): | |
| """Return the entire formatted message as bytes. | |
| Optional `unixfrom' when True, means include the Unix From_ envelope | |
| header. | |
| This overrides the default as_bytes() implementation to not mangle | |
| lines that begin with 'From '. See bug #13433 for details. | |
| """ | |
| fp = BytesIO() | |
| g = generator.BytesGenerator(fp, mangle_from_=False) | |
| g.flatten(self, unixfrom=unixfrom, linesep=linesep) | |
| return fp.getvalue() | |
| class SafeMIMEMessage(MIMEMixin, MIMEMessage): | |
| def __setitem__(self, name, val): | |
| # message/rfc822 attachments must be ASCII | |
| name, val = forbid_multi_line_headers(name, val, 'ascii') | |
| MIMEMessage.__setitem__(self, name, val) | |
| class SafeMIMEText(MIMEMixin, MIMEText): | |
| def __init__(self, _text, _subtype='plain', _charset=None): | |
| self.encoding = _charset | |
| MIMEText.__init__(self, _text, _subtype=_subtype, _charset=_charset) | |
| def __setitem__(self, name, val): | |
| name, val = forbid_multi_line_headers(name, val, self.encoding) | |
| MIMEText.__setitem__(self, name, val) | |
| def set_payload(self, payload, charset=None): | |
| if charset == 'utf-8': | |
| has_long_lines = any( | |
| len(l.encode()) > RFC5322_EMAIL_LINE_LENGTH_LIMIT | |
| for l in payload.splitlines() | |
| ) | |
| # Quoted-Printable encoding has the side effect of shortening long | |
| # lines, if any (#22561). | |
| charset = utf8_charset_qp if has_long_lines else utf8_charset | |
| MIMEText.set_payload(self, payload, charset=charset) | |
| class SafeMIMEMultipart(MIMEMixin, MIMEMultipart): | |
| def __init__(self, _subtype='mixed', boundary=None, _subparts=None, encoding=None, **_params): | |
| self.encoding = encoding | |
| MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params) | |
| def __setitem__(self, name, val): | |
| name, val = forbid_multi_line_headers(name, val, self.encoding) | |
| MIMEMultipart.__setitem__(self, name, val) | |
| class EmailMessage: | |
| """A container for email information.""" | |
| content_subtype = 'plain' | |
| mixed_subtype = 'mixed' | |
| encoding = None # None => use settings default | |
| def __init__(self, subject='', body='', from_email=None, to=None, bcc=None, | |
| connection=None, attachments=None, headers=None, cc=None, | |
| reply_to=None): | |
| """ | |
| Initialize a single email message (which can be sent to multiple | |
| recipients). | |
| """ | |
| if to: | |
| if isinstance(to, str): | |
| raise TypeError('"to" argument must be a list or tuple') | |
| self.to = list(to) | |
| else: | |
| self.to = [] | |
| if cc: | |
| if isinstance(cc, str): | |
| raise TypeError('"cc" argument must be a list or tuple') | |
| self.cc = list(cc) | |
| else: | |
| self.cc = [] | |
| if bcc: | |
| if isinstance(bcc, str): | |
| raise TypeError('"bcc" argument must be a list or tuple') | |
| self.bcc = list(bcc) | |
| else: | |
| self.bcc = [] | |
| if reply_to: | |
| if isinstance(reply_to, str): | |
| raise TypeError('"reply_to" argument must be a list or tuple') | |
| self.reply_to = list(reply_to) | |
| else: | |
| self.reply_to = [] | |
| self.from_email = from_email or settings.DEFAULT_FROM_EMAIL | |
| self.subject = subject | |
| self.body = body or '' | |
| self.attachments = [] | |
| if attachments: | |
| for attachment in attachments: | |
| if isinstance(attachment, MIMEBase): | |
| self.attach(attachment) | |
| else: | |
| self.attach(*attachment) | |
| self.extra_headers = headers or {} | |
| self.connection = connection | |
| def get_connection(self, fail_silently=False): | |
| from django.core.mail import get_connection | |
| if not self.connection: | |
| self.connection = get_connection(fail_silently=fail_silently) | |
| return self.connection | |
| def message(self): | |
| encoding = self.encoding or settings.DEFAULT_CHARSET | |
| msg = SafeMIMEText(self.body, self.content_subtype, encoding) | |
| msg = self._create_message(msg) | |
| msg['Subject'] = self.subject | |
| msg['From'] = self.extra_headers.get('From', self.from_email) | |
| self._set_list_header_if_not_empty(msg, 'To', self.to) | |
| self._set_list_header_if_not_empty(msg, 'Cc', self.cc) | |
| self._set_list_header_if_not_empty(msg, 'Reply-To', self.reply_to) | |
| # Email header names are case-insensitive (RFC 2045), so we have to | |
| # accommodate that when doing comparisons. | |
| header_names = [key.lower() for key in self.extra_headers] | |
| if 'date' not in header_names: | |
| # formatdate() uses stdlib methods to format the date, which use | |
| # the stdlib/OS concept of a timezone, however, Django sets the | |
| # TZ environment variable based on the TIME_ZONE setting which | |
| # will get picked up by formatdate(). | |
| msg['Date'] = formatdate(localtime=settings.EMAIL_USE_LOCALTIME) | |
| if 'message-id' not in header_names: | |
| # Use cached DNS_NAME for performance | |
| msg['Message-ID'] = make_msgid(domain=DNS_NAME) | |
| for name, value in self.extra_headers.items(): | |
| if name.lower() != 'from': # From is already handled | |
| msg[name] = value | |
| return msg | |
| def recipients(self): | |
| """ | |
| Return a list of all recipients of the email (includes direct | |
| addressees as well as Cc and Bcc entries). | |
| """ | |
| return [email for email in (self.to + self.cc + self.bcc) if email] | |
| def send(self, fail_silently=False): | |
| """Send the email message.""" | |
| if not self.recipients(): | |
| # Don't bother creating the network connection if there's nobody to | |
| # send to. | |
| return 0 | |
| return self.get_connection(fail_silently).send_messages([self]) | |
| def attach(self, filename=None, content=None, mimetype=None): | |
| """ | |
| Attach a file with the given filename and content. The filename can | |
| be omitted and the mimetype is guessed, if not provided. | |
| If the first parameter is a MIMEBase subclass, insert it directly | |
| into the resulting message attachments. | |
| For a text/* mimetype (guessed or specified), when a bytes object is | |
| specified as content, decode it as UTF-8. If that fails, set the | |
| mimetype to DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content. | |
| """ | |
| if isinstance(filename, MIMEBase): | |
| assert content is None | |
| assert mimetype is None | |
| self.attachments.append(filename) | |
| else: | |
| assert content is not None | |
| mimetype = mimetype or mimetypes.guess_type(filename)[0] or DEFAULT_ATTACHMENT_MIME_TYPE | |
| basetype, subtype = mimetype.split('/', 1) | |
| if basetype == 'text': | |
| if isinstance(content, bytes): | |
| try: | |
| content = content.decode() | |
| except UnicodeDecodeError: | |
| # If mimetype suggests the file is text but it's | |
| # actually binary, read() raises a UnicodeDecodeError. | |
| mimetype = DEFAULT_ATTACHMENT_MIME_TYPE | |
| self.attachments.append((filename, content, mimetype)) | |
| def attach_file(self, path, mimetype=None): | |
| """ | |
| Attach a file from the filesystem. | |
| Set the mimetype to DEFAULT_ATTACHMENT_MIME_TYPE if it isn't specified | |
| and cannot be guessed. | |
| For a text/* mimetype (guessed or specified), decode the file's content | |
| as UTF-8. If that fails, set the mimetype to | |
| DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content. | |
| """ | |
| path = Path(path) | |
| with path.open('rb') as file: | |
| content = file.read() | |
| self.attach(path.name, content, mimetype) | |
| def _create_message(self, msg): | |
| return self._create_attachments(msg) | |
| def _create_attachments(self, msg): | |
| if self.attachments: | |
| encoding = self.encoding or settings.DEFAULT_CHARSET | |
| body_msg = msg | |
| msg = SafeMIMEMultipart(_subtype=self.mixed_subtype, encoding=encoding) | |
| if self.body or body_msg.is_multipart(): | |
| msg.attach(body_msg) | |
| for attachment in self.attachments: | |
| if isinstance(attachment, MIMEBase): | |
| msg.attach(attachment) | |
| else: | |
| msg.attach(self._create_attachment(*attachment)) | |
| return msg | |
| def _create_mime_attachment(self, content, mimetype): | |
| """ | |
| Convert the content, mimetype pair into a MIME attachment object. | |
| If the mimetype is message/rfc822, content may be an | |
| email.Message or EmailMessage object, as well as a str. | |
| """ | |
| basetype, subtype = mimetype.split('/', 1) | |
| if basetype == 'text': | |
| encoding = self.encoding or settings.DEFAULT_CHARSET | |
| attachment = SafeMIMEText(content, subtype, encoding) | |
| elif basetype == 'message' and subtype == 'rfc822': | |
| # Bug #18967: per RFC2046 s5.2.1, message/rfc822 attachments | |
| # must not be base64 encoded. | |
| if isinstance(content, EmailMessage): | |
| # convert content into an email.Message first | |
| content = content.message() | |
| elif not isinstance(content, Message): | |
| # For compatibility with existing code, parse the message | |
| # into an email.Message object if it is not one already. | |
| content = message_from_string(force_text(content)) | |
| attachment = SafeMIMEMessage(content, subtype) | |
| else: | |
| # Encode non-text attachments with base64. | |
| attachment = MIMEBase(basetype, subtype) | |
| attachment.set_payload(content) | |
| Encoders.encode_base64(attachment) | |
| return attachment | |
| def _create_attachment(self, filename, content, mimetype=None): | |
| """ | |
| Convert the filename, content, mimetype triple into a MIME attachment | |
| object. | |
| """ | |
| attachment = self._create_mime_attachment(content, mimetype) | |
| if filename: | |
| try: | |
| filename.encode('ascii') | |
| except UnicodeEncodeError: | |
| filename = ('utf-8', '', filename) | |
| attachment.add_header('Content-Disposition', 'attachment', filename=filename) | |
| return attachment | |
| def _set_list_header_if_not_empty(self, msg, header, values): | |
| """ | |
| Set msg's header, either from self.extra_headers, if present, or from | |
| the values argument. | |
| """ | |
| if values: | |
| try: | |
| value = self.extra_headers[header] | |
| except KeyError: | |
| value = ', '.join(str(v) for v in values) | |
| msg[header] = value | |
| class EmailMultiAlternatives(EmailMessage): | |
| """ | |
| A version of EmailMessage that makes it easy to send multipart/alternative | |
| messages. For example, including text and HTML versions of the text is | |
| made easier. | |
| """ | |
| alternative_subtype = 'alternative' | |
| def __init__(self, subject='', body='', from_email=None, to=None, bcc=None, | |
| connection=None, attachments=None, headers=None, alternatives=None, | |
| cc=None, reply_to=None): | |
| """ | |
| Initialize a single email message (which can be sent to multiple | |
| recipients). | |
| """ | |
| super().__init__( | |
| subject, body, from_email, to, bcc, connection, attachments, | |
| headers, cc, reply_to, | |
| ) | |
| self.alternatives = alternatives or [] | |
| def attach_alternative(self, content, mimetype): | |
| """Attach an alternative content representation.""" | |
| assert content is not None | |
| assert mimetype is not None | |
| self.alternatives.append((content, mimetype)) | |
| def _create_message(self, msg): | |
| return self._create_attachments(self._create_alternatives(msg)) | |
| def _create_alternatives(self, msg): | |
| encoding = self.encoding or settings.DEFAULT_CHARSET | |
| if self.alternatives: | |
| body_msg = msg | |
| msg = SafeMIMEMultipart(_subtype=self.alternative_subtype, encoding=encoding) | |
| if self.body: | |
| msg.attach(body_msg) | |
| for alternative in self.alternatives: | |
| msg.attach(self._create_mime_attachment(*alternative)) | |
| return msg |