-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyTaskQ.py
executable file
·239 lines (195 loc) · 5.97 KB
/
pyTaskQ.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#!/usr/bin/python
#coding:utf-8
import gc
gc.disable()
import os
import sys
import time
import threading
import subprocess
import logging
try:
import simple_json as json
except:
import json
import MySQLdb
import torndb
from tornado import httpclient, gen, ioloop, queues, process
from tornado.web import RequestHandler, Application, url
nr_jobSaver = 1
nr_jobConsumer = 4
db_conf = {
'host': '127.0.0.1',
'port': 33060,
'user': 'root',
'pass': '123456',
'db' : 'queue',
}
"""
CREATE DATABASE queue;
USE queue;
CREATE TABLE jobs (
id int primary key auto_increment,
processor char(32),
args text, -- json
src_ip char(32),
state int not null default 0, -- 0=new 1=processing 2=finish 3=fail
create_at datetime not null,
update_at timestamp not null default current_timestamp on update current_timestamp
, KEY `I_create_state` (`create_at`, `state`)
);
"""
refuseJob = False
currentJobID = 0
RequestQueue = queues.Queue()
ProcessQueue = queues.Queue()
class Job(object):
JOB_NEW = 0
JOB_PROCESSING = 1
JOB_FINISH = 2
JOB_FAIL = 3
def __init__(self, id, processor, args, src_ip='', state=0):
self.d = {
'id' : id,
'processor' : processor,
'args' : args,
'src_ip' : src_ip,
'state' : state,
'create_at' : time.time(),
}
@gen.coroutine
def saveToQueue(self, q):
Log("[JOB:%d] saved to queue", self.d['id'])
yield q.put(self)
def saveToLog(self):
#TODO: save to log
Log('TODO: save to log ' + json.dumps(self.d))
def saveToMySQL(self):
#TODO: save to mysql
Log("TODO: save to mysql " + json.dumps(self.d))
def setState(self, newState):
self.d['state'] = newState
#TODO: update mysql
Log("TODO: job(%d) set to state %d", self.d['id'], newState)
def process(self):
self.setState(self.JOB_PROCESSING)
Log("[JOB:%d] process", self.d['id'])
cmd = './processor/' + self.d['processor']
argv = [cmd, str(self.d['id']), self.d['args']]
sp = process.Subprocess(argv)
sp.set_exit_callback(self.process_callback)
def process_callback(self, exit_code):
if exit_code == 0:
Log("[JOB:%d] finish", self.d['id'])
self.setState(self.JOB_FINISH)
else:
Log("[JOB:%d] fail", self.d['id'])
self.setState(self.JOB_FAIL)
def Log(message, *args):
if args != None:
message = message % (args)
print >>sys.stderr, message
class XRequestHandler(RequestHandler):
def json(self, errno, message, data=None):
self.set_header("Content-Type", "application/json")
self.write(json.dumps({'errno': errno, 'message': message, 'data': data}))
class JobHandler(XRequestHandler):
@gen.coroutine
def get(self):
global refuseJob
if refuseJob:
self.json(1, 'refuseJob')
return
args = self.get_argument("args", None)
if args == None or args == "":
self.json(2, 'Bad Request')
return
processor = self.get_argument("processor", "php")
global currentJobID
currentJobID += 1
job = Job(currentJobID, processor, args, self.request.remote_ip)
job.saveToLog()
job.saveToQueue(RequestQueue)
self.json(0, '', job.d)
class StatusHandler(XRequestHandler):
def get(self):
self.json(0, '', {
'RequestQueue': RequestQueue.qsize(),
'ProcessQueue': ProcessQueue.qsize()
})
class StopServerHandler(XRequestHandler):
def get(self):
global refuseJob
if refuseJob != True:
refuseJob = True
rqsize = RequestQueue.qsize()
pqsize = ProcessQueue.qsize()
if rqsize != 0 or pqsize != 0:
resp = """
<html>
<head>
<title>Please Wait...</title>
<meta http-equiv="refresh" content="3" />
</head>
<body align="center" style="margin:30px;">
<p>Please wait for jobs to be save and processed.</p>
<p>This page will refresh untill server stops.</p>
<p>Jobs in RequestQueue: %d not saved</p>
<p>Jobs in ProcessQueue: %d not processed</p>
</body>
</html>
""" % (rqsize, pqsize)
self.set_header("Refresh", "3")
self.write(resp)
return
self.write("Server Stopped.")
ioloop.IOLoop.current().stop()
class JobSaver(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self._id = num
@gen.coroutine
def run(self):
Log("[js:%d] run" % self._id)
global RequestQueue
global ProcessQueue
while True:
#yield gen.sleep(1) #TODO: for test
job = yield RequestQueue.get()
job.saveToMySQL()
job.saveToQueue(ProcessQueue)
class JobConsumer(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self._id = num
@gen.coroutine
def run(self):
Log("[jc:%d] run" % self._id)
global ProcessQueue
while True:
yield gen.sleep(0.5) #TODO: for test
job = yield ProcessQueue.get()
job.process()
def main():
arr_threads = []
#TODO: rebuild RequestQueue from log, and set currentJobID
#prepare Thread to Consume RequestQueue and Produce ProcessQueue
for i in range(nr_jobSaver):
arr_threads.append(JobSaver(i))
#prepare Thread to Consume ProcessQueue
for i in range(nr_jobConsumer):
arr_threads.append(JobConsumer(i))
#start threads
for t in arr_threads:
t.start()
#start server for incoming jobs
app = Application([
url(r"/", JobHandler),
url(r"/status", StatusHandler),
url(r"/stop", StopServerHandler),
])
app.listen(8888)
ioloop.IOLoop.current().start()
if __name__ == "__main__":
logging.basicConfig()
main()