Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

refactor backend support. Now backend are just modules that you can pass

to the pool object.
  • Loading branch information...
commit e50023d62ea75fa60a3308bbc7a03a5b44f6e6c8 1 parent 31f03e6
@benoitc authored
View
9 examples/test_eventlet.py
@@ -1,6 +1,7 @@
import eventlet
-from socketpool.epool import EConnectionPool, ESocketConnector
+from socketpool.pool import ConnectionPool
+from socketpool.conn import SocketConnector
# this handler will be run for each incoming connection in a dedicated greenlet
@@ -49,14 +50,12 @@ def stop(self):
import time
options = {'host': 'localhost', 'port': 6000}
- pool = EConnectionPool(factory=ESocketConnector, options=options)
+ pool = ConnectionPool(factory=SocketConnector, options=options,
+ backend="eventlet")
server = EchoServer('localhost', 6000)
server.start()
epool = eventlet.GreenPool()
-
-
-
def runpool(data):
print 'ok'
with pool.connection() as conn:
View
7 examples/test_gevent.py
@@ -1,8 +1,8 @@
import gevent
from gevent.server import StreamServer
-from socketpool.gpool import GConnectionPool, GSocketConnector
-
+from socketpool.pool import ConnectionPool
+from socketpool.conn import SocketConnector
# this handler will be run for each incoming connection in a dedicated greenlet
def echo(sock, address):
@@ -21,7 +21,8 @@ def echo(sock, address):
import time
options = {'host': 'localhost', 'port': 6000}
- pool = GConnectionPool(factory=GSocketConnector, options=options)
+ pool = ConnectionPool(factory=SocketConnector, options=options,
+ backend="gevent")
server = StreamServer(('localhost', 6000), echo)
gevent.spawn(server.serve_forever)
View
BIN  socketpool/.DS_Store
Binary file not shown
View
18 socketpool/epool.py → socketpool/backend_eventlet.py
@@ -6,14 +6,16 @@
import eventlet
from eventlet.green import socket
-from eventlet.queue import PriorityQueue
-
+from eventlet import queue
from socketpool.pool import ConnectionPool
from socketpool.conn import SocketConnector
-class IPriorityQueue(PriorityQueue):
+sleep = eventlet.sleep
+Socket = socket.socket
+
+class PriorityQueue(queue.PriorityQueue):
def __iter__(self):
return self
@@ -25,7 +27,7 @@ def next(self):
raise StopIteration
return result
-class EventletConnectionReaper(object):
+class ConnectionReaper(object):
running = False
@@ -53,11 +55,3 @@ def _exec(self):
def ensure_started(self):
if not self.running:
self.start()
-
-class EConnectionPool(ConnectionPool):
- QUEUE_CLASS = IPriorityQueue
- SLEEP = eventlet.sleep
- REAPER_CLASS = EventletConnectionReaper
-
-class ESocketConnector(SocketConnector):
- SOCKET_CLASS = socket.socket
View
19 socketpool/gpool.py → socketpool/backend_gevent.py
@@ -5,13 +5,18 @@
import gevent
from gevent import socket
-from gevent.queue import PriorityQueue
-
+from gevent import queue
from socketpool.pool import ConnectionPool
from socketpool.conn import SocketConnector
-class GeventConnectionReaper(gevent.Greenlet):
+
+sleep = gevent.sleep
+PriorityQueue = queue.PriorityQueue
+Socket = socket.socket
+
+
+class ConnectionReaper(gevent.Greenlet):
running = False
@@ -29,11 +34,3 @@ def _run(self):
def ensure_started(self):
if not self.running or self.ready():
self.start()
-
-class GConnectionPool(ConnectionPool):
- QUEUE_CLASS = PriorityQueue
- SLEEP = gevent.sleep
- REAPER_CLASS = GeventConnectionReaper
-
-class GSocketConnector(SocketConnector):
- SOCKET_CLASS = socket.socket
View
52 socketpool/backend_thread.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -
+#
+# This file is part of socketpool released under the MIT license.
+# See the NOTICE for more information.
+
+import socket
+import threading
+import time
+
+try:
+ import Queue as queue
+except ImportError: # py3
+ import queue
+
+Socket = socket.socket
+sleep = time.sleep
+
+class PriorityQueue(queue.PriorityQueue):
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ try:
+ result = self.get()
+ except queue.Empty:
+ raise StopIteration
+ return result
+
+class ConnectionReaper(threading.Thread):
+ """ connection reaper thread. Open a thread that will murder iddle
+ connections after a delay """
+
+ running = False
+
+ def __init__(self, pool, delay=600):
+ self.pool = pool
+ self.delay = delay
+ threading.Thread.__init__(self)
+ self.setDaemon(True)
+
+ def run(self):
+ self.running = True
+ while True:
+ time.sleep(self.delay)
+ self.pool.murder_connections()
+
+ def ensure_started(self):
+ if not self.running and not self.isAlive():
+ self.start()
+
+
View
6 socketpool/conn.py
@@ -25,10 +25,8 @@ def invalidate(self):
class SocketConnector(Connector):
- SOCKET_CLASS = socket.socket
-
- def __init__(self, host, port):
- self._s = self.SOCKET_CLASS(socket.AF_INET, socket.SOCK_STREAM)
+ def __init__(self, host, port, backend_mod):
+ self._s = backend_mod.Socket(socket.AF_INET, socket.SOCK_STREAM)
self._s.connect((host, port))
self.host = host
self.port = port
View
62 socketpool/pool.py
@@ -5,65 +5,27 @@
import contextlib
import sys
-import threading
-
-try:
- from Queue import Empty, PriorityQueue
-except ImportError: # py3
- from queue import Empty, PriorityQueue
-
import time
+from socketpool.util import load_backend
+
class MaxTriesError(Exception):
pass
-class IPriorityQueue(PriorityQueue):
-
- def __iter__(self):
- return self
-
- def next(self):
- try:
- result = self.get()
- except Empty:
- raise StopIteration
- return result
-
-class ConnectionReaper(threading.Thread):
- """ connection reaper thread. Open a thread that will murder iddle
- connections after a delay """
-
- running = False
-
- def __init__(self, pool, delay=600):
- self.pool = pool
- self.delay = delay
- threading.Thread.__init__(self)
- self.setDaemon(True)
-
- def run(self):
- self.running = True
- while True:
- time.sleep(self.delay)
- self.pool.murder_connections()
-
- def ensure_started(self):
- if not self.running and not self.isAlive():
- self.start()
-
class ConnectionPool(object):
- QUEUE_CLASS = IPriorityQueue
- SLEEP = time.sleep
- REAPER_CLASS = ConnectionReaper
def __init__(self, factory,
retry_max=3, retry_delay=.1,
timeout=-1, max_lifetime=600.,
max_size=10, options=None,
- reap_connections=True):
+ reap_connections=True,
+ backend="thread"):
+
+ self.backend_mod = load_backend(backend)
+
self.max_size = max_size
- self.pool = self.QUEUE_CLASS()
+ self.pool = self.backend_mod.PriorityQueue()
self.size = 0
self.factory = factory
self.retry_max = retry_max
@@ -71,9 +33,10 @@ def __init__(self, factory,
self.timeout = timeout
self.max_lifetime = max_lifetime
if options is None:
- self.options = {}
+ self.options = {"backend_mod": self.backend_mod}
else:
self.options = options
+ self.options["backend_mod"] = self.backend_mod
self._reaper = None
if reap_connections:
@@ -90,7 +53,8 @@ def murder_connections(self):
pool.put((priority, candidate))
def start_reaper(self):
- self._reaper = ConnectionReaper(self, delay=self.max_lifetime)
+ self._reaper = self.backend_mod.ConnectionReaper(self,
+ delay=self.max_lifetime)
self._reaper.ensure_started()
@@ -145,7 +109,7 @@ def get(self, **options):
return new_item
tries += 1
- self.SLEEP(self.retry_delay)
+ self.backend_mod.sleep(self.retry_delay)
if last_error is None:
raise MaxTriesError()
View
64 socketpool/util.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -
+#
+# This file is part of socketpool released under the MIT license.
+# See the NOTICE for more information.
+
+try:
+ from importlib import import_module
+except ImportError:
+ import sys
+ import imp
+
+ def _resolve_name(name, package, level):
+ """Return the absolute name of the module to be imported."""
+ if not hasattr(package, 'rindex'):
+ raise ValueError("'package' not set to a string")
+ dot = len(package)
+ for x in xrange(level, 1, -1):
+ try:
+ dot = package.rindex('.', 0, dot)
+ except ValueError:
+ raise ValueError("attempted relative import beyond top-level "
+ "package")
+ return "%s.%s" % (package[:dot], name)
+
+
+ def import_module(name, package=None):
+ """Import a module.
+
+ The 'package' argument is required when performing a relative import. It
+ specifies the package to use as the anchor point from which to resolve the
+ relative import to an absolute import.
+
+ """
+ if name.startswith('.'):
+ if not package:
+ raise TypeError("relative imports require the 'package' argument")
+ level = 0
+ for character in name:
+ if character != '.':
+ break
+ level += 1
+ name = _resolve_name(name[level:], package, level)
+ return imp.load_source(name, name)
+
+def load_backend(backend_name):
+ """ load pool backend. If this is an external module it should be
+ passed as "somelib.backend_mod", for socketpool backend you can just
+ pass the name.
+
+ Supported backend are :
+ - thread: connection are maintained in a threadsafe queue.
+ - gevent: support gevent
+ - eventlet: support eventlet
+
+ """
+ try:
+ if len(backend_name.split(".")) > 1:
+ mod = import_module(backend_name)
+ else:
+ mod = import_module("socketpool.backend_%s" % backend_name)
+ return mod
+ except ImportError:
+ error_msg = "%s isn't a socketpool backend" % backend_name
+ raise ImportError(error_msg)
Please sign in to comment.
Something went wrong with that request. Please try again.