Skip to content

Commit

Permalink
Merge pull request #40 from JuliaComputing/tan/misc
Browse files Browse the repository at this point in the history
fix channelmax and framemax negotiations
  • Loading branch information
tanmaykm committed Feb 15, 2021
2 parents 9cf4f68 + a14c259 commit 9fed71e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
23 changes: 13 additions & 10 deletions src/protocol.jl
Expand Up @@ -354,7 +354,7 @@ function connection_processor(c, name, fn)
end

function connection_sender(c::Connection)
@debug("==> sending on conn", host=c.virtualhost)
@debug("==> sending on conn", host=c.host, port=c.port, virtualhost=c.virtualhost)
nbytes = sendq_to_stream(sock(c), c.sendq)
@debug("==> sent", nbytes)
c.heartbeat_time_client = time() # update heartbeat time for client
Expand Down Expand Up @@ -1084,6 +1084,7 @@ function on_connection_tune(chan::MessageChannel, m::TAMQPMethodFrame, ctx)
conn.channelmax = m.payload.fields[1].second
conn.framemax = m.payload.fields[2].second
conn.heartbeat = m.payload.fields[3].second
@debug("got_connection_tune", channelmax=conn.channelmax, framemax=conn.framemax, heartbeat=conn.heartbeat)
handle(chan, FrameHeartbeat, on_connection_heartbeat)
send_connection_tune_ok(chan, ctx[:channelmax], ctx[:framemax], ctx[:heartbeat])
handle(chan, :Connection, :Tune)
Expand All @@ -1095,17 +1096,19 @@ end
function send_connection_tune_ok(chan::MessageChannel, channelmax=0, framemax=0, heartbeat=0)
conn = chan.conn

# set channelmax and framemax
(channelmax > 0) && (conn.channelmax = channelmax)
(framemax > 0) && (conn.framemax = framemax)

# negotiate heartbeat (min of what expected by both parties)
if heartbeat > 0 && conn.heartbeat > 0
conn.heartbeat = min(conn.heartbeat, heartbeat)
else
conn.heartbeat = max(conn.heartbeat, heartbeat)
# negotiate (min of what expected by both parties)
function opt(desired_param, limited_param)
if desired_param > 0 && limited_param > 0
min(desired_param, limited_param)
else
max(desired_param, limited_param)
end
end

conn.channelmax = opt(channelmax, conn.channelmax)
conn.framemax = opt(framemax, conn.framemax)
conn.heartbeat = opt(heartbeat, conn.heartbeat)

@debug("send_connection_tune_ok", channelmax=conn.channelmax, framemax=conn.framemax, heartbeat=conn.heartbeat)
send(chan, TAMQPMethodPayload(:Connection, :TuneOk, (conn.channelmax, conn.framemax, conn.heartbeat)))

Expand Down
9 changes: 8 additions & 1 deletion test/runtests.jl
Expand Up @@ -10,7 +10,14 @@ AMQPTestRPC.runtests()

if length(ARGS) > 0
amqps_host = ARGS[1]
AMQPTestCoverage.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure())
virtualhost = ARGS[2]
port = AMQPClient.AMQPS_DEFAULT_PORT

login = ENV["AMQPPLAIN_LOGIN"]
password = ENV["AMQPPLAIN_PASSWORD"]
auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password)

AMQPTestCoverage.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, virtualhost=virtualhost, amqps=amqps_configure(), auth_params=auth_params)
AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true)
AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure())
end
Expand Down

0 comments on commit 9fed71e

Please sign in to comment.