Skip to content

Commit

Permalink
Merge 18953c1 into d8aa6ea
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmaykm committed Jan 15, 2021
2 parents d8aa6ea + 18953c1 commit b59b7ca
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/AMQPClient.jl
@@ -1,6 +1,6 @@
module AMQPClient

import Base: write, read, read!, close, convert, show, isopen
import Base: write, read, read!, close, convert, show, isopen, flush

using Sockets

Expand All @@ -26,7 +26,7 @@ export tx_select, tx_commit, tx_rollback
export basic_qos, basic_consume, basic_cancel, basic_publish, basic_get, basic_ack, basic_reject, basic_recover
export confirm_select
export EXCHANGE_TYPE_DIRECT, EXCHANGE_TYPE_FANOUT, EXCHANGE_TYPE_TOPIC, EXCHANGE_TYPE_HEADERS
export read, read!, close, convert, show
export read, read!, close, convert, show, flush
export Message, set_properties, PERSISTENT, NON_PERSISTENT

end # module
21 changes: 17 additions & 4 deletions src/protocol.jl
Expand Up @@ -191,8 +191,8 @@ mutable struct Connection
heartbeat_time_server::Float64
heartbeat_time_client::Float64

function Connection(virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT)
sendq = Channel{TAMQPGenericFrame}(CONN_MAX_QUEUED)
function Connection(virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT; send_queue_size::Int=CONN_MAX_QUEUED)
sendq = Channel{TAMQPGenericFrame}(send_queue_size)
sendlck = Channel{UInt8}(1)
put!(sendlck, 1)
new(virtualhost, host, port, nothing,
Expand Down Expand Up @@ -248,6 +248,13 @@ mutable struct MessageChannel <: AbstractChannel
end
end

flush(c::MessageChannel) = flush(c.conn)
function flush(c::Connection)
while isready(c.sendq) && (c.sender !== nothing) && !istaskdone(c.sender)
yield()
end
end

sock(c::MessageChannel) = sock(c.conn)
sock(c::Connection) = c.sock

Expand Down Expand Up @@ -492,9 +499,15 @@ function channel(c::Connection, id::Integer, create::Bool; connect_timeout=DEFAU
chan
end

function connection(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, channelmax=AMQPClient.DEFAULT_CHANNELMAX, framemax=0, heartbeat=0, connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT)
function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT,
framemax=0,
heartbeat=0,
send_queue_size::Integer=CONN_MAX_QUEUED,
auth_params=AMQPClient.DEFAULT_AUTH_PARAMS,
channelmax::Integer=AMQPClient.DEFAULT_CHANNELMAX,
connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT)
@debug("connecting", host, port, virtualhost)
conn = AMQPClient.Connection(virtualhost, host, port)
conn = AMQPClient.Connection(virtualhost, host, port; send_queue_size=send_queue_size)
chan = channel(conn, AMQPClient.DEFAULT_CHANNEL, true)

# setup handler for Connection.Start
Expand Down
6 changes: 5 additions & 1 deletion test/test_coverage.jl
Expand Up @@ -20,12 +20,14 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU

# open a connection
testlog("opening connection...")
conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params)
conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params, send_queue_size=512)
@test conn.conn.sendq.sz_max == 512

# open a channel
testlog("opening channel...")
chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
@test chan1.id == 1
@test conn.conn.sendq.sz_max == 512

# test default exchange names
@test default_exchange_name() == ""
Expand Down Expand Up @@ -63,6 +65,8 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
# publish 10 messages
for idx in 1:10
basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1)
flush(chan1)
@test !isready(chan1.conn.sendq)
end

# basic get 10 messages
Expand Down

0 comments on commit b59b7ca

Please sign in to comment.