Skip to content
Permalink
Browse files

Add socket_sendbuffer

  • Loading branch information
cloudwu committed Nov 5, 2019
1 parent 58291a2 commit 41c77ba5cd2150265712735cb5457266951ae646
@@ -525,41 +525,59 @@ concat_table(lua_State *L, int index, void *buffer, size_t tlen) {
lua_pop(L,1);
}

static void *
get_buffer(lua_State *L, int index, int *sz) {
static void
get_buffer(lua_State *L, int index, struct socket_sendbuffer *buf) {
void *buffer;
switch(lua_type(L, index)) {
const char * str;
size_t len;
case LUA_TUSERDATA:
case LUA_TLIGHTUSERDATA:
buffer = lua_touserdata(L,index);
*sz = luaL_checkinteger(L,index+1);
// lua full useobject must be a raw pointer, it can't be a socket object or a memory object.
buf->type = SOCKET_BUFFER_RAWPOINTER;
buf->buffer = lua_touserdata(L, index);
if (lua_isinteger(L, index+1)) {
buf->sz = lua_tointeger(L, index+1);
} else {
buf->sz = lua_rawlen(L, index+1);
}
break;
case LUA_TLIGHTUSERDATA: {
int sz = -1;
if (lua_isinteger(L, index+1)) {
sz = lua_tointeger(L,index+1);
}
if (sz < 0) {
buf->type = SOCKET_BUFFER_OBJECT;
} else {
buf->type = SOCKET_BUFFER_MEMORY;
}
buf->buffer = lua_touserdata(L,index);
buf->sz = (size_t)sz;
break;
}
case LUA_TTABLE:
// concat the table as a string
len = count_size(L, index);
buffer = skynet_malloc(len);
concat_table(L, index, buffer, len);
*sz = (int)len;
buf->type = SOCKET_BUFFER_MEMORY;
buf->buffer = buffer;
buf->sz = len;
break;
default:
str = luaL_checklstring(L, index, &len);
buffer = skynet_malloc(len);
memcpy(buffer, str, len);
*sz = (int)len;
buf->type = SOCKET_BUFFER_RAWPOINTER;
buf->buffer = luaL_checklstring(L, index, &buf->sz);
break;
}
return buffer;
}

static int
lsend(lua_State *L) {
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
int id = luaL_checkinteger(L, 1);
int sz = 0;
void *buffer = get_buffer(L, 2, &sz);
int err = skynet_socket_send(ctx, id, buffer, sz);
struct socket_sendbuffer buf;
buf.id = id;
get_buffer(L, 2, &buf);
int err = skynet_socket_sendbuffer(ctx, &buf);
lua_pushboolean(L, !err);
return 1;
}
@@ -568,9 +586,10 @@ static int
lsendlow(lua_State *L) {
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
int id = luaL_checkinteger(L, 1);
int sz = 0;
void *buffer = get_buffer(L, 2, &sz);
int err = skynet_socket_send_lowpriority(ctx, id, buffer, sz);
struct socket_sendbuffer buf;
buf.id = id;
get_buffer(L, 2, &buf);
int err = skynet_socket_sendbuffer_lowpriority(ctx, &buf);
lua_pushboolean(L, !err);
return 1;
}
@@ -645,9 +664,10 @@ ludp_send(lua_State *L) {
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
int id = luaL_checkinteger(L, 1);
const char * address = luaL_checkstring(L, 2);
int sz = 0;
void *buffer = get_buffer(L, 3, &sz);
int err = skynet_socket_udp_send(ctx, id, address, buffer, sz);
struct socket_sendbuffer buf;
buf.id = id;
get_buffer(L, 3, &buf);
int err = skynet_socket_udp_sendbuffer(ctx, address, &buf);

lua_pushboolean(L, !err);

@@ -332,13 +332,19 @@ send_remote(struct skynet_context * ctx, int fd, const char * buffer, size_t sz,
skynet_error(ctx, "remote message from :%08x to :%08x is too large.", cookie->source, cookie->destination);
return;
}
uint8_t * sendbuf = skynet_malloc(sz_header+4);
uint8_t sendbuf[sz_header+4];
to_bigendian(sendbuf, (uint32_t)sz_header);
memcpy(sendbuf+4, buffer, sz);
header_to_message(cookie, sendbuf+4+sz);

struct socket_sendbuffer tmp;
tmp.id = fd;
tmp.type = SOCKET_BUFFER_RAWPOINTER;
tmp.buffer = sendbuf;
tmp.sz = sz_header+4;

// ignore send error, because if the connection is broken, the mainloop will recv a message.
skynet_socket_send(ctx, fd, sendbuf, sz_header+4);
skynet_socket_sendbuffer(ctx, &tmp);
}

static void
@@ -580,9 +586,13 @@ remote_send_name(struct harbor *h, uint32_t source, const char name[GLOBALNAME_L
static void
handshake(struct harbor *h, int id) {
struct slave *s = &h->s[id];
uint8_t * handshake = skynet_malloc(1);
handshake[0] = (uint8_t)h->id;
skynet_socket_send(h->ctx, s->fd, handshake, 1);
uint8_t handshake[1] = { (uint8_t)h->id };
struct socket_sendbuffer tmp;
tmp.id = s->fd;
tmp.type = SOCKET_BUFFER_RAWPOINTER;
tmp.buffer = handshake;
tmp.sz = 1;
skynet_socket_sendbuffer(h->ctx, &tmp);
}

static void
@@ -117,13 +117,13 @@ skynet_socket_poll() {
}

int
skynet_socket_send(struct skynet_context *ctx, int id, void *buffer, int sz) {
return socket_server_send(SOCKET_SERVER, id, buffer, sz);
skynet_socket_sendbuffer(struct skynet_context *ctx, struct socket_sendbuffer *buffer) {
return socket_server_send(SOCKET_SERVER, buffer);
}

int
skynet_socket_send_lowpriority(struct skynet_context *ctx, int id, void *buffer, int sz) {
return socket_server_send_lowpriority(SOCKET_SERVER, id, buffer, sz);
skynet_socket_sendbuffer_lowpriority(struct skynet_context *ctx, struct socket_sendbuffer *buffer) {
return socket_server_send_lowpriority(SOCKET_SERVER, buffer);
}

int
@@ -179,8 +179,8 @@ skynet_socket_udp_connect(struct skynet_context *ctx, int id, const char * addr,
}

int
skynet_socket_udp_send(struct skynet_context *ctx, int id, const char * address, const void *buffer, int sz) {
return socket_server_udp_send(SOCKET_SERVER, id, (const struct socket_udp_address *)address, buffer, sz);
skynet_socket_udp_sendbuffer(struct skynet_context *ctx, const char * address, struct socket_sendbuffer *buffer) {
return socket_server_udp_send(SOCKET_SERVER, (const struct socket_udp_address *)address, buffer);
}

const char *
@@ -2,6 +2,7 @@
#define skynet_socket_h

#include "socket_info.h"
#include "socket_buffer.h"

struct skynet_context;

@@ -26,8 +27,8 @@ void skynet_socket_free();
int skynet_socket_poll();
void skynet_socket_updatetime();

int skynet_socket_send(struct skynet_context *ctx, int id, void *buffer, int sz);
int skynet_socket_send_lowpriority(struct skynet_context *ctx, int id, void *buffer, int sz);
int skynet_socket_sendbuffer(struct skynet_context *ctx, struct socket_sendbuffer *buffer);
int skynet_socket_sendbuffer_lowpriority(struct skynet_context *ctx, struct socket_sendbuffer *buffer);
int skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog);
int skynet_socket_connect(struct skynet_context *ctx, const char *host, int port);
int skynet_socket_bind(struct skynet_context *ctx, int fd);
@@ -38,9 +39,40 @@ void skynet_socket_nodelay(struct skynet_context *ctx, int id);

int skynet_socket_udp(struct skynet_context *ctx, const char * addr, int port);
int skynet_socket_udp_connect(struct skynet_context *ctx, int id, const char * addr, int port);
int skynet_socket_udp_send(struct skynet_context *ctx, int id, const char * address, const void *buffer, int sz);
int skynet_socket_udp_sendbuffer(struct skynet_context *ctx, const char * address, struct socket_sendbuffer *buffer);
const char * skynet_socket_udp_address(struct skynet_socket_message *, int *addrsz);

struct socket_info * skynet_socket_info();

// legacy APIs

static inline void sendbuffer_init_(struct socket_sendbuffer *buf, int id, const void *buffer, int sz) {
buf->id = id;
buf->buffer = buffer;
if (sz < 0) {
buf->type = SOCKET_BUFFER_OBJECT;
} else {
buf->type = SOCKET_BUFFER_MEMORY;
}
buf->sz = (size_t)sz;
}

static inline int skynet_socket_send(struct skynet_context *ctx, int id, void *buffer, int sz) {
struct socket_sendbuffer tmp;
sendbuffer_init_(&tmp, id, buffer, sz);
return skynet_socket_sendbuffer(ctx, &tmp);
}

static inline int skynet_socket_send_lowpriority(struct skynet_context *ctx, int id, void *buffer, int sz) {
struct socket_sendbuffer tmp;
sendbuffer_init_(&tmp, id, buffer, sz);
return skynet_socket_sendbuffer_lowpriority(ctx, &tmp);
}

static inline int skynet_socket_udp_send(struct skynet_context *ctx, int id, const char * address, const void *buffer, int sz) {
struct socket_sendbuffer tmp;
sendbuffer_init_(&tmp, id, buffer, sz);
return skynet_socket_udp_sendbuffer(ctx, address, &tmp);
}

#endif
@@ -0,0 +1,17 @@
#ifndef socket_buffer_h
#define socket_buffer_h

#include <stdlib.h>

#define SOCKET_BUFFER_MEMORY 0
#define SOCKET_BUFFER_OBJECT 1
#define SOCKET_BUFFER_RAWPOINTER 2

struct socket_sendbuffer {
int id;
int type;
const void *buffer;
size_t sz;
};

#endif

0 comments on commit 41c77ba

Please sign in to comment.
You can’t perform that action at this time.