From 39844f3ce8889a5c3f674d611e9d62d2ba5da05d Mon Sep 17 00:00:00 2001 From: Jiang Bian Date: Sat, 9 Nov 2013 18:19:10 -0600 Subject: [PATCH] continue migrating local multiprocessing.Queue to redis queue --- tweetf0rm/bootstrap.py | 38 +++++++++++++++++++--------------- tweetf0rm/client.py | 13 ++++++++++-- tweetf0rm/scheduler.py | 56 ++++++++++++++++++++++++++++++-------------------- 3 files changed, 67 insertions(+), 40 deletions(-) diff --git a/tweetf0rm/bootstrap.py b/tweetf0rm/bootstrap.py index 41234d9..ae2ac68 100644 --- a/tweetf0rm/bootstrap.py +++ b/tweetf0rm/bootstrap.py @@ -39,11 +39,14 @@ def tarball_results(data_folder, bucket, output_tarball_foldler, timestamp): gz_file = os.path.join(output_tarball_foldler, '%s.tar.gz'%timestamp) ll = [] + ignores = ['.DS_Store'] for root, dirs, files in os.walk(data_folder): if (len(files) > 0): with tarfile.open(gz_file, "w:gz") as tar: cnt = 0 for f in files: + if (f in ignores): + continue f_abspath = os.path.join(root, f) (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) = os.stat(f_abspath) @@ -114,26 +117,13 @@ def start_server(config, proxies): pre_time = time.time() last_load_balancing_task_ts = time.time() while True: - # block, the main process...for a command - if(not scheduler.is_alive()): - logger.info("no crawler is alive... i'm done too...") - break - - cmd = node_queue.get(block=True, timeout=360) - - if cmd: - scheduler.enqueue(cmd) - - if (time.time() - last_load_balancing_task_ts > 1800): # try to balance the local queues every 30 mins - last_load_balancing_task_ts = time.time() - cmd = {'cmd': 'BALANCING_LOAD'} - scheduler.enqueue(cmd) if (time.time() - pre_time > 120): logger.info(pprint.pformat(scheduler.crawler_status())) pre_time = time.time() - cmd = {'cmd': 'CRAWLER_FLUSH'} - scheduler.enqueue(cmd) + if (scheduler.is_alive()): + cmd = {'cmd': 'CRAWLER_FLUSH'} + scheduler.enqueue(cmd) if (time.time() - last_archive_ts > 3600): @@ -146,6 +136,22 @@ def start_server(config, proxies): future.add_done_callback(lambda f: logger.info("archive created? %s: [%s]"%f.result())) last_archive_ts = time.time() + + # block, the main process...for a command + if(not scheduler.is_alive()): + logger.info("no crawler is alive... i'm done too...") + time.sleep(120) # sleep for a minute and retry + continue + + if (time.time() - last_load_balancing_task_ts > 1800): # try to balance the local queues every 30 mins + last_load_balancing_task_ts = time.time() + cmd = {'cmd': 'BALANCING_LOAD'} + scheduler.enqueue(cmd) + + cmd = node_queue.get(block=True, timeout=360) + + if cmd: + scheduler.enqueue(cmd) if __name__=="__main__": diff --git a/tweetf0rm/client.py b/tweetf0rm/client.py index 42284ee..4941759 100644 --- a/tweetf0rm/client.py +++ b/tweetf0rm/client.py @@ -13,6 +13,7 @@ sys.path.append(".") from tweetf0rm.redis_helper import NodeQueue, NodeCoordinator from tweetf0rm.utils import node_id, public_ip, hash_cmd +from tweetf0rm.exceptions import NotImplemented pp = pprint.PrettyPrinter(indent=4) @@ -100,6 +101,8 @@ }, 'SHUTDOWN_NODE': { + }, 'NODE_QSIZES': { + } } @@ -162,9 +165,13 @@ def cmd(config, args): node_queue.put(cmd) elif (args.command == 'LIST_NODES'): pp.pprint(node_coordinator.list_nodes()) + elif (args.command == 'NODE_QSIZES'): + raise NotImplemented("NotImplemented yet...") + #pp.pprint(node_coordinator.list_nodes()) elif (args.command == 'SHUTDOWN_NODE'): - node_coordinator.remove_node(nid) - pp.pprint(node_coordinator.list_nodes()) + #node_coordinator.remove_node(nid) + #pp.pprint(node_coordinator.list_nodes()) + raise NotImplemented("NotImplemented yet...") else: args_dict = copy.copy(args.__dict__) cmd = new_cmd(args.command, args_dict) @@ -216,6 +223,8 @@ def print_avaliable_cmd(): }, 'LIST_NODES': { }, 'SHUTDOWN_NODE': { '-nid/--node_id': dictionary['-nid/--node_id'] + }, 'NODE_QSIZES':{ + '-nid/--node_id': dictionary['-nid/--node_id'] }} diff --git a/tweetf0rm/scheduler.py b/tweetf0rm/scheduler.py index 9dc5321..9cf1429 100644 --- a/tweetf0rm/scheduler.py +++ b/tweetf0rm/scheduler.py @@ -106,10 +106,14 @@ def crawler_status(self): status = [] for crawler_id in self.crawlers: cc = self.crawlers[crawler_id] - if ((not cc['crawler'].is_alive()) and time.time() - cc['retry_timer_start_ts'] > 1800): # retry 30 mins after the crawler dies... mostly the crawler died because "Twitter API returned a 503 (Service Unavailable), Over capacity" - self.new_crawler(self.node_id, cc['apikeys'], self.config, cc['crawler_proxies']) + if ((not cc['crawler'].is_alive())): + if ('retry_timer_start_ts' in cc and (time.time() - cc['retry_timer_start_ts'] > 1800)): + # retry 30 mins after the crawler dies... mostly the crawler died because "Twitter API returned a 503 (Service Unavailable), Over capacity" + self.new_crawler(self.node_id, cc['apikeys'], self.config, cc['crawler_proxies']) + else: + cc['retry_timer_start_ts'] = int(time.time()) - status.append({crawler_id: cc['crawler'].is_alive(), 'qsize': cc['crawler_queue'].qsize(), 'crawler_queue_key': cc['crawler_queue'].get_key()}) + status.append({'crawler_id':crawler_id, 'alive?': cc['crawler'].is_alive(), 'qsize': cc['crawler_queue'].qsize(), 'crawler_queue_key': cc['crawler_queue'].get_key()}) return status @@ -118,10 +122,15 @@ def balancing_load(self): Find the crawler that has the most load at this moment, and redistribut its item; Crawler is on a different subprocess, so we have to use redis to coordinate the redistribution... ''' + sorted_queues = self.sorted_local_queue(False) max_crawler_id, max_qsize = sorted_queues[-1] min_crawler_id, min_qsize = sorted_queues[0] + logger.info("crawler with max_qsize: %s (%d)"%(max_crawler_id, max_qsize)) + logger.info("crawler with min_qsize: %s (%d)"%(min_crawler_id, min_qsize)) + logger.info("max_qsize - min_qsize > 0.5 * min_qsize ?: %r"%((max_qsize - min_qsize > 0.5 * min_qsize))) if (max_qsize - min_qsize > 0.5 * min_qsize): + logger.info("load balancing process started...") cmds = [] controls = [] for i in range(int(0.3 * min_qsize)): @@ -139,6 +148,22 @@ def balancing_load(self): for cmd in cmds: self.enqueue(cmd) + def redistribute_crawler_queue(self, crawler_id): + if (crawler_id in self.crawlers): + logger.warn('%s just failed... redistributing its workload'%(crawler_id)) + try: + self.node_coordinator.distribute_to_nodes(self.crawlers[crawler_id]['crawler_queue']) + wait_timer = 180 + # wait until it dies (flushed all the data...) + while(self.crawlers[crawler_id]['crawler'].is_alive() and wait_timer > 0): + time.sleep(60) + wait_timer -= 60 + + self.crawlers[crawler_id]['retry_timer_start_ts'] = int(time.time()) + except Exception as exc: + logger.error(full_stack()) + else: + logger.warn("whatever are you trying to do? crawler_id: [%s] is not valid..."%(crawler_id)) def enqueue(self, cmd): @@ -150,28 +175,15 @@ def enqueue(self, cmd): self.balancing_load() elif(cmd['cmd'] == 'CRAWLER_FAILED'): crawler_id = cmd['crawler_id'] - if (crawler_id in self.crawlers): - logger.warn('%s just failed... redistributing its workload'%(crawler_id)) - try: - self.node_coordinator.distribute_to_nodes(self.crawlers[crawler_id]['crawler_queue']) - wait_timer = 180 - # wait until it dies (flushed all the data...) - while(self.crawlers[crawler_id]['crawler'].is_alive() and wait_timer > 0): - time.sleep(60) - wait_timer -= 60 - - self.crawlers[crawler_id]['retry_timer_start_ts'] = int(time.time()) - except Exception as exc: - logger.error(full_stack()) - else: - logger.warn("whatever are you trying to do? crawler_id: [%s] is not valid..."%(crawler_id)) + self.redistribute_crawler_queue(crawler_id) else: '''distribute item to the local crawler that has the least tasks in queue''' - crawler_id, qsize = self.sorted_local_queue(False)[0] - - self.crawlers[crawler_id]['crawler_queue'].put(cmd) + for crawler_id, qsize in self.sorted_local_queue(False): + if self.crawlers[crawler_id]['crawler'].is_alive(): + self.crawlers[crawler_id]['crawler_queue'].put(cmd) - logger.debug("pushed %s to crawler: %s"%(cmd, crawler_id)) + logger.debug("pushed %s to crawler: %s"%(cmd, crawler_id)) + break def check_crawler_qsizes(self): return {crawler_id:self.crawlers[crawler_id]['crawler_queue'].qsize() for crawler_id in self.crawlers}