forked from galuano1/twitalkerplus
/
cron.py
77 lines (65 loc) · 2.25 KB
/
cron.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/python
from time import time
from google.appengine.api import taskqueue
from google.appengine.api import xmpp
from google.appengine.ext import webapp, db
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.runtime import DeadlineExceededError
CRON_NUM = 10
TASK_QUEUE_NUM = 10
USERS_NUM_IN_TASK = 8
class cron_handler(webapp.RequestHandler):
def get(self, cron_id):
def add_task_by_jid(jid):
self.jids.append(jid)
if len(self.jids) >= USERS_NUM_IN_TASK:
flush_jids()
if len(self.tasks) >= 100:
flush_tasks()
def flush_jids():
if self.jids:
self.tasks.append(taskqueue.Task(url='/worker', params={'jid': self.jids}))
self.jids = list()
def flush_tasks():
def db_op():
while db.WRITE_CAPABILITY:
try:
self.queues[self.queue_pointer].add(self.tasks)
except taskqueue.TransientError:
continue
break
if self.tasks:
db.run_in_transaction(db_op)
self.tasks = list()
self.queue_pointer = (self.queue_pointer + 1) % TASK_QUEUE_NUM
cron_id = int(cron_id)
self.queues = [taskqueue.Queue('fetch' + str(id)) for id in xrange(TASK_QUEUE_NUM)]
self.queue_pointer = cron_id % TASK_QUEUE_NUM
self.tasks = list()
self.jids = list()
from db import GoogleUser, Db
data = GoogleUser.get_all(shard=cron_id)
try:
for u in data:
if u.display_timeline or u.msg_template.strip():
time_delta = int(time()) - u.last_update
if time_delta >= u.interval * 60 - 30:
try:
flag = xmpp.get_presence(u.jid)
except xmpp.Error:
flag = False
if not flag:
continue
Db.set_cache(u)
add_task_by_jid(u.jid)
flush_jids()
flush_tasks()
except DeadlineExceededError:
self.response.clear()
self.response.set_status(500)
self.response.out.write("This operation could not be completed in time...")
def main():
application = webapp.WSGIApplication([('/cron(\d+)', cron_handler)], debug=True)
run_wsgi_app(application)
if __name__ == "__main__":
main()