-
Notifications
You must be signed in to change notification settings - Fork 7
/
__init__.py
124 lines (102 loc) · 5.14 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import datetime
from akismet.exceptions import AkismetServerError, MissingParameterError
__version__ = '0.4.3'
PYTHON_AKISMET_USER_AGENT = "Python-Akismet/{0}".format(__version__)
# API URIs
AKISMET_PROTOCOL = 'https'
AKISMET_DOMAIN = 'rest.akismet.com'
AKISMET_VERSION = '1.1'
AKISMET_CHECK_URL = "{protocol}://{api_key}.{domain}/{version}/comment-check"
AKISMET_SUBMIT_SPAM_URL = "{protocol}://{api_key}.{domain}/{version}/submit-spam"
AKISMET_SUBMIT_HAM_URL = "{protocol}://{api_key}.{domain}/{version}/submit-ham"
def remove_self(params):
params = dict(params)
del params['self']
return params
class SpamStatus:
Ham = 0
Unknown = 1
ProbableSpam = 2
DefiniteSpam = 3
class Akismet:
charset = 'utf-8'
protocol = AKISMET_PROTOCOL
domain = AKISMET_DOMAIN
version = AKISMET_VERSION
def __init__(self, api_key, blog=None, application_user_agent=None, timeout=None, is_test=False):
self.api_key = api_key
self.blog = blog
self.timeout = timeout
self.is_test = is_test
self.user_agent = '{0} | {1}'.format(application_user_agent or 'Unknown Application/0.0.0',
PYTHON_AKISMET_USER_AGENT)
def _get_parameters(self, data):
data.pop('self', None)
data = dict((key, value) for key, value in data.items() if value is not None)
data['is_test'] = data.get('is_test') or self.is_test
data['charset'] = self.charset
if 'blog' not in data and not self.blog:
raise MissingParameterError("blog is a required parameter if blog is not set on the akismet object")
elif 'blog' not in data:
data['blog'] = self.blog
for dt_param in ['comment_date', 'comment_post_modified']:
value = data.pop(dt_param, None)
if not value:
continue
assert type(value) is datetime.datetime
data[dt_param + '_gmt'] = value.isoformat()
return data
def _request(self, url, parameters, headers=None):
import requests
headers = headers or self.get_headers()
return requests.post(url, data=parameters, headers=headers, timeout=self.timeout)
def get_url(self, url):
return url.format(protocol=self.protocol, api_key=self.api_key, domain=self.domain, version=self.version)
def get_check_url(self):
return self.get_url(AKISMET_CHECK_URL)
def get_submit_spam_url(self):
return self.get_url(AKISMET_SUBMIT_SPAM_URL)
def get_submit_ham_url(self):
return self.get_url(AKISMET_SUBMIT_HAM_URL)
def get_headers(self):
return {
"User-Agent": self.user_agent,
}
def check(self, user_ip, user_agent, comment_author=None, comment_author_email=None, comment_author_url=None,
comment_content=None, referrer='unknown', blog=None, permalink=None, comment_type=None, blog_lang=None,
comment_date=None, comment_post_modified=None, user_role=None, is_test=False, recheck_reason=None):
parameters = self._get_parameters(locals())
r = self._request(self.get_check_url(), parameters)
try:
result = r.json()
if result is False:
return SpamStatus.Ham
elif result is True:
if r.headers.get('X-Akismet-Pro-Tip') == "discard":
return SpamStatus.DefiniteSpam
else:
return SpamStatus.ProbableSpam
else:
return SpamStatus.Unknown
except ValueError:
raise AkismetServerError("Akismet server returned an error: {0}".format(
r.headers.get('X-Akismet-Debug-Help') or r.text
))
def submit_spam(self, user_ip, user_agent, comment_author=None, comment_author_email=None,
comment_author_url=None, comment_content=None, referrer='unknown', blog=None, permalink=None,
comment_type=None, is_test=False, blog_lang=None, comment_date=None, comment_post_modified=None,
user_role=None):
self.submit(True, **remove_self(locals()))
def submit_ham(self, user_ip, user_agent, comment_author=None, comment_author_email=None,
comment_author_url=None, comment_content=None, referrer='unknown', blog=None, permalink=None,
comment_type=None, is_test=False, blog_lang=None, comment_date=None, comment_post_modified=None,
user_role=None):
self.submit(False, **remove_self(locals()))
def submit(self, is_spam, user_ip, user_agent, comment_author=None, comment_author_email=None,
comment_author_url=None, comment_content=None, referrer='unknown', blog=None, permalink=None,
comment_type=None, is_test=False, blog_lang=None, comment_date=None, comment_post_modified=None,
user_role=None):
parameters = self._get_parameters(locals())
r = self._request(self.get_submit_spam_url() if is_spam else self.get_submit_ham_url(), parameters)
if r.text != "Thanks for making the web a better place.":
raise AkismetServerError("Akismet server returned an error: {0}".format(r.text))