Skip to content

Commit

Permalink
added multipart specs, finalize to ZMQ::Message, refactored send and …
Browse files Browse the repository at this point in the history
…receive message to use LibZMQ.msg_recv and LibZMQ.msg_send as older are deprecated according 0mq docs
  • Loading branch information
k-solutions committed Jun 15, 2016
1 parent 5124252 commit 97cf6af
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 42 deletions.
9 changes: 8 additions & 1 deletion examples/publish_subscribe.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
require "../src/zeromq"
require "signal"

link = "tcp://127.0.0.1:5555"

begin
Expand All @@ -17,6 +19,11 @@ s2.set_socket_option(ZMQ::SUBSCRIBE, "") # receive all
s3.set_socket_option(ZMQ::SUBSCRIBE, "animals") # receive any starting with this string
s4.set_socket_option(ZMQ::SUBSCRIBE, "animals.dog")

Signal::INT.trap do
[s1, s2, s3, s4].each { |socket| socket.close }
ctx.terminate
end

s1.bind(link)
s2.connect(link)
s3.connect(link)
Expand Down Expand Up @@ -48,9 +55,9 @@ identity = s3.receive_string if s3.more_parts?
puts "s3 received topic [#{topic}], body [#{body}], identity [#{identity}]"

topic, body, identity = s4.receive_strings

puts "s4 received topic [#{topic}], body [#{body}], identity [#{identity}]"


[s1, s2, s3, s4].each do |socket|
socket.close
end
Expand Down
93 changes: 92 additions & 1 deletion spec/multipart_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,98 @@ require "./spec_helper"

describe ZMQ::Socket do
context "#send_strings" do
pending "correctly handles a multipart message array with 1 element" do
it "correctly handles a multipart message with single message send" do
data = { "topic", "payload" } # [ "1" ]
APIHelper.with_push_pull("tcp://127.0.0.1:5555") do |ctx, sender, receiver|
sender.identity = "Test-Sender"
sender.send_string(sender.identity, ZMQ::SNDMORE)
sender.send_string(data[0], ZMQ::SNDMORE)
sender.send_string(data[1])
# sender.send_strings(data)
sleep 0.1
# receiver.receive_strings.should eq(data)
receiver.receive_string.should eq(sender.identity)
receiver.more_parts?.should be_truthy

receiver.receive_string.should eq(data[0])
receiver.more_parts?.should be_truthy

receiver.receive_string.should eq(data[1])
receiver.more_parts?.should be_falsey
end
end

it "correctly handles a multipart message with multiple recieve" do
data = { "topic", "payload" } # [ "1" ]
APIHelper.with_push_pull("tcp://127.0.0.1:5555") do |ctx, sender, receiver|
sender.identity = "Test-Sender"
sender.send_string(sender.identity, ZMQ::SNDMORE)
sender.send_string(data[0], ZMQ::SNDMORE)
sender.send_string(data[1])
sleep 0.1
receiver.receive_strings.should eq [sender.identity, data[0], data[1]]
end
end

it "correctly handles a multipart message with single string array" do
data = [ "1" ]
APIHelper.with_push_pull("tcp://127.0.0.1:5555") do |ctx, sender, receiver|
sender.send_strings(data)
sleep 0.1
receiver.receive_strings.should eq data
end
end

it "delivers between REQ and REP returning an array of messages" do
req_data, rep_data = [ "1", "2" ], [ "2", "3" ]
APIHelper.with_req_rep("tcp://127.0.0.1:5555") do |ctx, req, rep|
req.send_strings(req_data)
rep.receive_messages.each_with_index do |msg, idx|
msg.to_s.should eq(req_data[idx])
end

rep.send_strings(rep_data)
req.receive_messages.each_with_index do |msg, idx|
msg.to_s.should eq(rep_data[idx])
end
end
end

it "delivers between REQ and REP returning an array of strings" do
req_data, rep_data = [ "1", "2" ], [ "2", "3" ]
APIHelper.with_req_rep do |ctx, req, rep|
req.send_strings(req_data)
rep.receive_strings.should eq(req_data)

rep.send_strings(rep_data)
req.receive_strings.should eq(rep_data)
end
end
end

context "with identity" do
it "delivers between REQ and REP returning an array of strings with an empty string as the envelope delimiter" do
APIHelper.with_req_rep(rep_type: ZMQ::XREP) do |ctx, req, rep|
req_data, rep_data = "hello", [ req.identity, "", "ok" ]

req.send_string(req_data)
rep.receive_strings.should eq([ req.identity, "", "hello" ])

rep.send_strings(rep_data)
req.receive_string.should eq(rep_data.last)
end
end

