/
sendmail_client.py
168 lines (148 loc) · 6.55 KB
/
sendmail_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""
SMTP client reads configuration and sends message.
Andrew DeOrio <awdeorio@umich.edu>
"""
import collections
import socket
import smtplib
import configparser
import getpass
import datetime
import base64
from . import exceptions
# Type to store info read from config file
MailmergeConfig = collections.namedtuple(
"MailmergeConfig",
["username", "host", "port", "security", "ratelimit"],
)
class SendmailClient:
"""Represent a client connection to an SMTP server."""
def __init__(self, config_path, dry_run=False):
"""Read configuration from server configuration file."""
self.config_path = config_path
self.dry_run = dry_run # Do not send real messages
self.config = None # Config read from config_path by read_config()
self.password = None # Password read from stdin
self.lastsent = None # Timestamp of last successful send
self.read_config()
def read_config(self):
"""Read configuration file and return a MailmergeConfig object."""
try:
parser = configparser.RawConfigParser()
parser.read(str(self.config_path))
host = parser.get("smtp_server", "host")
port = parser.getint("smtp_server", "port")
security = parser.get("smtp_server", "security", fallback=None)
username = parser.get("smtp_server", "username", fallback=None)
ratelimit = parser.getint("smtp_server", "ratelimit", fallback=0)
except (configparser.Error, ValueError) as err:
raise exceptions.MailmergeError(f"{self.config_path}: {err}")
# Coerce legacy option "security = Never"
if security == "Never":
security = None
# Verify security type
if security not in [None, "SSL/TLS", "STARTTLS", "PLAIN", "XOAUTH"]:
raise exceptions.MailmergeError(
f"{self.config_path}: unrecognized security type: '{security}'"
)
# Verify username
if security is not None and username is None:
raise exceptions.MailmergeError(
f"{self.config_path}: username is required for "
f"security type '{security}'"
)
# Save validated configuration
self.config = MailmergeConfig(
username, host, port, security, ratelimit,
)
def sendmail(self, sender, recipients, message):
"""Send email message."""
if self.dry_run:
return
# Check if we've hit the rate limit
now = datetime.datetime.now()
if self.config.ratelimit and self.lastsent:
waittime = datetime.timedelta(minutes=1.0 / self.config.ratelimit)
if now - self.lastsent < waittime:
raise exceptions.MailmergeRateLimitError()
# Ask for password if necessary
if self.config.security is not None and self.password is None:
self.password = getpass.getpass(
f">>> password for {self.config.username} on "
f"{self.config.host}: "
)
# Send
host, port = self.config.host, self.config.port
try:
if self.config.security == "SSL/TLS":
self.sendmail_ssltls(sender, recipients, message)
elif self.config.security == "STARTTLS":
self.sendmail_starttls(sender, recipients, message)
elif self.config.security == "PLAIN":
self.sendmail_plain(sender, recipients, message)
elif self.config.security == "XOAUTH":
self.sendmail_xoauth(sender, recipients, message)
elif self.config.security is None:
self.sendmail_clear(sender, recipients, message)
except smtplib.SMTPAuthenticationError as err:
raise exceptions.MailmergeError(
f"{host}:{port} failed to authenticate "
f"user '{self.config.username}': {err}"
)
except smtplib.SMTPException as err:
raise exceptions.MailmergeError(
f"{host}:{port} failed to send message: {err}"
)
except socket.error as err:
raise exceptions.MailmergeError(
f"{host}:{port} failed to connect to server: {err}"
)
# Update timestamp of last sent message
self.lastsent = now
def sendmail_ssltls(self, sender, recipients, message):
"""Send email message with SSL/TLS security."""
message_flattened = str(message)
with smtplib.SMTP_SSL(self.config.host, self.config.port) as smtp:
smtp.login(self.config.username, self.password)
smtp.sendmail(sender, recipients, message_flattened)
def sendmail_starttls(self, sender, recipients, message):
"""Send email message with STARTTLS security."""
message_flattened = str(message)
with smtplib.SMTP(self.config.host, self.config.port) as smtp:
smtp.ehlo()
smtp.starttls()
smtp.ehlo()
smtp.login(self.config.username, self.password)
smtp.sendmail(sender, recipients, message_flattened)
def sendmail_plain(self, sender, recipients, message):
"""Send email message with plain security."""
message_flattened = str(message)
with smtplib.SMTP(self.config.host, self.config.port) as smtp:
smtp.login(self.config.username, self.password)
smtp.sendmail(sender, recipients, message_flattened)
def sendmail_clear(self, sender, recipients, message):
"""Send email message with no security."""
message_flattened = str(message)
with smtplib.SMTP(self.config.host, self.config.port) as smtp:
smtp.sendmail(sender, recipients, message_flattened)
def sendmail_xoauth(self, sender, recipients, message):
"""Send email message with XOAUTH security."""
xoauth2 = (
f"user={self.config.username}\x01"
f"auth=Bearer {self.password}\x01\x01"
)
try:
xoauth2 = xoauth2.encode("ascii")
except UnicodeEncodeError as err:
raise exceptions.MailmergeError(
f"Username and XOAUTH access token must be ASCII '{xoauth2}'. "
f"{err}, "
)
message_flattened = str(message)
with smtplib.SMTP(self.config.host, self.config.port) as smtp:
smtp.ehlo()
smtp.starttls()
smtp.ehlo()
smtp.docmd('AUTH XOAUTH2')
smtp.docmd(str(base64.b64encode(xoauth2).decode("utf-8")))
smtp.sendmail(sender, recipients, message_flattened)