Permalink
Browse files

rewrite local crawler queue using redis; using lifo queue so that con…

…trol commands can be grab asap;
  • Loading branch information...
1 parent 84e96d1 commit 4439f2fb4351e11030ad2c238f86764f7db5c860 @bianjiang committed Nov 9, 2013
View
@@ -1,4 +1,3 @@
#!/bin/bash
-ulimit -S n 4096
PYTHONPATH=$PYTHONPATH:./tweetf0rm python ./tweetf0rm/bootstrap.py "$@"
View
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+
+logger = logging.getLogger(__name__)
+logging.basicConfig(level=logging.INFO, format='%(levelname)s-[%(asctime)s][%(module)s][%(funcName)s][%(lineno)d]: %(message)s')
+requests_log = logging.getLogger("requests")
+requests_log.setLevel(logging.WARNING)
+
+import argparse, pickle, os, json, sys, time
+sys.path.append("..")
+
+
+from tweetf0rm.proxies import proxy_checker
+
+if __name__=="__main__":
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-p', '--proxies', help="define the location of the output;", default="proxies.json")
+ args = parser.parse_args()
+
+ with open(os.path.abspath(args.proxies), 'rb') as proxy_f:
+ proxies = json.load(proxy_f)['proxies']
+
+ proxies = [proxy['proxy'] for proxy in proxy_checker(proxies)]
+
+ logger.info('%d live proxies left'%(len(proxies)))
+
+ with open(os.path.abspath(args.proxies), 'wb') as proxy_f:
+ json.dump({'proxies':proxies}, proxy_f)
+
+
+
+
+
View
@@ -67,7 +67,8 @@ def tarball_results(data_folder, bucket, output_tarball_foldler, timestamp):
return True, gz_file
return False, gz_file
- #tar.add()
+
+
def start_server(config, proxies):
import copy
@@ -104,25 +105,32 @@ def start_server(config, proxies):
logger.info('starting node_id: %s'%this_node_id)
node_coordinator = NodeCoordinator(config['redis_config'])
- #time.sleep(5)
- # the main event loop, actually we don't need one, since we can just join on the crawlers and don't stop until a terminate command to each crawler, but we need one to check on redis command queue ...
+ #node_coordinator.clear()
+
+ #the main event loop, actually we don't need one, since we can just join on the crawlers and don't stop until a terminate command is issued to each crawler;
+ #but we need one to report the status of each crawler and perform the tarball tashs...
last_archive_ts = time.time() + 3600 # the first archive event starts 2 hrs later...
pre_time = time.time()
+ last_load_balancing_task_ts = time.time()
while True:
# block, the main process...for a command
if(not scheduler.is_alive()):
logger.info("no crawler is alive... i'm done too...")
- break;
+ break
cmd = node_queue.get(block=True, timeout=360)
if cmd:
scheduler.enqueue(cmd)
+
+ if (time.time() - last_load_balancing_task_ts > 1800): # try to balance the local queues every 30 mins
+ last_load_balancing_task_ts = time.time()
+ cmd = {'cmd': 'BALANCING_LOAD'}
+ scheduler.enqueue(cmd)
if (time.time() - pre_time > 120):
logger.info(pprint.pformat(scheduler.crawler_status()))
- #logger.info('local queue_sizes: %s'%scheduler.check_local_qsizes())
pre_time = time.time()
cmd = {'cmd': 'CRAWLER_FLUSH'}
scheduler.enqueue(cmd)
@@ -140,27 +148,6 @@ def start_server(config, proxies):
last_archive_ts = time.time()
- # cmd = {
- # "cmd": "CRAWL_FRIENDS",
- # "user_id": 1948122342,
- # "data_type": "ids",
- # "depth": 2,
- # "bucket":"friend_ids"
- # }
- # cmd = {
- # "cmd": "CRAWL_FRIENDS",
- # "user_id": 1948122342,
- # "data_type": "users",
- # "depth": 2,
- # "bucket":"friends"
- # }
- # # cmd = {
- # # "cmd": "CRAWL_USER_TIMELINE",
- # # "user_id": 1948122342#53039176,
- ## "bucket": "timelines"
- # # }
-
-
if __name__=="__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help="config.json that contains a) twitter api keys; b) redis connection string;", required = True)
View
@@ -9,11 +9,13 @@
requests_log = logging.getLogger("requests")
requests_log.setLevel(logging.WARNING)
-import sys, time, argparse, random, copy
+import sys, time, argparse, random, copy, pprint
sys.path.append(".")
-from tweetf0rm.redis_helper import NodeQueue
+from tweetf0rm.redis_helper import NodeQueue, NodeCoordinator
from tweetf0rm.utils import node_id, public_ip, hash_cmd
+pp = pprint.PrettyPrinter(indent=4)
+
avaliable_cmds = {
'CRAWL_FRIENDS': {
'user_id' : {
@@ -94,8 +96,12 @@
}, 'GET_USERS_FROM_IDS': {
+ }, 'LIST_NODES':{
+
+ }, 'SHUTDOWN_NODE': {
+
}
- }
+}
from tweetf0rm.twitterapi.users import User
import json, os
@@ -119,10 +125,11 @@ def cmd(config, args):
if (args.command not in avaliable_cmds):
raise Exception("not a valid command...")
- nid = node_id()
- logger.info("sending to %s"%(nid))
+ nid = args.node_id
+
+ logger.info("node_id: %s"%(nid))
node_queue = NodeQueue(nid, redis_config=config['redis_config'])
-
+ node_coordinator = NodeCoordinator(config['redis_config'])
# this can be done locally without sending the command to the servers...
if (args.command == 'GET_UIDS_FROM_SCREEN_NAMES'):
apikeys = config["apikeys"].values()[0]
@@ -153,6 +160,11 @@ def cmd(config, args):
args_dict['user_id'] = user_id
cmd = new_cmd(command, args_dict)
node_queue.put(cmd)
+ elif (args.command == 'LIST_NODES'):
+ pp.pprint(node_coordinator.list_nodes())
+ elif (args.command == 'SHUTDOWN_NODE'):
+ node_coordinator.remove_node(nid)
+ pp.pprint(node_coordinator.list_nodes())
else:
args_dict = copy.copy(args.__dict__)
cmd = new_cmd(args.command, args_dict)
@@ -168,7 +180,8 @@ def print_avaliable_cmd():
'-dt/--data_type': '"ids" or "users" (default to ids) what the results are going to look like (either a list of twitter user ids or a list of user objects)',
'-d/--depth': 'the depth of the network; e.g., if it is 2, it will give you his/her (indicated by the -uid) friends\' friends',
'-j/--json': 'a json file that contains a list of screen_names or user_ids, depending on the command',
- '-o/--output': ' the output json file (for storing user_ids from screen_names)'
+ '-o/--output': ' the output json file (for storing user_ids from screen_names)',
+ '-nid/--node_id':'the node_id that you want to interact with; default to the current machine...'
}
cmds = {'CRAWL_FRIENDS': {
'-uid/--user_id': dictionary['-uid/--user_id'],
@@ -200,6 +213,9 @@ def print_avaliable_cmd():
}, 'GET_USERS_FROM_IDS': {
'-j/--json': dictionary['-j/--json'],
'-o/--output': dictionary['-o/--output']
+ }, 'LIST_NODES': {
+ }, 'SHUTDOWN_NODE': {
+ '-nid/--node_id': dictionary['-nid/--node_id']
}}
@@ -213,6 +229,7 @@ def print_avaliable_cmd():
if __name__=="__main__":
+ nid = node_id()
import json, os
parser = argparse.ArgumentParser()
@@ -223,6 +240,7 @@ def print_avaliable_cmd():
parser.add_argument('-d', '--depth', help="the depth", default=1)
parser.add_argument('-j', '--json', help="the location of the json file that has a list of user_ids or screen_names", required=False)
parser.add_argument('-o', '--output', help="the location of the output json file for storing user_ids", default='user_ids.json')
+ parser.add_argument('-nid', '--node_id', help="the node_id you want to interact with", default=nid)
try:
args = parser.parse_args()
@@ -231,5 +249,6 @@ def print_avaliable_cmd():
config = json.load(config_f)
cmd(config, args)
- except:
+ except Exception as exc:
+ logger.error(exc)
print_avaliable_cmd()
@@ -30,7 +30,7 @@ def flush_file(output_folder, bucket, items):
return True
-FLUSH_SIZE = 10
+FLUSH_SIZE = 100
class FileHandler(BaseHandler):
@@ -8,16 +8,22 @@
import multiprocessing as mp
import tweetf0rm.handler
+from tweetf0rm.redis_helper import CrawlerQueue
-MAX_QUEUE_SIZE = 32767
+#MAX_QUEUE_SIZE = 32767
class CrawlerProcess(mp.Process):
- def __init__(self, crawler_id, handlers = []):
+ def __init__(self, node_id, crawler_id, redis_config, handlers):
super(CrawlerProcess, self).__init__()
+ self.node_id = node_id
self.crawler_id = crawler_id
- self.queue = mp.Queue(maxsize=MAX_QUEUE_SIZE)
- self.lock = mp.Lock()
+ self.redis_config = redis_config
+ #self.queue = mp.Queue(maxsize=MAX_QUEUE_SIZE)
+
+ self.crawler_queue = CrawlerQueue(node_id, crawler_id, redis_config=redis_config)
+ self.crawler_queue.clear()
+ #self.lock = mp.Lock()
self.handlers = handlers
logger.debug("number of handlers attached: %d"%(len(handlers)))
@@ -26,12 +32,16 @@ def get_crawler_id(self):
return self.crawler_id
def enqueue(self, request):
- self.queue.put(request, block=True)
+ #self.queue.put(request, block=True)
+ self.crawler_queue.put(request)
return True
def get_cmd(self):
- return self.queue.get(block=True)
+ #return self.queue.get(block=True)
+ return self.crawler_queue.get(block=True)
+ def get_queue_size(self):
+ self.crawler_queue.qsize()
def run(self):
pass
@@ -17,13 +17,13 @@
class UserRelationshipCrawler(CrawlerProcess):
- def __init__(self, node_id, crawler_id, apikeys, handlers = None, redis_config = None, proxies=None):
+ def __init__(self, node_id, crawler_id, apikeys, handlers, redis_config, proxies=None):
if (handlers == None):
raise MissingArgs("you need a handler to write the data to...")
- super(UserRelationshipCrawler, self).__init__(crawler_id, handlers=handlers)
- self.redis_config = redis_config
+
+ super(UserRelationshipCrawler, self).__init__(node_id, crawler_id, redis_config, handlers)
+
self.apikeys = copy.copy(apikeys)
- self.node_id = node_id
self.tasks = {
"TERMINATE": "TERMINATE",
"CRAWL_FRIENDS" : {
@@ -38,7 +38,7 @@ def __init__(self, node_id, crawler_id, apikeys, handlers = None, redis_config =
},
"CRAWL_USER_TIMELINE": "fetch_user_timeline"
}
- self.node_queue = NodeQueue(self.node_id, redis_config=redis_config)
+ self.node_queue = NodeQueue(self.node_id, redis_config=redis_config)
self.client_args = {"timeout": 300}
self.proxies = iter(proxies) if proxies else None
self.user_api = None
@@ -82,8 +82,7 @@ def run(self):
command = cmd['cmd']
- if ('cmd_hash' in cmd):
- logger.debug("new cmd: %s [%s]"%(cmd, cmd['cmd_hash']))
+ logger.debug("new cmd: %s"%(cmd))
redis_cmd_handler = None
@@ -93,7 +92,7 @@ def run(self):
for handler in self.handlers:
handler.flush_all()
break
- if (command == 'CRAWLER_FLUSH'):
+ elif (command == 'CRAWLER_FLUSH'):
for handler in self.handlers:
handler.flush_all()
else:
@@ -142,8 +141,7 @@ def run(self):
if func:
try:
func(**args)
- del args['cmd_handlers']
- self.node_queue.put({'cmd':"CMD_FINISHED", "cmd_hash":cmd['cmd_hash'], "crawler_id":self.crawler_id})
+ del args['cmd_handlers']
except Exception as exc:
logger.error("%s"%exc)
try:
View
@@ -23,8 +23,8 @@ def check_proxy(proxy, timeout):
p = {'proxy':proxy,'proxy_dict':{proxy_type: '%s://%s'%(proxy_type, proxy_ip)}}
try:
-
- r = requests.get(url, headers=headers, proxies=p['proxy_dict'], timeout=timeout)
+ s = requests.Session()
+ r = s.get(url,headers=headers, proxies=p['proxy_dict'], timeout=timeout, allow_redirects=True)
if (r.status_code == requests.codes.ok):
return True, p
@@ -40,15 +40,18 @@ def proxy_checker(proxies):
'''
logger.info('%d proxies to check'%(len(proxies)))
+ import multiprocessing as mp
+
results = []
- with futures.ProcessPoolExecutor(max_workers=100) as executor:
+ with futures.ProcessPoolExecutor(max_workers=mp.cpu_count()*10) as executor:
- future_to_proxy = {executor.submit(check_proxy, proxy, 15): proxy for proxy in proxies}
+ future_to_proxy = {executor.submit(check_proxy, proxy, 30): proxy for proxy in proxies if proxy.values()[0] == 'http'}
for future in future_to_proxy:
future.add_done_callback(lambda f: results.append(f.result()))
+ logger.info('%d http proxies to check'%(len(future_to_proxy)))
futures.wait(future_to_proxy)
Oops, something went wrong.

0 comments on commit 4439f2f

Please sign in to comment.