Skip to content

Commit

Permalink
add AMQPS support
Browse files Browse the repository at this point in the history
This adds AMQPS (TLS) connections support.

The IANA assigned port number for AMQPS is 5671. It is available as the constant `AMQPClient.AMQPS_DEFAULT_PORT`.

An example of making an AMQPS connection:

```julia
conn = connection(; virtualhost="/",
    host = "amqps.example.com",
    port = AMQPFlient.AMQPS_DEFAULT_PORT
    auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>"guest", "PASSWORD"=>"guest"),
    amqps = amqps_configure()
)
```

The `amqps_configure` method can be provided additional parameters for TLS connections:
- cacerts: A CA certificate file (or it's contents) to use for certificate verification.
- verify: Whether to verify server certificate. Default is false if cacerts is not provided and true if it is.
- client_cert and client_key: The client certificate and corresponding private key to use. Default is nothing (no client certificate). Values can either be the file name or certificate/key contents.

```julia
amqps_configure(;
    cacerts = nothing,
    verify = MbedTLS.MBEDTLS_SSL_VERIFY_NONE,
    client_cert = nothing,
    client_key = nothing
)
```
  • Loading branch information
tanmaykm committed Jan 19, 2021
1 parent 31d2e8d commit d057d0b
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 283 deletions.
63 changes: 50 additions & 13 deletions CONNECTIONS.md
Expand Up @@ -3,27 +3,56 @@
More than one connection can be made to a single server, though one is sufficient for most cases.

The IANA assigned port number for AMQP is 5672. It is available as the constant `AMQPClient.AMQP_DEFAULT_PORT`.
The IANA assigned port number for AMQPS is 5671. It is available as the constant `AMQPClient.AMQPS_DEFAULT_PORT`.

The `AMQPPLAIN` authentication mechanism is supported as of now.

````julia
```julia
using AMQPClient

port = AMQPClient.AMQP_DEFAULT_PORT
login = get_userid() # default is usually "guest"
password = get_password() # default is usually "guest"
auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password)

conn = connection(;virtualhost="/", host="localhost", port=port, auth_params=auth_params)
````
conn = connection(; virtualhost="/", host="localhost", port=port, auth_params=auth_params)
```

An example of making an AMQPS connection:

```julia
using AMQPClient

port = AMQPFlient.AMQPS_DEFAULT_PORT
login = get_userid() # default is usually "guest"
password = get_password() # default is usually "guest"
auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password)
amqps = amqps_configure()

conn = connection(; virtualhost="/", host="amqps.example.com", port=port, auth_params=auth_params, amqps=amqps)
```

The `amqps_configure` method can be provided additional parameters for TLS connections:
- cacerts: A CA certificate file (or it's contents) to use for certificate verification.
- verify: Whether to verify server certificate. Default is false if cacerts is not provided and true if it is.
- client_cert and client_key: The client certificate and corresponding private key to use. Default is nothing (no client certificate). Values can either be the file name or certificate/key contents.

```julia
amqps_configure(;
cacerts = nothing,
verify = MbedTLS.MBEDTLS_SSL_VERIFY_NONE,
client_cert = nothing,
client_key = nothing
)
```

Multiple channels can be multiplexed over a single connection. Channels are identified by their numeric id.

An existing channel can be attached to, or a new one created if it does not exist.

Specifying `AMQPClient.UNUSED_CHANNEL` as channel id during creation will automatically assign an unused id.

````julia
```julia
chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true)

# to attach to a channel only if it already exists:
Expand All @@ -33,28 +62,36 @@ chan2 = channel(conn, chanid)
# to specify a channel id and create if it does not exists yet:
chanid = 3
chan3 = channel(conn, chanid, true)
````
```

Channels and connections remain open until they are closed or they run into an error. The server can also initiate a close in some cases.

Channels represent logical multiplexing over a single connection, so closing a connection implicitly closes all its channels.

````julia
```julia
if isopen(conn)
close(conn)
# close is an asynchronous operation. To wait for the negotiation to complete:
AMQPClient.wait_for_state(conn, AMQPClient.CONN_STATE_CLOSED)
end
# an individual channel can be closed similarly too
````
```

The `connection` and `channel` methods can also be used with Julia's do-block syntax, which ensures it's closure when the block exits.

```julia
connection(; virtualhost="/", host="localhost", port=port, auth_params=auth_params) do conn
channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
# use channel
end
end
```

If a channel or connection is closed due to an error or by the server, the `closereason` attribute (type `CloseReason`) of the channel or connection object
may contain the error code and diagnostic message.

````julia
if !isnull(conn.closereason)
reason = get(conn.closereason)
println("Error code: ", reason.code)
println("Message: ", reason.msg)
```julia
if conn.closereason !== nothing
@error("connection has errors", code=conn.closereason.code, message=conn.closereason.msg)
end
````
```
4 changes: 3 additions & 1 deletion Project.toml
Expand Up @@ -3,14 +3,16 @@ uuid = "79c8b4cd-a41a-55fa-907c-fab5288e1383"
keywords = ["amqpclient", "rabbitmq", "amqp", "amqp-client", "message-queue"]
license = "MIT"
desc = "A Julia AMQP (Advanced Message Queuing Protocol) / RabbitMQ Client."
version = "0.3.1"
version = "0.4.0"

[deps]
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
julia = "1"
MbedTLS = "0.6.8, 0.7, 1"

[extras]
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Expand Down
5 changes: 4 additions & 1 deletion src/AMQPClient.jl
Expand Up @@ -3,6 +3,7 @@ module AMQPClient
import Base: write, read, read!, close, convert, show, isopen, flush

using Sockets
using MbedTLS

# Client property info that gets sent to the server on connection startup
const CLIENT_IDENTIFICATION = Dict{String,Any}(
Expand All @@ -15,11 +16,13 @@ include("types.jl")
include("spec.jl")
include("message.jl")
include("auth.jl")
include("buffered_socket.jl")
include("amqps.jl")
include("protocol.jl")
include("convert.jl")
include("show.jl")

export connection, channel, CloseReason
export connection, channel, CloseReason, amqps_configure
export exchange_declare, exchange_delete, exchange_bind, exchange_unbind, default_exchange_name
export queue_declare, queue_bind, queue_unbind, queue_purge, queue_delete
export tx_select, tx_commit, tx_rollback
Expand Down
74 changes: 74 additions & 0 deletions src/amqps.jl
@@ -0,0 +1,74 @@
function default_tls_debug(level, filename, number, msg)
@debug(level, filename, number, msg)
end

function default_tls_rng()
entropy = MbedTLS.Entropy()
rng = MbedTLS.CtrDrbg()
MbedTLS.seed!(rng, entropy)
rng
end

"""
amqps_configure(;
cacerts = nothing,
verify = MbedTLS.MBEDTLS_SSL_VERIFY_NONE,
client_cert = nothing,
client_key = nothing
)
Creates and returns a configuration for making AMQPS connections.
- cacerts: A CA certificate file (or it's contents) to use for certificate verification.
- verify: Whether to verify server certificate. Default is false if cacerts is not provided and true if it is.
- client_cert and client_key: The client certificate and corresponding private key to use. Default is nothing (no client certificate). Values can either be the file name or certificate/key contents.
"""
function amqps_configure(;
rng = default_tls_rng(),
cacerts::Union{String,Nothing} = nothing,
verify::Int64 = (cacerts === nothing) ? MbedTLS.MBEDTLS_SSL_VERIFY_NONE : MbedTLS.MBEDTLS_SSL_VERIFY_REQUIRED,
client_cert::Union{String,Nothing} = nothing,
client_key::Union{String,Nothing} = nothing,
debug::Union{Function,Nothing} = nothing)

conf = MbedTLS.SSLConfig()
MbedTLS.config_defaults!(conf)
MbedTLS.rng!(conf, rng)
(debug === nothing) || MbedTLS.dbg!(conf, debug)

if cacerts !== nothing
if isfile(cacerts)
# if it is a file name instead of certificate contents, read the contents
cacerts = read(cacerts, String)
end
MbedTLS.ca_chain!(conf, MbedTLS.crt_parse(cacerts))
end
MbedTLS.authmode!(conf, verify)

if (client_cert !== nothing) && (client_key !== nothing)
if isfile(client_cert)
# if it is a file name instead of certificate contents, read the contents
client_cert = read(client_cert, String)
end
if isfile(client_key)
client_key = read(client_key, String)
end
key = MbedTLS.PKContext()
MbedTLS.parse_key!(key, client_key)
MbedTLS.own_cert!(conf, MbedTLS.crt_parse(client_cert), key)
end

conf
end

function setup_tls(sock::TCPSocket, hostname::String, ssl_options::MbedTLS.SSLConfig)
@debug("setting up TLS")

ctx = MbedTLS.SSLContext()
MbedTLS.setup!(ctx, ssl_options)
MbedTLS.set_bio!(ctx, sock)
MbedTLS.hostname!(ctx, hostname)
MbedTLS.handshake(ctx)
@debug("TLS setup done")

BufferedTLSSocket(ctx)
end
77 changes: 77 additions & 0 deletions src/buffered_socket.jl
@@ -0,0 +1,77 @@
const TLS_BUSY_READ_SECS = 1
const TLS_BUSY_READ_YIELD_SECS = 0.001
const TLS_READBUFF_SIZE = MbedTLS.MBEDTLS_SSL_MAX_CONTENT_LEN * 5
const TLS_MIN_WRITEBUFF_SIZE = MbedTLS.MBEDTLS_SSL_MAX_CONTENT_LEN
const TCP_MAX_WRITEBUFF_SIZE = 1024*512
const TCP_MIN_WRITEBUFF_SIZE = 1024*64

struct BufferedTLSSocket <: IO
in::IOBuffer # no read lock, single task reads socket and distributes messages to channels
out::IOBuffer
sock::MbedTLS.SSLContext
readbuff::Vector{UInt8}
out_lck::ReentrantLock # protect out::IOBuffer when there are multiple channels on the connection

function BufferedTLSSocket(sock::MbedTLS.SSLContext; readbuff_size::Int=TLS_READBUFF_SIZE)
new(PipeBuffer(), PipeBuffer(), sock, Vector{UInt8}(undef, readbuff_size), ReentrantLock())
end
end

isopen(bio::BufferedTLSSocket) = isopen(bio.sock)
close(bio::BufferedTLSSocket) = close(bio.sock)

function read(bio::BufferedTLSSocket, ::Type{UInt8})
fill_in(bio, 1)
read(bio.in, UInt8)
end

function read(bio::BufferedTLSSocket, T::Union{Type{Int16},Type{UInt16},Type{Int32},Type{UInt32},Type{Int64},Type{UInt64},Type{Int128},Type{UInt128},Type{Float16},Type{Float32},Type{Float64}})
fill_in(bio, sizeof(T))
read(bio.in, T)
end

function read!(bio::BufferedTLSSocket, buff::Vector{UInt8})
fill_in(bio, length(buff))
read!(bio.in, buff)
end

function peek(bio::BufferedTLSSocket, T::Union{Type{Int16},Type{UInt16},Type{Int32},Type{UInt32},Type{Int64},Type{UInt64},Type{Int128},Type{UInt128},Type{Float16},Type{Float32},Type{Float64}})
fill_in(bio, sizeof(T))
peek(bio.in, T)
end

function fill_in(bio::BufferedTLSSocket, atleast::Int)
avail = bytesavailable(bio.in)
if atleast > avail
while (atleast > avail) && isopen(bio.sock)
bytes_read = isreadable(bio.sock) ? readbytes!(bio.sock, bio.readbuff; all=false) : 0
if bytes_read > 0
avail += Base.write_sub(bio.in, bio.readbuff, 1, bytes_read)
else
MbedTLS.wait_for_decrypted_data(bio.sock)
end
end
end
end

function write(bio::BufferedTLSSocket, data::UInt8)
lock(bio.out_lck) do
write(bio.out, data)
end
end
function write(bio::BufferedTLSSocket, data::Union{Int16,UInt16,Int32,UInt32,Int64,UInt64,Int128,UInt128,Float16,Float32,Float64})
lock(bio.out_lck) do
write(bio.out, data)
end
end
function write(bio::BufferedTLSSocket, data::Array)
lock(bio.out_lck) do
write(bio.out, data)
end
end
function flush(bio::BufferedTLSSocket)
lock(bio.out_lck) do
write(bio.sock, take!(bio.out))
end
nothing
end

0 comments on commit d057d0b

Please sign in to comment.