22from __future__ import absolute_import , division , print_function
33
44from collections import deque
5- import sys
65import warnings
76
87from tornado .ioloop import IOLoop
98from tornado .gen import coroutine , Return
109from tornado .concurrent import Future
10+
1111from tornado_mysql import connect
12+ from tornado_mysql .connections import Connection
1213
1314
1415DEBUG = False
@@ -32,52 +33,85 @@ def __init__(self,
3233 connect_kwargs ,
3334 max_idle_connections = 1 ,
3435 max_recycle_sec = 3600 ,
36+ max_open_connections = 0 ,
3537 io_loop = None ,
3638 ):
3739 """
3840 :param dict connect_kwargs: kwargs for tornado_mysql.connect()
3941 :param int max_idle_connections: Max number of keeping connections.
4042 :param int max_recycle_sec: How long connections are recycled.
43+ :param int max_open_connections:
44+ Max number of opened connections. 0 means no limit.
4145 """
4246 connect_kwargs ['autocommit' ] = True
4347 self .io_loop = io_loop or IOLoop .current ()
4448 self .connect_kwargs = connect_kwargs
45- self .max_idle_connections = max_idle_connections
49+ self .max_idle = max_idle_connections
50+ self .max_open = max_open_connections
4651 self .max_recycle_sec = max_recycle_sec
4752
4853 self ._opened_conns = 0
4954 self ._free_conn = deque ()
55+ self ._waitings = deque ()
56+
57+ def stat (self ):
58+ """Returns (opened connections, free connections, waiters)"""
59+ return (self ._opened_conns , len (self ._free_conn ), len (self ._waitings ))
5060
5161 def _get_conn (self ):
5262 now = self .io_loop .time ()
63+
64+ # Try to reuse in free pool
5365 while self ._free_conn :
5466 conn = self ._free_conn .popleft ()
5567 if now - conn .connected_time > self .max_recycle_sec :
5668 self ._close_async (conn )
5769 continue
58- _debug ("Reusing connection from pool (opened=%d)" % ( self ._opened_conns , ))
70+ _debug ("Reusing connection from pool:" , self .stat ( ))
5971 fut = Future ()
6072 fut .set_result (conn )
6173 return fut
6274
63- self ._opened_conns += 1
64- _debug ("Creating new connection (opened=%d)" % (self ._opened_conns ,))
65- return connect (** self .connect_kwargs )
75+ # Open new connection
76+ if self .max_open and self ._opened_conns < self .max_open :
77+ self ._opened_conns += 1
78+ _debug ("Creating new connection:" , self .stat ())
79+ return connect (** self .connect_kwargs )
80+
81+ # Wait to other connection is released.
82+ fut = Future ()
83+ self ._waitings .append (fut )
84+ return fut
6685
6786 def _put_conn (self , conn ):
68- if (len (self ._free_conn ) < self .max_idle_connections and
87+ if (len (self ._free_conn ) < self .max_idle and
6988 self .io_loop .time () - conn .connected_time < self .max_recycle_sec ):
70- self ._free_conn .append (conn )
89+ if self ._waitings :
90+ fut = self ._waitings .popleft ()
91+ fut .set_result (conn )
92+ _debug ("Passing returned connection to waiter:" , self .stat ())
93+ else :
94+ self ._free_conn .append (conn )
95+ _debug ("Add conn to free pool:" , self .stat ())
7196 else :
7297 self ._close_async (conn )
7398
7499 def _close_async (self , conn ):
75- self .io_loop .add_future (conn .close_async (), callback = lambda f : None )
76- self ._opened_conns -= 1
100+ self .io_loop .add_future (conn .close_async (), callback = self ._after_close )
77101
78102 def _close_conn (self , conn ):
79103 conn .close ()
80- self ._opened_conns -= 1
104+ self ._after_close ()
105+
106+ def _after_close (self , fut = None ):
107+ if self ._waitings :
108+ fut = self ._waitings .popleft ()
109+ conn = Connection (** self .connect_kwargs )
110+ cf = conn .connect ()
111+ self .io_loop .add_future (cf , callback = lambda f : fut .set_result (conn ))
112+ else :
113+ self ._opened_conns -= 1
114+ _debug ("Connection closed:" , self .stat ())
81115
82116 @coroutine
83117 def execute (self , query , params = None ):
@@ -94,11 +128,11 @@ def execute(self, query, params=None):
94128 cur = conn .cursor ()
95129 yield cur .execute (query , params )
96130 yield cur .close ()
97- self ._put_conn (conn )
98131 except :
99- self ._opened_conns -= 1
100- conn .close ()
132+ self ._close_conn (conn )
101133 raise
134+ else :
135+ self ._put_conn (conn )
102136 raise Return (cur )
103137
104138 @coroutine
@@ -111,7 +145,11 @@ def begin(self):
111145 :rtype: Future
112146 """
113147 conn = yield self ._get_conn ()
114- yield conn .begin ()
148+ try :
149+ yield conn .begin ()
150+ except :
151+ self ._close_conn (conn )
152+ raise
115153 trx = Transaction (self , conn )
116154 raise Return (trx )
117155
0 commit comments