In [None]:
#| default_exp core

# MarkdownMerge

> API details

In [None]:
#| export
import os,mimetypes,smtplib

from fastcore.utils import *
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders

from contextlib import contextmanager
from markdown import markdown
from email.headerregistry import Address
from time import sleep

In [None]:
from fastcore.test import test_eq

In [None]:
#| hide
from nbdev.showdoc import *

## Utility functions

In [None]:
#| export
def get_addr(email, name=None):
    "Convert `email` and optional `name` into an email `Address` object"
    return Address(email if name is None else name, addr_spec=email)

Specify `from_addr` and `to_addrs` as either a string, or an `Address` object (created with `get_addr`). Note the `to_addrs` is a list.

In [None]:
from_addr = get_addr('from@example.com', 'Jeremy Howard')
to_addrs = [get_addr('to@example.com', 'Jeremy')]
print(from_addr)
print(to_addrs[0])

Jeremy Howard <from@example.com>
Jeremy <to@example.com>


In [None]:
#| export
def attach_file(msg, f):
    "Attach file `f` to message `msg`"
    mtype,_ = mimetypes.guess_type(f)
    main,sub = (mtype or 'application/octet-stream').split('/', 1)
    part = MIMEBase(main, sub)
    with open(f, 'rb') as fp: part.set_payload(fp.read())
    encoders.encode_base64(part)
    part['Content-Disposition'] = f'attachment; filename={Path(f).name}'
    msg.attach(part)

In [None]:
msg = MIMEMultipart()
attach_file(msg, 'settings.ini')
part = msg.get_payload()[0]
test_eq(part.get_content_type(), 'application/octet-stream')
test_eq(part['Content-Disposition'], 'attachment; filename=settings.ini')
assert len(part.get_payload())>20

In [None]:
#| export
def create_multipart_msg(subj, from_addr, to_addrs, md=None, html=None, attach=None):
    "Create a multipart email with markdown text and HTML"
    msg = MIMEMultipart('alternative')
    msg['Subject'],msg['From'] = subj,str(from_addr)
    msg['To'] = ', '.join([str(a) for a in listify(to_addrs)])
    if md: msg.attach(MIMEText(md, 'plain'))
    if html: msg.attach(MIMEText(html, 'html'))
    for f in listify(attach): attach_file(msg, f)
    return msg

In [None]:
msg = create_multipart_msg('Test Subject', from_addr, to_addrs, md='**Bold text**', html='<b>Bold text</b>')
test_eq(msg['Subject'], 'Test Subject')
test_eq(msg['From'], str(from_addr))
test_eq(len(msg.get_payload()), 2)
test_eq(msg.get_payload()[0].get_content_type(), 'text/plain')
test_eq(msg.get_payload()[1].get_content_type(), 'text/html')

In [None]:
#| export
def md2email(subj, from_addr, to_addrs, md, attach=None):
    "Create a multipart email from markdown"
    html = markdown(md)
    return create_multipart_msg(subj, from_addr, to_addrs, md=md, html=html, attach=attach)

In [None]:
test_msg2 = md2email('Test md2email', 'support@answer.ai', 'j@answer.ai', '**Markdown** test with _attachment_', attach='settings.ini')
test_eq(test_msg2['Subject'], 'Test md2email')
payload = test_msg2.get_payload()
test_eq(len(payload), 3)
test_eq(payload[2]['Content-Disposition'], 'attachment; filename=settings.ini')

The basic email body is the plain text message (note that the template variables in `{}` will be filled in by `MarkdownMerge`):

In [None]:
print(payload[0].get_payload())

**Markdown** test with _attachment_


Most email software is set up to display the HTML version:

In [None]:
from IPython.display import HTML

In [None]:
html = payload[1].get_payload()
HTML(html)

In [None]:
att = payload[2].get_payload()

In [None]:
import base64

In [None]:
decoded = base64.b64decode(payload[2].get_payload())
print(decoded.decode('utf-8')[:35])

