Skip to content

Commit

Permalink
add test for processor and impove coverage for fetcher
Browse files Browse the repository at this point in the history
fix cookie not work bug for fetcher
  • Loading branch information
binux committed Nov 10, 2014
1 parent c291de3 commit d09cf46
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 20 deletions.
5 changes: 4 additions & 1 deletion fetcher/tornado_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,17 @@ def header_callback(line):
request = tornado.httpclient.HTTPRequest(header_callback=header_callback, **fetch)
if cookie:
session.update(cookie)
request.headers.add('Cookie', self.session.get_cookie_header(request))
if 'Cookie' in request.headers:
del request.headers['Cookie']
request.headers['Cookie'] = session.get_cookie_header(request)
if self.async:
response = self.http_client.fetch(request, handle_response)
else:
return handle_response(self.http_client.fetch(request))
except tornado.httpclient.HTTPError as e:
return handle_response(e.response)
except Exception as e:
raise
result = {
'status_code': 599,
'error': '%r' % e,
Expand Down
8 changes: 4 additions & 4 deletions libs/base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ def every(minutes=NOTSET, seconds=NOTSET):
def wrapper(func):
@functools.wraps(func)
def on_cronjob(self, response, task):
if response.save and 'tick' in response.save and response.save['tick'] % (minutes * 60 + seconds) == 0:
function = func.__get__(self, self.__class__)
return self._run_func(function, response, task)
return None
if response.save and 'tick' in response.save and response.save['tick'] % (minutes * 60 + seconds) != 0:
return None
function = func.__get__(self, self.__class__)
return self._run_func(function, response, task)
on_cronjob.is_cronjob = True
on_cronjob.tick = minutes * 60 + seconds
return on_cronjob
Expand Down
6 changes: 4 additions & 2 deletions processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,17 @@ def on_task(self, task, response):
ret.result, len(ret.follows), len(ret.messages), ret.exception))
return True

def quit(self):
self._quit = True

def run(self):
while not self._quit:
try:
task, response = self.inqueue.get()
task, response = self.inqueue.get(timeout=1)
self._check_projects(task)
self.on_task(task, response)
self._exceptions = 0
except Queue.Empty as e:
time.sleep(1)
continue
except KeyboardInterrupt:
break
Expand Down
71 changes: 58 additions & 13 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import time
import json
import logging
import xmlrpclib
import cPickle as pickle
import unittest2 as unittest
from multiprocessing import Queue

from libs import utils
from fetcher.tornado_fetcher import Fetcher

