diff --git a/tests/bootstrap_test.py b/tests/bootstrap_test.py index f41ed61..cab4e33 100644 --- a/tests/bootstrap_test.py +++ b/tests/bootstrap_test.py @@ -11,7 +11,7 @@ from nose.tools import nottest -import sys, os, json +import sys, os, json, exceptions sys.path.append("..") from tweetf0rm.utils import full_stack @@ -114,18 +114,53 @@ def test_bootstrap_with_proxies(self): pass @nottest - def test_proxy(self): + def test_split(self): + + def split(l, n): + for i in xrange(0, len(l), n): + yield l[i:i+n] + + + l = [1,2,3,4,5, 6] + + logger.info({}.values()) + n = iter(l) + logger.info(next(n)) + logger.info(next(n)) + logger.info(next(n)) + logger.info(next(n)) + logger.info(next(n)) + logger.info(next(n)) + try: + logger.info(next(n)) + except Exception as exc: + try: + logger.info(type(exc)) + logger.info(isinstance(exc, exceptions.StopIteration)) + raise + except Exception as sss: + logger.info("again...%r"%(exc)) + raise + #raise + # p = split(l, 2) + # pp = next(p) if p else None + # logger.info(pp) + + + #logger.info(next(p)) + @nottest + def test_proxy(self): proxies = proxy_checker(self.proxies['proxies']) #logger.info(proxies) logger.info('%d good proxies left'%len(proxies)) - ps = [] - for d in proxies: - ps.append(d['proxy']) + # ps = [] + # for d in proxies: + # ps.append(d['proxy']) - with open(os.path.abspath('proxy.json'), 'wb') as proxy_f: - json.dump({'proxies':ps}, proxy_f) + # with open(os.path.abspath('proxy.json'), 'wb') as proxy_f: + # json.dump({'proxies':ps}, proxy_f) if __name__=="__main__": import nose diff --git a/tests/proxy.json b/tests/proxy.json index 7edcc58..e559705 100644 --- a/tests/proxy.json +++ b/tests/proxy.json @@ -1 +1 @@ -{"proxies": ["59.47.43.95:8080", "59.47.43.91:8080", "59.47.43.92:8080", "59.47.43.94:8080", "59.47.43.90:8080", "59.47.43.89:8080", "221.238.28.158:8081", "120.202.249.230:80", "222.87.129.29:80", "59.47.43.93:8080", "210.22.63.72:8080", "114.80.136.112:7780", "213.175.174.94:80", "111.63.14.245:80", "120.198.230.63:81", "61.135.179.167:8080", "210.22.63.91:8080", "119.75.219.70:80", "202.202.0.163:3128", "58.20.127.100:3128", "221.10.102.199:82", "60.28.183.5:8081", "118.26.57.13:82"]} \ No newline at end of file +{"proxies": [{"180.251.170.91:3128": "HTTPS"}, {"84.241.58.230:8080": "HTTPS"}, {"189.11.215.154:3128": "HTTP"}, {"180.148.142.131:80": "HTTP"}, {"61.185.255.156:9000": "HTTPS"}, {"190.0.16.58:8080": "HTTPS"}, {"187.61.117.11:8080": "HTTPS"}, {"37.77.225.37:8080": "HTTPS"}, {"222.124.178.98:3128": "HTTPS"}, {"110.138.216.157:3128": "HTTPS"}, {"201.76.181.146:8080": "HTTP"}, {"201.62.48.153:8080": "HTTP"}, {"60.190.129.52:3128": "HTTP"}, {"219.239.14.148:18186": "HTTP"}, {"218.108.232.189:82": "HTTP"}, {"109.238.189.252:8080": "HTTPS"}, {"180.250.67.54:8080": "HTTPS"}, {"82.79.66.19:8080": "HTTPS"}, {"84.120.105.202:8080": "HTTPS"}, {"110.77.221.29:3128": "HTTP"}, {"203.176.119.59:8080": "HTTPS"}, {"190.200.0.13:8080": "HTTP"}, {"195.14.167.54:8080": "HTTPS"}, {"197.136.42.3:80": "HTTPS"}, {"202.106.16.36:3128": "HTTP"}, {"218.108.242.124:8080": "HTTP"}, {"122.144.130.11:3128": "HTTP"}, {"60.51.218.180:8080": "HTTPS"}, {"218.204.23.4:80": "HTTPS"}, {"123.183.211.9:80": "HTTP"}, {"177.43.188.67:80": "HTTPS"}, {"60.24.10.126:8080": "HTTP"}, {"84.42.3.3:3128": "HTTP"}, {"91.207.136.218:8080": "HTTPS"}, {"202.91.25.41:3128": "HTTP"}, {"181.48.125.218:3128": "HTTPS"}, {"83.146.70.81:3128": "HTTPS"}, {"125.162.242.124:8080": "HTTPS"}, {"124.119.50.254:80": "HTTPS"}, {"221.10.40.236:843": "HTTP"}, {"201.232.104.7:8080": "HTTP"}, {"209.141.57.195:3128": "HTTPS"}, {"202.29.216.236:3128": "HTTPS"}, {"190.198.42.247:8080": "HTTP"}, {"182.160.120.155:8080": "HTTPS"}, {"118.85.208.193:80": "HTTP"}, {"110.137.48.186:8080": "HTTPS"}, {"187.120.23.14:8080": "HTTPS"}, {"213.181.73.145:80": "HTTP"}, {"201.209.94.154:8080": "HTTP"}, {"202.101.26.182:80": "HTTP"}, {"91.203.140.46:3128": "HTTPS"}, {"178.48.2.237:80": "HTTPS"}, {"122.136.46.151:3128": "HTTP"}, {"221.7.213.216:8080": "HTTPS"}, {"179.190.141.128:8080": "HTTPS"}, {"110.74.210.218:8080": "HTTPS"}, {"113.20.138.238:80": "HTTPS"}, {"94.42.155.68:3128": "HTTP"}, {"178.19.98.173:443": "HTTPS"}, {"190.37.130.177:8080": "HTTP"}, {"120.138.97.225:8080": "HTTPS"}, {"222.88.242.213:9999": "HTTPS"}, {"177.129.214.44:8080": "HTTPS"}, {"91.228.53.28:7808": "HTTP"}, {"213.131.41.98:8080": "HTTPS"}, {"117.59.224.60:80": "HTTP"}, {"180.250.34.30:8080": "HTTPS"}, {"188.95.38.233:8080": "HTTPS"}, {"202.118.236.130:7777": "HTTP"}, {"180.241.113.26:3128": "HTTPS"}, {"72.64.146.136:3128": "HTTP"}, {"93.190.18.146:8080": "HTTP"}, {"118.97.191.203:8080": "HTTPS"}, {"217.219.190.209:8080": "HTTPS"}, {"84.22.2.1:8080": "HTTPS"}, {"213.110.196.195:8080": "HTTPS"}, {"94.205.205.131:80": "HTTP"}, {"125.163.192.106:8080": "HTTPS"}, {"179.49.112.50:8089": "HTTPS"}, {"201.49.209.146:3128": "HTTP"}, {"200.75.3.85:3128": "HTTPS"}, {"119.30.39.1:3128": "HTTPS"}, {"202.114.6.37:9001": "HTTP"}, {"181.177.194.118:3128": "HTTPS"}, {"192.69.200.37:8089": "HTTP"}, {"46.0.202.184:80": "HTTPS"}, {"180.247.112.101:8080": "HTTPS"}, {"41.79.61.26:8080": "HTTP"}, {"199.204.45.90:3127": "HTTP"}, {"58.20.127.100:3128": "HTTP"}, {"115.124.65.90:9876": "HTTPS"}, {"61.19.51.226:3128": "HTTPS"}, {"186.227.160.129:3128": "HTTPS"}, {"117.135.131.86:80": "HTTP"}, {"192.227.139.227:8089": "HTTP"}, {"94.228.205.33:8080": "HTTPS"}, {"197.160.116.70:8080": "HTTPS"}, {"201.242.25.184:3128": "HTTPS"}, {"91.225.78.152:8080": "HTTP"}, {"187.125.147.178:3128": "HTTP"}, {"103.16.45.184:80": "HTTPS"}, {"202.98.123.126:8080": "HTTP"}, {"176.31.233.170:3128": "HTTP"}, {"193.226.94.95:8888": "HTTP"}, {"78.46.250.85:3128": "HTTP"}, {"66.122.95.249:8080": "HTTPS"}, {"200.44.126.114:3131": "HTTP"}, {"200.37.53.116:8080": "HTTPS"}, {"187.120.217.82:3128": "HTTPS"}, {"41.129.120.180:8080": "HTTPS"}, {"119.97.146.16:8080": "HTTP"}, {"41.164.23.162:8080": "HTTPS"}, {"88.85.108.16:8080": "HTTPS"}, {"190.201.18.205:8080": "HTTP"}, {"88.135.0.32:3128": "HTTPS"}, {"88.146.243.118:8080": "HTTPS"}, {"86.120.196.242:8080": "HTTPS"}, {"85.234.22.126:3128": "HTTPS"}, {"41.208.150.114:8080": "HTTPS"}, {"203.146.82.253:80": "HTTP"}, {"177.21.227.126:8080": "HTTPS"}, {"187.120.209.153:3128": "HTTPS"}, {"85.135.52.30:8080": "HTTP"}, {"198.204.245.212:8089": "HTTP"}, {"58.215.179.194:80": "HTTP"}, {"94.228.205.9:8080": "HTTPS"}, {"196.46.247.74:8080": "HTTPS"}, {"116.193.170.213:8080": "HTTPS"}, {"54.244.125.115:80": "HTTP"}, {"112.124.15.141:3128": "HTTPS"}, {"189.85.29.110:8080": "HTTPS"}, {"69.73.181.22:8089": "HTTP"}, {"118.97.99.35:8080": "HTTPS"}, {"69.197.132.80:3127": "HTTP"}, {"93.78.6.213:3128": "HTTP"}, {"196.202.116.2:80": "HTTPS"}, {"200.89.99.182:8080": "HTTPS"}, {"54.214.50.160:80": "HTTP"}, {"49.248.113.53:3128": "HTTPS"}, {"115.134.2.177:8080": "HTTPS"}, {"69.73.181.22:3127": "HTTP"}, {"212.144.254.122:3128": "HTTP"}, {"190.248.131.250:8080": "HTTPS"}, {"177.85.156.110:8080": "HTTPS"}, {"61.176.217.224:8080": "HTTP"}, {"87.229.26.141:31280": "HTTP"}, {"85.185.42.5:8080": "HTTPS"}, {"211.167.112.14:82": "HTTP"}, {"113.107.43.75:818": "HTTPS"}]} \ No newline at end of file diff --git a/tweetf0rm/handler/crawl_user_relationship_command_handler.py b/tweetf0rm/handler/crawl_user_relationship_command_handler.py index b280816..3977dc6 100644 --- a/tweetf0rm/handler/crawl_user_relationship_command_handler.py +++ b/tweetf0rm/handler/crawl_user_relationship_command_handler.py @@ -7,13 +7,13 @@ import logging logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s') +#logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s') from .base_handler import BaseHandler import multiprocessing as mp import futures, json, copy, time from tweetf0rm.redis_helper import NodeQueue, NodeCoordinator -from tweetf0rm.utils import full_stack, distribute_to_node +from tweetf0rm.utils import full_stack, get_keys_by_min_value import json def flush_cmd(bulk, data_type, template, redis_config, verbose=False): @@ -38,7 +38,7 @@ def flush_cmd(bulk, data_type, template, redis_config, verbose=False): t["user_id"] = user_id t["depth"] -= 1 - node_id = distribute_to_node(qsizes)[0] + node_id = get_keys_by_min_value(qsizes)[0] if (node_id in node_queues): node_queue = node_queues[node_id] diff --git a/tweetf0rm/process/user_relationship_crawler.py b/tweetf0rm/process/user_relationship_crawler.py index 6cab281..3b6789f 100644 --- a/tweetf0rm/process/user_relationship_crawler.py +++ b/tweetf0rm/process/user_relationship_crawler.py @@ -25,15 +25,6 @@ def __init__(self, node_id, crawler_id, apikeys, handlers = None, verbose = Fals self.redis_config = redis_config self.apikeys = copy.copy(apikeys) self.node_id = node_id - self.client_args = {"timeout": 300} - - if (proxies): - self.client_args['proxies'] = proxies - - self.user_api = User(apikeys=apikeys, verbose=verbose, client_args=self.client_args) - - if (self.verbose): - logger.info("# of handlers: %d"%(len(self.get_handlers()))) self.tasks = { "TERMINATE": "TERMINATE", "CRAWL_FRIENDS" : { @@ -48,7 +39,20 @@ def __init__(self, node_id, crawler_id, apikeys, handlers = None, verbose = Fals }, "CRAWL_USER_TIMELINE": "fetch_user_timeline" } - self.node_queue = NodeQueue(self.node_id, redis_config=redis_config) + self.node_queue = NodeQueue(self.node_id, redis_config=redis_config) + self.client_args = {"timeout": 300} + self.proxies = iter(proxies) if proxies else None + self.user_api = None + self.init_user_api() + + def init_user_api(self): # this will throw StopIteration if all proxies have been tried... + if (self.proxies): + self.client_args['proxies'] = next(self.proxies) # this will throw out + + if (self.user_api): + del self.user_api + + self.user_api = User(apikeys=self.apikeys, verbose=self.verbose, client_args=self.client_args) def get_handlers(self): return self.handlers @@ -122,6 +126,24 @@ def run(self): func(**args) except Exception as exc: logger.error("%s"%exc) + try: + self.init_user_api() + except Exception as init_user_api_exc: + import exceptions + if (isinstance(init_user_api, exceptions.StopIteration)): # no more proxy to try... so kill myself... + for handler in self.handlers: + handler.flush_all() + # flush first + self.node_queue.put({ + 'cmd':'CRAWLER_FAILED', + 'crawler_id': self.crawler_id + }) + return False + #raise + else: + #put current task back to queue... + self.enqueue(cmd) + #logger.error(full_stack()) else: self.node_queue.put({'cmd':"CMD_FINISHED", "cmd_hash":cmd_hash, "crawler_id":self.crawler_id}) diff --git a/tweetf0rm/proxies.py b/tweetf0rm/proxies.py index 556643e..b62ac6c 100644 --- a/tweetf0rm/proxies.py +++ b/tweetf0rm/proxies.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s') requests_log = logging.getLogger("requests") -requests_log.setLevel(logging.WARNING) +requests_log.setLevel(logging.DEBUG) from tweetf0rm.utils import full_stack import requests, futures @@ -20,8 +20,10 @@ def check_proxy(proxy, timeout): 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'en-US,en;q=0.5' } + proxy_ip = proxy.keys()[0] + proxy_type = proxy.values()[0] - p = {'proxy':proxy,'proxy_dict':{"http" : 'http://%s'%proxy}} + p = {'proxy':proxy,'proxy_dict':{proxy_type: '%s://%s'%(proxy_type, proxy_ip)}} r = requests.get(url, headers=headers, proxies=p['proxy_dict'], timeout=timeout) @@ -34,7 +36,9 @@ def check_proxy(proxy, timeout): return False, None def proxy_checker(proxies): - proxies = set(proxies) + ''' + proxies is a list of {key:value}, where the key is the ip of the proxy (including port), e.g., 192.168.1.1:8080, and the value is the type of the proxy (http/https) + ''' logger.info('%d proxies to check'%(len(proxies))) good_proxies = [] diff --git a/tweetf0rm/redis_helper.py b/tweetf0rm/redis_helper.py index f4f4900..d2661c2 100644 --- a/tweetf0rm/redis_helper.py +++ b/tweetf0rm/redis_helper.py @@ -5,9 +5,9 @@ import logging logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s') -requests_log = logging.getLogger("requests") -requests_log.setLevel(logging.INFO) +#logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s') +# requests_log = logging.getLogger("requests") +# requests_log.setLevel(logging.INFO) import redis, json @@ -83,26 +83,28 @@ class NodeCoordinator(RedisBase): ''' def __init__(self, redis_config=None): super(NodeCoordinator, self).__init__("coordinator", namespace="node", redis_config=redis_config) + self.active_nodes = '%s:active'%(self.key) + self.all_nodes = '%s:all'%(self.key) def clear(self): self.conn().delete(self.key) def add_node(self, node_id): - self.conn().sadd(self.key, node_id) + self.conn().sadd(self.active_nodes, node_id) + self.conn().sadd(self.all_nodes, node_id) def remove_node(self, node_id): - self.conn().srem(self.key, node_id) + ''' Only remove the node from the active list;''' + self.conn().srem(self.active_nodes, node_id) def node_queue_key(self, node_id): return 'queue:%s'%(node_id) def node_qsizes(self): ''' - List the size of all node queues + List the size of all active nodes' queues ''' - node_ids = self.conn().smembers(self.key) - - logger.info(node_ids) + node_ids = self.conn().smembers(self.active_nodes) qsizes = {node_id:self.conn().llen(self.node_queue_key(node_id)) for node_id in node_ids} diff --git a/tweetf0rm/scheduler.py b/tweetf0rm/scheduler.py index 99d5e3a..ece9d15 100644 --- a/tweetf0rm/scheduler.py +++ b/tweetf0rm/scheduler.py @@ -5,18 +5,19 @@ import logging logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s') -requests_log = logging.getLogger("requests") -requests_log.setLevel(logging.WARNING) +# logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s') +# requests_log = logging.getLogger("requests") +# requests_log.setLevel(logging.WARNING) -import json, copy -from tweetf0rm.utils import full_stack, hash_cmd, md5 +import json, copy, time +from tweetf0rm.utils import full_stack, hash_cmd, md5, get_keys_by_min_value from tweetf0rm.proxies import proxy_checker from process.user_relationship_crawler import UserRelationshipCrawler #from handler.inmemory_handler import InMemoryHandler from handler import create_handler from tweetf0rm.redis_helper import NodeCoordinator + class Scheduler(object): def __init__(self, node_id, config={}, proxies=[], verbose=False): @@ -29,8 +30,13 @@ def __init__(self, node_id, config={}, proxies=[], verbose=False): # each process only get one apikey... if there are more proxies than apikeys, each process can get more than one proxy that can be rotated when one fails. number_of_processes = min(len(self.config['apikeys']), len(self.proxy_list)) + + # if there are more proxies than apikeys, then each process will get a list of proxies, and the process will restart it self if a proxy failed, and try the next available proxy + self.proxy_generator = self.split(self.proxy_list, number_of_processes) + else: self.proxy_list = None + self.proxy_generator = None number_of_processes = 1 if (verbose): @@ -46,13 +52,15 @@ def __init__(self, node_id, config={}, proxies=[], verbose=False): apikey_list = self.config['apikeys'].keys() + crawlers = {} for idx in range(number_of_processes): crawler_id = md5('%s:%s'%(self.node_id, idx)) apikeys = self.config['apikeys'][apikey_list[idx]] if (verbose): logger.info('creating a new crawler: %s'%crawler_id) - crawler = UserRelationshipCrawler(self.node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], verbose=verbose, redis_config=copy.copy(config['redis_config']), proxies=self.proxy_list[idx]['proxy_dict']) + crawler_proxies = next(self.proxy_generator) if self.proxy_generator else None + crawler = UserRelationshipCrawler(self.node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], verbose=verbose, redis_config=copy.copy(config['redis_config']), proxies=crawler_proxies) crawlers[crawler_id] = { 'crawler': crawler, 'queue': {} @@ -86,13 +94,52 @@ def persist_queues(self): for crawler_id in self.crawlers: cmds.update(self.crawlers[crawler_id]['queue']) - with open('__cmds.json', 'wb') as f: + with open('%s_queued_cmds.json'%(int(time.time())), 'wb') as f: json.dump(cmds, f) + def remaining_tasks(self): + qsizes = [len(self.crawlers[crawler_id]['queue']) for crawler_id in self.crawlers] + return sum(qsizes) + + def distribute_to_nodes(self, queue): + node_queues = {} + + def get_node_queue(node_id, redis_config): + if (node_id in node_queues): + node_queue = node_queues[node_id] + else: + node_queue = NodeQueue(node_id, redis_config=redis_config) + node_queues[node_id] = node_queue + + qsizes = self.node_coordinator.node_qsizes() + + for cmd in queue.values(): + node_id = get_keys_by_min_value(qsizes)[0] + + node_queue = get_node_queue(self, node_id) + + node_queue.put(cmd) + qsizes[node_id] += 1 + + + + def enqueue(self, cmd): if (cmd['cmd'] == 'TERMINATE'): + # note that we need to save both the queues that are local, but also let others know that i am dead, and I need to have my redis queue cleared out... (it's possible that another node is still trying to send data into my redis queue, after i am dead... this needs to be handled as maintenance job (take the cmds in dead nodes' queue, and persist...)) + if (remaining_tasks > 0): + self.persist_queues() [self.crawlers[crawler_id]['crawler'].enqueue(cmd) for crawler_id in self.crawlers] + elif(cmd['cmd'] == 'CRAWLER_FAILED'): + crawler_id = cmd['crawler_id'] + if (crawler_id in self.crawlers): + self.distribute_to_nodes(self.crawlers[crawler_id]['queue']) + # wait until it dies (flushed all the data...) + while(self.crawlers[crawler_id]['crawler'].is_alive()): + time.sleep(60) + else: + logger.warn("whatever are you trying to do? crawler_id: [%s] is not valid..."%(crawler_id)) elif(cmd['cmd'] == 'CMD_FINISHED'): #acknowledged finished cmd try: @@ -103,8 +150,6 @@ def enqueue(self, cmd): else: crawler_id = self.distribute_to() - logger.info(crawler_id) - self.crawlers[crawler_id]['queue'][hash_cmd(cmd)] = cmd self.crawlers[crawler_id]['crawler'].enqueue(cmd) diff --git a/tweetf0rm/utils.py b/tweetf0rm/utils.py index 10818ce..a25ea38 100644 --- a/tweetf0rm/utils.py +++ b/tweetf0rm/utils.py @@ -10,7 +10,7 @@ def __call__(cls, *args, **kwargs): import requests, json, traceback, sys -def distribute_to_node(qsizes): +def get_keys_by_min_value(qsizes): ''' return a list of keys (crawler_ids) that have the minimum number of pending cmds ''' @@ -46,30 +46,3 @@ def hash_cmd(cmd): def node_id(): ip = public_ip() return md5(ip) - -def check_proxies(proxies, proxy_type="http"): - url = "http://google.com" - headers = { - 'User-Agent':'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:23.0) Gecko/20100101 Firefox/23.0', - 'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', - 'Accept-Encoding': 'gzip, deflate', - 'Accept-Language': 'en-US,en;q=0.5' - } - - verified_proxies = [] - for proxy in proxies: - try: - proxy_dict = {proxy_type : '%s://%s'%(proxy_type, proxy)} - - r = requests.get(url, headers=headers, proxies=proxy_dict) - if (r.status_code == requests.codes.ok): - verified_proxies.append(proxy) - - logger.info('GOOD: [%s] - %d'%(proxy, r.elapsed.seconds)) - else: - logger.warn('BROKEN: [%s] - %d'%(proxy, r.elapsed.seconds)) - except: - logger.warn('BROKEN: [%s] - %d'%(proxy, r.elapsed.seconds)) - pass - - return verified_proxies \ No newline at end of file