Skip to content

Commit

Permalink
close fd when write failed
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwu committed Oct 18, 2012
1 parent 9c6309e commit 491a8cd
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 41 deletions.
78 changes: 41 additions & 37 deletions connection/lua-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ _close(lua_State *L) {

static int
_write(lua_State *L) {
int fd = luaL_checkinteger(L,1);
int fd = -1;
if (!lua_isnil(L,1)) {
fd = luaL_checkinteger(L,1);
}
int type = lua_type(L,2);
const char * buffer = NULL;
size_t sz;
Expand All @@ -60,37 +63,37 @@ _write(lua_State *L) {
buffer = lua_touserdata(L,2);
sz = luaL_checkinteger(L,3);
}
for (;;) {
int err = send(fd, buffer, sz, 0);
if (err < 0) {
switch (errno) {
case EAGAIN:
case EINTR:
continue;
if (fd >= 0) {
for (;;) {
int err = send(fd, buffer, sz, 0);
if (err < 0) {
switch (errno) {
case EAGAIN:
case EINTR:
continue;
}
break;
}
if (type == LUA_TSTRING) {
lua_settop(L,2);
} else {
lua_pushlstring(L, buffer, sz);
}
return 1;
}
if (err == 0) {
if (type == LUA_TSTRING) {
lua_settop(L,2);
} else {
lua_pushlstring(L, buffer, sz);
if (err != sz) {
break;
}
return 1;
return 0;
}
assert(err == sz);
return 0;
}
if (type == LUA_TSTRING) {
lua_settop(L,2);
} else {
lua_pushlstring(L, buffer, sz);
}
return 1;
}

static int
_writeblock(lua_State *L) {
int fd = luaL_checkinteger(L,1);
int fd = -1;
if (!lua_isnil(L,1)) {
fd = luaL_checkinteger(L,1);
}
int header = luaL_checkinteger(L,2);
int type = lua_type(L,3);
const char * buffer = NULL;
Expand Down Expand Up @@ -127,21 +130,22 @@ _writeblock(lua_State *L) {
buf[1].iov_base = (void *)buffer;
buf[1].iov_len = sz;

for (;;) {
int err = writev(fd, buf, 2);
if (err < 0) {
switch (errno) {
case EAGAIN:
case EINTR:
continue;
if (fd >= 0) {
for (;;) {
int err = writev(fd, buf, 2);
if (err < 0) {
switch (errno) {
case EAGAIN:
case EINTR:
continue;
}
break;
}
break;
}
if (err == 0) {
break;
if (err != sz + header) {
break;
}
return 0;
}
assert(err == sz + header);
return 0;
}
luaL_Buffer b;
luaL_buffinitsize(L,&b, buf[0].iov_len + buf[1].iov_len);
Expand Down
13 changes: 10 additions & 3 deletions connection/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
struct connection {
int fd;
uint32_t address;
int close;
};

struct connection_server {
Expand Down Expand Up @@ -69,6 +70,7 @@ _add(struct connection_server * server, int fd , uint32_t address) {
if (c->address == 0) {
c->fd = fd;
c->address = address;
c->close = 0;
int err = connection_add(server->pool, fd , c);
assert(err == 0);
return;
Expand All @@ -83,8 +85,12 @@ _del(struct connection_server * server, int fd) {
for (i=0;i<server->max_connection;i++) {
struct connection * c = &server->conn[i];
if (c->fd == fd) {
if (c->close == 0) {
skynet_send(server->ctx, 0, c->address, PTYPE_CLIENT | PTYPE_TAG_DONTCOPY, 0, NULL, 0);
}
c->address = 0;
c->fd = 0;
c->close = 0;
connection_del(server->pool, fd);
return;
}
Expand Down Expand Up @@ -114,11 +120,12 @@ _poll(struct connection_server * server) {
continue;
}
if (size == 0) {
connection_del(server->pool, c->fd);
free(buffer);
buffer = NULL;
// todo: support user defined type
skynet_send(server->ctx, 0, c->address, PTYPE_CLIENT | PTYPE_TAG_DONTCOPY, 0, NULL, 0);
if (c->close == 0) {
c->close = 1;
skynet_send(server->ctx, 0, c->address, PTYPE_CLIENT | PTYPE_TAG_DONTCOPY, 0, NULL, 0);
}
} else {
skynet_send(server->ctx, 0, c->address, PTYPE_CLIENT | PTYPE_TAG_DONTCOPY, 0, buffer, size);
buffer = NULL;
Expand Down
3 changes: 3 additions & 0 deletions lualib/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ end
function socket.connect(addr)
local ip, port = string.match(addr,"([^:]+):(.+)")
port = tonumber(port)
socket.close()
fd = c.open(ip,port)
if fd == nil then
return true
Expand Down Expand Up @@ -54,13 +55,15 @@ end
function socket.write(...)
local str = c.write(fd, ...)
if str then
socket.close()
table.insert(data, str)
end
end

function socket.writeblock(...)
local str = c.write(fd, ...)
if str then
socket.close()
table.insert(data, str)
end
end
Expand Down
2 changes: 1 addition & 1 deletion service/redis-cli.lua
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ local function reconnect()
init()
for i = request_queue.head, request_queue.tail-1 do
local request = request_queue[i]
socket.write(request[3])
socket.write(request.cmd)
end
split_co = coroutine.create(split_package)
end
Expand Down

0 comments on commit 491a8cd

Please sign in to comment.