/
epoll.lua
211 lines (191 loc) · 6.56 KB
/
epoll.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
--- Epoll based event framework
-- Events can originate from file descriptors, signals or timers.
-- The module returns an epoll object constructor
local ffi = require "ffi"
local bit = require "bit"
local new_file = require "fend.file".wrap
local signalfd = require "fend.signalfd"
local timerfd = require "fend.timerfd"
local inotify = require "fend.inotify"
require "fend.common"
include "string"
include "sys/epoll"
local epoll_methods = { }
local epoll_mt = {
__index = epoll_methods ;
}
--- Creates a new epoll object.
-- guesstimate is a guess for how many file handles will be watched (to help memory allocation)
-- returns the new object
local function new_epoll ( guesstimate )
guesstimate = guesstimate or 10
local epfd = ffi.C.epoll_create ( guesstimate )
if epfd == -1 then
error ( ffi.string ( ffi.C.strerror ( ffi.errno ( ) ) ) )
end
epfd = new_file ( epfd )
local self = setmetatable ( {
epfile = epfd ;
-- Holds registered file descriptors, has maps to each one's callbacks
registered = { } ;
raw_fd_map = { } ;
-- Data structures for dispatch
wait_size = 0 ;
wait_events = nil ;
locked = false ;
} , epoll_mt )
self.signalfd = signalfd.new ( self ) ;
self.inotify = inotify.new ( self ) ;
return self
end
--- Add a file descriptor to be watched.
-- fd is the file descriptor to watch
-- cbs is a table of callbacks, the events to watch for are selected based on the callbacks given; it must contain an 'error' callback
function epoll_methods:add_fd ( file , cbs )
local fd = file:getfd()
assert ( cbs.error , "No error callback" )
local op
if self.registered [ file ] then
op = defines.EPOLL_CTL_MOD
else
op = defines.EPOLL_CTL_ADD
end
local __events = ffi.new ( "struct epoll_event[1]" )
__events[0].events = bit.bor (
cbs.read and ffi.C.EPOLLIN or 0 ,
cbs.write and ffi.C.EPOLLOUT or 0 ,
cbs.rdclose and ffi.C.EPOLLRDHUP or 0,
cbs.oneshot and ffi.C.EPOLLONESHOT or 0 ,
cbs.edge and ffi.C.EPOLLET or 0 )
__events[0].data.fd = fd
if ffi.C.epoll_ctl ( self.epfile:getfd() , op , fd , __events ) ~= 0 then
error ( ffi.string ( ffi.C.strerror ( ffi.errno ( ) ) ) )
end
self.registered [ file ] = cbs
self.raw_fd_map [ fd ] = file
end
--- Stop watching a file descriptor
-- fd is the file descriptor to stop watching
function epoll_methods:del_fd ( file )
local fd = file:getfd()
if ffi.C.epoll_ctl ( self.epfile:getfd() , defines.EPOLL_CTL_DEL , fd , nil ) ~= 0 then
local err = ffi.errno ( )
if err == defines.ENOENT then
-- Ignore unregistered files
else
error ( ffi.string ( ffi.C.strerror ( err ) ) )
end
return
end
self.registered [ file ] = nil
self.raw_fd_map [ fd ] = nil
end
function epoll_methods:remove_lock ( )
self.locked = false
end
local function event_string(events)
local t = {}
local function ap ( v ) if v then t[#t+1] = v end end
ap(bit.band ( events , ffi.C.EPOLLIN ) ~= 0 and "R")
ap(bit.band ( events , ffi.C.EPOLLOUT ) ~= 0 and "W")
ap(bit.band ( events , ffi.C.EPOLLERR ) ~= 0 and "E")
ap(bit.band ( events , ffi.C.EPOLLHUP ) ~= 0 and "C")
ap(bit.band ( events , ffi.C.EPOLLRDHUP ) ~= 0 and "D")
return table.concat(t,",")
end
local function default_onerror ( self , file , cbs , err , eventtype )
self:del_fd ( file )
pcall ( file.close , file )
return false
end
--- Wait for a number of events and call their callbacks.
-- max_events (optional) is the number of events to wait for. Defaults to 1.
-- timeout (optional) is the maximum time to wait for an event before returning. Default is to wait forever
-- onerror (optional) is a function to call on an unhandled error, receives `( self , file , cbs , err , eventtype )`
-- the defaut onerror removes the file from the dispatcher and tries to close the file.
function epoll_methods:dispatch ( max_events , timeout , onerror )
if self.locked then error ( "dispatch already running, call :remove_lock() to recover" ) end
self.locked = true
max_events = max_events or 1
if max_events > self.wait_size then -- Expand the array
self.wait_events = ffi.new ( "struct epoll_event[?]" , max_events )
self.wait_size = max_events
end
if timeout then
timeout = timeout * 1000
else
timeout = -1
end
onerror = onerror or default_onerror
local n = ffi.C.epoll_wait ( self.epfile:getfd() , self.wait_events , max_events , timeout )
if n == -1 then
self.locked = false
error ( ffi.string ( ffi.C.strerror ( ffi.errno ( ) ) ) )
end
for i=0,n-1 do
local events = self.wait_events[i].events
local fd = self.wait_events[i].data.fd
local file = self.raw_fd_map [ fd ]
local cbs = self.registered [ file ]
--print(string.format("EVENT on %s: %s", tostring(file), event_string(events)))
if cbs.oneshot then
if ffi.C.epoll_ctl ( self.epfile:getfd() , defines.EPOLL_CTL_DEL , fd , nil ) ~= 0 then
self.locked = false
error ( ffi.string ( ffi.C.strerror ( ffi.errno ( ) ) ) )
end
self.registered [ file ] = nil
self.raw_fd_map [ fd ] = nil
end
if bit.band ( events , ffi.C.EPOLLIN ) ~= 0 then
if cbs.read then
local ok , err = pcall ( cbs.read , file , cbs , "read" )
if not ok and onerror ( self , file , cbs , err , "read" ) == false then
error ( err )
end
end
end
if bit.band ( events , ffi.C.EPOLLERR ) ~= 0 then
local ok , err = pcall ( cbs.error , file , cbs , "error" )
if not ok and onerror ( self , file , cbs , err , "error" ) == false then
error ( err )
end
elseif bit.band ( events , ffi.C.EPOLLOUT ) ~= 0 then
if cbs.write then
local ok , err = pcall ( cbs.write , file , cbs , "write" )
if not ok and onerror ( self , file , cbs , err , "write" ) == false then
error ( err )
end
end
end
if bit.band ( events , ffi.C.EPOLLHUP ) ~= 0 then
if cbs.close then
local ok , err = pcall ( cbs.close , file , cbs , "close" )
if not ok and onerror ( self , file , cbs , err , "close" ) == false then
error ( err )
end
else
self:del_fd ( file , cbs )
end
elseif bit.band ( events , ffi.C.EPOLLRDHUP ) ~= 0 then
if cbs.rdclose then
local ok , err = pcall ( cbs.rdclose , file , cbs , "rdclose" )
if not ok and onerror ( self , file , cbs , err , "rdclose" ) == false then
error ( err )
end
elseif cbs.close then
local ok , err = pcall ( cbs.close , file , cbs , "close" )
if not ok and onerror ( self , file , cbs , err , "close" ) == false then
error ( err )
end
else
self:del_fd ( file , cbs )
end
end
end
self.locked = false
end
epoll_methods.add_signal = signalfd.add
epoll_methods.del_signal = signalfd.del
epoll_methods.add_timer = timerfd.add
epoll_methods.add_path = inotify.add
return new_epoll