class TestTaskDB(unittest.TestCase):
class TestFetcher(unittest.TestCase):
sample_task_http = {
'taskid': 'taskid',
'project': 'project',
'url': 'http://httpbin.org/get',
'url': 'http://echo.opera.com/',
'fetch': {
'method': 'GET',
'headers': {
Expand All @@ -32,31 +35,73 @@ class TestTaskDB(unittest.TestCase):
'save': [1, 2, 3],
},
}
def setUp(self):
self.fetcher = Fetcher(None, None)
@classmethod
def setUpClass(self):
self.inqueue = Queue(10)
self.outqueue = Queue(10)
self.fetcher = Fetcher(self.inqueue, self.outqueue)
self.rpc = xmlrpclib.ServerProxy('http://localhost:%d' % 24444)
self.xmlrpc_thread = utils.run_in_thread(self.fetcher.xmlrpc_run, port=24444)
self.thread = utils.run_in_thread(self.fetcher.run)

def tearDown(self):
self.fetcher.quit()
@classmethod
def tearDownClass(self):
self.rpc._quit()
self.thread.join()

def test_http_get(self):
def test_10_http_get(self):
result = self.fetcher.sync_fetch(self.sample_task_http)
self.assertEqual(result['status_code'], 200)
self.assertEqual(result['orig_url'], self.sample_task_http['url'])
self.assertEqual(result['save'], self.sample_task_http['fetch']['save'])
self.assertIn('content', result)

content = json.loads(result['content'])
self.assertIn('headers', content)
self.assertIn('A', content['headers'])
self.assertIn('Cookie', content['headers'])
self.assertEqual(content['headers']['Cookie'], 'a=b')
content = result['content']
self.assertIn('..A:', content)
self.assertIn('..Cookie:', content)
self.assertIn('a=b', content)

def test_dataurl_get(self):
def test_10_http_post(self):
request = dict(self.sample_task_http)
request['fetch']['method'] = 'POST'
request['fetch']['data'] = 'binux'
request['fetch']['cookies'] = {'c': 'd'}
result = self.fetcher.sync_fetch(request)
self.assertEqual(result['status_code'], 200)
self.assertEqual(result['orig_url'], self.sample_task_http['url'])
self.assertEqual(result['save'], self.sample_task_http['fetch']['save'])
self.assertIn('content', result)

content = result['content']
self.assertIn('<h2>POST', content)
self.assertIn('..A:', content)
self.assertIn('..Cookie:', content)
# FIXME: cookies in headers not supported
self.assertNotIn('a=b', content)
self.assertIn('c=d', content)
self.assertIn('binux', content)

def test_20_dataurl_get(self):
data = dict(self.sample_task_http)
data['url'] = 'data:,hello';
result = self.fetcher.sync_fetch(data)
self.assertEqual(result['status_code'], 200)
self.assertIn('content', result)
self.assertEqual(result['content'], 'hello')

def test_30_with_queue(self):
data = dict(self.sample_task_http)
data['url'] = 'data:,hello';
self.inqueue.put(data)
task, result = self.outqueue.get()
self.assertEqual(result['status_code'], 200)
self.assertIn('content', result)
self.assertEqual(result['content'], 'hello')

def test_40_with_rpc(self):
data = dict(self.sample_task_http)
data['url'] = 'data:,hello';
result = pickle.loads(self.rpc.fetch(data).data)
self.assertEqual(result['status_code'], 200)
self.assertIn('content', result)
self.assertEqual(result['content'], 'hello')
88 changes: 88 additions & 0 deletions test/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import os
import time
import unittest2 as unittest
import logging.config
logging.config.fileConfig("logging.conf")

from processor.processor import build_module
class TestProjectModule(unittest.TestCase):
Expand Down Expand Up @@ -172,3 +174,89 @@ def test_20_get_info(self):
for each in ret.follows:
self.assertEqual(each['url'], 'data:,on_get_info')
self.assertEqual(each['fetch']['save']['min_tick'] , 10)

import shutil
from multiprocessing import Queue
from database.sqlite import projectdb
from processor.processor import Processor
from libs.utils import run_in_subprocess, run_in_thread
class TestProcessor(unittest.TestCase):
projectdb_path = './test/data/project.db'

@classmethod
def setUpClass(self):
shutil.rmtree('./test/data/', ignore_errors=True)
os.makedirs('./test/data/')

def get_projectdb():
return projectdb.ProjectDB(self.projectdb_path)
self.projectdb = get_projectdb()
self.in_queue = Queue(10)
self.status_queue = Queue(10)
self.newtask_queue = Queue(10)
self.result_queue = Queue(10)

def run_processor():
self.processor = Processor(get_projectdb(), self.in_queue,
self.status_queue, self.newtask_queue, self.result_queue)
self.processor.CHECK_PROJECTS_INTERVAL = 0.1
self.processor.run()
self.process = run_in_thread(run_processor)
time.sleep(1)

@classmethod
def tearDownClass(self):
if self.process.is_alive():
self.processor.quit()
self.process.join(2)
assert not self.process.is_alive()
shutil.rmtree('./test/data/', ignore_errors=True)

def test_10_update_project(self):
self.assertEqual(len(self.processor.projects), 0)
self.projectdb.insert('test_project', {
'name': 'test_project',
'group': 'group',
'status': 'TODO',
'script': open('libs/sample_handler.py', 'r').read(),
'comments': 'test project',
'rate': 1.0,
'burst': 10,
})

task = {
"process": {
"callback": "on_start"
},
"project": "not_exists",
"taskid": "data:,on_start",
"url": "data:,on_start"
}
self.in_queue.put((task, {}))
time.sleep(1)
self.assertTrue(self.status_queue.empty())
self.assertEqual(len(self.processor.projects), 1)

def test_30_new_task(self):
self.assertTrue(self.status_queue.empty())
self.assertTrue(self.newtask_queue.empty())
task = {
"process": {
"callback": "on_start"
},
"project": "test_project",
"taskid": "data:,on_start",
"url": "data:,on_start"
}
fetch_result = {
"orig_url": "data:,on_start",
"content": "on_start",
"headers": {},
"status_code": 200,
"url": "data:,on_start",
"time": 0,
}
self.in_queue.put((task, fetch_result))
time.sleep(1)
self.assertFalse(self.status_queue.empty())
self.assertFalse(self.newtask_queue.empty())

0 comments on commit d09cf46

Please sign in to comment.