Permalink
Browse files

refactor, add search_for function, need more work to create crawler d…

…ynamically per request
  • Loading branch information...
1 parent c454bd0 commit e5c118326b1ce951a3935ceb760d2e47e9e76d21 @bi0nji0ng bi0nji0ng committed Dec 17, 2014
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
View
@@ -31,13 +31,27 @@ def teardown_class(cls):
def setup(self): def setup(self):
import sys, os, json import sys, os, json
#sys.path.append("..") #sys.path.append("..")
- with open(os.path.abspath('../../config.json'), 'rb') as config_f, open(os.path.abspath('proxies.json'), 'rb') as proxy_f: + with open(os.path.abspath('../config.json'), 'rb') as config_f, open(os.path.abspath('proxies.json'), 'rb') as proxy_f:
self.config = json.load(config_f) self.config = json.load(config_f)
self.proxies = json.load(proxy_f) self.proxies = json.load(proxy_f)
def teardown(self): def teardown(self):
pass pass
+ #@nottest
+ def test_search(self):
+ from tweetf0rm.twitterapi.twitter_api import TwitterAPI
+ from tweetf0rm.handler.inmemory_handler import InMemoryHandler
+
+ apikeys = self.config["apikeys"]["i0mf0rmer03"]
+
+ #inmemoryhandler = InMemoryHandler()
+ twitter_api = TwitterAPI(apikeys=apikeys)
+ tweets = twitter_api.search_by_query(query="transmasculine OR transman OR transmale")
+ #tweets = twitter_api.search(q="twitter", geocode=None, lang=None, count=100)
+ logger.info(tweets)
+
+
@nottest @nottest
def test_distribute_to_local(self): def test_distribute_to_local(self):
def distribute_to(crawlers): def distribute_to(crawlers):
@@ -90,14 +104,14 @@ def distribute_to(qsizes):
@nottest @nottest
def test_get_user_id(self): def test_get_user_id(self):
- from tweetf0rm.twitterapi.users import User + from tweetf0rm.twitterapi.twitter_api import TwitterAPI
from tweetf0rm.handler.inmemory_handler import InMemoryHandler from tweetf0rm.handler.inmemory_handler import InMemoryHandler
apikeys = self.config["apikeys"]["i0mf0rmer03"] apikeys = self.config["apikeys"]["i0mf0rmer03"]
#inmemoryhandler = InMemoryHandler() #inmemoryhandler = InMemoryHandler()
- user_api = User(apikeys=apikeys) + twitter_api = TwitterAPI(apikeys=apikeys)
- userIds = user_api.get_user_ids_by_screen_names(["AmericanCance"]) + userIds = twitter_api.get_user_ids_by_screen_names(["AmericanCance"])
logger.info(userIds) logger.info(userIds)
@nottest @nottest
View
@@ -19,7 +19,7 @@
import multiprocessing as mp import multiprocessing as mp
-from tweetf0rm.twitterapi.users import User +from tweetf0rm.twitterapi.twitter_api import TwitterAPI
class Handler(object): class Handler(object):
@@ -29,8 +29,8 @@ def append(self,data, bucket=None, key=None):
def call_user_api(apikeys, client_args): def call_user_api(apikeys, client_args):
- user_api = User(apikeys=apikeys, client_args=client_args) + twitter_api = TwitterAPI(apikeys=apikeys, client_args=client_args)
- user_api.find_all_friend_ids(53039176, [Handler()]) + twitter_api.find_all_friend_ids(53039176, [Handler()])
class TestTwitterRateLimit: class TestTwitterRateLimit:
View
@@ -158,6 +158,7 @@ def start_server(config, proxies):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help="config.json that contains a) twitter api keys; b) redis connection string;", required = True) parser.add_argument('-c', '--config', help="config.json that contains a) twitter api keys; b) redis connection string;", required = True)
parser.add_argument('-p', '--proxies', help="the proxies.json file") parser.add_argument('-p', '--proxies', help="the proxies.json file")
+ parser.add_argument('-m', '--mode', help="mode of the cralwer (streaming or normal crawler)", default='crawler')
args = parser.parse_args() args = parser.parse_args()
View
@@ -116,10 +116,18 @@
}, 'CLEAR_NODE_QUEUES': { }, 'CLEAR_NODE_QUEUES': {
+ }, 'SEARCH': {
+ 'query' : {
+ 'value': None,
+ 'validation': lambda x: x != None
+ },
+ 'bucket': {
+ 'value': 'tweets'
+ }
} }
} }
-from tweetf0rm.twitterapi.users import User +from tweetf0rm.twitterapi.twitter_api import TwitterAPI
import json, os import json, os
def new_cmd(command, args_dict): def new_cmd(command, args_dict):
@@ -153,17 +161,17 @@ def cmd(config, args):
raise Exception("doesn't exist... ") raise Exception("doesn't exist... ")
with open(os.path.abspath(args.json), 'rb') as f, open(os.path.abspath(args.output), 'wb') as o_f: with open(os.path.abspath(args.json), 'rb') as f, open(os.path.abspath(args.output), 'wb') as o_f:
screen_names = json.load(f) screen_names = json.load(f)
- user_api = User(apikeys=apikeys) + twitter_api = TwitterAPI(apikeys=apikeys)
- user_ids = user_api.get_user_ids_by_screen_names(screen_names) + user_ids = twitter_api.get_user_ids_by_screen_names(screen_names)
json.dump(list(user_ids), o_f) json.dump(list(user_ids), o_f)
elif (args.command == 'GET_USERS_FROM_IDS'): elif (args.command == 'GET_USERS_FROM_IDS'):
apikeys = config["apikeys"].values()[0] apikeys = config["apikeys"].values()[0]
if (not os.path.exists(args.json)): if (not os.path.exists(args.json)):
raise Exception("doesn't exist... ") raise Exception("doesn't exist... ")
with open(os.path.abspath(args.json), 'rb') as f, open(os.path.abspath(args.output), 'wb') as o_f: with open(os.path.abspath(args.json), 'rb') as f, open(os.path.abspath(args.output), 'wb') as o_f:
user_ids = json.load(f) user_ids = json.load(f)
- user_api = User(apikeys=apikeys) + twitter_api = TwitterAPI(apikeys=apikeys)
- users = user_api.get_users(user_ids) + users = twitter_api.get_users(user_ids)
json.dump(list(users), o_f) json.dump(list(users), o_f)
elif (args.command.startswith('BATCH_')): elif (args.command.startswith('BATCH_')):
new_command = args.command.replace('BATCH_', '') new_command = args.command.replace('BATCH_', '')
@@ -212,7 +220,8 @@ def print_avaliable_cmd():
'-d/--depth': 'the depth of the network; e.g., if it is 2, it will give you his/her (indicated by the -uid) friends\' friends', '-d/--depth': 'the depth of the network; e.g., if it is 2, it will give you his/her (indicated by the -uid) friends\' friends',
'-j/--json': 'a json file that contains a list of screen_names or user_ids, depending on the command', '-j/--json': 'a json file that contains a list of screen_names or user_ids, depending on the command',
'-o/--output': ' the output json file (for storing user_ids from screen_names)', '-o/--output': ' the output json file (for storing user_ids from screen_names)',
- '-nid/--node_id':'the node_id that you want to interact with; default to the current machine...' + '-nid/--node_id':'the node_id that you want to interact with; default to the current machine...',
+ '-q/--query': 'the query to search'
} }
cmds = {'CRAWL_FRIENDS': { cmds = {'CRAWL_FRIENDS': {
'-uid/--user_id': dictionary['-uid/--user_id'], '-uid/--user_id': dictionary['-uid/--user_id'],
@@ -255,6 +264,8 @@ def print_avaliable_cmd():
'-nid/--node_id': dictionary['-nid/--node_id'] '-nid/--node_id': dictionary['-nid/--node_id']
}, 'CLEAR_NODE_QUEUES':{ }, 'CLEAR_NODE_QUEUES':{
'-nid/--node_id': dictionary['-nid/--node_id'] '-nid/--node_id': dictionary['-nid/--node_id']
+ }, 'SEARCH':{
+ '-q/--query': dictionary['-q/--query']
}} }}
@@ -271,7 +282,7 @@ def print_avaliable_cmd():
nid = node_id() nid = node_id()
import json, os import json, os
- parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-c', '--config', help="config.json that contains a) twitter api keys; b) redis connection string;", required = True) parser.add_argument('-c', '--config', help="config.json that contains a) twitter api keys; b) redis connection string;", required = True)
parser.add_argument('-cmd', '--command', help="the cmd you want to run, e.g., \"CRAWL_FRIENDS\"", required=True) parser.add_argument('-cmd', '--command', help="the cmd you want to run, e.g., \"CRAWL_FRIENDS\"", required=True)
parser.add_argument('-uid', '--user_id', help="the user_id", default=0) parser.add_argument('-uid', '--user_id', help="the user_id", default=0)
@@ -281,10 +292,15 @@ def print_avaliable_cmd():
parser.add_argument('-j', '--json', help="the location of the json file that has a list of user_ids or screen_names", required=False) parser.add_argument('-j', '--json', help="the location of the json file that has a list of user_ids or screen_names", required=False)
parser.add_argument('-o', '--output', help="the location of the output json file for storing user_ids", default='user_ids.json') parser.add_argument('-o', '--output', help="the location of the output json file for storing user_ids", default='user_ids.json')
parser.add_argument('-nid', '--node_id', help="the node_id you want to interact with", default=nid) parser.add_argument('-nid', '--node_id', help="the node_id you want to interact with", default=nid)
+ parser.add_argument('-q', '--query', help="the search query", default=None)
try: try:
args = parser.parse_args() args = parser.parse_args()
+ if args.command == 'HELP':
+ print_avaliable_cmd()
+ quit()
+
with open(os.path.abspath(args.config), 'rb') as config_f: with open(os.path.abspath(args.config), 'rb') as config_f:
config = json.load(config_f) config = json.load(config_f)
@@ -6,7 +6,7 @@
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from .crawler_process import CrawlerProcess from .crawler_process import CrawlerProcess
-from tweetf0rm.twitterapi.users import User +from tweetf0rm.twitterapi.twitter_api import TwitterAPI
from tweetf0rm.handler import create_handler from tweetf0rm.handler import create_handler
from tweetf0rm.handler.crawl_user_relationship_command_handler import CrawlUserRelationshipCommandHandler from tweetf0rm.handler.crawl_user_relationship_command_handler import CrawlUserRelationshipCommandHandler
from tweetf0rm.utils import full_stack, hash_cmd from tweetf0rm.utils import full_stack, hash_cmd
@@ -15,13 +15,13 @@
import copy, json import copy, json
-class UserRelationshipCrawler(CrawlerProcess): +class TwitterCrawler(CrawlerProcess):
def __init__(self, node_id, crawler_id, apikeys, handlers, redis_config, proxies=None): def __init__(self, node_id, crawler_id, apikeys, handlers, redis_config, proxies=None):
if (handlers == None): if (handlers == None):
raise MissingArgs("you need a handler to write the data to...") raise MissingArgs("you need a handler to write the data to...")
- super(UserRelationshipCrawler, self).__init__(node_id, crawler_id, redis_config, handlers) + super(TwitterCrawler, self).__init__(node_id, crawler_id, redis_config, handlers)
self.apikeys = copy.copy(apikeys) self.apikeys = copy.copy(apikeys)
self.tasks = { self.tasks = {
@@ -37,32 +37,32 @@ def __init__(self, node_id, crawler_id, apikeys, handlers, redis_config, proxies
"network_type": "followers" "network_type": "followers"
}, },
"CRAWL_USER_TIMELINE": "fetch_user_timeline", "CRAWL_USER_TIMELINE": "fetch_user_timeline",
- "CRAWL_TWEET": "fetch_tweet_by_id" + "CRAWL_TWEET": "fetch_tweet_by_id",
+ "SEARCH": "search_by_query"
} }
self.node_queue = NodeQueue(self.node_id, redis_config=redis_config) self.node_queue = NodeQueue(self.node_id, redis_config=redis_config)
self.client_args = {"timeout": 300} self.client_args = {"timeout": 300}
self.proxies = iter(proxies) if proxies else None self.proxies = iter(proxies) if proxies else None
- self.user_api = None + self.twitter_api = None
- self.init_user_api() + self.init_twitter_api()
- #self.init_user_api()
- def init_user_api(self): # this will throw StopIteration if all proxies have been tried... + def init_twitter_api(self): # this will throw StopIteration if all proxies have been tried...
if (self.proxies): if (self.proxies):
try: try:
self.client_args['proxies'] = next(self.proxies)['proxy_dict'] # this will throw out self.client_args['proxies'] = next(self.proxies)['proxy_dict'] # this will throw out
#logger.info("client_args: %s"%json.dumps(self.client_args)) #logger.info("client_args: %s"%json.dumps(self.client_args))
except StopIteration as exc: except StopIteration as exc:
raise raise
except Exception as exc: except Exception as exc:
- self.init_user_api() + self.init_twitter_api()
- if (self.user_api): + if (self.twitter_api):
- del self.user_api + del self.twitter_api
#crawler_id=self.crawler_id, #crawler_id=self.crawler_id,
- self.user_api = User(apikeys=self.apikeys, client_args=self.client_args) + self.twitter_api = TwitterAPI(apikeys=self.apikeys, client_args=self.client_args)
def get_handlers(self): def get_handlers(self):
@@ -98,13 +98,19 @@ def run(self):
handler.flush_all() handler.flush_all()
else: else:
+ # figure out args first...
args = {} args = {}
if (command == 'CRAWL_TWEET'): if (command == 'CRAWL_TWEET'):
args = { args = {
"tweet_id": cmd['tweet_id'], "tweet_id": cmd['tweet_id'],
"write_to_handlers": self.handlers, "write_to_handlers": self.handlers,
"cmd_handlers" : [] "cmd_handlers" : []
} }
+ elif (command == 'SEARCH'):
+ args = {
+ "write_to_handlers": self.handlers,
+ "cmd_handlers" : []
+ }
else: else:
args = { args = {
"user_id": cmd['user_id'], "user_id": cmd['user_id'],
@@ -119,7 +125,25 @@ def run(self):
func = None func = None
if (command in ['CRAWL_USER_TIMELINE', 'CRAWL_TWEET']): if (command in ['CRAWL_USER_TIMELINE', 'CRAWL_TWEET']):
- func = getattr(self.user_api, self.tasks[command]) + func = getattr(self.twitter_api, self.tasks[command])
+ elif (command in ['SEARCH']):
+
+ if "lang" in cmd:
+ args['lang'] = cmd['lang']
+
+ if "geocode" in cmd:
+ args['geocode'] = cmd['geocode']
+
+ if "key" in cmd:
+ args['key'] = cmd['key']
+
+ #logger.info("new cmd: %s"%(cmd))
+ # q is required, otherwise let it fail...
+ if "query" in cmd:
+ args['query'] = cmd['query']
+ func = getattr(self.twitter_api, self.tasks[command])
+
+
elif (command in ['CRAWL_FRIENDS', 'CRAWL_FOLLOWERS']): elif (command in ['CRAWL_FRIENDS', 'CRAWL_FOLLOWERS']):
data_type = cmd['data_type'] data_type = cmd['data_type']
@@ -145,17 +169,20 @@ def run(self):
except Exception as exc: except Exception as exc:
logger.warn(exc) logger.warn(exc)
- func = getattr(self.user_api, self.tasks[command][data_type]) + func = getattr(self.twitter_api, self.tasks[command][data_type])
if func: if func:
try: try:
+ #logger.info(args)
func(**args) func(**args)
- del args['cmd_handlers'] + del args['cmd_handlers']
+ for handler in self.handlers:
+ handler.flush_all()
except Exception as exc: except Exception as exc:
logger.error("%s"%exc) logger.error("%s"%exc)
try: try:
- self.init_user_api() + self.init_twitter_api()
- except StopIteration as init_user_api_exc: + except StopIteration as init_twitter_api_exc:
# import exceptions # import exceptions
# if (isinstance(init_user_api_exc, exceptions.StopIteration)): # no more proxy to try... so kill myself... # if (isinstance(init_user_api_exc, exceptions.StopIteration)): # no more proxy to try... so kill myself...
for handler in self.handlers: for handler in self.handlers:
View
@@ -12,7 +12,7 @@
import json, copy, time import json, copy, time
from tweetf0rm.utils import full_stack, hash_cmd, md5, get_keys_by_min_value from tweetf0rm.utils import full_stack, hash_cmd, md5, get_keys_by_min_value
from tweetf0rm.proxies import proxy_checker from tweetf0rm.proxies import proxy_checker
-from process.user_relationship_crawler import UserRelationshipCrawler +from process.twitter_crawler import TwitterCrawler
#from handler.inmemory_handler import InMemoryHandler #from handler.inmemory_handler import InMemoryHandler
from handler import create_handler from handler import create_handler
from tweetf0rm.redis_helper import NodeCoordinator, NodeQueue, CrawlerQueue from tweetf0rm.redis_helper import NodeCoordinator, NodeQueue, CrawlerQueue
@@ -78,7 +78,7 @@ def new_crawler(self, node_id, apikeys, config, crawler_proxies = None):
if (not crawler_proxies): if (not crawler_proxies):
crawler_proxies = next(self.proxy_generator) if self.proxy_generator else None crawler_proxies = next(self.proxy_generator) if self.proxy_generator else None
- crawler = UserRelationshipCrawler(node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], redis_config=copy.copy(config['redis_config']), proxies=crawler_proxies) + crawler = TwitterCrawler(node_id, crawler_id, copy.copy(apikeys), handlers=[create_handler(file_handler_config)], redis_config=copy.copy(config['redis_config']), proxies=crawler_proxies)
if (crawler_id in self.crawlers): if (crawler_id in self.crawlers):
#self.crawlers[crawler_id].clear() #self.crawlers[crawler_id].clear()
Oops, something went wrong.

0 comments on commit e5c1183

Please sign in to comment.