Permalink
Browse files

Merge pull request #2 from cbuntain/master

Bug fixes when commands fail and need to be redistributed.
  • Loading branch information...
2 parents 6c87fb1 + b8345c2 commit 8f75668028d33c74d92ecc78e47c6a0599e04b62 @bianjiang committed Apr 3, 2014
Showing with 98 additions and 16 deletions.
  1. +31 −6 tweetf0rm/client.py
  2. +16 −7 tweetf0rm/process/user_relationship_crawler.py
  3. +5 −2 tweetf0rm/redis_helper.py
  4. +46 −1 tweetf0rm/twitterapi/users.py
View
@@ -89,6 +89,17 @@
'bucket': {
'value': 'timelines'
}
+ },'CRAWL_TWEET': {
+ 'tweet_id' : {
+ 'value':0
+ },
+ 'bucket': {
+ 'value': 'tweets'
+ }
+ }, 'BATCH_CRAWL_TWEET': {
+ 'bucket': {
+ 'value': 'tweets'
+ }
}, 'BATCH_CRAWL_USER_TIMELINE': {
'bucket': {
'value': 'timelines'
@@ -155,16 +166,24 @@ def cmd(config, args):
users = user_api.get_users(user_ids)
json.dump(list(users), o_f)
elif (args.command.startswith('BATCH_')):
- command = args.command.replace('BATCH_', '')
+ new_command = args.command.replace('BATCH_', '')
args_dict = copy.copy(args.__dict__)
if (not os.path.exists(args.json)):
raise Exception("doesn't exist... ")
with open(os.path.abspath(args.json), 'rb') as f:
- user_ids = json.load(f)
- for user_id in user_ids:
- args_dict['user_id'] = user_id
- cmd = new_cmd(command, args_dict)
- node_queue.put(cmd)
+ if ( args.command == 'BATCH_CRAWL_TWEET' ):
+ tweet_ids = json.load(f)
+ for tweet_id in tweet_ids:
+ print "Loading Tweet ID: ", tweet_id
+ args_dict['tweet_id'] = tweet_id
+ cmd = new_cmd(new_command, args_dict)
+ node_queue.put(cmd)
+ else:
+ user_ids = json.load(f)
+ for user_id in user_ids:
+ args_dict['user_id'] = user_id
+ cmd = new_cmd(new_command, args_dict)
+ node_queue.put(cmd)
elif (args.command == 'LIST_NODES'):
pp.pprint(node_coordinator.list_nodes())
elif (args.command == 'NODE_QSIZES'):
@@ -187,6 +206,7 @@ def cmd(config, args):
def print_avaliable_cmd():
dictionary = {
'-uid/--user_id': 'the user id that you want to crawl his/her friends (who he/she is following) or followers',
+ '-tid/--tweet_id': 'the tweet id that you want to fetch',
#'-nt/--network_type': 'whether you want to crawl his/her friends or followers',
'-dt/--data_type': '"ids" or "users" (default to ids) what the results are going to look like (either a list of twitter user ids or a list of user objects)',
'-d/--depth': 'the depth of the network; e.g., if it is 2, it will give you his/her (indicated by the -uid) friends\' friends',
@@ -216,6 +236,10 @@ def print_avaliable_cmd():
'-d/--depth': dictionary['-d/--depth']
}, 'CRAWL_USER_TIMELINE': {
'-uid/--user_id': dictionary['-uid/--user_id']
+ }, 'CRAWL_TWEET': {
+ '-tid/--tweet_id': dictionary['-tid/--tweet_id']
+ }, 'BATCH_CRAWL_TWEET': {
+ '-j/--json': dictionary['-j/--json']
}, 'BATCH_CRAWL_USER_TIMELINE': {
'-j/--json': dictionary['-j/--json']
}, 'GET_UIDS_FROM_SCREEN_NAMES': {
@@ -251,6 +275,7 @@ def print_avaliable_cmd():
parser.add_argument('-c', '--config', help="config.json that contains a) twitter api keys; b) redis connection string;", required = True)
parser.add_argument('-cmd', '--command', help="the cmd you want to run, e.g., \"CRAWL_FRIENDS\"", required=True)
parser.add_argument('-uid', '--user_id', help="the user_id", default=0)
+ parser.add_argument('-tid', '--tweet_id', help="the tweet_id", default=0)
parser.add_argument('-dt', '--data_type', help="the data_type (e.g., 'ids' or 'users'", default='ids')
parser.add_argument('-d', '--depth', help="the depth", default=1)
parser.add_argument('-j', '--json', help="the location of the json file that has a list of user_ids or screen_names", required=False)
@@ -36,7 +36,8 @@ def __init__(self, node_id, crawler_id, apikeys, handlers, redis_config, proxies
"ids": "find_all_follower_ids",
"network_type": "followers"
},
- "CRAWL_USER_TIMELINE": "fetch_user_timeline"
+ "CRAWL_USER_TIMELINE": "fetch_user_timeline",
+ "CRAWL_TWEET": "fetch_tweet_by_id"
}
self.node_queue = NodeQueue(self.node_id, redis_config=redis_config)
self.client_args = {"timeout": 300}
@@ -97,19 +98,27 @@ def run(self):
handler.flush_all()
else:
- args = {
- "user_id": cmd['user_id'],
- "write_to_handlers": self.handlers,
- "cmd_handlers" : []
- }
+ args = {}
+ if (command == 'CRAWL_TWEET'):
+ args = {
+ "tweet_id": cmd['tweet_id'],
+ "write_to_handlers": self.handlers,
+ "cmd_handlers" : []
+ }
+ else:
+ args = {
+ "user_id": cmd['user_id'],
+ "write_to_handlers": self.handlers,
+ "cmd_handlers" : []
+ }
bucket = cmd["bucket"] if "bucket" in cmd else None
if (bucket):
args["bucket"] = bucket
func = None
- if (command == 'CRAWL_USER_TIMELINE'):
+ if (command in ['CRAWL_USER_TIMELINE', 'CRAWL_TWEET']):
func = getattr(self.user_api, self.tasks[command])
elif (command in ['CRAWL_FRIENDS', 'CRAWL_FOLLOWERS']):
data_type = cmd['data_type']
@@ -124,7 +124,8 @@ def distribute_to_nodes(self, crawler_queue):
qsizes = self.node_qsizes()
- while (crawler_queue.get(timeout=60)):
+ cmd = crawler_queue.get(timeout=60)
+ while (cmd):
node_id = get_keys_by_min_value(qsizes)[0]
@@ -133,6 +134,8 @@ def distribute_to_nodes(self, crawler_queue):
node.put(cmd)
qsizes[node_id] += 1
+ cmd = crawler_queue.get(timeout=60)
+
def clear(self):
self.conn().delete('%s:*'%self.key)
@@ -160,7 +163,7 @@ def node_qsizes(self):
for crawler_queue_key in self.conn().keys('queue:%s:*'%node_id):
qsize += self.conn().llen(crawler_queue_key)
- qsizes[node_id] = node_qsize
+ qsizes[node_id] = qsize
return qsizes
@@ -257,7 +257,52 @@ def fetch_user_timeline(self, user_id = None, write_to_handlers=[], cmd_handlers
for handler in write_to_handlers:
handler.append(json.dumps({}), bucket=bucket, key=user_id)
- logger.debug("[%s] total tweets: %d "%(user_id, cnt))
+ logger.debug("[%s] total tweets: %d "%(user_id, cnt))
+
+ def fetch_tweet_by_id(self, tweet_id = None, write_to_handlers=[], cmd_handlers=[], bucket="tweets"):
+
+ if not tweet_id:
+ raise Exception("show_status: tweet_id cannot be None")
+
+ tweet = None
+ retry_cnt = MAX_RETRY_CNT
+ while retry_cnt > 1:
+ try:
+ tweet = self.show_status(id=tweet_id)
+
+ # logger.debug('%d > %d ? %s'%(prev_max_id, current_max_id, bool(prev_max_id > current_max_id)))
+ logger.info("Fetched tweet [%s]" % (tweet_id))
+
+ break
+
+ except twython.exceptions.TwythonRateLimitError:
+ self.rate_limit_error_occured('statuses', '/statuses/show')
+ except twython.exceptions.TwythonError as te:
+ if ( te.error_code == 404 or te.error_code == 403 ):
+ logger.info("Tweet [%s] unavailable. Error code: %d" % (tweet_id, te.error_code))
+
+ break
+ else:
+ time.sleep(10)
+ logger.error("exception: %s"%(te))
+ retry_cnt -= 1
+ if (retry_cnt == 0):
+ raise MaxRetryReached("max retry reached due to %s"%(te))
+ except Exception as exc:
+ time.sleep(10)
+ logger.error("exception: %s, %s"%(exc, type(exc)))
+ retry_cnt -= 1
+ if (retry_cnt == 0):
+ raise MaxRetryReached("max retry reached due to %s"%(exc))
+
+ if (tweet != None):
+ for handler in write_to_handlers:
+ handler.append(json.dumps(tweet), bucket=bucket, key="tweetList")
+ else:
+ for handler in write_to_handlers:
+ handler.append(json.dumps({"id":tweet_id}), bucket=bucket, key="tweetList")
+
+ logger.debug("[%s] tweet fetched..." % tweet_id)
def get_user_ids_by_screen_names(self, seeds):

0 comments on commit 8f75668

Please sign in to comment.