Skip to content

Commit

Permalink
修复 autorange 处理错误导致无法继续使用 GAE 代理
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaHOH committed Aug 31, 2017
1 parent 216bf6f commit 00ac867
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
32 changes: 18 additions & 14 deletions local/ProxyHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,7 @@ def do_GAE(self):
self.write('HTTP/1.1 504 Gateway Timeout\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n' % len(c))
self.write(c)
return
with self.nLock:
nappid = self.__class__.nappid
while True:
nappid += 1
if nappid >= len(GC.GAE_APPIDS):
nappid = 0
appid = GC.GAE_APPIDS[nappid]
contains, expired, _ = self.badappids.getstate(appid)
if contains and expired:
for _ in range(GC.GAE_MAXREQUESTS):
qGAE.put(True)
if not contains or expired:
break
self.__class__.nappid = nappid
appid = self.get_appid()
noerror = True
data = None
response = None
Expand Down Expand Up @@ -1147,6 +1134,23 @@ def parse_netloc(self, netloc):
host, _, port = host.partition(':')
return host.lower(), port

def get_appid(self):
with self.nLock:
nappid = self.__class__.nappid
while True:
nappid += 1
if nappid >= len(GC.GAE_APPIDS):
nappid = 0
appid = GC.GAE_APPIDS[nappid]
contains, expired, _ = self.badappids.getstate(appid)
if contains and expired:
for _ in range(GC.GAE_MAXREQUESTS):
qGAE.put(True)
if not contains or expired:
break
self.__class__.nappid = nappid
return appid

def mark_badappid(self, appid):
if appid not in self.badappids:
if len(GC.GAE_APPIDS) - len(self.badappids) <= 1:
Expand Down
34 changes: 18 additions & 16 deletions local/RangeFetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ def __init__(self, handler, headers, payload, response):
self._last_app_status = {}
self.lastupdate = testip.lastupdate
self.iplist = GC.IPLIST_MAP['google_gws'].copy()
self.appids = Queue.Queue()
for id in GC.GAE_APPIDS:
self.appids.put(id)

self.handler = handler
self.get_appid = handler.get_appid
self.write = handler.write
self.bufsize = handler.bufsize
self.command = handler.command
Expand Down Expand Up @@ -182,48 +180,51 @@ def __fetchlet(self, range_queue, data_queue, threadorder):
if self._stopped: return
try:
if self.response:
qGAE.get()
response = self.response
self.response = None
start, end = self.firstrange
else:
appid = self.get_appid()
if self._last_app_status.get(appid, 200) >= 500:
sleep(2)
start, end = range_queue.get(timeout=1)
headers['Range'] = 'bytes=%d-%d' % (start, end)
appid = self.appids.get()
if self._last_app_status.get(appid, 200) >= 500:
sleep(2)
while (start - self.expect_begin) / self.delaysize > 4.0 and data_queue.qsize() * self.bufsize / self.delaysize > 8.0:
if self._stopped: return
sleep(0.1)
if self.response is None:
if appid:
response = gae_urlfetch(self.command, self.url, headers, self.payload, appid, getfast=self.timeout)
if response:
if appid:
self._last_app_status[appid] = response.app_status
xip = response.xip[0]
if xip in self.iplist:
self._last_app_status[appid] = response.app_status
realstart = start
starttime = time()
else:
range_queue.put((start, end))
noerror = False
continue
except Queue.Empty:
appid = None
return
except Exception as e:
logging.warning('%s Response %r in __fetchlet', self.address_string(), e)
logging.warning('%s Response %r in __fetchlet', self.address_string(response), e)
range_queue.put((start, end))
continue
if self._stopped: return
if not response:
logging.warning('%s RangeFetch %s 没有响应,重试', self.address_string(), headers['Range'])
logging.warning('%s RangeFetch %s 没有响应,重试', self.address_string(response), headers['Range'])
range_queue.put((start, end))
elif response.app_status == 503:
self.handler.mark_badappid(appid)
if appid:
self.handler.mark_badappid(appid)
range_queue.put((start, end))
noerror = False
elif response.app_status != 200:
logging.warning('%s Range Fetch "%s %s" %s 返回 %s', self.address_string(response), self.command, self.url, headers['Range'], response.app_status)
range_queue.put((start, end))
if response.reason != 'debug error':
noerror = False
noerror = False
elif response.getheader('Location'):
self.url = urlparse.urljoin(self.url, response.getheader('Location'))
logging.info('%s RangeFetch Redirect(%r)', self.address_string(response), self.url)
Expand All @@ -249,6 +250,7 @@ def __fetchlet(self, range_queue, data_queue, threadorder):
if xip in self.iplist and len(self.iplist) > self.minip:
self.iplist.remove(xip)
logging.warning('%s RangeFetch 移除慢速 ip %s', self.address_string(), xip)
noerror = False
break
else:
data = response.read(self.bufsize)
Expand All @@ -264,14 +266,14 @@ def __fetchlet(self, range_queue, data_queue, threadorder):
else:
logging.error('%s RangeFetch %r 返回 %s', self.address_string(response), self.url, response.status)
range_queue.put((start, end))
appid = None
noerror = False
except Exception as e:
logging.exception('%s RangeFetch._fetchlet 错误:%r', self.address_string(), e)
noerror = False
raise
finally:
if appid:
qGAE.put(True)
self.appids.put(appid)
if response:
response.close()
if noerror:
Expand Down

0 comments on commit 00ac867

Please sign in to comment.