Permalink
Browse files

put in proxy switch, fail safe, and persist queue functions

  • Loading branch information...
1 parent 0992105 commit 227768e49d0419a5383d9bc2962869db3e4ad686 @bi0nji0ng bi0nji0ng committed Oct 20, 2013
View
@@ -11,7 +11,7 @@
from nose.tools import nottest
-import sys, os, json
+import sys, os, json, exceptions
sys.path.append("..")
from tweetf0rm.utils import full_stack
@@ -114,18 +114,53 @@ def test_bootstrap_with_proxies(self):
pass
@nottest
- def test_proxy(self):
+ def test_split(self):
+
+ def split(l, n):
+ for i in xrange(0, len(l), n):
+ yield l[i:i+n]
+
+
+ l = [1,2,3,4,5, 6]
+
+ logger.info({}.values())
+ n = iter(l)
+ logger.info(next(n))
+ logger.info(next(n))
+ logger.info(next(n))
+ logger.info(next(n))
+ logger.info(next(n))
+ logger.info(next(n))
+ try:
+ logger.info(next(n))
+ except Exception as exc:
+ try:
+ logger.info(type(exc))
+ logger.info(isinstance(exc, exceptions.StopIteration))
+ raise
+ except Exception as sss:
+ logger.info("again...%r"%(exc))
+ raise
+ #raise
+ # p = split(l, 2)
+ # pp = next(p) if p else None
+ # logger.info(pp)
+
+
+ #logger.info(next(p))
+ @nottest
+ def test_proxy(self):
proxies = proxy_checker(self.proxies['proxies'])
#logger.info(proxies)
logger.info('%d good proxies left'%len(proxies))
- ps = []
- for d in proxies:
- ps.append(d['proxy'])
+ # ps = []
+ # for d in proxies:
+ # ps.append(d['proxy'])
- with open(os.path.abspath('proxy.json'), 'wb') as proxy_f:
- json.dump({'proxies':ps}, proxy_f)
+ # with open(os.path.abspath('proxy.json'), 'wb') as proxy_f:
+ # json.dump({'proxies':ps}, proxy_f)
if __name__=="__main__":
import nose
View
@@ -1 +1 @@
-{"proxies": ["59.47.43.95:8080", "59.47.43.91:8080", "59.47.43.92:8080", "59.47.43.94:8080", "59.47.43.90:8080", "59.47.43.89:8080", "221.238.28.158:8081", "120.202.249.230:80", "222.87.129.29:80", "59.47.43.93:8080", "210.22.63.72:8080", "114.80.136.112:7780", "213.175.174.94:80", "111.63.14.245:80", "120.198.230.63:81", "61.135.179.167:8080", "210.22.63.91:8080", "119.75.219.70:80", "202.202.0.163:3128", "58.20.127.100:3128", "221.10.102.199:82", "60.28.183.5:8081", "118.26.57.13:82"]}
+{"proxies": [{"180.251.170.91:3128": "HTTPS"}, {"84.241.58.230:8080": "HTTPS"}, {"189.11.215.154:3128": "HTTP"}, {"180.148.142.131:80": "HTTP"}, {"61.185.255.156:9000": "HTTPS"}, {"190.0.16.58:8080": "HTTPS"}, {"187.61.117.11:8080": "HTTPS"}, {"37.77.225.37:8080": "HTTPS"}, {"222.124.178.98:3128": "HTTPS"}, {"110.138.216.157:3128": "HTTPS"}, {"201.76.181.146:8080": "HTTP"}, {"201.62.48.153:8080": "HTTP"}, {"60.190.129.52:3128": "HTTP"}, {"219.239.14.148:18186": "HTTP"}, {"218.108.232.189:82": "HTTP"}, {"109.238.189.252:8080": "HTTPS"}, {"180.250.67.54:8080": "HTTPS"}, {"82.79.66.19:8080": "HTTPS"}, {"84.120.105.202:8080": "HTTPS"}, {"110.77.221.29:3128": "HTTP"}, {"203.176.119.59:8080": "HTTPS"}, {"190.200.0.13:8080": "HTTP"}, {"195.14.167.54:8080": "HTTPS"}, {"197.136.42.3:80": "HTTPS"}, {"202.106.16.36:3128": "HTTP"}, {"218.108.242.124:8080": "HTTP"}, {"122.144.130.11:3128": "HTTP"}, {"60.51.218.180:8080": "HTTPS"}, {"218.204.23.4:80": "HTTPS"}, {"123.183.211.9:80": "HTTP"}, {"177.43.188.67:80": "HTTPS"}, {"60.24.10.126:8080": "HTTP"}, {"84.42.3.3:3128": "HTTP"}, {"91.207.136.218:8080": "HTTPS"}, {"202.91.25.41:3128": "HTTP"}, {"181.48.125.218:3128": "HTTPS"}, {"83.146.70.81:3128": "HTTPS"}, {"125.162.242.124:8080": "HTTPS"}, {"124.119.50.254:80": "HTTPS"}, {"221.10.40.236:843": "HTTP"}, {"201.232.104.7:8080": "HTTP"}, {"209.141.57.195:3128": "HTTPS"}, {"202.29.216.236:3128": "HTTPS"}, {"190.198.42.247:8080": "HTTP"}, {"182.160.120.155:8080": "HTTPS"}, {"118.85.208.193:80": "HTTP"}, {"110.137.48.186:8080": "HTTPS"}, {"187.120.23.14:8080": "HTTPS"}, {"213.181.73.145:80": "HTTP"}, {"201.209.94.154:8080": "HTTP"}, {"202.101.26.182:80": "HTTP"}, {"91.203.140.46:3128": "HTTPS"}, {"178.48.2.237:80": "HTTPS"}, {"122.136.46.151:3128": "HTTP"}, {"221.7.213.216:8080": "HTTPS"}, {"179.190.141.128:8080": "HTTPS"}, {"110.74.210.218:8080": "HTTPS"}, {"113.20.138.238:80": "HTTPS"}, {"94.42.155.68:3128": "HTTP"}, {"178.19.98.173:443": "HTTPS"}, {"190.37.130.177:8080": "HTTP"}, {"120.138.97.225:8080": "HTTPS"}, {"222.88.242.213:9999": "HTTPS"}, {"177.129.214.44:8080": "HTTPS"}, {"91.228.53.28:7808": "HTTP"}, {"213.131.41.98:8080": "HTTPS"}, {"117.59.224.60:80": "HTTP"}, {"180.250.34.30:8080": "HTTPS"}, {"188.95.38.233:8080": "HTTPS"}, {"202.118.236.130:7777": "HTTP"}, {"180.241.113.26:3128": "HTTPS"}, {"72.64.146.136:3128": "HTTP"}, {"93.190.18.146:8080": "HTTP"}, {"118.97.191.203:8080": "HTTPS"}, {"217.219.190.209:8080": "HTTPS"}, {"84.22.2.1:8080": "HTTPS"}, {"213.110.196.195:8080": "HTTPS"}, {"94.205.205.131:80": "HTTP"}, {"125.163.192.106:8080": "HTTPS"}, {"179.49.112.50:8089": "HTTPS"}, {"201.49.209.146:3128": "HTTP"}, {"200.75.3.85:3128": "HTTPS"}, {"119.30.39.1:3128": "HTTPS"}, {"202.114.6.37:9001": "HTTP"}, {"181.177.194.118:3128": "HTTPS"}, {"192.69.200.37:8089": "HTTP"}, {"46.0.202.184:80": "HTTPS"}, {"180.247.112.101:8080": "HTTPS"}, {"41.79.61.26:8080": "HTTP"}, {"199.204.45.90:3127": "HTTP"}, {"58.20.127.100:3128": "HTTP"}, {"115.124.65.90:9876": "HTTPS"}, {"61.19.51.226:3128": "HTTPS"}, {"186.227.160.129:3128": "HTTPS"}, {"117.135.131.86:80": "HTTP"}, {"192.227.139.227:8089": "HTTP"}, {"94.228.205.33:8080": "HTTPS"}, {"197.160.116.70:8080": "HTTPS"}, {"201.242.25.184:3128": "HTTPS"}, {"91.225.78.152:8080": "HTTP"}, {"187.125.147.178:3128": "HTTP"}, {"103.16.45.184:80": "HTTPS"}, {"202.98.123.126:8080": "HTTP"}, {"176.31.233.170:3128": "HTTP"}, {"193.226.94.95:8888": "HTTP"}, {"78.46.250.85:3128": "HTTP"}, {"66.122.95.249:8080": "HTTPS"}, {"200.44.126.114:3131": "HTTP"}, {"200.37.53.116:8080": "HTTPS"}, {"187.120.217.82:3128": "HTTPS"}, {"41.129.120.180:8080": "HTTPS"}, {"119.97.146.16:8080": "HTTP"}, {"41.164.23.162:8080": "HTTPS"}, {"88.85.108.16:8080": "HTTPS"}, {"190.201.18.205:8080": "HTTP"}, {"88.135.0.32:3128": "HTTPS"}, {"88.146.243.118:8080": "HTTPS"}, {"86.120.196.242:8080": "HTTPS"}, {"85.234.22.126:3128": "HTTPS"}, {"41.208.150.114:8080": "HTTPS"}, {"203.146.82.253:80": "HTTP"}, {"177.21.227.126:8080": "HTTPS"}, {"187.120.209.153:3128": "HTTPS"}, {"85.135.52.30:8080": "HTTP"}, {"198.204.245.212:8089": "HTTP"}, {"58.215.179.194:80": "HTTP"}, {"94.228.205.9:8080": "HTTPS"}, {"196.46.247.74:8080": "HTTPS"}, {"116.193.170.213:8080": "HTTPS"}, {"54.244.125.115:80": "HTTP"}, {"112.124.15.141:3128": "HTTPS"}, {"189.85.29.110:8080": "HTTPS"}, {"69.73.181.22:8089": "HTTP"}, {"118.97.99.35:8080": "HTTPS"}, {"69.197.132.80:3127": "HTTP"}, {"93.78.6.213:3128": "HTTP"}, {"196.202.116.2:80": "HTTPS"}, {"200.89.99.182:8080": "HTTPS"}, {"54.214.50.160:80": "HTTP"}, {"49.248.113.53:3128": "HTTPS"}, {"115.134.2.177:8080": "HTTPS"}, {"69.73.181.22:3127": "HTTP"}, {"212.144.254.122:3128": "HTTP"}, {"190.248.131.250:8080": "HTTPS"}, {"177.85.156.110:8080": "HTTPS"}, {"61.176.217.224:8080": "HTTP"}, {"87.229.26.141:31280": "HTTP"}, {"85.185.42.5:8080": "HTTPS"}, {"211.167.112.14:82": "HTTP"}, {"113.107.43.75:818": "HTTPS"}]}
@@ -7,13 +7,13 @@
import logging
logger = logging.getLogger(__name__)
-logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')
+#logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')
from .base_handler import BaseHandler
import multiprocessing as mp
import futures, json, copy, time
from tweetf0rm.redis_helper import NodeQueue, NodeCoordinator
-from tweetf0rm.utils import full_stack, distribute_to_node
+from tweetf0rm.utils import full_stack, get_keys_by_min_value
import json
def flush_cmd(bulk, data_type, template, redis_config, verbose=False):
@@ -38,7 +38,7 @@ def flush_cmd(bulk, data_type, template, redis_config, verbose=False):
t["user_id"] = user_id
t["depth"] -= 1
- node_id = distribute_to_node(qsizes)[0]
+ node_id = get_keys_by_min_value(qsizes)[0]
if (node_id in node_queues):
node_queue = node_queues[node_id]
@@ -25,15 +25,6 @@ def __init__(self, node_id, crawler_id, apikeys, handlers = None, verbose = Fals
self.redis_config = redis_config
self.apikeys = copy.copy(apikeys)
self.node_id = node_id
- self.client_args = {"timeout": 300}
-
- if (proxies):
- self.client_args['proxies'] = proxies
-
- self.user_api = User(apikeys=apikeys, verbose=verbose, client_args=self.client_args)
-
- if (self.verbose):
- logger.info("# of handlers: %d"%(len(self.get_handlers())))
self.tasks = {
"TERMINATE": "TERMINATE",
"CRAWL_FRIENDS" : {
@@ -48,7 +39,20 @@ def __init__(self, node_id, crawler_id, apikeys, handlers = None, verbose = Fals
},
"CRAWL_USER_TIMELINE": "fetch_user_timeline"
}
- self.node_queue = NodeQueue(self.node_id, redis_config=redis_config)
+ self.node_queue = NodeQueue(self.node_id, redis_config=redis_config)
+ self.client_args = {"timeout": 300}
+ self.proxies = iter(proxies) if proxies else None
+ self.user_api = None
+ self.init_user_api()
+
+ def init_user_api(self): # this will throw StopIteration if all proxies have been tried...
+ if (self.proxies):
+ self.client_args['proxies'] = next(self.proxies) # this will throw out
+
+ if (self.user_api):
+ del self.user_api
+
+ self.user_api = User(apikeys=self.apikeys, verbose=self.verbose, client_args=self.client_args)
def get_handlers(self):
return self.handlers
@@ -122,6 +126,24 @@ def run(self):
func(**args)
except Exception as exc:
logger.error("%s"%exc)
+ try:
+ self.init_user_api()
+ except Exception as init_user_api_exc:
+ import exceptions
+ if (isinstance(init_user_api, exceptions.StopIteration)): # no more proxy to try... so kill myself...
+ for handler in self.handlers:
+ handler.flush_all()
+ # flush first
+ self.node_queue.put({
+ 'cmd':'CRAWLER_FAILED',
+ 'crawler_id': self.crawler_id
+ })
+ return False
+ #raise
+ else:
+ #put current task back to queue...
+ self.enqueue(cmd)
+
#logger.error(full_stack())
else:
self.node_queue.put({'cmd':"CMD_FINISHED", "cmd_hash":cmd_hash, "crawler_id":self.crawler_id})
View
@@ -7,7 +7,7 @@
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')
requests_log = logging.getLogger("requests")
-requests_log.setLevel(logging.WARNING)
+requests_log.setLevel(logging.DEBUG)
from tweetf0rm.utils import full_stack
import requests, futures
@@ -20,8 +20,10 @@ def check_proxy(proxy, timeout):
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.5'
}
+ proxy_ip = proxy.keys()[0]
+ proxy_type = proxy.values()[0]
- p = {'proxy':proxy,'proxy_dict':{"http" : 'http://%s'%proxy}}
+ p = {'proxy':proxy,'proxy_dict':{proxy_type: '%s://%s'%(proxy_type, proxy_ip)}}
r = requests.get(url, headers=headers, proxies=p['proxy_dict'], timeout=timeout)
@@ -34,7 +36,9 @@ def check_proxy(proxy, timeout):
return False, None
def proxy_checker(proxies):
- proxies = set(proxies)
+ '''
+ proxies is a list of {key:value}, where the key is the ip of the proxy (including port), e.g., 192.168.1.1:8080, and the value is the type of the proxy (http/https)
+ '''
logger.info('%d proxies to check'%(len(proxies)))
good_proxies = []
View
@@ -5,9 +5,9 @@
import logging
logger = logging.getLogger(__name__)
-logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')
-requests_log = logging.getLogger("requests")
-requests_log.setLevel(logging.INFO)
+#logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')
+# requests_log = logging.getLogger("requests")
+# requests_log.setLevel(logging.INFO)
import redis, json
@@ -83,26 +83,28 @@ class NodeCoordinator(RedisBase):
'''
def __init__(self, redis_config=None):
super(NodeCoordinator, self).__init__("coordinator", namespace="node", redis_config=redis_config)
+ self.active_nodes = '%s:active'%(self.key)
+ self.all_nodes = '%s:all'%(self.key)
def clear(self):
self.conn().delete(self.key)
def add_node(self, node_id):
- self.conn().sadd(self.key, node_id)
+ self.conn().sadd(self.active_nodes, node_id)
+ self.conn().sadd(self.all_nodes, node_id)
def remove_node(self, node_id):
- self.conn().srem(self.key, node_id)
+ ''' Only remove the node from the active list;'''
+ self.conn().srem(self.active_nodes, node_id)
def node_queue_key(self, node_id):
return 'queue:%s'%(node_id)
def node_qsizes(self):
'''
- List the size of all node queues
+ List the size of all active nodes' queues
'''
- node_ids = self.conn().smembers(self.key)
-
- logger.info(node_ids)
+ node_ids = self.conn().smembers(self.active_nodes)
qsizes = {node_id:self.conn().llen(self.node_queue_key(node_id)) for node_id in node_ids}
View
@@ -5,18 +5,19 @@
import logging
logger = logging.getLogger(__name__)
-logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')
-requests_log = logging.getLogger("requests")
-requests_log.setLevel(logging.WARNING)
+# logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')
+# requests_log = logging.getLogger("requests")
+# requests_log.setLevel(logging.WARNING)
-import json, copy
-from tweetf0rm.utils import full_stack, hash_cmd, md5
+import json, copy, time
+from tweetf0rm.utils import full_stack, hash_cmd, md5, get_keys_by_min_value
from tweetf0rm.proxies import proxy_checker
from process.user_relationship_crawler import UserRelationshipCrawler
#from handler.inmemory_handler import InMemoryHandler
from handler import create_handler
from tweetf0rm.redis_helper import NodeCoordinator
+
class Scheduler(object):
def __init__(self, node_id, config={}, proxies=[], verbose=False):
@@ -29,8 +30,13 @@ def __init__(self, node_id, config={}, proxies=[], verbose=False):
# each process only get one apikey... if there are more proxies than apikeys, each process can get more than one proxy that can be rotated when one fails.
number_of_processes = min(len(self.config['apikeys']), len(self.proxy_list))
+
+ # if there are more proxies than apikeys, then each process will get a list of proxies, and the process will restart it self if a proxy failed, and try the next available proxy
+ self.proxy_generator = self.split(self.proxy_list, number_of_processes)
+
else:
self.proxy_list = None
+ self.proxy_generator = None
number_of_processes = 1
if (verbose):
@@ -46,13 +52,15 @@ def __init__(self, node_id, config={}, proxies=[], verbose=False):
apikey_list = self.config['apikeys'].keys()
+
crawlers = {}
for idx in range(number_of_processes):
crawler_id = md5('%s:%s'%(self.node_id, idx))
apikeys = self.config['apikeys'][apikey_list[idx]]
if (verbose):
logger.info('creating a new crawler: %s'%crawler_id)
- crawler = UserRelationshipCrawler(self.node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], verbose=verbose, redis_config=copy.copy(config['redis_config']), proxies=self.proxy_list[idx]['proxy_dict'])
+ crawler_proxies = next(self.proxy_generator) if self.proxy_generator else None
+ crawler = UserRelationshipCrawler(self.node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], verbose=verbose, redis_config=copy.copy(config['redis_config']), proxies=crawler_proxies)
crawlers[crawler_id] = {
'crawler': crawler,
'queue': {}
@@ -86,13 +94,52 @@ def persist_queues(self):
for crawler_id in self.crawlers:
cmds.update(self.crawlers[crawler_id]['queue'])
- with open('__cmds.json', 'wb') as f:
+ with open('%s_queued_cmds.json'%(int(time.time())), 'wb') as f:
json.dump(cmds, f)
+ def remaining_tasks(self):
+ qsizes = [len(self.crawlers[crawler_id]['queue']) for crawler_id in self.crawlers]
+ return sum(qsizes)
+
+ def distribute_to_nodes(self, queue):
+ node_queues = {}
+
+ def get_node_queue(node_id, redis_config):
+ if (node_id in node_queues):
+ node_queue = node_queues[node_id]
+ else:
+ node_queue = NodeQueue(node_id, redis_config=redis_config)
+ node_queues[node_id] = node_queue
+
+ qsizes = self.node_coordinator.node_qsizes()
+
+ for cmd in queue.values():
+ node_id = get_keys_by_min_value(qsizes)[0]
+
+ node_queue = get_node_queue(self, node_id)
+
+ node_queue.put(cmd)
+ qsizes[node_id] += 1
+
+
+
+
def enqueue(self, cmd):
if (cmd['cmd'] == 'TERMINATE'):
+ # note that we need to save both the queues that are local, but also let others know that i am dead, and I need to have my redis queue cleared out... (it's possible that another node is still trying to send data into my redis queue, after i am dead... this needs to be handled as maintenance job (take the cmds in dead nodes' queue, and persist...))
+ if (remaining_tasks > 0):
+ self.persist_queues()
[self.crawlers[crawler_id]['crawler'].enqueue(cmd) for crawler_id in self.crawlers]
+ elif(cmd['cmd'] == 'CRAWLER_FAILED'):
+ crawler_id = cmd['crawler_id']
+ if (crawler_id in self.crawlers):
+ self.distribute_to_nodes(self.crawlers[crawler_id]['queue'])
+ # wait until it dies (flushed all the data...)
+ while(self.crawlers[crawler_id]['crawler'].is_alive()):
+ time.sleep(60)
+ else:
+ logger.warn("whatever are you trying to do? crawler_id: [%s] is not valid..."%(crawler_id))
elif(cmd['cmd'] == 'CMD_FINISHED'):
#acknowledged finished cmd
try:
@@ -103,8 +150,6 @@ def enqueue(self, cmd):
else:
crawler_id = self.distribute_to()
- logger.info(crawler_id)
-
self.crawlers[crawler_id]['queue'][hash_cmd(cmd)] = cmd
self.crawlers[crawler_id]['crawler'].enqueue(cmd)
Oops, something went wrong.

0 comments on commit 227768e

Please sign in to comment.