/
autodumpscan.py
212 lines (182 loc) · 6.94 KB
/
autodumpscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
"""Dump scanner for ToolForge."""
import codecs
import os
import os.path
import re
import pywikibot
import mwparserfromhell
from pywikibot import xmlreader
from checkwiki import ignore, deignore
DIRECTORY = "/public/dumps/public/ruwiki/"
FILENAME = "/public/dumps/public/ruwiki/{date}/ruwiki-{date}-pages-meta-current.xml.bz2"
CATEGORY = "Категория:Википедия:Запросы на автоматическое сканирование дампа"
def add_match_to_dict(dictionary, match, prefix=""):
"""Add all group-value pairs from match to dict with given key prefix."""
dictionary[prefix + "0"] = match.group(0)
for idx, val in enumerate(match.groups()):
dictionary[prefix + str(idx + 1)] = val
for key, val in match.groupdict().items():
dictionary[prefix + str(key)] = val
return dictionary
class Processor(object):
"""Class for processing one request."""
limit = 1000000
def __init__(self, page, date):
"""Initialize class from a page with request template."""
if not (page.namespace().id in [2, 104] and "/" in page.title()):
self.correct = False
return
code = mwparserfromhell.parse(page.text)
templates = code.filter_templates(matches=r"^\{\{\s*scan dump")
if len(templates) != 1:
self.correct = False
return
template = templates[0]
self.page = page
self.date = date
self.data = []
self.stopped_at = 0
self.processed = 0
self.title = None
self.namespaces = None
self.ignore = None
self.contains = None
self.not_contains = None
self.flags = 0
self.prefix = ""
self.result = "* [[{title}]]"
self.postfix = ""
self.sortkey = None
self.sortreverse = False
process_param = lambda s: re.sub(r"^<nowiki>(.*)</nowiki>$", "\\1", s.strip(), flags=re.I | re.DOTALL)
escape_param = lambda s: codecs.escape_decode(bytes(s, "utf-8"))[0].decode("utf-8")
for param in template.params:
name = param.name.strip()
value = process_param(param.value)
if value == "":
continue
if name == "title":
self.title = value
elif name == "namespaces":
self.namespaces = [x.strip() for x in value.split(",")]
elif name == "ignore":
self.ignore = value
elif name == "contains":
self.contains = value
elif name in ["not_contains", "not contains"]:
self.not_contains = value
elif name == "ignorecase":
self.flags = self.flags | re.IGNORECASE
elif name == "multiline":
self.flags = self.flags | re.MULTILINE
elif name == "dotall":
self.flags = self.flags | re.DOTALL
elif name == "verbose":
self.flags = self.flags | re.VERBOSE
elif name == "prefix":
self.prefix = escape_param(value) + "\n"
elif name == "result":
self.result = escape_param(value)
elif name == "postfix":
self.postfix = "\n" + escape_param(value)
elif name == "sortkey":
self.sortkey = escape_param(value)
elif name == "sortreverse":
self.sortreverse = True
elif name == "done":
self.correct = False
return
if self.contains is None:
self.correct = False
return
self.length = len(self.prefix) + len(self.postfix)
if self.length > self.limit:
self.correct = False
return
self.correct = True
def process(self, entry):
"""Process single entry."""
self.processed += 1
if not self.correct or self.stopped_at != 0:
return False
groups = {
"title": entry.title,
"namespace": entry.ns,
"id": entry.id,
"text": entry.text
}
if self.namespaces is not None:
if entry.ns not in self.namespaces:
return False
if self.title is not None:
match = re.match(self.title, entry.title)
if match:
add_match_to_dict(groups, match, "t_")
else:
return False
if self.ignore is not None:
(entry.text, ignored) = ignore(entry.text, self.ignore)
match = re.search(self.contains, entry.text, flags=self.flags)
if match:
add_match_to_dict(groups, match, "c_")
else:
return False
if self.not_contains is not None:
if re.search(self.not_contains, entry.text, flags=self.flags):
return False
result = self.result.format(**groups)
if self.sortkey is None:
sortkey = int(entry.id)
else:
sortkey = self.sortkey.format(**groups)
if self.ignore is not None:
result = deignore(result, ignored)
if self.sortkey is not None:
sortkey = deignore(sortkey, ignored)
self.length += len(result) + 1
if self.length > self.limit:
self.stopped_at = self.processed
return False
self.data.append((sortkey, result))
return True
def save_result(self):
"""Save the result to the page."""
self.data = sorted(self.data, reverse=self.sortreverse)
result = self.prefix + "\n".join([pair[1] for pair in self.data]) + self.postfix
params = "\\1|done=True|date={}|pages={}".format(self.date, self.processed)
if self.stopped_at != 0:
params += "|processed={}".format(self.stopped_at)
text = self.page.text
text = re.sub(r"(\{\{\s*[Ss]can dump)", params, text)
text = text + "\n\n" + result
self.page.text = text
self.page.save("Результат сканирования дампа.", minor=False)
def get_dump_date():
"""Iterate through labs dumps and find the newest one."""
dates = sorted(next(os.walk(DIRECTORY))[1], reverse=True)
for date in dates:
if os.path.isfile(FILENAME.format(date=date)):
return date
return None
def main():
"""Main script function."""
processors = []
date = get_dump_date()
dump = xmlreader.XmlDump(FILENAME.format(date=date))
site = pywikibot.Site()
category = pywikibot.Category(site, CATEGORY)
for page in category.members():
processor = Processor(page, date)
if processor.correct:
processors.append(processor)
if len(processors) > 100:
break
if len(processors) == 0:
return
for entry in dump.parse():
for processor in processors:
processor.process(entry)
for processor in processors:
processor.save_result()
if __name__ == "__main__":
main()