From 63d4cf6a783f2db2cc420a76d87f17b8742f94d5 Mon Sep 17 00:00:00 2001 From: Fabio Mascarenhas Date: Tue, 5 Apr 2011 21:19:13 -0300 Subject: [PATCH 1/4] autoclose option, set to false to not close socket after handler finishes --- src/copas/copas.lua | 197 ++++++++++++++++++++++---------------------- 1 file changed, 100 insertions(+), 97 deletions(-) diff --git a/src/copas/copas.lua b/src/copas/copas.lua index 5d18ac3..0151271 100644 --- a/src/copas/copas.lua +++ b/src/copas/copas.lua @@ -43,13 +43,13 @@ end function socket.newtry(finalizer) return function (...) - local status = (...) - if not status then - copcall(finalizer, select(2, ...)) - error({ (select(2, ...)) }, 0) - end - return ... - end + local status = (...) + if not status then + copcall(finalizer, select(2, ...)) + error({ (select(2, ...)) }, 0) + end + return ... + end end -- end of LuaSocket redefinitions @@ -62,6 +62,9 @@ _COPYRIGHT = "Copyright (C) 2005-2010 Kepler Project" _DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services" _VERSION = "Copas 1.1.7" +-- Close the socket associated with the current connection after the handler finishes +autoclose = true + ------------------------------------------------------------------------------- -- Simple set implementation based on LuaSocket's tinyirc.lua example -- adds a FIFO queue for each value in the set @@ -71,46 +74,46 @@ local function newset() local set = {} local q = {} setmetatable(set, { __index = { - insert = function(set, value) - if not reverse[value] then - set[#set + 1] = value - reverse[value] = #set - end - end, - - remove = function(set, value) - local index = reverse[value] - if index then - reverse[value] = nil - local top = set[#set] - set[#set] = nil - if top ~= value then - reverse[top] = index - set[index] = top - end - end - end, - - push = function (set, key, itm) - local qKey = q[key] - if qKey == nil then - q[key] = {itm} - else - qKey[#qKey + 1] = itm - end - end, - - pop = function (set, key) - local t = q[key] - if t ~= nil then - local ret = table.remove (t, 1) - if t[1] == nil then - q[key] = nil - end - return ret - end - end - }}) + insert = function(set, value) + if not reverse[value] then + set[#set + 1] = value + reverse[value] = #set + end + end, + + remove = function(set, value) + local index = reverse[value] + if index then + reverse[value] = nil + local top = set[#set] + set[#set] = nil + if top ~= value then + reverse[top] = index + set[index] = top + end + end + end, + + push = function (set, key, itm) + local qKey = q[key] + if qKey == nil then + q[key] = {itm} + else + qKey[#qKey + 1] = itm + end + end, + + pop = function (set, key) + local t = q[key] + if t ~= nil then + local ret = table.remove (t, 1) + if t[1] == nil then + q[key] = nil + end + return ret + end + end + }}) return set end @@ -202,26 +205,26 @@ end -- wraps a socket to use Copas methods (send, receive, flush and settimeout) local _skt_mt = {__index = { - send = function (self, data, from, to) - return send (self.socket, data, from, to) - end, - - receive = function (self, pattern) - if (self.timeout==0) then - return receivePartial(self.socket, pattern) - end - return receive (self.socket, pattern) - end, - - flush = function (self) - return flush (self.socket) - end, - - settimeout = function (self,time) - self.timeout=time - return - end, - }} + send = function (self, data, from, to) + return send (self.socket, data, from, to) + end, + + receive = function (self, pattern) + if (self.timeout==0) then + return receivePartial(self.socket, pattern) + end + return receive (self.socket, pattern) + end, + + flush = function (self) + return flush (self.socket) + end, + + settimeout = function (self,time) + self.timeout=time + return + end, + }} function wrap (skt) return setmetatable ({socket = skt}, _skt_mt) @@ -258,7 +261,7 @@ local function _doTick (co, skt, ...) new_q:push (res, co) else if not ok then copcall (_errhandlers [co] or _deferror, res, co, skt) end - if skt then skt:close() end + if skt and autoclose then skt:close() end _errhandlers [co] = nil end end @@ -334,22 +337,22 @@ end -- a task to check ready to read events local _readable_t = { events = function(self) - local i = 0 - return function () - i = i + 1 - return self._evs [i] - end - end, + local i = 0 + return function () + i = i + 1 + return self._evs [i] + end + end, tick = function (self, input) - local handler = _servers[input] - if handler then - input = _accept(input, handler) - else - _reading:remove (input) - self.def_tick (input) - end - end + local handler = _servers[input] + if handler then + input = _accept(input, handler) + else + _reading:remove (input) + self.def_tick (input) + end + end } addtaskRead (_readable_t) @@ -358,17 +361,17 @@ addtaskRead (_readable_t) -- a task to check ready to write events local _writable_t = { events = function (self) - local i = 0 - return function () - i = i + 1 - return self._evs [i] - end - end, + local i = 0 + return function () + i = i + 1 + return self._evs [i] + end + end, tick = function (self, output) - _writing:remove (output) - self.def_tick (output) - end + _writing:remove (output) + self.def_tick (output) + end } addtaskWrite (_writable_t) @@ -394,17 +397,17 @@ local function _select (timeout) last_cleansing = now for k,v in pairs(_reading_log) do if not r_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then - _reading_log[k] = nil - r_evs[#r_evs + 1] = k - r_evs[k] = #r_evs + _reading_log[k] = nil + r_evs[#r_evs + 1] = k + r_evs[k] = #r_evs end end for k,v in pairs(_writing_log) do if not w_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then - _writing_log[k] = nil - w_evs[#w_evs + 1] = k - w_evs[k] = #w_evs + _writing_log[k] = nil + w_evs[#w_evs + 1] = k + w_evs[k] = #w_evs end end end From f4493d1fe031004c0161a159cd8b5fe2bd59566a Mon Sep 17 00:00:00 2001 From: Thijs Schreijer Date: Thu, 29 Sep 2011 14:48:12 +0200 Subject: [PATCH 2/4] Added possibility for UDP servers, based upon fix by Cuero Bugot. Added minor code to auto detect socket type (UDP/TCP), so all function signatures remain the same and handling is transparent. Also updated documentation. See: http://lists.luaforge.net/pipermail/kepler-project/2008-October/003003.html --- doc/us/manual.html | 27 +++++++++++++++++++++++++++ src/copas/copas.lua | 16 +++++++++++++++- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/doc/us/manual.html b/doc/us/manual.html index b780365..40470b8 100644 --- a/doc/us/manual.html +++ b/doc/us/manual.html @@ -281,6 +281,33 @@

