Skip to content
Browse files

[enhance] protocols: Added draft socketpool code.

  • Loading branch information...
1 parent e9e34a3 commit fb2c9bd7f1efaf918c4019828539fc23c5a9a28c @nrs135 nrs135 committed Oct 26, 2012
Showing with 362 additions and 0 deletions.
  1. +91 −0 lib/stdlib/apis/protocols/common.opa
  2. +77 −0 lib/stdlib/apis/protocols/log.opa
  3. +194 −0 lib/stdlib/apis/protocols/socketpool.opa
View
91 lib/stdlib/apis/protocols/common.opa
@@ -0,0 +1,91 @@
+/*
+ Copyright © 2011, 2012 MLstate
+
+ This file is part of Opa.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+/**
+ * Logging support for Protocols driver
+ *
+ * @destination public
+ * @stabilization work in progress
+ **/
+
+/**
+ * {1 About this module}
+ *
+ * Module [ProtocolsLog] contains support for debugging and logging for the Protocols driver.
+ *
+ * {1 Where should I start?}
+ *
+ * {1 What if I need more?}
+ *
+ **/
+//package stdlib.apis.protocols
+package protocols
+
+type Protocols.params = {
+ pool_max:int;
+ log:bool;
+}
+
+ProtocolsCommon = {{
+
+ @private PL = ProtocolsLog
+
+ @private init_params = ({
+ pool_max=2;
+ log=false;
+ }:Protocols.params)
+
+ @private params = Mutable.make(init_params:Protocols.params)
+ @private params_done = Mutable.make(false)
+
+ @private
+ get_params = ->
+ do if not(params_done.get())
+ then params.set(CommandLine.filter({
+ title = "Protocols parameters";
+ init = params.get() : Protocols.params;
+ anonymous = [];
+ parsers = [
+ {CommandLine.default_parser with
+ names = ["--protocols-socket-pool", "--protocolssocketpool", "--pp", "-pp"]
+ description = "Number of sockets in socket pool (>=2 enables socket pool)"
+ param_doc = "<int>"
+ on_param(p) = parser n={Rule.natural} -> {no_params={p with pool_max=Int.max(n,1)}}
+ },
+ {CommandLine.default_parser with
+ names = ["--protocols-log", "--protocolslog", "--pl", "-pl"]
+ description = "Enable ProtocolsLog logging"
+ param_doc = "<bool>"
+ on_param(p) = parser b={Rule.bool} -> {no_params={p with log=b}}
+ },
+ {CommandLine.default_parser with
+ names = ["--protocols-log-type", "--protocolslogtype", "--pt", "-pt"]
+ description = "Type of logging: stdout, stderr, logger, none"
+ param_doc = "<string>"
+ on_param(p) = parser s={Rule.consume} ->
+ logtype =
+ ((match s with
+ | "stdout" -> {stdout}
+ | "stderr" -> {stderr}
+ | "logger" -> {logger}
+ | "none" | "nolog" -> {nolog}
+ | _ -> PL.fatal("Protocols.get_params","Unknown Protocols log type string {s}",-1)):Protocols.logtype)
+ do PL.logtype.set(logtype)
+ {no_params=p}
+ }
+ ];
+ }))
+ params_done.set(true)
+
+}}
+
+
View
77 lib/stdlib/apis/protocols/log.opa
@@ -0,0 +1,77 @@
+/*
+ Copyright © 2011, 2012 MLstate
+
+ This file is part of Opa.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+
+/**
+ * Logging support for Protocols driver
+ *
+ * @destination public
+ * @stabilization work in progress
+ **/
+
+/**
+ * {1 About this module}
+ *
+ * Module [ProtocolsLog] contains support for debugging and logging for the Protocols driver.
+ *
+ * {1 Where should I start?}
+ *
+ * {1 What if I need more?}
+ *
+ **/
+
+/**
+ * Log functions for Protocols driver.
+ *
+ * We can choose various logging methods but these apply globally
+ * to all Protocols instances in a running program.
+ **/
+
+/**
+ * Type values for ProtocolsLog, we can log to stdout, stderr or to
+ * the OPA Logger functions, or we can switch logging off altogether.
+ **/
+type Protocols.logtype = {stdout} / {stderr} / {logger} / {nolog}
+
+ProtocolsLog = {{
+
+ /**
+ * This Mutable value defines globally throughout the running
+ * program which logging type we are using.
+ **/
+ logtype = Mutable.make({logger}:Protocols.logtype)
+
+ @private log_(from, what, logfn, str, v) =
+ do match logtype.get() with
+ | {stdout} -> println("{what}{if from=="" then "" else "({from})"}: {str}")
+ | {stderr} -> prerrln("{what}{if from=="" then "" else "({from})"}: {str}")
+ | {logger} -> logfn(from,str)
+ | {nolog} -> void
+ v
+
+ /** The usual logging functions **/
+ info(from, str, v) = log_(from,"Info",Log.info,str,v)
+ debug(from, str, v) = log_(from,"Debug",Log.debug,str,v)
+ warning(from, str, v) = log_(from,"Warning",Log.warning,str,v)
+ error(from, str, v) = log_(from,"Error",Log.error,str,v)
+
+ /** A special fatal log which logs a message and terminates the running program. **/
+ fatal(from, str, v) =
+ do match logtype.get() with
+ | {stdout} -> println("Fatal{if from=="" then "" else "({from})"}: {str}")
+ | {stderr} -> prerrln("Fatal{if from=="" then "" else "({from})"}: {str}")
+ | {logger} -> Log.fatal(from,str)
+ | {nolog} -> void
+ @fail("{v}")
+
+}}
+
+// End of file log.opa
View
194 lib/stdlib/apis/protocols/socketpool.opa
@@ -0,0 +1,194 @@
+/*
+ Copyright © 2011 MLstate
+
+ This file is part of Opa.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+*/
+//package stdlib.apis.protocols
+package protocols
+
+import stdlib.core.queue
+import stdlib.io.socket
+
+type SocketPool.host = {addr:string; port:int}
+
+type SocketPool.socket = {mbox:Mailbox.t; conn:Socket.connection}
+
+type SocketPool.result = outcome((bool,SocketPool.socket),Mongo.failure)
+
+@private type SocketPool.state = {
+ host: SocketPool.host;
+ max: int;
+ hint: int;
+ sockets: list(SocketPool.socket);
+ allocated: list(SocketPool.socket);
+ cnt: int;
+ log: bool;
+ queue: Queue.t(continuation(SocketPool.result));
+ slaveok: bool;
+ open_connections: intset;
+ monitor_connections: bool;
+}
+
+@private type SocketPool.msg =
+ {get : continuation(SocketPool.result)}
+ / {release : SocketPool.socket}
+ / {reconnect : SocketPool.host}
+ / {gethost : continuation(SocketPool.host)}
+ / {setslaveok : bool}
+ / {getslaveok : continuation(bool)}
+ / {sethint : int}
+ / {gethint : continuation(int)}
+ / {close}
+ / {stop}
+
+@abstract type SocketPool.t = channel(SocketPool.msg)
+
+@server @private
+SocketPool = {{
+
+ @private ML = ProtocolsLog
+
+ @private
+ Socket_close(conn, state) =
+ conn_id = Socket.conn_id(conn.conn)
+ do if state.log then ML.debug("SocketPool.handler","close {conn_id}",void)
+ do Socket.close(conn.conn)
+ _ = Mailbox.reset(conn.mbox)
+ {state with open_connections=IntSet.remove(conn_id,state.open_connections)}
+
+ @private monitor(from, state) =
+ if state.monitor_connections
+ then
+ do ML.debug("SocketPool.handler({from})",
+ "open={List.list_to_string(Int.to_string,IntSet.To.list(state.open_connections))}",void)
+ do ML.debug("SocketPool.handler({from})",
+ "sockets={List.list_to_string(Int.to_string,List.map((id -> Socket.conn_id(id.conn)),state.sockets))}",void)
+ do ML.debug("SocketPool.handler({from})",
+ "allocated={List.list_to_string(Int.to_string,List.map((id -> Socket.conn_id(id.conn)),state.allocated))}",void)
+ void
+
+ @private pool_handler(state:SocketPool.state, msg:SocketPool.msg): Session.instruction(SocketPool.state) =
+ match msg with
+ | {release=connection} ->
+ do monitor("release", state)
+ conn_id = Socket.conn_id(connection.conn)
+ if List.exists((c -> Socket.conn_id(c.conn) == conn_id),state.allocated)
+ then
+ (match Queue.rem(state.queue) with
+ | ({none}, _) ->
+ do if state.log then ML.debug("SocketPool.handler","socket back in pool {Socket.conn_id(connection.conn)}",void)
+ sockets = connection +> state.sockets
+ conn_id = Socket.conn_id(connection.conn)
+ allocated = List.filter((s -> Socket.conn_id(s.conn) != conn_id),state.allocated)
+ {set={state with ~sockets; ~allocated}}
+ | ({some=waiter}, queue) ->
+ do if state.log then ML.debug("SocketPool.handler","reallocate socket {Socket.conn_id(connection.conn)}",void)
+ do Continuation.return(waiter, {success=(state.slaveok,connection)})
+ {set={state with ~queue}})
+ else
+ do if state.log then ML.debug("SocketPool.handler","drop socket {Socket.conn_id(connection.conn)}",void)
+ state = Socket_close(connection, state)
+ {set=state}
+ | {get=k} ->
+ do monitor("get", state)
+ (match state.sockets with
+ | [connection|sockets] ->
+ do if state.log then ML.debug("SocketPool.handler","reuse open socket {Socket.conn_id(connection.conn)}",void)
+ do Continuation.return(k, {success=(state.slaveok,connection)})
+ allocated = connection +> state.allocated
+ {set={state with ~sockets; ~allocated}}
+ | [] ->
+ if state.cnt >= state.max
+ then
+ do if state.log then ML.debug("SocketPool.handler","queue caller",void)
+ {set={state with queue=Queue.add(k, state.queue)}}
+ else
+ (match Socket.binary_connect_with_err_cont(state.host.addr,state.host.port) with
+ | {success=conn} ->
+ connection = {~conn; mbox=Mailbox.create(state.hint)}
+ state = {state with open_connections=IntSet.add(Socket.conn_id(connection.conn),state.open_connections)}
+ do if state.log
+ then ML.debug("SocketPool.handler","successfully opened socket {Socket.conn_id(connection.conn)}",void)
+ do Continuation.return(k, {success=(state.slaveok,connection)})
+ allocated = connection +> state.allocated
+ {set={state with cnt=state.cnt+1; ~allocated}}
+ | {failure=str} ->
+ do if state.log then ML.debug("SocketPool.handler","open socket failure",void)
+ do Continuation.return(k, {failure={Error="Got exception {str}"}})
+ {unchanged}))
+ | {reconnect=host} ->
+ do monitor("reconnect", state)
+ do if state.log then ML.debug("SocketPool.handler","reconnect({host.addr}.{host.port})",void)
+ state = List.fold(Socket_close,state.sockets,state)
+ state = List.fold(Socket_close,state.allocated,state)
+ {set={state with ~host; cnt=0; sockets=[]; allocated=[]}}
+ | {gethost=k} ->
+ do Continuation.return(k, state.host)
+ {unchanged}
+ | {setslaveok=tf} ->
+ {set={state with slaveok=tf}}
+ | {getslaveok=k} ->
+ do Continuation.return(k, state.slaveok)
+ {unchanged}
+ | {sethint=hint} ->
+ {set={state with ~hint}}
+ | {gethint=k} ->
+ do Continuation.return(k, state.hint)
+ {unchanged}
+ | {close} ->
+ do if state.log then ML.debug("SocketPool.handler","close socket pool",void)
+ state = List.fold(Socket_close,state.sockets,state)
+ state = List.fold(Socket_close,state.allocated,state)
+ {set=state}
+ | {stop} ->
+ do monitor("stop", state)
+ do if state.log then ML.debug("SocketPool.handler","stop socket pool",void)
+ state = List.fold(Socket_close,state.sockets,state)
+ _state = List.fold(Socket_close,state.allocated,state)
+ {stop}
+
+ make(host:SocketPool.host, max:int, hint:int, log:bool): SocketPool.t =
+ do if log then ML.debug("SocketPool.make","{host}",void)
+ Session.make(({~host; ~max; ~hint; log=log; cnt=0; sockets=[]; allocated=[]; queue=Queue.empty;
+ slaveok=false; open_connections=IntSet.empty; monitor_connections=false}:SocketPool.state),
+ pool_handler)
+
+ get(pool:SocketPool.t) : SocketPool.result =
+ @callcc(k -> Session.send(pool, {get=k}))
+
+ release(pool:SocketPool.t, connection:SocketPool.socket) : void =
+ Session.send(pool, {release=connection})
+
+ reconnect(pool:SocketPool.t, host:SocketPool.host) : void =
+ Session.send(pool, {reconnect=host})
+
+ gethost(pool:SocketPool.t) : SocketPool.host =
+ @callcc(k -> Session.send(pool, {gethost=k}))
+
+ setslaveok(pool:SocketPool.t, setslaveok:bool) : void =
+ Session.send(pool, {~setslaveok})
+
+ getslaveok(pool:SocketPool.t) : bool =
+ @callcc(k -> Session.send(pool, {getslaveok=k}))
+
+ sethint(pool:SocketPool.t, sethint:int) : void =
+ Session.send(pool, {~sethint})
+
+ gethint(pool:SocketPool.t) : int =
+ @callcc(k -> Session.send(pool, {gethint=k}))
+
+ close(pool:SocketPool.t) : void =
+ Session.send(pool, {close})
+
+ stop(pool:SocketPool.t) : void =
+ Session.send(pool, {stop})
+
+}}
+
+// End of file socketpool.opa

0 comments on commit fb2c9bd

Please sign in to comment.
Something went wrong with that request. Please try again.