it "delivers between REQ and REP returning an array of of messages with an empty string as the envelope delimiter" do
APIHelper.with_req_rep(rep_type: ZMQ::XREP) do |ctx, req, rep|
req_data, rep_data = "hello", [ req.identity, "", "ok" ]

req.send_string(req_data)
rep.receive_messages.map(&.to_s).should eq([ req.identity, "", "hello" ])

rep.send_strings(rep_data)
req.receive_messages.first.to_s.should eq(rep_data.last)
end
end
end
end
25 changes: 13 additions & 12 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,25 @@ module APIHelper
pong.receive_string
end

def with_req_rep
endpoint = "inproc://reqrep_test"
ctx = ZMQ::Context.new
def with_req_rep(endpoint = "inproc://reqrep_test", req_type = ZMQ::REQ, rep_type = ZMQ::REP)
ctx = ZMQ::Context.new

req = ctx.socket req_type
rep = ctx.socket rep_type

ping = ctx.socket ZMQ::REQ
pong = ctx.socket ZMQ::REP
req.identity = "req"
rep.identity = "rep"

pong.bind(endpoint)
connect_to_inproc(ping, endpoint)
rep.bind(endpoint)
connect_to_inproc(req, endpoint)

yield ctx, ping, pong, endpoint
yield ctx, req, rep, endpoint

[ping, pong].each { |sock| sock.close }
[req, rep].each { |sock| sock.close }
ctx.terminate
end

def with_push_pull
link = "inproc://push_pull_test"
def with_push_pull(link = "inproc://push_pull_test")
string = "boogi-boogi"
msg = ZMQ::Message.new string

Expand All @@ -37,7 +38,7 @@ module APIHelper
pull = ctx.socket ZMQ::PULL

push.bind link
APIHelper.connect_to_inproc pull, link
connect_to_inproc pull, link

yield ctx, push, pull, link

Expand Down
9 changes: 8 additions & 1 deletion src/zeromq/message.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
module ZMQ
class Message
getter? closed

def initialize(message : String? = nil)
@pointer = LibZMQ::Msg.new
@closed, @pointer = false, LibZMQ::Msg.new

if message
LibZMQ.msg_init_data(address, message.to_unsafe.as(Void*), message.size, ->(a, b) { LibC.free(b) }, nil)
Expand All @@ -22,7 +24,12 @@ module ZMQ
LibZMQ.msg_size address
end

def finalize
close
end

def close
@closed = true
LibZMQ.msg_close(address)
end

Expand Down
45 changes: 18 additions & 27 deletions src/zeromq/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,25 @@ class ZMQ::Socket(T)
end

def send_message(message : Message, flags = 0)
rc = LibZMQ.sendmsg(@socket, message.address, flags)
message.close
rc = LibZMQ.msg_send(message.address, @socket, flags) # NOTE: 0mq docs state that msg_send do not require message.close
Util.resultcode_ok?(rc)
end

def send_messages(messages : Array(Message), flags = 0)
return false if !messages || messages.empty?
res = false
return res if !messages || messages.empty?
flags = DONTWAIT if dontwait?(flags)
rc = false

messages[0..-2].each do |message|
rc = send_message(message, (flags | ZMQ::SNDMORE))
break unless rc # Util.resultcode_ok?(rc)
return false unless (res = send_message(message, (flags | ZMQ::SNDMORE)))
end

rc ? send_message(messages[-1], flags) : rc
send_message(messages[-1], flags) # NOTE: according to 0mq docs last call should be the default
end

def receive_message(flags = 0)
message = T.new
LibZMQ.recvmsg(@socket, message.address, flags)

rc = LibZMQ.msg_recv(message.address, @socket, flags)
message
end

Expand All @@ -66,25 +63,19 @@ class ZMQ::Socket(T)
def receive_messages(flags = 0)
messages = [] of Message

message = T.new
rc = LibZMQ.recvmsg(@socket, message.address, flags)

if Util.resultcode_ok?(rc)
messages << message
while Util.resultcode_ok?(rc) && more_parts?
message = receive_message(flags)

if Util.resultcode_ok?(rc)
messages << message
else
message.close
messages.map(&.close)
messages.clear
end
loop do
message = T.new
rc = LibZMQ.msg_recv(message.address, @socket, flags)
if (msg_status = Util.resultcode_ok?(rc))
messages << message
return messages unless more_parts?
else
message.close
messages.map(&.close)
messages.clear
end
else
message.close
end

messages
end

Expand Down Expand Up @@ -184,7 +175,7 @@ class ZMQ::Socket(T)
end

def close
@close = true
@closed = true
LibZMQ.close @socket
end
end

0 comments on commit 97cf6af

Please sign in to comment.