-
Notifications
You must be signed in to change notification settings - Fork 0
/
threading2.py
executable file
·118 lines (101 loc) · 2.77 KB
/
threading2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/python
#-*- coding:utf-8 -*-
import threading
import Queue
import sys,time
import re
import urllib2
class MyThread(threading.Thread):
"""This is for running"""
def __init__(self,workQueue,resultQueue,timeout = 30):
threading.Thread.__init__(self)
self.timeout = timeout
self.setDaemon(True)
self.workQueue = workQueue
self.resultQueue = resultQueue
self.start()
def run(self):
"""get and run and put
出现情况,任务队列未清空,任务结束
解决,线程添加出错处理
"""
while True:
try:
callable , args = self.workQueue.get(timeout = self.timeout)
#我们要执行的任务
#print 'worksize',self.workQueue.qsize()
try:
res = callable(args)#res为列表
if res:
for temp in res:
self.resultQueue.put(temp)#以什么格式放入呢
time.sleep(0.1)
except:
continue
except Queue.Empty:#队列为空时,结束还是等待呢
time.sleep(2)
break
except:
sys.exit()
class MyThread2(threading.Thread):
"""this is for print the schedule"""
def __init__(self,work,result,deep,sleeptime):
threading.Thread.__init__(self)
self.setDaemon(True)
self.time = 0
self.work = work
self.result = result
self.deep = deep
self.stime = sleeptime
self.start()
def run(self):
while True:
if self.time == 0:
print ' 时间 深度 当前完成 待完成'
print time.ctime().split(' ')[4],' ',
print self.deep,' ',
print self.result.qsize(),' ',
print self.work.qsize()
self.time = 1
else:
print time.ctime().split(' ')[4],' ',
print self.deep,' ',
print self.result.qsize(),' ',
print self.work.qsize()
time.sleep(self.stime)
class ThreadPool(object):
def __init__(self, workQueue,resultQueue, num_of_threads = 10):
super(ThreadPool , self).__init__()
self.workQueue = workQueue
self.resultQueue = resultQueue
self.thread =[]
self.createThreadPool(num_of_threads)
def createThreadPool( self, num_of_threads):
""" 新建线程,并准备执行"""
for i in range( num_of_threads):
thread = MyThread(self.workQueue,self.resultQueue)
self.thread.append(thread)
def wait_for_done(self):
"""线程完成"""
while len(self.thread):
thread = self.thread.pop()
if thread.isAlive():
thread.join()
def add_jobs(self,callable,args):
"""添加工作到工作队列"""
self.workQueue.put((callable,args))
def test_job(url):
html = urllib2.urlopen(url)
data = html.read()
rr = re.compile(r"""content\=["|']text\/html\;charset\=(\w*?)["|']""")
m = rr.search(data)
if m:
code = m.group(1)
print 'code=',code###
if code:
print data[:1000]
data = data.decode(code)
print data[:1000]
print type(data)
if __name__ =='__main__':
test_job('http://www.baidu.com')