Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

[fix] postgres: Per-socket authentication via socket pool.

  • Loading branch information...
commit 7752ae0ed6fe9c5224ede768b63d814bc7688a1a 1 parent aecb403
@nrs135 nrs135 authored
View
11 lib/stdlib/apis/apigenlib/apigenlib.opa
@@ -790,6 +790,7 @@ type ApigenLib.connection = {
ApigenLib.conf conf,
bool retain, // total mass
option(Socket.t) conn,
+ option(SocketPool.auth) auth,
SocketPool.t pool,
ApigenLib.read_packet read_packet
}
@@ -838,7 +839,7 @@ module ApilibConnection(Socket.host default_host) {
*/
function ApigenLib.connection init(family_name, name, secure, preamble, retain) {
conf = Conf.get_default(family_name, name)
- ~{ name, conf, retain, conn:{none},
+ ~{ name, conf, retain, conn:{none}, auth:{none},
pool:SocketPool.make_secure(conf.default_host,{hint:conf.bufsize, max:conf.poolmax, verbose:conf.verbose},secure,preamble),
read_packet:read_packet_prefixed(default_length)
}
@@ -863,9 +864,9 @@ module ApilibConnection(Socket.host default_host) {
match (conn.conn) {
case {some:_}: {success:conn};
case {none}:
- match (SocketPool.get(conn.pool)) {
+ match (SocketPool.get_auth(conn.pool)) {
case ~{failure}: {failure:{socket:failure}};
- case {success:c}: {success:{conn with conn:{some:c}}};
+ case {success:(c,auth)}: {success:{conn with conn:{some:c}, auth:{some:auth}}};
}
}
}
@@ -876,7 +877,9 @@ module ApilibConnection(Socket.host default_host) {
*/
function ApigenLib.connection release(ApigenLib.connection conn) {
match (conn.conn) {
- case {some:c}: SocketPool.release(conn.pool,c); {conn with conn:none};
+ case {some:c}:
+ SocketPool.release_auth(conn.pool,c,conn.auth?{authenticated:false, i1:-1, i2:-1});
+ {conn with conn:none, auth:none};
case {none}: conn;
}
}
View
189 lib/stdlib/apis/postgres/postgres.opa
@@ -186,8 +186,6 @@ type Postgres.connection = {
minor_version : int /** The minor version for the protocol */
dbase : string /** The name of the database to connected to */
params : stringmap(string) /** Map of parameter values returned during authentication */
- processid : int /** The processid for the connection at the server */
- secret_key : int /** The connection's secret data (used in cancel requests) */
query : string /** A note of the last query command */
status : string /** The last received status code from the server */
suspended : bool /** Whether an execution was suspended or not */
@@ -362,7 +360,7 @@ Postgres = {{
| {success=conn} ->
{success={ ~name ~secure ~conn ~dbase ssl_accepted=false
major_version=default_major_version minor_version=default_minor_version
- params=StringMap.empty processid=-1 secret_key=-1
+ params=StringMap.empty
query="" status="" suspended=false in_transaction=false error=none
empty=true completed=[] paramdescs=[] rows=0 rowdescs=[]
handlers=IntMap.empty backhandlers=StringMap.empty }}
@@ -388,7 +386,7 @@ Postgres = {{
then Pg.terminate({success=conn.conn})
else {success=conn.conn}
match Pg.close(sconn) with
- | {success=c} -> {success={conn with conn=c processid=-1 secret_key=-1}}
+ | {success=c} -> {success={conn with conn=c}}
| {~failure} -> {~failure}
/**
@@ -421,9 +419,12 @@ Postgres = {{
* @returns An optional object with the key data.
*/
keydata(conn:Postgres.connection) : option({processid:int; secret_key:int}) =
- if conn.processid != -1 && conn.secret_key != -1
- then {some={processid=conn.processid; secret_key=conn.secret_key}}
- else {none}
+ match conn.conn.auth with
+ | {some=auth} ->
+ if auth.authenticated && auth.i1 != -1 && auth.i2 != -1
+ then {some={processid=auth.i1; secret_key=auth.i2}}
+ else {none}
+ | {none} -> {none}
/** Return last status value.
*
@@ -518,7 +519,8 @@ Postgres = {{
| {success=(c,{ParameterStatus=(n,v)})} ->
loop({conn with conn=c; params=StringMap.add(n,v,conn.params)}, acc, f)
| {success=(c,{BackendKeyData=(processid,secret_key)})} ->
- loop({conn with conn=c; ~processid ~secret_key}, acc, f)
+ c = {c with auth={some={authenticated=true; i1=processid; i2=secret_key}}}
+ loop({conn with conn=c}, acc, f)
| {success=(c,{~CommandComplete})} ->
loop({conn with conn=c; completed=[CommandComplete|conn.completed]}, acc, f)
| {success=(c,{EmptyQueryResponse})} ->
@@ -557,7 +559,11 @@ Postgres = {{
end
@private init_conn(conn:Postgres.connection, query) : Postgres.connection =
- {conn with error=none; empty=false; suspended=false; completed=[]; rows=0; rowdescs=[]; paramdescs=[]; ~query}
+ // We now verify that the connection has been authenticated upon each command.
+ conn = authenticate(conn)
+ if Option.is_none(conn.error)
+ then {conn with error=none; empty=false; suspended=false; completed=[]; rows=0; rowdescs=[]; paramdescs=[]; ~query}
+ else conn
/* Failed attempt to get SSL connection. We do everything the docs say,
* we send the SSLRequest message, we get back "S" and then we call
@@ -595,6 +601,11 @@ Postgres = {{
| ~{failure} -> ~{failure}
end
+ is_authenticated(conn:Postgres.connection) : bool =
+ match conn.conn.auth with
+ | {some={authenticated={true}; i1=processid; i2=secret_key}} -> processid != -1 && secret_key != -1
+ | _ -> false
+
/** Authenticate with the PostgreSQL server.
*
* This function sends a [StartupMessage] message which includes the user and database names.
@@ -608,11 +619,23 @@ Postgres = {{
* @returns An updated connection with connection data installed.
*/
authenticate(conn:Postgres.connection) : Postgres.connection =
- conn = init_conn(conn, "authentication")
- version = Bitwise.lsl(Bitwise.land(conn.major_version,0xffff),16) + Bitwise.land(conn.minor_version,0xffff)
- match Pg.start({success=conn.conn}, (version, [("user",conn.conn.conf.user),("database",conn.dbase)])) with
- | {success=c} -> loop({conn with conn=c}, void, ignore_listener).f1
- | ~{failure} -> error(conn,{api_failure=failure}, void, ignore_listener).f1
+ // We have to ensure that the connection is pre-allocated here
+ // since Pg.start might be given a socket which is already authenticated.
+ match Pg.Conn.allocate(conn.conn) with
+ | {success=c} ->
+ conn = {conn with conn=c}
+ if is_authenticated(conn)
+ then conn
+ else
+ conn = {conn with error=none; empty=false; suspended=false; completed=[]; rows=0; rowdescs=[]; paramdescs=[];
+ query="authentication"}
+ version = Bitwise.lsl(Bitwise.land(conn.major_version,0xffff),16) + Bitwise.land(conn.minor_version,0xffff)
+ match Pg.start({success=conn.conn}, (version, [("user",conn.conn.conf.user),("database",conn.dbase)])) with
+ | {success=c} -> loop({conn with conn=c}, void, ignore_listener).f1
+ | ~{failure} -> error(conn,{api_failure=failure}, void, ignore_listener).f1
+ end
+ | ~{failure} -> error(conn,{api_failure=failure}, void, ignore_listener).f1
+ end
/** Issue simple query command and read back responses.
@@ -633,10 +656,13 @@ Postgres = {{
*/
query(conn:Postgres.connection, init, query, folder) =
conn = init_conn(conn, query)
- match Pg.query({success=conn.conn}, query) with
- | {success=c} -> loop({conn with conn=c}, init, folder)
- | ~{failure} -> error(conn, {api_failure=failure}, init, folder)
- end
+ if Option.is_none(conn.error)
+ then
+ match Pg.query({success=conn.conn}, query) with
+ | {success=c} -> loop({conn with conn=c}, init, folder)
+ | ~{failure} -> error(conn, {api_failure=failure}, init, folder)
+ end
+ else (conn,init)
/** Send a parse query message.
@@ -654,13 +680,16 @@ Postgres = {{
*/
parse(conn:Postgres.connection, name, query, oids) : Postgres.connection =
conn = init_conn(conn, "Parse({query},{name})")
- match Pg.parse({success=conn.conn}, (name,query,oids)) with
- | {success=c} ->
- conn = {conn with conn=c}
- final(conn, {final={success=conn}}, void, ignore_listener).f1
- | ~{failure} ->
- error(conn, {api_failure=failure}, void, ignore_listener).f1
- end
+ if Option.is_none(conn.error)
+ then
+ match Pg.parse({success=conn.conn}, (name,query,oids)) with
+ | {success=c} ->
+ conn = {conn with conn=c}
+ final(conn, {final={success=conn}}, void, ignore_listener).f1
+ | ~{failure} ->
+ error(conn, {api_failure=failure}, void, ignore_listener).f1
+ end
+ else conn
/** Bind parameters to a prepared statement and a portal.
*
@@ -676,13 +705,16 @@ Postgres = {{
params = List.map(serialize, params)
(codes, params) = List.unzip(params)
conn = init_conn(conn, "Bind({name},{portal})")
- match Pg.bind({success=conn.conn}, (portal, name, codes, params, [0])) with
- | {success=c} ->
- conn = {conn with conn=c}
- final(conn,{final={success=conn}}, void, ignore_listener).f1
- | ~{failure} ->
- error(conn, {api_failure=failure}, void, ignore_listener).f1
- end
+ if Option.is_none(conn.error)
+ then
+ match Pg.bind({success=conn.conn}, (portal, name, codes, params, [0])) with
+ | {success=c} ->
+ conn = {conn with conn=c}
+ final(conn,{final={success=conn}}, void, ignore_listener).f1
+ | ~{failure} ->
+ error(conn, {api_failure=failure}, void, ignore_listener).f1
+ end
+ else conn
/** Execute named portal.
*
@@ -697,13 +729,16 @@ Postgres = {{
execute(conn:Postgres.connection, init, portal, rows_to_return, folder) =
// should we fold on sync instead of execute ?
conn = init_conn(conn, "Execute({portal})")
- match Pg.execute({success=conn.conn},(portal,rows_to_return)) with
- | {success=c} ->
- conn = {conn with conn=c}
- loop(sync(conn), init, folder)
- | ~{failure} ->
- error(conn, {api_failure=failure}, init, folder)
- end
+ if Option.is_none(conn.error)
+ then
+ match Pg.execute({success=conn.conn},(portal,rows_to_return)) with
+ | {success=c} ->
+ conn = {conn with conn=c}
+ loop(sync(conn), init, folder)
+ | ~{failure} ->
+ error(conn, {api_failure=failure}, init, folder)
+ end
+ else (conn, init)
/** Describe portal or statement.
*
@@ -718,13 +753,16 @@ Postgres = {{
*/
describe(conn:Postgres.connection, sp:Postgres.sp, name) : Postgres.connection =
conn = init_conn(conn, "Describe({string_of_sp(sp)},{name})")
- match Pg.describe({success=conn.conn},(string_of_sp(sp),name)) with
- | {success=c} ->
- conn = {conn with conn=c}
- final(conn,{final={success=conn}}, void, ignore_listener).f1
- | ~{failure} ->
- error(conn,{api_failure=failure}, void, ignore_listener).f1
- end
+ if Option.is_none(conn.error)
+ then
+ match Pg.describe({success=conn.conn},(string_of_sp(sp),name)) with
+ | {success=c} ->
+ conn = {conn with conn=c}
+ final(conn,{final={success=conn}}, void, ignore_listener).f1
+ | ~{failure} ->
+ error(conn,{api_failure=failure}, void, ignore_listener).f1
+ end
+ else conn
/** Close a prepared statement or portal.
*
@@ -741,13 +779,16 @@ Postgres = {{
*/
closePS(conn:Postgres.connection, sp:Postgres.sp, name) : Postgres.connection =
conn = init_conn(conn, "Close({string_of_sp(sp)},{name})")
- match Pg.closePS({success=conn.conn},(string_of_sp(sp),name)) with
- | {success=c} ->
- conn = {conn with conn=c}
- final(conn, {final={success=conn}}, void, ignore_listener).f1
- | ~{failure} ->
- error(conn, {api_failure=failure}, void, ignore_listener).f1
- end
+ if Option.is_none(conn.error)
+ then
+ match Pg.closePS({success=conn.conn},(string_of_sp(sp),name)) with
+ | {success=c} ->
+ conn = {conn with conn=c}
+ final(conn, {final={success=conn}}, void, ignore_listener).f1
+ | ~{failure} ->
+ error(conn, {api_failure=failure}, void, ignore_listener).f1
+ end
+ else conn
/** Send [Sync] command and read back response data.
*
@@ -761,13 +802,16 @@ Postgres = {{
*/
sync(conn:Postgres.connection) : Postgres.connection =
conn = init_conn(conn, "Sync")
- match Pg.sync({success=conn.conn}) with
- | {success=c} ->
- conn = {conn with conn=c}
- final(conn, {final={success=conn}}, void, ignore_listener).f1
- | ~{failure} ->
- error(conn,{api_failure=failure}, void, ignore_listener).f1
- end
+ if Option.is_none(conn.error)
+ then
+ match Pg.sync({success=conn.conn}) with
+ | {success=c} ->
+ conn = {conn with conn=c}
+ final(conn, {final={success=conn}}, void, ignore_listener).f1
+ | ~{failure} ->
+ error(conn,{api_failure=failure}, void, ignore_listener).f1
+ end
+ else conn
/** Send a [Flush] command and read back response data.
*
@@ -779,13 +823,16 @@ Postgres = {{
*/
flush(conn:Postgres.connection) : Postgres.connection =
conn = init_conn(conn, "Flush")
- match Pg.flush({success=conn.conn}) with
- | {success=c} ->
- conn = {conn with conn=c}
- loop(conn, void, ignore_listener).f1
- | ~{failure} ->
- error(conn,{api_failure=failure}, void, ignore_listener).f1
- end
+ if Option.is_none(conn.error)
+ then
+ match Pg.flush({success=conn.conn}) with
+ | {success=c} ->
+ conn = {conn with conn=c}
+ loop(conn, void, ignore_listener).f1
+ | ~{failure} ->
+ error(conn,{api_failure=failure}, void, ignore_listener).f1
+ end
+ else conn
/** Send cancel request message on secondary channel.
*
@@ -802,12 +849,16 @@ Postgres = {{
* @returns An outcome of a void or an [Apigen.failure] object.
*/
cancel(conn:Postgres.connection) : Postgres.result =
- if conn.processid == -1 || conn.secret_key == -1
+ (processid, secret_key) =
+ match conn.conn.auth with
+ | {some={authenticated={true}; i1=processid; i2=secret_key}} -> (processid,secret_key)
+ | _ -> (-1,-1)
+ if processid == -1 || secret_key == -1
then {failure=(conn,{no_key})}
else
match connect(conn.name, conn.secure, conn.dbase) with
| {success=conn} ->
- match Pg.cancel({success=conn.conn},(conn.processid,conn.secret_key)) with
+ match Pg.cancel({success=conn.conn},(processid,secret_key)) with
| {success=c} ->
_ = close(conn, false)
{success={conn with conn=c}}
View
207 lib/stdlib/io/socket/socket_pool.opa
@@ -13,12 +13,25 @@
import stdlib.core.queue
/**
- * A SocketPool.result is either a socket or a string which describes the socket error.
+ * Authentication information.
+ * We should really parametrise the socket pool with this
+ * type but that would propagate for miles and miles.
+ * For now we just provide two general-purpose ints and a bool
+ * (see Postgres for the use of the ints).
*/
+type SocketPool.auth = { authenticated:bool; i1:int; i2:int }
+/**
+ * A SocketPool.result is either a socket or a string which describes the socket error.
+ */
type SocketPool.result = outcome(Socket.t, string)
/**
+ * A SocketPool result with an authorisation flag.
+ */
+type SocketPool.auth_result = outcome((Socket.t,SocketPool.auth), string)
+
+/**
* Some protocols have a preamble to trigger SSL authentication (see Postgres).
*/
type SocketPool.preamble = Socket.t -> SocketPool.result
@@ -37,21 +50,28 @@ type SocketPool.conf = {
max : int;
}
+@private type SocketPool.waiter =
+ {waiter:continuation(SocketPool.result)}
+ / {auth_waiter:continuation(SocketPool.auth_result)}
+
@private type SocketPool.state = {
host : Socket.host;
secure_type : option(SSL.secure_type);
preamble : option(SocketPool.preamble);
conf : SocketPool.conf;
sockets: list(Socket.t);
+ auths: list(SocketPool.auth);
allocated: list(Socket.t);
cnt: int;
- queue: Queue.t(continuation(SocketPool.result));
+ queue: Queue.t(SocketPool.waiter);
open_connections: intset;
}
@private type SocketPool.msg =
{get : continuation(SocketPool.result)}
+ / {get_auth : continuation(SocketPool.auth_result)}
/ {release : Socket.t}
+ / {release_auth : (Socket.t,SocketPool.auth)}
/ {reconnect : Socket.host}
/ {getconf : continuation(SocketPool.conf)}
/ {updconf : SocketPool.conf -> SocketPool.conf}
@@ -105,79 +125,104 @@ SocketPool = {{
void
#<End>
+ @private release_socket(state:SocketPool.state, connection:Socket.t, authenticated:SocketPool.auth): Session.instruction(SocketPool.state) =
+ do monitor("release", state)
+ conn_id = Socket.conn_id(connection.conn)
+ if List.exists((c -> Socket.conn_id(c.conn) == conn_id),state.allocated)
+ then
+ match Queue.rem(state.queue) with
+ | ({none}, _) ->
+ do Log.debug(state, "handler","socket back in pool {Socket.conn_id(connection.conn)}")
+ sockets = connection +> state.sockets
+ auths = authenticated +> state.auths
+ conn_id = Socket.conn_id(connection.conn)
+ allocated = List.filter((s -> Socket.conn_id(s.conn) != conn_id),state.allocated)
+ {set={state with ~sockets; ~auths; ~allocated}}
+ | ({some={~waiter}}, queue) ->
+ do Log.debug(state, "handler","reallocate socket {Socket.conn_id(connection.conn)}")
+ do Continuation.return(waiter, {success=connection})
+ {set={state with ~queue}}
+ | ({some={~auth_waiter}}, queue) ->
+ do Log.debug(state, "handler","reallocate socket {Socket.conn_id(connection.conn)}")
+ do Continuation.return(auth_waiter, {success=(connection,authenticated)})
+ {set={state with ~queue}}
+ end
+ else
+ do Log.debug(state, "handler","drop socket {Socket.conn_id(connection.conn)}")
+ state = socket_close(connection, state)
+ {set=state}
+
+ @private socket_return(k:SocketPool.waiter, connection, authenticated:SocketPool.auth) : void =
+ match k with
+ | {waiter=k} -> Continuation.return(k, {success=connection})
+ | {auth_waiter=k} -> Continuation.return(k, {success=(connection,authenticated)})
+ end
+
+ @private socket_error(k:SocketPool.waiter, failure:{failure:string}) : void =
+ match (k,failure) with
+ | ({waiter=k},{~failure}) -> Continuation.return(k, ~{failure})
+ | ({auth_waiter=k},{~failure}) -> Continuation.return(k, ~{failure})
+ end
+
+ @private get_socket(state:SocketPool.state, k:SocketPool.waiter): Session.instruction(SocketPool.state) =
+ do monitor("get", state)
+ match (state.sockets,state.auths) with
+ | ([connection|sockets],[authenticated|auths]) ->
+ do Log.debug(state, "handler","reuse open socket {Socket.conn_id(connection.conn)}")
+ do socket_return(k, connection, authenticated)
+ allocated = connection +> state.allocated
+ {set={state with ~sockets; ~auths; ~allocated}}
+ | (_,_) ->
+ if state.cnt >= state.conf.max
+ then
+ do Log.debug(state, "handler", "queue caller")
+ {set={state with queue=Queue.add(k, state.queue)}}
+ else
+ match
+ match state.secure_type with
+ | {some=secure_type} ->
+ match state.preamble with
+ | {some=preamble} ->
+ //match Socket.binary_secure_connect_with_err_cont(state.host.f1,state.host.f2,false,secure_type) with
+ match Socket.binary_connect_with_err_cont(state.host.f1,state.host.f2) with
+ | {success=conn} ->
+ match preamble({~conn; mbox=Mailbox.create(state.conf.hint)}) with
+ | {success=conn} ->
+ Socket.binary_secure_reconnect_with_err_cont(conn.conn,state.host.f1,state.host.f2,secure_type)
+ | ~{failure} -> ~{failure}
+ end
+ | ~{failure} -> ~{failure}
+ end
+ | {none} -> Socket.binary_secure_connect_with_err_cont(state.host.f1,state.host.f2,true,secure_type)
+ end
+ | {none} -> Socket.binary_connect_with_err_cont(state.host.f1,state.host.f2)
+ end
+ with
+ | {success=conn} ->
+ connection = {~conn; mbox=Mailbox.create(state.conf.hint)}
+ state = {state with open_connections=IntSet.add(Socket.conn_id(connection.conn),state.open_connections)}
+ do Log.debug(state, "handler", "successfully opened socket {Socket.conn_id(connection.conn)}")
+ do socket_return(k, connection, {authenticated=false; i1=-1; i2=-1})
+ allocated = connection +> state.allocated
+ {set={state with cnt=state.cnt+1; ~allocated}}
+ | {failure=str} ->
+ do Log.debug(state, "handler", "open socket failure: {str}")
+ do socket_error(k, {failure=str})
+ {unchanged}
+ end
+
@private pool_handler(state:SocketPool.state, msg:SocketPool.msg): Session.instruction(SocketPool.state) =
match msg with
- | {release=connection} ->
- do monitor("release", state)
- conn_id = Socket.conn_id(connection.conn)
- if List.exists((c -> Socket.conn_id(c.conn) == conn_id),state.allocated)
- then
- (match Queue.rem(state.queue) with
- | ({none}, _) ->
- do Log.debug(state, "handler","socket back in pool {Socket.conn_id(connection.conn)}")
- sockets = connection +> state.sockets
- conn_id = Socket.conn_id(connection.conn)
- allocated = List.filter((s -> Socket.conn_id(s.conn) != conn_id),state.allocated)
- {set={state with ~sockets; ~allocated}}
- | ({some=waiter}, queue) ->
- do Log.debug(state, "handler","reallocate socket {Socket.conn_id(connection.conn)}")
- do Continuation.return(waiter, {success=connection})
- {set={state with ~queue}})
- else
- do Log.debug(state, "handler","drop socket {Socket.conn_id(connection.conn)}")
- state = socket_close(connection, state)
- {set=state}
- | {get=k} ->
- do monitor("get", state)
- (match state.sockets with
- | [connection|sockets] ->
- do Log.debug(state, "handler","reuse open socket {Socket.conn_id(connection.conn)}")
- do Continuation.return(k, {success=connection})
- allocated = connection +> state.allocated
- {set={state with ~sockets; ~allocated}}
- | [] ->
- if state.cnt >= state.conf.max
- then
- do Log.debug(state, "handler", "queue caller")
- {set={state with queue=Queue.add(k, state.queue)}}
- else
- (match
- match state.secure_type with
- | {some=secure_type} ->
- match state.preamble with
- | {some=preamble} ->
- //match Socket.binary_secure_connect_with_err_cont(state.host.f1,state.host.f2,false,secure_type) with
- match Socket.binary_connect_with_err_cont(state.host.f1,state.host.f2) with
- | {success=conn} ->
- match preamble({~conn; mbox=Mailbox.create(state.conf.hint)}) with
- | {success=conn} ->
- Socket.binary_secure_reconnect_with_err_cont(conn.conn,state.host.f1,state.host.f2,secure_type)
- | ~{failure} -> ~{failure}
- end
- | ~{failure} -> ~{failure}
- end
- | {none} -> Socket.binary_secure_connect_with_err_cont(state.host.f1,state.host.f2,true,secure_type)
- end
- | {none} -> Socket.binary_connect_with_err_cont(state.host.f1,state.host.f2)
- end
- with
- | {success=conn} ->
- connection = {~conn; mbox=Mailbox.create(state.conf.hint)}
- state = {state with open_connections=IntSet.add(Socket.conn_id(connection.conn),state.open_connections)}
- do Log.debug(state, "handler", "successfully opened socket {Socket.conn_id(connection.conn)}")
- do Continuation.return(k, {success=connection})
- allocated = connection +> state.allocated
- {set={state with cnt=state.cnt+1; ~allocated}}
- | {failure=str} ->
- do Log.debug(state, "handler", "open socket failure: {str}")
- do Continuation.return(k, {failure=str})
- {unchanged}))
+ | {release=connection} -> release_socket(state, connection, {authenticated=false; i1=-1; i2=-1})
+ | {release_auth=(connection,authenticated)} -> release_socket(state, connection, authenticated)
+ | {get=k} -> get_socket(state, {waiter=k})
+ | {get_auth=k} -> get_socket(state, {auth_waiter=k})
| {reconnect=host} ->
do monitor("reconnect", state)
do Log.debug(state, "handler", "reconnect({host.f1}.{host.f2})")
state = List.fold(socket_close,state.sockets,state)
state = List.fold(socket_close,state.allocated,state)
- {set={state with ~host; cnt=0; sockets=[]; allocated=[]}}
+ {set={state with ~host; cnt=0; sockets=[]; auths=[]; allocated=[]}}
| {getconf=k} ->
do Continuation.return(k, state.conf)
{unchanged}
@@ -190,7 +235,7 @@ SocketPool = {{
do Log.debug(state, "handler","close socket pool")
state = List.fold(socket_close,state.sockets,state)
state = List.fold(socket_close,state.allocated,state)
- {set=state}
+ {set={state with auths=[]}}
| {stop} ->
do monitor("stop", state)
do Log.debug(state, "handler","stop socket pool")
@@ -203,6 +248,7 @@ SocketPool = {{
~host; ~conf; ~secure_type; ~preamble;
cnt=0;
sockets=[];
+ auths=[];
allocated=[];
queue=Queue.empty;
open_connections=IntSet.empty;
@@ -243,6 +289,14 @@ SocketPool = {{
@callcc(k -> Session.send(pool, {get=k}))
/**
+ * Same as get but also returns an authentication value associated with the socket.
+ * This is intended, for example, to attach an "authenticated" status to the socket
+ * for connections requiring external authentication.
+ */
+ get_auth(pool:SocketPool.t) : SocketPool.auth_result =
+ @callcc(k -> Session.send(pool, {get_auth=k}))
+
+ /**
* Same as get but the result is returned to the [callback]
* @param pool The pool of socket
* @param callback The callback which receives the result
@@ -254,6 +308,15 @@ SocketPool = {{
)
/**
+ * Same as get_async but with the authentication flag.
+ */
+ get_auth_async(pool:SocketPool.t, callback) =
+ @callcc(k ->
+ do Continuation.return(k, void)
+ callback(get_auth(pool))
+ )
+
+ /**
* Release [socket] into the pool. If the socket not coming from the [pool]
* then the socket is just close and the pool stay unchanged.
* @param socket The socket to release
@@ -263,6 +326,12 @@ SocketPool = {{
Session.send(pool, {release=socket})
/**
+ * Same as release but returns the authentication flag along with the socket.
+ */
+ release_auth(pool:SocketPool.t, socket:Socket.t, authenticated:SocketPool.auth) : void =
+ Session.send(pool, {release_auth=(socket,authenticated)})
+
+ /**
* Change the [host] where sockets of the pool are connected.
* @param pool The pool of socket
* @param host The new host to set
Please sign in to comment.
Something went wrong with that request. Please try again.