Permalink
Browse files

pollers deal with spider queues too

  • Loading branch information...
1 parent c687904 commit 98a014a2766e22042f7252172b0fc58e56e91020 @dmclain committed Nov 16, 2012
@@ -1,2 +1,2 @@
-__version__ = (0, 1, 0, 'dev', 0)
+__version__ = (0, 1, 0)
View
@@ -1,5 +1,4 @@
from os import environ
-import urlparse
from twisted.application.service import Application
from twisted.application.internet import TimerService, TCPServer
@@ -10,33 +9,21 @@
IEnvironment)
from scrapyd.launcher import Launcher
from scrapyd.eggstorage import FilesystemEggStorage
-from scrapyd.poller import QueuePoller
from scrapyd.environ import Environment
from scrapyd.website import Root
from .scheduler import Psycopg2SpiderScheduler
+from .poller import Psycopg2QueuePoller
def application(config):
app = Application("Scrapyd")
- http_port = environ.get('PORT', config.getint('http_port', 6800))
- url = urlparse.urlparse(environ.get('DATABASE_URL'))
-
- # Remove query strings.
- path = url.path[1:]
- path = path.split('?', 2)[0]
-
- args = {
- 'dbname': path,
- 'user': url.username,
- 'password': url.password,
- 'host': url.hostname,
- 'port': url.port,
- }
-
- poller = QueuePoller(config)
+ http_port = int(environ.get('PORT', config.getint('http_port', 6800)))
+ config.cp.set('scrapyd', 'database_url', environ.get('DATABASE_URL'))
+
+ poller = Psycopg2QueuePoller(config)
eggstorage = FilesystemEggStorage(config)
- scheduler = Psycopg2SpiderScheduler(config, **args)
+ scheduler = Psycopg2SpiderScheduler(config)
environment = Environment(config)
app.setComponent(IPoller, poller)
View
@@ -0,0 +1,8 @@
+from scrapyd.poller import QueuePoller
+
+from .utils import get_spider_queues
+
+
+class Psycopg2QueuePoller(QueuePoller):
+ def update_projects(self):
+ self.queues = get_spider_queues(self.config)
View
@@ -2,16 +2,14 @@
from scrapyd.interfaces import ISpiderScheduler
-from .spiderqueue import Psycopg2SpiderQueue
-from .utils import get_project_list
+from .utils import get_spider_queues
class Psycopg2SpiderScheduler(object):
implements(ISpiderScheduler)
- def __init__(self, config, **pg_args):
+ def __init__(self, config):
self.config = config
- self.pg_args = pg_args
self.update_projects()
def schedule(self, project, spider_name, **spider_args):
@@ -22,7 +20,4 @@ def list_projects(self):
return self.queues.keys()
def update_projects(self):
- self.queues = {}
- for project in get_project_list(self.config):
- table = 'scrapy_%s_queue' % project
- self.queues[project] = Psycopg2SpiderQueue(table, **self.pg_args)
+ self.queues = get_spider_queues(self.config)
@@ -1,14 +1,27 @@
import psycopg2
import cPickle
import json
+import urlparse
from zope.interface import implements
from scrapyd.interfaces import ISpiderQueue
class Psycopg2PriorityQueue(object):
- def __init__(self, table='scrapy_queue', **kwargs):
- conn_string = ' '.join('%s=%s' % item for item in kwargs.items())
+ def __init__(self, config, table='scrapy_queue'):
+ url = urlparse.urlparse(config.get('database_url'))
+ # Remove query strings.
+ path = url.path[1:]
+ path = path.split('?', 2)[0]
+
+ args = {
+ 'dbname': path,
+ 'user': url.username,
+ 'password': url.password,
+ 'host': url.hostname,
+ 'port': url.port,
+ }
+ conn_string = ' '.join('%s=%s' % item for item in args.items())
self.table = table
self.conn = psycopg2.connect(conn_string)
q = "create table if not exists %s " \
@@ -98,8 +111,8 @@ def decode(self, text):
class Psycopg2SpiderQueue(object):
implements(ISpiderQueue)
- def __init__(self, table='spider_queue', **kwargs):
- self.q = JsonPsycopg2PriorityQueue(table, **kwargs)
+ def __init__(self, config, table='spider_queue'):
+ self.q = JsonPsycopg2PriorityQueue(config, table)
def add(self, name, **spider_args):
d = spider_args.copy()
View
@@ -1,18 +1,11 @@
-import os
-from ConfigParser import NoSectionError
+from scrapyd.utils import get_project_list
+from spiderqueue import Psycopg2SpiderQueue
-def get_project_list(config):
- """Get list of projects by inspecting the eggs dir and the ones defined in
- the scrapyd.conf [settings] section
- """
- eggs_dir = config.get('eggs_dir', 'eggs')
- if os.path.exists(eggs_dir):
- projects = os.listdir(eggs_dir)
- else:
- projects = []
- try:
- projects += [x[0] for x in config.cp.items('settings')]
- except NoSectionError:
- pass
- return projects
+
+def get_spider_queues(config):
+ queues = {}
+ for project in get_project_list(config):
+ table = 'scrapy_%s_queue' % project
+ queues[project] = Psycopg2SpiderQueue(config, table=table)
+ return queues

0 comments on commit 98a014a

Please sign in to comment.