/
rssnotify.py
executable file
·161 lines (152 loc) · 5.62 KB
/
rssnotify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python
"""
rssnotify.py - Phenny RSSNotify Module
Copyright 2016, sfan5
Licensed under GNU General Public License v2.0
"""
import time
import re
import web
import os
import threading
import feedparser # sudo pip install feedparser
def to_unix_time(tstr):
if tstr.endswith("Z"):
tstr = tstr[:-1] + "+00:00"
r = re.compile(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})([-+])(\d{2}):(\d{2})")
g = r.match(tstr).groups(1)
# This function would be like 100% shorter if the time library didn't suck so hard.
# time.strptime completly ignores timezones because who needs timezones anyway?
# time.timezone is only for non-DST; nobody reading 'time.timezone' in code expects that
# you have time.gmtime() but you can't use it because time.mktime() fucks with it when it's DST
# </rant>
ts = time.mktime(time.strptime(g[0], "%Y-%m-%dT%H:%M:%S"))
ts -= (time.altzone if time.daylight else time.timezone)
if g[1] == "+":
ts -= int(g[2]) * 60 * 60
ts -= int(g[3]) * 60
else: # g[1] == "-"
ts += int(g[2]) * 60 * 60
ts += int(g[3]) * 60
return ts
def resolve_channels(phenny, l):
ret = set()
for entry in l:
sign = 1
if entry[0] == "-":
entry = entry[1:]
sign = -1
c = phenny.bot.channels if entry == "*" else [entry]
if sign == 1:
ret |= set(c)
else: # sign == -1
ret -= set(c)
return ret
class RssNotify():
MAX_MESSAGES = 4
def __init__(self, config):
self.config = config
self.last_updated = {}
self.last_check = 0
self.firstrun = True
for i in range(len(self.config["feeds"])):
self.last_updated[i] = 0
def needs_check(self):
return time.time() > self.last_check + self.config["check_interval"]
def check(self, phenny):
start = self.last_check = time.time()
print("[RssNotify]: Checking RSS feeds...")
for fid, feedspec in enumerate(self.config["feeds"]):
feed = feedparser.parse(feedspec[0], agent="Mozilla/5.0 (compatible; MinetestBot)")
if self.firstrun:
self.last_updated[fid] = max((to_unix_time(e.updated) for e in feed.entries), default=0)
continue
new = []
for entry in feed.entries:
if self.last_updated[fid] >= to_unix_time(entry.updated):
continue
new.append(entry)
if len(new) == 0:
continue
print("[RssNotify]: Found %d update(s) for '%s'" % (len(new), feedspec[0]))
self.last_updated[fid] = max(to_unix_time(e.updated) for e in feed.entries)
new.reverse()
if self.config["logfile"] is not None:
with open(self.config["logfile"], "a", encoding="utf-8") as f:
for entry in new:
message = self._format_msg(entry, log_format=True)
f.write(message + "\n")
for entry in new[:RssNotify.MAX_MESSAGES]:
message = self._format_msg(entry)
self._announce(phenny, message, feedspec[1])
if len(new) > RssNotify.MAX_MESSAGES:
message = self._get_cutoff_message(len(new) - RssNotify.MAX_MESSAGES)
self._announce(phenny, message, feedspec[1])
self.firstrun = False
print("[RssNotify]: Checked %d RSS feeds in %0.3f seconds" % (len(self.config["feeds"]), time.time()-start))
def _format_msg(self, feed_entry, log_format=False):
if log_format:
f_cshort = "[color=#c00]%s[/color]"
f_clong = "[color=#c00]%s[/color] ([color=#c00]%s[/color])"
f_all = "[color=#3465a4][git][/color] %s -> [color=#73d216]%s[/color]: [b]%s[/b] [color=#a04265]%s[/color] %s ([color=#888a85]%s[/color])"
else:
f_cshort = "\x0304%s\x0f"
f_clong = "\x0304%s\x0f (\x0304%s\x0f)"
f_all = "\x0302[git]\x0f %s -> \x0303%s\x0f: \x02%s\x0f \x0313%s\x0f %s (\x0315%s\x0f)"
committer_realname = feed_entry.authors[0].name
if committer_realname == "":
try:
committer_realname = feed_entry.authors[0].email
except AttributeError:
committer_realname = ""
try:
committer = feed_entry.authors[0].href.replace('https://github.com/',"")
except AttributeError:
committer = committer_realname # This will only use the realname if the nickname couldn't be obtained
m = re.search(r'/([a-zA-Z0-9_-]+/[a-zA-Z0-9_-]+)/commit/([a-f0-9]{7})', feed_entry.links[0].href)
repo_name = m.group(1) if m else "?"
commit_hash = m.group(2) if m else "???????"
commit_time = feed_entry.updated
commit_text = feed_entry.title
if self.config["show_link"]:
commit_link = feed_entry.link
else:
commit_link = ""
if committer_realname == "" or committer_realname.lower() == committer.lower():
committer_final = f_cshort % committer
else:
committer_final = f_clong % (committer, committer_realname)
return f_all % (committer_final, repo_name, commit_text, commit_hash, commit_link, commit_time)
def _get_cutoff_message(self, left):
return "\x0302[git]\x0f (%d newer commits not shown)" % left
def _announce(self, phenny, message, chans):
chans = resolve_channels(phenny, chans)
for ch in chans:
phenny.write(['PRIVMSG', ch], message)
#################
c = ['*', '-#minetest-hub']
rssn = RssNotify({
"check_interval": 120,
"show_link": True,
"logfile": os.getcwd() + "/rssnotify.log",
"feeds": [
('https://github.com/minetest/minetest/commits/master.atom', c),
('https://github.com/minetest/minetest_game/commits/master.atom', c),
('https://github.com/minetest/minetestmapper/commits/master.atom', c),
('https://github.com/minetest/serverlist/commits/master.atom', c),
('https://github.com/sfan5/phenny/commits/master.atom', ['##minetestbot']),
('https://github.com/sfan5/minetestbot-modules/commits/master.atom', ['##minetestbot']),
],
})
def rsscheck(phenny, input):
if not rssn.needs_check():
return
t = threading.Thread(target=rssn.check, args=(phenny, ))
t.start()
rsscheck.priority = 'low'
rsscheck.rule = r'.*'
rsscheck.event = '*'
rsscheck.thread = False
rsscheck.nohook = True
if __name__ == '__main__':
print(__doc__.strip())