Permalink
Browse files

use app_key as id, and add-in support for restart the entire crawler

  • Loading branch information...
1 parent ccf2f36 commit 2c9130ad7269e9bd7c18930ed51bf3cdace36227 Jiang Bian committed Nov 2, 2013
Showing with 73 additions and 44 deletions.
  1. +26 −17 tweetf0rm/process/user_relationship_crawler.py
  2. +47 −27 tweetf0rm/scheduler.py
@@ -42,19 +42,28 @@ def __init__(self, node_id, crawler_id, apikeys, handlers = None, redis_config =
self.client_args = {"timeout": 300}
self.proxies = iter(proxies) if proxies else None
self.user_api = None
+
self.init_user_api()
+ #self.init_user_api()
+
def init_user_api(self): # this will throw StopIteration if all proxies have been tried...
- if (self.proxies):
- self.client_args['proxies'] = next(self.proxies)['proxy_dict'] # this will throw out
- #logger.info("client_args: %s"%json.dumps(self.client_args))
+ if (self.proxies):
+ try:
+ self.client_args['proxies'] = next(self.proxies)['proxy_dict'] # this will throw out
+ #logger.info("client_args: %s"%json.dumps(self.client_args))
+ except StopIteration as exc:
+ raise
+ except Exception as exc:
+ self.init_user_api()
if (self.user_api):
del self.user_api
#crawler_id=self.crawler_id,
self.user_api = User(apikeys=self.apikeys, client_args=self.client_args)
+
def get_handlers(self):
return self.handlers
@@ -134,22 +143,23 @@ def run(self):
try:
func(**args)
del args['cmd_handlers']
+ self.node_queue.put({'cmd':"CMD_FINISHED", "cmd_hash":cmd['cmd_hash'], "crawler_id":self.crawler_id})
except Exception as exc:
logger.error("%s"%exc)
try:
self.init_user_api()
- except Exception as init_user_api_exc:
- import exceptions
- if (isinstance(init_user_api_exc, exceptions.StopIteration)): # no more proxy to try... so kill myself...
- for handler in self.handlers:
- handler.flush_all()
-
- logger.warn('not enough proxy servers, kill me... %s'%(self.crawler_id))
- # flush first
- self.node_queue.put({
- 'cmd':'CRAWLER_FAILED',
- 'crawler_id': self.crawler_id
- })
+ except StopIteration as init_user_api_exc:
+ # import exceptions
+ # if (isinstance(init_user_api_exc, exceptions.StopIteration)): # no more proxy to try... so kill myself...
+ for handler in self.handlers:
+ handler.flush_all()
+
+ logger.warn('not enough proxy servers, kill me... %s'%(self.crawler_id))
+ # flush first
+ self.node_queue.put({
+ 'cmd':'CRAWLER_FAILED',
+ 'crawler_id': self.crawler_id
+ })
return False
#raise
else:
@@ -158,8 +168,7 @@ def run(self):
self.enqueue(cmd)
#logger.error(full_stack())
- else:
- self.node_queue.put({'cmd':"CMD_FINISHED", "cmd_hash":cmd['cmd_hash'], "crawler_id":self.crawler_id})
+
else:
logger.warn("whatever are you trying to do?")
View
@@ -41,46 +41,66 @@ def __init__(self, node_id, config={}, proxies=[]):
logger.info("number of crawlers: %d"%(number_of_processes))
- file_handler_config = {
- "name": "FileHandler",
- "args": {
- "output_folder" : config["output"]
- }
- }
-
apikey_list = self.config['apikeys'].keys()
- crawlers = {}
+ self.crawlers = {}
for idx in range(number_of_processes):
try:
- crawler_id = md5('%s:%s'%(self.node_id, idx))
- apikeys = self.config['apikeys'][apikey_list[idx]]
-
- logger.debug('creating a new crawler: %s'%crawler_id)
-
- crawler_proxies = next(self.proxy_generator) if self.proxy_generator else None
- crawler = UserRelationshipCrawler(self.node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], redis_config=copy.copy(config['redis_config']), proxies=crawler_proxies)
- crawlers[crawler_id] = {
- 'crawler': crawler,
- 'queue': {}
- }
- crawler.start()
- except twython.exceptions.TwythonAuthError as exc:
- logger.error('%s: %s'%(exc, apikeys))
-
- self.crawlers = crawlers
+ self.new_crawler(self.config['apikeys'][apikey_list[idx]], config)
+ except:
+ pass
+
self.node_coordinator = NodeCoordinator(config['redis_config'])
self.node_coordinator.add_node(node_id)
logger.info("number of crawlers: %d created"%(number_of_processes))
+ def new_crawler(self, apikeys, config, crawler_proxies = None):
+ file_handler_config = {
+ "name": "FileHandler",
+ "args": {
+ "output_folder" : config["output"]
+ }
+ }
+
+ try:
+ #crawler_id = md5('%s:%s'%(self.node_id, idx))
+ #apikeys = self.config['apikeys'][apikey_list[idx]]
+ crawler_id = apikeys['app_key']
+ logger.debug('creating a new crawler: %s'%crawler_id)
+ if (not crawler_proxies):
+ crawler_proxies = next(self.proxy_generator) if self.proxy_generator else None
+ crawler = UserRelationshipCrawler(self.node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], redis_config=copy.copy(config['redis_config']), proxies=crawler_proxies)
+ self.crawlers[crawler_id] = {
+ 'apikeys': apikeys,
+ 'crawler': crawler,
+ 'queue': {},
+ 'crawler_proxies': crawler_proxies
+ }
+ crawler.start()
+ except twython.exceptions.TwythonAuthError as exc:
+ logger.error('%s: %s'%(exc, apikeys))
+ except:
+ raise
+
+
def is_alive(self):
a = [1 if self.crawlers[crawler_id]['crawler'].is_alive() else 0 for crawler_id in self.crawlers]
return sum(a) > 0
def crawler_status(self):
- return [{crawler_id: True, 'qsize': len(self.crawlers[crawler_id]['queue'])} if self.crawlers[crawler_id]['crawler'].is_alive() else {crawler_id: False} for crawler_id in self.crawlers]
+ status = []
+ for crawler_id in self.crawlers:
+ cc = self.crawlers[crawler_id]
+ if (not cc['crawler'].is_alive()):
+ self.new_crawler(cc['apikeys'], self.config, cc['crawler_proxies'])
+
+ status.append({crawler_id: cc['crawler'].is_alive(), 'qsize': len(cc['queue'])})
+
+ return status
+
+ #return [{crawler_id: True, 'qsize': len(self.crawlers[crawler_id]['queue'])} else {crawler_id: False} for crawler_id in self.crawlers]
def distribute_to(self):
current_qsize = None
@@ -159,7 +179,7 @@ def enqueue(self, cmd):
try:
crawler_id = cmd['crawler_id']
del self.crawlers[crawler_id]['queue'][cmd['cmd_hash']]
- logger.debug('removeing cmd: %s from [%s]'%(cmd['cmd_hash'], crawler_id))
+ logger.info('removed cmd: %s from [%s]'%(cmd['cmd_hash'], crawler_id))
except Exception as exc:
logger.warn("the cmd doesn't exist? %s: %s"%(cmd['cmd_hash'], exc))
else:
@@ -170,7 +190,7 @@ def enqueue(self, cmd):
self.crawlers[crawler_id]['crawler'].enqueue(cmd)
- logger.debug("pusing %s: [%s] to crawler: %s"%(cmd, cmd_hash, crawler_id))
+ logger.debug("pushed %s: [%s] to crawler: %s"%(cmd, cmd_hash, crawler_id))
def check_local_qsizes(self):
#logger.info(self.crawlers)

0 comments on commit 2c9130a

Please sign in to comment.