Skip to content
Browse files

Fixing host lookup, changing frompipe\topipe interface

  • Loading branch information...
1 parent 57a9227 commit 49469ab5d3b5f140a9a1521e279eeeaa7715c2e5 @davidhollander committed Sep 28, 2011
Showing with 112 additions and 75 deletions.
  1. +71 −74 ox.lua
  2. +30 −0 tests/dnstcp.lua
  3. +10 −0 tests/hosttcp.lua
  4. +1 −1 tests/ip4tcp.lua
View
145 ox.lua
@@ -71,6 +71,7 @@ int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
int getaddrinfo(const char *node, const char *service, const struct addrinfo *hints,
struct addrinfo **res);
void freeaddrinfo(struct addrinfo *res);
+const char * inet_ntop(int af, const void *src, char *dst, socklen_t cnt);
]]
-- constants
local F_SETFL = 4
@@ -346,76 +347,61 @@ function ox.tcpserv(port, cn)
ti(contexts, c)
return true
end
-
-- blocking DNS lookup
+-- returns list of readable ip addresses
function ox.b_resolv(file, host)
local hints = new 'struct addrinfo'
local results = new 'struct addrinfo * [1]'
hints.ai_family = 0
hints.ai_socktype = SOCK_STREAM
hints.ai_protocol = 0
if C.getaddrinfo(host, port, hints, results)~=0 then return nil, 'could not resolve' end
+ local buffer = vla_char(64)
local r = results[0][0]
repeat
+ local ipaddr
if r.ai_family == AF_INET then
local sa = ffi.cast('struct sockaddr_in *', r.ai_addr)
- file:write(sa.sin_addr.s_addr, '\n')
+ ipaddr = sa.sin_addr
elseif r.ai_family == AF_INET6 then
local sa = ffi.cast('struct sockaddr_in6 *', r.ai_addr)
- file:write(sa.sin_addr.s6_addr, '\n')
+ ipaddr = sa.sin6_addr
+ end
+ if C.inet_ntop(r.ai_family, ipaddr, buffer, 64)~=nil then
+ local str = ffi.string(buffer)
+ file:write(str,'\r\n')
end
r = r.ai_next
until r==nil
+ file:close()
C.freeaddrinfo(results[0])
+ buffer=nil
end
-function ox.resolv(host, port, cb)
- local r, w = ox.pipe 'r'
- ox.split(1, function()
- ox.b_resolv(w, host)
- w:close()
- end)
- ox.readln(r, 128, function(c, ip)
- if not ip then return cb(nil) end
- local x, err = ox.tcpconn(ip, port, cb)
- if err then return ox.readln(ip, port, cb) end
- end)
-end
-
-function ox.fromfork(fn, cb, expire)
- local r, w = ox.pipe 'r'
- ox.split(1, fn)
- return cb(r
-end
+local lib = require 'ox.lib'
-- asynchronous DNS lookup pooler and cache
local host_cache = lib.cache1(function(host, cb)
- call_fork(function()
- local x = nixio.getaddrinfo(host, 'inet', port)
- return x and x[1].address or nil
- end, cb)
-end, 1000)
+ print('cache update', host)
+ local r = ox.fromfork(function(w) return ox.b_resolv(w, host) end, 30)
+ return ox.readln(r, 64, function(c, ip) ox.close(r); return cb(ip) end)
+end, 3600)
function ox.tcpconn(address, port, cb)
-- convert ip6 or ip4 to address
local version, addr, sockaddr, fd
if address:match ':' then
addr = new 'struct in6_addr'
version = AF_INET6
- if C.inet_pton(AF_INET6, ip, addr)~=1 then return nil, 'Could not parse ip6' end
+ if C.inet_pton(AF_INET6, address, addr)~=1 then return nil, 'Could not parse ip6' end
print('ip6conn', version, addr.s6_addr)
elseif address:match '^[%d%.]+$' then
addr = new 'struct in_addr'
version = AF_INET
- if C.inet_pton(AF_INET, ip, addr)~=1 then return nil, 'Could not parse ip4' end
+ if C.inet_pton(AF_INET, address, addr)~=1 then return nil, 'Could not parse ip4' end
print('ip4conn', version, addr.s_addr)
else
- return host_cache(address, function(pipe)
- ox.readln(pipe, 1024, function(pipe, ip)
- local x, err = ox.tcpconn(ip, port, cb)
- if err then return
- end)
- end)
+ return host_cache(address, function(ip) return ox.tcpconn(ip, port, cb) end)
end
-- fill socket address
@@ -435,10 +421,10 @@ function ox.tcpconn(address, port, cb)
local casted = cast('struct sockaddr *', sockaddr)
-- create socket and connect
- fd = C.socket(version, SOCK_STREAM + SOCK_NONBLOCK, 0)
+ fd = C.socket(version, SOCK_STREAM, 0)
if fd==-1 then return nil, 'Could not create socket' end
if C.connect(fd, casted, sizeof(sockaddr))==-1 then print(version, errno())return nil, 'Could not connect' end
- --if C.fcntl(fd, F_SETFL, O_NONBLOCK)==-1 then return nil, 'Could not set nonblock' end
+ if C.fcntl(fd, F_SETFL, O_NONBLOCK)==-1 then return nil, 'Could not set nonblock' end
local c = {fd = fd, events=0, revents = 0}
ti(contexts, c)
return cb(c)
@@ -473,6 +459,55 @@ local function b_read(c, N)
return ffi.string(buff, n)
end
+-- Read asynchronously from a pipe, perform blocking writes in fork
+-- @param fn: function(pipe). Use pipe:write(string, ...) to send information
+-- @param cb: function(context). Use ox.read or ox.readln
+-- @param expire: optional timeout in seconds
+function ox.fromfork(fn, expire)
+ local pfds = ffi.new 'int[2]'
+ if C.pipe(pfds)==-1 then return nil, 'Could not create pipe'..errno()
+ elseif C.fcntl(pfds[0], F_SETFL, O_NONBLOCK)==-1 then
+ return nil, 'Could not set read end nonblocking' end
+
+ local pid = C.fork()
+ if pid==-1 then return nil, 'Could not fork'
+ elseif pid==0 then
+ local w = {fd = tonumber(pfds[1]), write = b_write, close = ox.close}
+ fn(w)
+ os.exit()
+ else
+ local r = {fd = tonumber(pfds[0]), pid=pid, events = 0, revents = 0, expire = expire}
+ ti(contexts, r)
+ return r
+ end
+end
+
+-- Write asynchronously to a pipe, perform blocking reads in fork
+-- @param fn: function(pipe). Use pipe:read(number) to read information
+-- @param cb: function(context). Use ox.write
+-- @param expire: optional timeout in seconds
+function ox.tofork(fn, expire)
+ local pfds = ffi.new 'int[2]'
+ if C.pipe(pfds)==-1 then return nil, 'Could not create pipe'..errno()
+ elseif C.fcntl(pfds[1], F_SETFL, O_NONBLOCK)==-1 then
+ return nil, 'Could not set write end nonblocking' end
+
+ local pid = C.fork()
+ if pid==-1 then return nil, 'Could not fork'
+ elseif pid==0 then
+ local r = {fd = tonumber(pfds[0]), read = b_read, close = ox.close}
+ fn(r)
+ os.exit()
+ else
+ local w = {fd = tonumber(pfds[1]), pid=pid, events = 0, revents = 0, expire = expire}
+ ti(contexts, w)
+ return w
+ end
+end
+
+
+
+
-- PIPE
--
function ox.pipe(flag, expire)
@@ -507,44 +542,6 @@ function ox.pipe(flag, expire)
return r, w
end
-function ox.fromfork(fn, expire, cb)
- local pfds = ffi.new 'int[2]'
- if C.pipe(pfds)==-1 then return nil, 'Could not create pipe'..errno()
- elseif C.fcntl(pfds[0], F_SETFL, O_NONBLOCK)==-1 then return nil, 'Could not set read end nonblocking' end
-
- local pid = C.fork()
- if pid==-1 then return nil, 'Could not fork'
- elseif pid==0 then
- local w = {fd = tonumber(pfds[1]), write = b_write, close = ox.close}
- cb(w)
- os.exit()
- else
- local r = {fd = tonumber(pfds[0]), pid=pid, events = 0, revents = 0, expire = expire, on_read = cb}
- ti(contexts, r)
- return cb(r)
- end
-end
-
-function ox.tofork(fn, expire, cb)
- local pfds = ffi.new 'int[2]'
- if C.pipe(pfds)==-1 then return nil, 'Could not create pipe'..errno()
- elseif C.fcntl(pfds[1], F_SETFL, O_NONBLOCK)==-1 then return nil, 'Could not set write end nonblocking' end
-
- local pid = C.fork()
- if pid==-1 then return nil, 'Could not fork'
- elseif pid==0 then
- local r = {fd = tonumber(pfds[0]), read = b_read, close = ox.close}
- cb(r)
- os.exit()
- else
- local w = {fd = tonumber(pfds[1]), pid=pid, events = 0, revents = 0, expire = expire}
- ti(contexts, w)
- return cb(w)
- end
-end
-
-
-
-- LOOP
--
local pollfds = ffi.typeof 'struct pollfd[?]'
View
30 tests/dnstcp.lua
@@ -0,0 +1,30 @@
+local ox = require 'ox'
+local PORT = 8094 or ...
+print(PORT)
+
+ox.split(1,function()
+ print(ox.tcpserv(PORT, function(c)
+ return ox.readln(c, 2048, function(c, line)
+ print('server readln', line)
+ return ox.write(c, line..'\n', function(c)
+ ox.close(c)
+ end)
+ end)
+ end))
+ ox.at(ox.time+2,ox.stop)
+ ox.start()
+end)
+
+ox.at(ox.time+1, function()
+ print(ox.tcpconn('localhost',PORT,function(c)
+ ox.write(c, 'Hello\n', function(c)
+ ox.readln(c, 2048, function(c, line)
+ print('conn readln', line)
+ assert(line=='Hello')
+ ox.close(c)
+ end)
+ end)
+ end))
+end)
+ox.at(ox.time+2,ox.stop)
+ox.start()
View
10 tests/hosttcp.lua
@@ -0,0 +1,10 @@
+local ox = require 'ox'
+
+function handle(c, err)
+ print('handle', c, err)
+ ox.stop()
+end
+
+ox.tcpconn('google.com', 80, handle)
+
+ox.start()
View
2 tests/ip4tcp.lua
@@ -1,5 +1,5 @@
local ox = require 'ox'
-local PORT = 8092 or ...
+local PORT = 8094 or ...
print(PORT)
ox.split(1,function()

0 comments on commit 49469ab

Please sign in to comment.
Something went wrong with that request. Please try again.