[DEFAULT]
lib_name = markdown_merge


In [None]:
#| export
@contextmanager
def smtp_connection(host, port, user=None, password=None, use_ssl=True, use_tls=False):
    "Context manager for SMTP connection"
    conn = smtplib.SMTP_SSL(host, port) if use_ssl else smtplib.SMTP(host, port)
    if use_tls and not use_ssl: conn.starttls()
    if user and password: conn.login(user, password)
    try: yield conn
    finally: conn.quit()

In [None]:
servernm='email-smtp.us-west-2.amazonaws.com'
username=os.getenv('SES_SMTP_USER')
password=os.getenv('SES_SMTP_PASS')

In [None]:
smtp_cfg = dict(host=servernm, port=587, user=username, password=password, use_ssl=False, use_tls=True)
test_msg = create_multipart_msg('Test from stdlib', 'support@answer.ai', 'j@answer.ai', md='**Test message**', html='<b>Test message</b>')
# with smtp_connection(**smtp_cfg) as conn: conn.send_message(test_msg)

## MarkdownMerge -

In [None]:
#| export
class MarkdownMerge:
    "Send templated email merge messages formatted with Markdown"
    def __init__(self, addrs, from_addr, subj, msg, smtp_cfg=None, inserts=None, test=False):
        self.addrs,self.from_addr,self.subj,self.msg,self.i = addrs,from_addr,subj,msg,0
        self.inserts = [{}]*len(addrs) if inserts is None else inserts
        self.smtp_cfg,self.test = smtp_cfg,test

    def send_msgs(self, pause=0.5):
        "Send all unsent messages to `addrs` with `pause` secs between each send"
        with smtp_connection(**self.smtp_cfg) as conn:
            while self.i < len(self.addrs):
                addr,insert = self.addrs[self.i],self.inserts[self.i]
                msg = self.msg.format(**insert)
                eml = md2email(self.subj, self.from_addr, addr, md=msg)
                if self.test: print(f"To: {addr}\n{'-'*40}\n{msg}\n{'='*40}\n")
                else: conn.send_message(eml); sleep(pause)
                self.i += 1
                if self.i%100==0: print(self.i)

    def reset(self): self.i=0

Your message should be in [markdown](https://daringfireball.net/projects/markdown/syntax) format. It will be converted into a two part email, containing both a plain text and an HTML part, so recipients will see whatever format they're set as their preference for viewing mail. Anything in curly brackets `{}` will be replaced with the contents of the inserts dictionary for that address. If there are no bracketed variables to replace, then you don't need to pass any inserts.

In [None]:
msg = "**Hello {name}!**\n\nYour special number is: *{num}*"

`inserts` is a list of dictionaries. For each dictionary, the keys should match the bracketed names in your email template, and the values will be filled in to those sections.

In [None]:
inserts = [{'name': 'Jeremy', 'num': 42}, {'name': 'Rachel', 'num': 7}]

In [None]:
mm = MarkdownMerge(['aaa@answer.ai', 'bbb@answer.ai'], 'from@answer.ai', 'Test merge',
                   msg, smtp_cfg=smtp_cfg, inserts=inserts, test=True)

In [None]:
#| eval: false
mm.send_msgs()

To: aaa@answer.ai
----------------------------------------
**Hello Jeremy!**

Your special number is: *42*

To: bbb@answer.ai
----------------------------------------
**Hello Rachel!**

Your special number is: *7*



Use `pause` to avoid sending too many messages too quickly; many SMTP servers restrict sending speed to avoid abuse. If you get an error during sending (e.g. "too many messages"), then wait an hour or so, then continue sending, using a larger `pause` value.

**NB**: You can just call `send_msgs` again when resending, since the successfully sent message count is saved, and those messages are not re-sent (unless you call `reset`). This includes test sends, therefore you should run reset after a test send.

To reset the counter to `0`, call `reset`:

In [None]:
mm.reset()