Permalink
Browse files

continue migrating local multiprocessing.Queue to redis queue

  • Loading branch information...
1 parent 4439f2f commit 39844f3ce8889a5c3f674d611e9d62d2ba5da05d @bianjiang committed Nov 10, 2013
Showing with 67 additions and 40 deletions.
  1. +22 −16 tweetf0rm/bootstrap.py
  2. +11 −2 tweetf0rm/client.py
  3. +34 −22 tweetf0rm/scheduler.py
View
@@ -39,11 +39,14 @@ def tarball_results(data_folder, bucket, output_tarball_foldler, timestamp):
gz_file = os.path.join(output_tarball_foldler, '%s.tar.gz'%timestamp)
ll = []
+ ignores = ['.DS_Store']
for root, dirs, files in os.walk(data_folder):
if (len(files) > 0):
with tarfile.open(gz_file, "w:gz") as tar:
cnt = 0
for f in files:
+ if (f in ignores):
+ continue
f_abspath = os.path.join(root, f)
(mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) = os.stat(f_abspath)
@@ -114,26 +117,13 @@ def start_server(config, proxies):
pre_time = time.time()
last_load_balancing_task_ts = time.time()
while True:
- # block, the main process...for a command
- if(not scheduler.is_alive()):
- logger.info("no crawler is alive... i'm done too...")
- break
-
- cmd = node_queue.get(block=True, timeout=360)
-
- if cmd:
- scheduler.enqueue(cmd)
-
- if (time.time() - last_load_balancing_task_ts > 1800): # try to balance the local queues every 30 mins
- last_load_balancing_task_ts = time.time()
- cmd = {'cmd': 'BALANCING_LOAD'}
- scheduler.enqueue(cmd)
if (time.time() - pre_time > 120):
logger.info(pprint.pformat(scheduler.crawler_status()))
pre_time = time.time()
- cmd = {'cmd': 'CRAWLER_FLUSH'}
- scheduler.enqueue(cmd)
+ if (scheduler.is_alive()):
+ cmd = {'cmd': 'CRAWLER_FLUSH'}
+ scheduler.enqueue(cmd)
if (time.time() - last_archive_ts > 3600):
@@ -146,6 +136,22 @@ def start_server(config, proxies):
future.add_done_callback(lambda f: logger.info("archive created? %s: [%s]"%f.result()))
last_archive_ts = time.time()
+
+ # block, the main process...for a command
+ if(not scheduler.is_alive()):
+ logger.info("no crawler is alive... i'm done too...")
+ time.sleep(120) # sleep for a minute and retry
+ continue
+
+ if (time.time() - last_load_balancing_task_ts > 1800): # try to balance the local queues every 30 mins
+ last_load_balancing_task_ts = time.time()
+ cmd = {'cmd': 'BALANCING_LOAD'}
+ scheduler.enqueue(cmd)
+
+ cmd = node_queue.get(block=True, timeout=360)
+
+ if cmd:
+ scheduler.enqueue(cmd)
if __name__=="__main__":
View
@@ -13,6 +13,7 @@
sys.path.append(".")
from tweetf0rm.redis_helper import NodeQueue, NodeCoordinator
from tweetf0rm.utils import node_id, public_ip, hash_cmd
+from tweetf0rm.exceptions import NotImplemented
pp = pprint.PrettyPrinter(indent=4)
@@ -100,6 +101,8 @@
}, 'SHUTDOWN_NODE': {
+ }, 'NODE_QSIZES': {
+
}
}
@@ -162,9 +165,13 @@ def cmd(config, args):
node_queue.put(cmd)
elif (args.command == 'LIST_NODES'):
pp.pprint(node_coordinator.list_nodes())
+ elif (args.command == 'NODE_QSIZES'):
+ raise NotImplemented("NotImplemented yet...")
+ #pp.pprint(node_coordinator.list_nodes())
elif (args.command == 'SHUTDOWN_NODE'):
- node_coordinator.remove_node(nid)
- pp.pprint(node_coordinator.list_nodes())
+ #node_coordinator.remove_node(nid)
+ #pp.pprint(node_coordinator.list_nodes())
+ raise NotImplemented("NotImplemented yet...")
else:
args_dict = copy.copy(args.__dict__)
cmd = new_cmd(args.command, args_dict)
@@ -216,6 +223,8 @@ def print_avaliable_cmd():
}, 'LIST_NODES': {
}, 'SHUTDOWN_NODE': {
'-nid/--node_id': dictionary['-nid/--node_id']
+ }, 'NODE_QSIZES':{
+ '-nid/--node_id': dictionary['-nid/--node_id']
}}
View
@@ -106,10 +106,14 @@ def crawler_status(self):
status = []
for crawler_id in self.crawlers:
cc = self.crawlers[crawler_id]
- if ((not cc['crawler'].is_alive()) and time.time() - cc['retry_timer_start_ts'] > 1800): # retry 30 mins after the crawler dies... mostly the crawler died because "Twitter API returned a 503 (Service Unavailable), Over capacity"
- self.new_crawler(self.node_id, cc['apikeys'], self.config, cc['crawler_proxies'])
+ if ((not cc['crawler'].is_alive())):
+ if ('retry_timer_start_ts' in cc and (time.time() - cc['retry_timer_start_ts'] > 1800)):
+ # retry 30 mins after the crawler dies... mostly the crawler died because "Twitter API returned a 503 (Service Unavailable), Over capacity"
+ self.new_crawler(self.node_id, cc['apikeys'], self.config, cc['crawler_proxies'])
+ else:
+ cc['retry_timer_start_ts'] = int(time.time())
- status.append({crawler_id: cc['crawler'].is_alive(), 'qsize': cc['crawler_queue'].qsize(), 'crawler_queue_key': cc['crawler_queue'].get_key()})
+ status.append({'crawler_id':crawler_id, 'alive?': cc['crawler'].is_alive(), 'qsize': cc['crawler_queue'].qsize(), 'crawler_queue_key': cc['crawler_queue'].get_key()})
return status
@@ -118,10 +122,15 @@ def balancing_load(self):
Find the crawler that has the most load at this moment, and redistribut its item;
Crawler is on a different subprocess, so we have to use redis to coordinate the redistribution...
'''
+
sorted_queues = self.sorted_local_queue(False)
max_crawler_id, max_qsize = sorted_queues[-1]
min_crawler_id, min_qsize = sorted_queues[0]
+ logger.info("crawler with max_qsize: %s (%d)"%(max_crawler_id, max_qsize))
+ logger.info("crawler with min_qsize: %s (%d)"%(min_crawler_id, min_qsize))
+ logger.info("max_qsize - min_qsize > 0.5 * min_qsize ?: %r"%((max_qsize - min_qsize > 0.5 * min_qsize)))
if (max_qsize - min_qsize > 0.5 * min_qsize):
+ logger.info("load balancing process started...")
cmds = []
controls = []
for i in range(int(0.3 * min_qsize)):
@@ -139,6 +148,22 @@ def balancing_load(self):
for cmd in cmds:
self.enqueue(cmd)
+ def redistribute_crawler_queue(self, crawler_id):
+ if (crawler_id in self.crawlers):
+ logger.warn('%s just failed... redistributing its workload'%(crawler_id))
+ try:
+ self.node_coordinator.distribute_to_nodes(self.crawlers[crawler_id]['crawler_queue'])
+ wait_timer = 180
+ # wait until it dies (flushed all the data...)
+ while(self.crawlers[crawler_id]['crawler'].is_alive() and wait_timer > 0):
+ time.sleep(60)
+ wait_timer -= 60
+
+ self.crawlers[crawler_id]['retry_timer_start_ts'] = int(time.time())
+ except Exception as exc:
+ logger.error(full_stack())
+ else:
+ logger.warn("whatever are you trying to do? crawler_id: [%s] is not valid..."%(crawler_id))
def enqueue(self, cmd):
@@ -150,28 +175,15 @@ def enqueue(self, cmd):
self.balancing_load()
elif(cmd['cmd'] == 'CRAWLER_FAILED'):
crawler_id = cmd['crawler_id']
- if (crawler_id in self.crawlers):
- logger.warn('%s just failed... redistributing its workload'%(crawler_id))
- try:
- self.node_coordinator.distribute_to_nodes(self.crawlers[crawler_id]['crawler_queue'])
- wait_timer = 180
- # wait until it dies (flushed all the data...)
- while(self.crawlers[crawler_id]['crawler'].is_alive() and wait_timer > 0):
- time.sleep(60)
- wait_timer -= 60
-
- self.crawlers[crawler_id]['retry_timer_start_ts'] = int(time.time())
- except Exception as exc:
- logger.error(full_stack())
- else:
- logger.warn("whatever are you trying to do? crawler_id: [%s] is not valid..."%(crawler_id))
+ self.redistribute_crawler_queue(crawler_id)
else:
'''distribute item to the local crawler that has the least tasks in queue'''
- crawler_id, qsize = self.sorted_local_queue(False)[0]
-
- self.crawlers[crawler_id]['crawler_queue'].put(cmd)
+ for crawler_id, qsize in self.sorted_local_queue(False):
+ if self.crawlers[crawler_id]['crawler'].is_alive():
+ self.crawlers[crawler_id]['crawler_queue'].put(cmd)
- logger.debug("pushed %s to crawler: %s"%(cmd, crawler_id))
+ logger.debug("pushed %s to crawler: %s"%(cmd, crawler_id))
+ break
def check_crawler_qsizes(self):
return {crawler_id:self.crawlers[crawler_id]['crawler_queue'].qsize() for crawler_id in self.crawlers}

0 comments on commit 39844f3

Please sign in to comment.