Using Copas with an existing server

During the loop Copas' dispatcher accepts connections from clients and automatically calls the corresponding handler functions.

+

Using UDP servers

+

Copas may also be used for UDP servers. Here is an example; +

+local port = 51034
+local server = socket.udp()
+server:setsockname("*",port)
+
+function handler(skt)
+  skt = copas.wrap(skt)
+  print("UDP connection handler")
+
+  while true do
+    local s, err
+    print("receiving...")
+    s, err = skt:receive(2048)
+    if not s then
+      print("Receive error: ", err)
+      return
+    end
+    print("Received data, bytes:" , #s)
+  end
+end
+
+copas.addserver(server, handler, 1)
+copas.loop()
+
+

Controlling Copas

diff --git a/src/copas/copas.lua b/src/copas/copas.lua index 0151271..44af486 100644 --- a/src/copas/copas.lua +++ b/src/copas/copas.lua @@ -290,12 +290,26 @@ end ------------------------------------------------------------------------------- -- Adds a server/handler pair to Copas dispatcher ------------------------------------------------------------------------------- -function addserver(server, handler, timeout) +local function addTCPserver(server, handler, timeout) server:settimeout(timeout or 0.1) _servers[server] = handler _reading:insert(server) end +local function addUDPserver(server, handler, timeout) + server:settimeout(timeout or 0) + local co = coroutine.create(handler) + _reading:insert(server) + _doTick (co, server) +end + +function addserver(server, handler, timeout) + if string.sub(tostring(server),1,3) == "udp" then + addUDPserver(server, handler, timeout) + else + addTCPserver(server, handler, timeout) + end +end ------------------------------------------------------------------------------- -- Adds an new courotine thread to Copas dispatcher ------------------------------------------------------------------------------- From 8c28b49e46fb24cc32db8c4938540a3c99e97779 Mon Sep 17 00:00:00 2001 From: Thijs Schreijer Date: Thu, 20 Oct 2011 00:16:36 +0200 Subject: [PATCH 3/4] Very minor update to step() function. To return a result based upon data handled or timeout. Fully transparent, no breaking of existing code. This tells the caller that it should asap resume IO (eg. next call to step()) because data is coming in, or alternatively it timed out and there is (some) time to do something else temporarily. --- doc/us/reference.html | 3 +++ src/copas/copas.lua | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/doc/us/reference.html b/doc/us/reference.html index b52c3a9..01348bb 100644 --- a/doc/us/reference.html +++ b/doc/us/reference.html @@ -96,6 +96,9 @@

Reference

handlers. When a server accepts a connection, Copas calls the associated handler passing the client socket returned by socket.accept(). The timeout parameter is optional. + It returns false when no data was handled (timeout) or + true if there was data handled (or alternatively nil + error + message in case of errors). diff --git a/src/copas/copas.lua b/src/copas/copas.lua index 44af486..188e882 100644 --- a/src/copas/copas.lua +++ b/src/copas/copas.lua @@ -437,10 +437,12 @@ end ------------------------------------------------------------------------------- -- Dispatcher loop step. -- Listen to client requests and handles them +-- Returns false if no data was handled (timeout), or true if there was data +-- handled (or nil + error message) ------------------------------------------------------------------------------- function step(timeout) local err = _select (timeout) - if err == "timeout" then return end + if err == "timeout" then return false end if err then error(err) @@ -451,6 +453,7 @@ function step(timeout) tsk:tick (ev) end end + return true end ------------------------------------------------------------------------------- From 49722e9e53a0a2009e8c119c8f4ca14a15c3c8a1 Mon Sep 17 00:00:00 2001 From: Thijs Schreijer Date: Wed, 16 Nov 2011 22:26:06 +0100 Subject: [PATCH 4/4] Updated UDP functionality and accompanying documentation. Added receivefrom(), sendto() methods and updated the wrap() method to be UDP or TCP specific. Change is fully transparent to existing usage. --- doc/us/manual.html | 9 ++++- src/copas/copas.lua | 80 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/doc/us/manual.html b/doc/us/manual.html index 40470b8..fae1ccf 100644 --- a/doc/us/manual.html +++ b/doc/us/manual.html @@ -282,7 +282,7 @@

Using Copas with an existing server

automatically calls the corresponding handler functions.

Using UDP servers

-

Copas may also be used for UDP servers. Here is an example; +

Copas may also be used for UDP servers. Here is an example;

 local port = 51034
 local server = socket.udp()
@@ -307,6 +307,13 @@ 

Using UDP servers

copas.addserver(server, handler, 1) copas.loop()
+

For UDP sockets the receivefrom() and sendto() +methods are available, both for copas and when the socket is wrapped. These +methods cannot be used on TCP sockets.

+

NOTE: When using the copas.receive([size]) method +on a UDP socket, the size parameter is NOT optional as with regular +luasocket UDP sockets. This limitation is removed when the socket is wrapped +(it then defaults to 8192, the max UDP datagram size luasocket supports).

Controlling Copas

diff --git a/src/copas/copas.lua b/src/copas/copas.lua index 188e882..48f5ca8 100644 --- a/src/copas/copas.lua +++ b/src/copas/copas.lua @@ -22,6 +22,7 @@ local socket = require "socket" require "coxpcall" local WATCH_DOG_TIMEOUT = 120 +local UDP_DATAGRAM_MAX = 8192 -- Redefines LuaSocket functions with coroutine safe versions -- (this allows the use of socket.http from within copas) @@ -128,6 +129,9 @@ local _writing = newset() -- sockets currently being written -- Coroutine based socket I/O functions. ------------------------------------------------------------------------------- -- reads a pattern from a client and yields to the reading set on timeouts +-- UDP: a UDP socket expects a second argument to be a number, so it MUST +-- be provided as the 'pattern' below defaults to a string. Will throw a +-- 'bad argument' error if omitted. function receive(client, pattern, part) local s, err pattern = pattern or "*l" @@ -142,6 +146,22 @@ function receive(client, pattern, part) until false end +-- receives data from a client over UDP. Not available for TCP. +-- (this is a copy of receive() method, adapted for receivefrom() use) +function receivefrom(client, size) + local s, err, port + size = size or UDP_DATAGRAM_MAX + repeat + s, err, port = client:receivefrom(size) -- upon success err holds ip address + if s or err ~= "timeout" then + _reading_log[client] = nil + return s, err, port + end + _reading_log[client] = os.time() + coroutine.yield(client, _reading) + until false +end + -- same as above but with special treatment when reading chunks, -- unblocks on any data received. function receivePartial(client, pattern) @@ -161,6 +181,7 @@ end -- sends data to a client. The operation is buffered and -- yields to the writing set on timeouts +-- Note: from and to parameters will be ignored by/for UDP sockets function send(client,data, from, to) local s, err,sent from = from or 1 @@ -183,6 +204,28 @@ function send(client,data, from, to) until false end +-- sends data to a client over UDP. Not available for TCP. +-- (this is a copy of send() method, adapted for sendto() use) +function sendto(client,data, ip, port) + local s, err,sent + + repeat + s, err = client:sendto(data, ip, port) + -- adds extra corrotine swap + -- garantees that high throuput dont take other threads to starvation + if (math.random(100) > 90) then + _writing_log[client] = os.time() + coroutine.yield(client, _writing) + end + if s or err ~= "timeout" then + _writing_log[client] = nil + return s, err + end + _writing_log[client] = os.time() + coroutine.yield(client, _writing) + until false +end + -- waits until connection is completed function connect(skt, host, port) skt:settimeout(0) @@ -203,7 +246,7 @@ end function flush(client) end --- wraps a socket to use Copas methods (send, receive, flush and settimeout) +-- wraps a TCP socket to use Copas methods (send, receive, flush and settimeout) local _skt_mt = {__index = { send = function (self, data, from, to) return send (self.socket, data, from, to) @@ -226,8 +269,41 @@ local _skt_mt = {__index = { end, }} +-- wraps a UDP socket, copy of TCP one adapted for UDP. +-- Mainly adds sendto() and receivefrom() +local _skt_mt_udp = {__index = { + send = function (self, data) + return send (self.socket, data) + end, + + sendto = function (self, data, ip, port) + return sendto (self.socket, data, ip, port) + end, + + receive = function (self, size) + return receive (self.socket, (size or UDP_DATAGRAM_MAX)) + end, + + receivefrom = function (self, size) + return receivefrom (self.socket, (size or UDP_DATAGRAM_MAX)) + end, + + flush = function (self) + return flush (self.socket) + end, + + settimeout = function (self,time) + self.timeout=time + return + end, + }} + function wrap (skt) - return setmetatable ({socket = skt}, _skt_mt) + if string.sub(tostring(skt),1,3) == "udp" then + return setmetatable ({socket = skt}, _skt_mt_udp) + else + return setmetatable ({socket = skt}, _skt_mt) + end end --------------------------------------------------