diff --git a/tweetf0rm/client.py b/tweetf0rm/client.py index f2531cc..c61e2e4 100644 --- a/tweetf0rm/client.py +++ b/tweetf0rm/client.py @@ -89,6 +89,17 @@ 'bucket': { 'value': 'timelines' } + },'CRAWL_TWEET': { + 'tweet_id' : { + 'value':0 + }, + 'bucket': { + 'value': 'tweets' + } + }, 'BATCH_CRAWL_TWEET': { + 'bucket': { + 'value': 'tweets' + } }, 'BATCH_CRAWL_USER_TIMELINE': { 'bucket': { 'value': 'timelines' @@ -155,16 +166,24 @@ def cmd(config, args): users = user_api.get_users(user_ids) json.dump(list(users), o_f) elif (args.command.startswith('BATCH_')): - command = args.command.replace('BATCH_', '') + new_command = args.command.replace('BATCH_', '') args_dict = copy.copy(args.__dict__) if (not os.path.exists(args.json)): raise Exception("doesn't exist... ") with open(os.path.abspath(args.json), 'rb') as f: - user_ids = json.load(f) - for user_id in user_ids: - args_dict['user_id'] = user_id - cmd = new_cmd(command, args_dict) - node_queue.put(cmd) + if ( args.command == 'BATCH_CRAWL_TWEET' ): + tweet_ids = json.load(f) + for tweet_id in tweet_ids: + print "Loading Tweet ID: ", tweet_id + args_dict['tweet_id'] = tweet_id + cmd = new_cmd(new_command, args_dict) + node_queue.put(cmd) + else: + user_ids = json.load(f) + for user_id in user_ids: + args_dict['user_id'] = user_id + cmd = new_cmd(new_command, args_dict) + node_queue.put(cmd) elif (args.command == 'LIST_NODES'): pp.pprint(node_coordinator.list_nodes()) elif (args.command == 'NODE_QSIZES'): @@ -187,6 +206,7 @@ def cmd(config, args): def print_avaliable_cmd(): dictionary = { '-uid/--user_id': 'the user id that you want to crawl his/her friends (who he/she is following) or followers', + '-tid/--tweet_id': 'the tweet id that you want to fetch', #'-nt/--network_type': 'whether you want to crawl his/her friends or followers', '-dt/--data_type': '"ids" or "users" (default to ids) what the results are going to look like (either a list of twitter user ids or a list of user objects)', '-d/--depth': 'the depth of the network; e.g., if it is 2, it will give you his/her (indicated by the -uid) friends\' friends', @@ -216,6 +236,10 @@ def print_avaliable_cmd(): '-d/--depth': dictionary['-d/--depth'] }, 'CRAWL_USER_TIMELINE': { '-uid/--user_id': dictionary['-uid/--user_id'] + }, 'CRAWL_TWEET': { + '-tid/--tweet_id': dictionary['-tid/--tweet_id'] + }, 'BATCH_CRAWL_TWEET': { + '-j/--json': dictionary['-j/--json'] }, 'BATCH_CRAWL_USER_TIMELINE': { '-j/--json': dictionary['-j/--json'] }, 'GET_UIDS_FROM_SCREEN_NAMES': { @@ -251,6 +275,7 @@ def print_avaliable_cmd(): parser.add_argument('-c', '--config', help="config.json that contains a) twitter api keys; b) redis connection string;", required = True) parser.add_argument('-cmd', '--command', help="the cmd you want to run, e.g., \"CRAWL_FRIENDS\"", required=True) parser.add_argument('-uid', '--user_id', help="the user_id", default=0) + parser.add_argument('-tid', '--tweet_id', help="the tweet_id", default=0) parser.add_argument('-dt', '--data_type', help="the data_type (e.g., 'ids' or 'users'", default='ids') parser.add_argument('-d', '--depth', help="the depth", default=1) parser.add_argument('-j', '--json', help="the location of the json file that has a list of user_ids or screen_names", required=False) diff --git a/tweetf0rm/process/user_relationship_crawler.py b/tweetf0rm/process/user_relationship_crawler.py index 368d378..938bff7 100644 --- a/tweetf0rm/process/user_relationship_crawler.py +++ b/tweetf0rm/process/user_relationship_crawler.py @@ -36,7 +36,8 @@ def __init__(self, node_id, crawler_id, apikeys, handlers, redis_config, proxies "ids": "find_all_follower_ids", "network_type": "followers" }, - "CRAWL_USER_TIMELINE": "fetch_user_timeline" + "CRAWL_USER_TIMELINE": "fetch_user_timeline", + "CRAWL_TWEET": "fetch_tweet_by_id" } self.node_queue = NodeQueue(self.node_id, redis_config=redis_config) self.client_args = {"timeout": 300} @@ -97,11 +98,19 @@ def run(self): handler.flush_all() else: - args = { - "user_id": cmd['user_id'], - "write_to_handlers": self.handlers, - "cmd_handlers" : [] - } + args = {} + if (command == 'CRAWL_TWEET'): + args = { + "tweet_id": cmd['tweet_id'], + "write_to_handlers": self.handlers, + "cmd_handlers" : [] + } + else: + args = { + "user_id": cmd['user_id'], + "write_to_handlers": self.handlers, + "cmd_handlers" : [] + } bucket = cmd["bucket"] if "bucket" in cmd else None @@ -109,7 +118,7 @@ def run(self): args["bucket"] = bucket func = None - if (command == 'CRAWL_USER_TIMELINE'): + if (command in ['CRAWL_USER_TIMELINE', 'CRAWL_TWEET']): func = getattr(self.user_api, self.tasks[command]) elif (command in ['CRAWL_FRIENDS', 'CRAWL_FOLLOWERS']): data_type = cmd['data_type'] diff --git a/tweetf0rm/redis_helper.py b/tweetf0rm/redis_helper.py index 71506d3..b10b42a 100644 --- a/tweetf0rm/redis_helper.py +++ b/tweetf0rm/redis_helper.py @@ -124,7 +124,8 @@ def distribute_to_nodes(self, crawler_queue): qsizes = self.node_qsizes() - while (crawler_queue.get(timeout=60)): + cmd = crawler_queue.get(timeout=60) + while (cmd): node_id = get_keys_by_min_value(qsizes)[0] @@ -133,6 +134,8 @@ def distribute_to_nodes(self, crawler_queue): node.put(cmd) qsizes[node_id] += 1 + cmd = crawler_queue.get(timeout=60) + def clear(self): self.conn().delete('%s:*'%self.key) @@ -160,7 +163,7 @@ def node_qsizes(self): for crawler_queue_key in self.conn().keys('queue:%s:*'%node_id): qsize += self.conn().llen(crawler_queue_key) - qsizes[node_id] = node_qsize + qsizes[node_id] = qsize return qsizes diff --git a/tweetf0rm/twitterapi/users.py b/tweetf0rm/twitterapi/users.py index acf408f..2029c55 100644 --- a/tweetf0rm/twitterapi/users.py +++ b/tweetf0rm/twitterapi/users.py @@ -257,7 +257,52 @@ def fetch_user_timeline(self, user_id = None, write_to_handlers=[], cmd_handlers for handler in write_to_handlers: handler.append(json.dumps({}), bucket=bucket, key=user_id) - logger.debug("[%s] total tweets: %d "%(user_id, cnt)) + logger.debug("[%s] total tweets: %d "%(user_id, cnt)) + + def fetch_tweet_by_id(self, tweet_id = None, write_to_handlers=[], cmd_handlers=[], bucket="tweets"): + + if not tweet_id: + raise Exception("show_status: tweet_id cannot be None") + + tweet = None + retry_cnt = MAX_RETRY_CNT + while retry_cnt > 1: + try: + tweet = self.show_status(id=tweet_id) + + # logger.debug('%d > %d ? %s'%(prev_max_id, current_max_id, bool(prev_max_id > current_max_id))) + logger.info("Fetched tweet [%s]" % (tweet_id)) + + break + + except twython.exceptions.TwythonRateLimitError: + self.rate_limit_error_occured('statuses', '/statuses/show') + except twython.exceptions.TwythonError as te: + if ( te.error_code == 404 or te.error_code == 403 ): + logger.info("Tweet [%s] unavailable. Error code: %d" % (tweet_id, te.error_code)) + + break + else: + time.sleep(10) + logger.error("exception: %s"%(te)) + retry_cnt -= 1 + if (retry_cnt == 0): + raise MaxRetryReached("max retry reached due to %s"%(te)) + except Exception as exc: + time.sleep(10) + logger.error("exception: %s, %s"%(exc, type(exc))) + retry_cnt -= 1 + if (retry_cnt == 0): + raise MaxRetryReached("max retry reached due to %s"%(exc)) + + if (tweet != None): + for handler in write_to_handlers: + handler.append(json.dumps(tweet), bucket=bucket, key="tweetList") + else: + for handler in write_to_handlers: + handler.append(json.dumps({"id":tweet_id}), bucket=bucket, key="tweetList") + + logger.debug("[%s] tweet fetched..." % tweet_id) def get_user_ids_by_screen_names(self, seeds):