diff --git a/tweetf0rm/process/user_relationship_crawler.py b/tweetf0rm/process/user_relationship_crawler.py index 7acf27b..6186a05 100644 --- a/tweetf0rm/process/user_relationship_crawler.py +++ b/tweetf0rm/process/user_relationship_crawler.py @@ -42,12 +42,20 @@ def __init__(self, node_id, crawler_id, apikeys, handlers = None, redis_config = self.client_args = {"timeout": 300} self.proxies = iter(proxies) if proxies else None self.user_api = None + self.init_user_api() + #self.init_user_api() + def init_user_api(self): # this will throw StopIteration if all proxies have been tried... - if (self.proxies): - self.client_args['proxies'] = next(self.proxies)['proxy_dict'] # this will throw out - #logger.info("client_args: %s"%json.dumps(self.client_args)) + if (self.proxies): + try: + self.client_args['proxies'] = next(self.proxies)['proxy_dict'] # this will throw out + #logger.info("client_args: %s"%json.dumps(self.client_args)) + except StopIteration as exc: + raise + except Exception as exc: + self.init_user_api() if (self.user_api): del self.user_api @@ -55,6 +63,7 @@ def init_user_api(self): # this will throw StopIteration if all proxies have bee #crawler_id=self.crawler_id, self.user_api = User(apikeys=self.apikeys, client_args=self.client_args) + def get_handlers(self): return self.handlers @@ -134,22 +143,23 @@ def run(self): try: func(**args) del args['cmd_handlers'] + self.node_queue.put({'cmd':"CMD_FINISHED", "cmd_hash":cmd['cmd_hash'], "crawler_id":self.crawler_id}) except Exception as exc: logger.error("%s"%exc) try: self.init_user_api() - except Exception as init_user_api_exc: - import exceptions - if (isinstance(init_user_api_exc, exceptions.StopIteration)): # no more proxy to try... so kill myself... - for handler in self.handlers: - handler.flush_all() - - logger.warn('not enough proxy servers, kill me... %s'%(self.crawler_id)) - # flush first - self.node_queue.put({ - 'cmd':'CRAWLER_FAILED', - 'crawler_id': self.crawler_id - }) + except StopIteration as init_user_api_exc: + # import exceptions + # if (isinstance(init_user_api_exc, exceptions.StopIteration)): # no more proxy to try... so kill myself... + for handler in self.handlers: + handler.flush_all() + + logger.warn('not enough proxy servers, kill me... %s'%(self.crawler_id)) + # flush first + self.node_queue.put({ + 'cmd':'CRAWLER_FAILED', + 'crawler_id': self.crawler_id + }) return False #raise else: @@ -158,8 +168,7 @@ def run(self): self.enqueue(cmd) #logger.error(full_stack()) - else: - self.node_queue.put({'cmd':"CMD_FINISHED", "cmd_hash":cmd['cmd_hash'], "crawler_id":self.crawler_id}) + else: logger.warn("whatever are you trying to do?") diff --git a/tweetf0rm/scheduler.py b/tweetf0rm/scheduler.py index 0efa173..c45fbe0 100644 --- a/tweetf0rm/scheduler.py +++ b/tweetf0rm/scheduler.py @@ -41,46 +41,66 @@ def __init__(self, node_id, config={}, proxies=[]): logger.info("number of crawlers: %d"%(number_of_processes)) - file_handler_config = { - "name": "FileHandler", - "args": { - "output_folder" : config["output"] - } - } - apikey_list = self.config['apikeys'].keys() - crawlers = {} + self.crawlers = {} for idx in range(number_of_processes): try: - crawler_id = md5('%s:%s'%(self.node_id, idx)) - apikeys = self.config['apikeys'][apikey_list[idx]] - - logger.debug('creating a new crawler: %s'%crawler_id) - - crawler_proxies = next(self.proxy_generator) if self.proxy_generator else None - crawler = UserRelationshipCrawler(self.node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], redis_config=copy.copy(config['redis_config']), proxies=crawler_proxies) - crawlers[crawler_id] = { - 'crawler': crawler, - 'queue': {} - } - crawler.start() - except twython.exceptions.TwythonAuthError as exc: - logger.error('%s: %s'%(exc, apikeys)) - - self.crawlers = crawlers + self.new_crawler(self.config['apikeys'][apikey_list[idx]], config) + except: + pass + self.node_coordinator = NodeCoordinator(config['redis_config']) self.node_coordinator.add_node(node_id) logger.info("number of crawlers: %d created"%(number_of_processes)) + def new_crawler(self, apikeys, config, crawler_proxies = None): + file_handler_config = { + "name": "FileHandler", + "args": { + "output_folder" : config["output"] + } + } + + try: + #crawler_id = md5('%s:%s'%(self.node_id, idx)) + #apikeys = self.config['apikeys'][apikey_list[idx]] + crawler_id = apikeys['app_key'] + logger.debug('creating a new crawler: %s'%crawler_id) + if (not crawler_proxies): + crawler_proxies = next(self.proxy_generator) if self.proxy_generator else None + crawler = UserRelationshipCrawler(self.node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], redis_config=copy.copy(config['redis_config']), proxies=crawler_proxies) + self.crawlers[crawler_id] = { + 'apikeys': apikeys, + 'crawler': crawler, + 'queue': {}, + 'crawler_proxies': crawler_proxies + } + crawler.start() + except twython.exceptions.TwythonAuthError as exc: + logger.error('%s: %s'%(exc, apikeys)) + except: + raise + + def is_alive(self): a = [1 if self.crawlers[crawler_id]['crawler'].is_alive() else 0 for crawler_id in self.crawlers] return sum(a) > 0 def crawler_status(self): - return [{crawler_id: True, 'qsize': len(self.crawlers[crawler_id]['queue'])} if self.crawlers[crawler_id]['crawler'].is_alive() else {crawler_id: False} for crawler_id in self.crawlers] + status = [] + for crawler_id in self.crawlers: + cc = self.crawlers[crawler_id] + if (not cc['crawler'].is_alive()): + self.new_crawler(cc['apikeys'], self.config, cc['crawler_proxies']) + + status.append({crawler_id: cc['crawler'].is_alive(), 'qsize': len(cc['queue'])}) + + return status + + #return [{crawler_id: True, 'qsize': len(self.crawlers[crawler_id]['queue'])} else {crawler_id: False} for crawler_id in self.crawlers] def distribute_to(self): current_qsize = None @@ -159,7 +179,7 @@ def enqueue(self, cmd): try: crawler_id = cmd['crawler_id'] del self.crawlers[crawler_id]['queue'][cmd['cmd_hash']] - logger.debug('removeing cmd: %s from [%s]'%(cmd['cmd_hash'], crawler_id)) + logger.info('removed cmd: %s from [%s]'%(cmd['cmd_hash'], crawler_id)) except Exception as exc: logger.warn("the cmd doesn't exist? %s: %s"%(cmd['cmd_hash'], exc)) else: @@ -170,7 +190,7 @@ def enqueue(self, cmd): self.crawlers[crawler_id]['crawler'].enqueue(cmd) - logger.debug("pusing %s: [%s] to crawler: %s"%(cmd, cmd_hash, crawler_id)) + logger.debug("pushed %s: [%s] to crawler: %s"%(cmd, cmd_hash, crawler_id)) def check_local_qsizes(self): #logger.info(self.crawlers)