-
Notifications
You must be signed in to change notification settings - Fork 16
/
article_bulk_delete.py
136 lines (103 loc) · 4.57 KB
/
article_bulk_delete.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import json
from copy import deepcopy
from flask_login import current_user
from portality import models
from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary
from portality.tasks.redis_huey import main_queue
from portality.util import batch_up
def article_bulk_delete_manage(selection_query, dry_run=True):
if dry_run:
ArticleBulkDeleteBackgroundTask.check_admin_privilege(current_user.id)
estimate = ArticleBulkDeleteBackgroundTask.estimate_delete_counts(selection_query)
return BackgroundSummary(None, affected={"articles": estimate})
ids = ArticleBulkDeleteBackgroundTask.resolve_selection_query(selection_query)
job = ArticleBulkDeleteBackgroundTask.prepare(
current_user.id,
selection_query=selection_query,
ids=ids
)
ArticleBulkDeleteBackgroundTask.submit(job)
affected = len(ids)
job_id = None
if job is not None:
job_id = job.id
return BackgroundSummary(job_id, affected={"articles": affected})
class ArticleBulkDeleteBackgroundTask(AdminBackgroundTask):
BATCH_SIZE = 1000
__action__ = "article_bulk_delete"
@classmethod
def _job_parameter_check(cls, params):
# we definitely need "ids" defined
return bool(cls.get_param(params, 'ids'))
def run(self):
"""
Execute the task as specified by the background_job
:return:
"""
job = self.background_job
params = job.params
ids = self.get_param(params, 'ids')
if not self._job_parameter_check(params):
raise BackgroundException("{}.run run without sufficient parameters".format(self.__class__.__name__))
batches_count = len(ids) // self.BATCH_SIZE + (0 if len(ids) % self.BATCH_SIZE == 0 else 1)
job.add_audit_message("About to delete {} articles in {} batches".format(len(ids), batches_count))
for batch_num, batch in enumerate(batch_up(ids, self.BATCH_SIZE), start=1):
article_delete_q_by_ids = models.Article.make_query(should_terms={'_id': batch}, consistent_order=False)
models.Article.delete_selected(query=article_delete_q_by_ids, snapshot=True)
job.add_audit_message("Deleted {} articles in batch {} of {}".format(len(batch), batch_num, batches_count))
job.add_audit_message("Deleted {} articles".format(len(ids)))
def cleanup(self):
"""
Cleanup after a successful OR failed run of the task
:return:
"""
pass
@classmethod
def estimate_delete_counts(cls, selection_query):
q = deepcopy(selection_query)
res = models.Article.query(q=q)
return res['hits']['total']['value']
@classmethod
def resolve_selection_query(cls, selection_query):
q = deepcopy(selection_query)
q["_source"] = False
iterator = models.Article.iterate(q=q, page_size=5000, wrap=False)
return [j['_id'] for j in iterator]
@classmethod
def prepare(cls, username, **kwargs):
"""
Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
or fail with a suitable exception
:param kwargs: arbitrary keyword arguments pertaining to this task type
:return: a BackgroundJob instance representing this task
"""
super(ArticleBulkDeleteBackgroundTask, cls).prepare(username, **kwargs)
# first prepare a job record
job = models.BackgroundJob()
job.user = username
job.action = cls.__action__
refs = {}
cls.set_reference(refs, "selection_query", json.dumps(kwargs['selection_query']))
job.reference = refs
params = {}
cls.set_param(params, 'ids', kwargs['ids'])
if not cls._job_parameter_check(params):
raise BackgroundException("{}.prepare run without sufficient parameters".format(cls.__name__))
job.params = params
job.queue_id = huey_helper.queue_id
return job
@classmethod
def submit(cls, background_job):
"""
Submit the specified BackgroundJob to the background queue
:param background_job: the BackgroundJob instance
:return:
"""
background_job.save(blocking=True)
article_bulk_delete.schedule(args=(background_job.id,), delay=10)
huey_helper = ArticleBulkDeleteBackgroundTask.create_huey_helper(main_queue)
@huey_helper.register_execute(is_load_config=False)
def article_bulk_delete(job_id):
job = models.BackgroundJob.pull(job_id)
task = ArticleBulkDeleteBackgroundTask(job)
BackgroundApi.execute(task)