Navigation Menu

Skip to content

Commit

Permalink
first working client sserver exchange of storage using zmq
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoscoffier committed Sep 2, 2011
1 parent d1b902f commit cce981c
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 19 deletions.
37 changes: 37 additions & 0 deletions examples/clientStorage.lua
@@ -0,0 +1,37 @@
-- Copyright (c) 2010 Aleksey Yeschenko <aleksey@yeschenko.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.

require("torch")
require("zmq")

local ctx = zmq.init(1)
local s = ctx:socket(zmq.REQ)


s:connect("tcp://localhost:5555")

local stg = torch.DoubleStorage.new(100)

stg.zmq.send(stg,s)

print(s:recv())

s:close()
ctx:term()
36 changes: 36 additions & 0 deletions examples/serverStorage.lua
@@ -0,0 +1,36 @@
-- Copyright (c) 2010 Aleksey Yeschenko <aleksey@yeschenko.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.

require("torch")
require("zmq")

local ctx = zmq.init(1)
local s = ctx:socket(zmq.REP)

s:bind("tcp://lo:5555")

local stg = torch.DoubleStorage.new(1)

while true do
stg.zmq.recv(stg,s)
print("got it:")
print(stg:size())
s:send("OK")
end
41 changes: 29 additions & 12 deletions generic/zmq.c
Expand Up @@ -2,18 +2,24 @@
#define TH_GENERIC_FILE "generic/zmq.c"
#else

static int Lzmq_(sendStorage)(lua_State *L)
static int Lzmq_(send)(lua_State *L)
{
zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET);
size_t msg_size;
const char *data = luaL_checklstring(L, 2, &msg_size);
THStorage *storage = luaT_checkudata(L, 1, torch_(Storage_id));

zmq_ptr *s = luaL_checkudata(L, 2, MT_ZMQ_SOCKET);


size_t msg_size = storage->size * sizeof(real);

int flags = luaL_optint(L, 3, 0);

zmq_msg_t msg;

if(zmq_msg_init_size(&msg, msg_size) != 0) {
return Lzmq_push_error(L);
}
memcpy(zmq_msg_data(&msg), data, msg_size);

memcpy(zmq_msg_data(&msg), storage->data, msg_size);

int rc = zmq_send(s->ptr, &msg, flags);

Expand All @@ -30,10 +36,15 @@ static int Lzmq_(sendStorage)(lua_State *L)
return 1;
}

static int Lzmq_(recvStorage)(lua_State *L)
static int Lzmq_(recv)(lua_State *L)
{
zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET);
int flags = luaL_optint(L, 2, 0);

THStorage *storage = luaT_checkudata(L, 1, torch_(Storage_id));

zmq_ptr *s = luaL_checkudata(L, 2, MT_ZMQ_SOCKET);


int flags = luaL_optint(L, 3, 0);

zmq_msg_t msg;
if(zmq_msg_init(&msg) != 0) {
Expand All @@ -46,20 +57,26 @@ static int Lzmq_(recvStorage)(lua_State *L)
return Lzmq_push_error(L);
}

lua_pushlstring(L, zmq_msg_data(&msg), zmq_msg_size(&msg));
size_t msg_size = zmq_msg_size(&msg);

// resize destination storage
THStorage_(resize)(storage, msg_size);

// copy data from buffer
memcpy(storage->data, zmq_msg_data(&msg), msg_size);

if(zmq_msg_close(&msg) != 0) {
// Above string will be poped from the stack by the normalising code
// upon sucessful return.
return Lzmq_push_error(L);
}

return 1;
return 0;
}

static const struct luaL_reg Lzmq_(methods)[] = {
{"sendStorage", Lzmq_(sendStorage)},
{"recvStorage", Lzmq_(recvStorage)},
{"send", Lzmq_(send)},
{"recv", Lzmq_(recv)},
{NULL, NULL}
};

Expand Down
14 changes: 7 additions & 7 deletions zmq.c
Expand Up @@ -597,13 +597,13 @@ DLL_EXPORT int luaopen_libluazmq(lua_State *L)
torch_FloatStorage_id = luaT_checktypename2id(L, "torch.FloatStorage");
torch_DoubleStorage_id = luaT_checktypename2id(L, "torch.DoubleStorage");

zmq_CharInit(L);
zmq_ByteInit(L);
zmq_ShortInit(L);
zmq_IntInit(L);
zmq_LongInit(L);
zmq_FloatInit(L);
zmq_DoubleInit(L);
Lzmq_CharInit(L);
Lzmq_ByteInit(L);
Lzmq_ShortInit(L);
Lzmq_IntInit(L);
Lzmq_LongInit(L);
Lzmq_FloatInit(L);
Lzmq_DoubleInit(L);

return 1;
}
2 changes: 2 additions & 0 deletions zmq.lua
@@ -0,0 +1,2 @@
require 'torch'
require 'libluazmq'

0 comments on commit cce981c

Please sign in to comment.