Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 96 lines (80 sloc) 2.731 kB
62c82cc @igrigorik connection pool for sync / async connection cases via fibers
authored
1 module EventMachine
2 module Synchrony
3
4 class ConnectionPool
0d176e9 @igrigorik connection pool should proxy "send" calls to objects, closes #110
authored
5 undef :send
6
62c82cc @igrigorik connection pool for sync / async connection cases via fibers
authored
7 def initialize(opts, &block)
8 @reserved = {} # map of in-progress connections
9 @available = [] # pool of free connections
10 @pending = [] # pending reservations (FIFO)
11
12 opts[:size].times do
13 @available.push(block.call) if block_given?
14 end
15 end
16
17 # Choose first available connection and pass it to the supplied
18 # block. This will block indefinitely until there is an available
19 # connection to service the request.
20 def execute(async)
21 f = Fiber.current
22
23 begin
24 conn = acquire(f)
25 yield conn
26 ensure
27 release(f) if not async
28 end
29 end
055bf3f added 'pool_status' method
Dirk Bolte authored
30
31 # Returns current pool utilization.
32 #
33 # @return [Hash] Current utilization.
34 def pool_status
35 {
36 available: @available.size,
37 reserved: @reserved.size,
38 pending: @pending.size
39 }
40 end
62c82cc @igrigorik connection pool for sync / async connection cases via fibers
authored
41
42 private
43
44 # Acquire a lock on a connection and assign it to executing fiber
45 # - if connection is available, pass it back to the calling block
46 # - if pool is full, yield the current fiber until connection is available
47 def acquire(fiber)
48 if conn = @available.pop
49 @reserved[fiber.object_id] = conn
50 conn
51 else
52 Fiber.yield @pending.push fiber
53 acquire(fiber)
54 end
55 end
56
57 # Release connection assigned to the supplied fiber and
87d9df0 @igrigorik shuffle names & requires
authored
58 # resume any other pending connections (which will
62c82cc @igrigorik connection pool for sync / async connection cases via fibers
authored
59 # immediately try to run acquire on the pool)
60 def release(fiber)
61 @available.push(@reserved.delete(fiber.object_id))
62
2becf13 @ssoroka Pending connection pool claims to be FIFO, but was actually FILO, and…
ssoroka authored
63 if pending = @pending.shift
62c82cc @igrigorik connection pool for sync / async connection cases via fibers
authored
64 pending.resume
65 end
66 end
67
68 # Allow the pool to behave as the underlying connection
69 #
70 # If the requesting method begins with "a" prefix, then
71 # hijack the callbacks and errbacks to fire a connection
72 # pool release whenever the request is complete. Otherwise
73 # yield the connection within execute method and release
74 # once it is complete (assumption: fiber will yield until
75 # data is available, or request is complete)
76 #
ea71a5d @igrigorik pass the block through in connection pool
authored
77 def method_missing(method, *args, &blk)
62c82cc @igrigorik connection pool for sync / async connection cases via fibers
authored
78 async = (method[0,1] == "a")
79
80 execute(async) do |conn|
0d176e9 @igrigorik connection pool should proxy "send" calls to objects, closes #110
authored
81 df = conn.__send__(method, *args, &blk)
62c82cc @igrigorik connection pool for sync / async connection cases via fibers
authored
82
83 if async
84 fiber = Fiber.current
85 df.callback { release(fiber) }
86 df.errback { release(fiber) }
87 end
87d9df0 @igrigorik shuffle names & requires
authored
88
62c82cc @igrigorik connection pool for sync / async connection cases via fibers
authored
89 df
90 end
91 end
92 end
93
94 end
95 end
Something went wrong with that request. Please try again.