Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to handle broker restart #24

Open
pshashk opened this issue Nov 2, 2019 · 3 comments
Open

How to handle broker restart #24

pshashk opened this issue Nov 2, 2019 · 3 comments

Comments

@pshashk
Copy link

pshashk commented Nov 2, 2019

Communication via fanout exchange works fine.

using AMQPClient

ENV["JULIA_DEBUG"] = "all"

const HOST = "localhost"
const PORT = 5672
const EXCHANGE = "exchange"
const AUTH = Dict{String,Any}(
    "MECHANISM" => "AMQPLAIN",
    "LOGIN" => "guest",
    "PASSWORD" => "guest"
)

function produce()
    conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
    chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
    success = exchange_declare(chnl, EXCHANGE, EXCHANGE_TYPE_FANOUT)

    () -> begin
        timestamp = round(Int, time())
        msg = Message(Vector{UInt8}(string(timestamp)), content_type="text/plain")
        basic_publish(chnl, msg; exchange=EXCHANGE, routing_key="")
        @info "sent" data=timestamp
    end
end

function consume()
    conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
    chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
    success, queue_name, message_count, consumer_count = queue_declare(chnl, "")
    success = queue_bind(chnl, queue_name, EXCHANGE, "")

    function consumer(msg)
        @info "recieved" data=parse(Int, String(msg.data))
        basic_ack(chnl, msg.delivery_tag)
    end

    success, consumer_tag = basic_consume(chnl, queue_name, consumer)
end

consumer = consume()
producer = produce()

producer()
┌ Info: sent
└   data = 1572692964
┌ Info: recieved
└   data = 1572692964

But when I restart RabbitMQ (docker restart ...) and call producer again julia process hangs and eats all available memory without any error reporting.

The RabbitMQ reliability guide says that in such cases it is necessary to create new connection and channel. How can I do that with AMQPClient?

@pshashk
Copy link
Author

pshashk commented Nov 2, 2019

On the producer side, it seems like a simple state check is enough.

create_connection() = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
create_channel(conn) = channel(conn, AMQPClient.UNUSED_CHANNEL, true)   

mutable struct Producer
    conn
    chnl
    function Producer()   
        conn = create_connection()
        chnl = create_channel(conn)    
        success = exchange_declare(chnl, EXCHANGE, EXCHANGE_TYPE_FANOUT)
        return new(conn, chnl)
    end
end

function (p::Producer)()
    if p.conn.state != AMQPClient.CONN_STATE_OPEN  
        p.conn = create_connection()
        p.chnl = create_channel(p.conn)        
    elseif p.chnl.state != AMQPClient.CONN_STATE_OPEN  
        p.chnl = create_channel(p.conn)
    end
    timestamp = round(Int, time())
    msg = Message(Vector{UInt8}(string(timestamp)), content_type="text/plain")
    basic_publish(p.chnl, msg; exchange=EXCHANGE, routing_key="")
    @info "sent" data=timestamp
end

But messages still don't make it to the consumer. So some connection handling is required there as well.

@pshashk
Copy link
Author

pshashk commented Nov 6, 2019

I've managed to implement consumer that could handle broker restarts. But I doubt that's the best way to do it.

function consume()
    while true
        try
            conn = connection()
            chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
            success = exchange_declare(chnl, EXCHNG, EXCHANGE_TYPE_FANOUT, durable=true)
            success, queue_name, _, _ = queue_declare(
                chnl, ""; exclusive=true, durable=false, auto_delete=true
            )
            success = queue_bind(chnl, queue_name, EXCHNG, ROUTE)
            callback = (msg) -> begin
                @info "recieved" data=String(msg.data)
                basic_ack(chnl, msg.delivery_tag)
            end
            success, consumer_tag = basic_consume(chnl, queue_name, callback)
            fetch(chnl.consumers[consumer_tag].receiver)
        catch e
            @error exception=(e, stacktrace(catch_backtrace()))
            sleep(5)
        end
    end
end

@nsslh
Copy link
Contributor

nsslh commented Jun 25, 2021

@pshashk This is excellent, thanks. My consumer service now restarts gracefully on connection error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants