Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection was closed by remote server: CONNECTION_FORCED #35

Closed
cp2587 opened this issue Jan 12, 2017 · 10 comments
Closed

Connection was closed by remote server: CONNECTION_FORCED #35

cp2587 opened this issue Jan 12, 2017 · 10 comments

Comments

@cp2587
Copy link
Contributor

cp2587 commented Jan 12, 2017

Hello,

We recently updated the library version to 2.1.3 (from 1.1.7) and we now face several errors we did not have previously. One of them is the following:

    self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/basic.py", line 194, in publish
    self._channel.write_frames(frames_out)
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 326, in write_frames
    self.check_for_errors()
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 169, in check_for_errors
    self._connection.check_for_errors()
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/connection.py", line 155, in check_for_errors
    raise self.exceptions[0]
AMQPConnectionError: Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'

On the rabbitmq server, we have the following logs:
client unexpectedly closed TCP connection

Finally here is the code i use to create an amqp socket (this socket is then used by logging to send log in the queue) :

class AMQPStormSocket(object):

    def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                 queue_is_durable, exchange_type, fallback_call):

        # create connection & channel
        self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
        self.channel = self.connection.channel()

        # create an exchange, if needed
        self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
        # create a queue, if needed
        self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
        # bind it
        self.channel.queue.bind(queue=queue, exchange=exchange)

        # needed when publishing
        self.exchange = exchange

        self.fallback_call = fallback_call

    def sendall(self, data):
        try:
            self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
        except Exception as e:
            self.fallback_call(e)

    def close(self):
        try:
            self.channel.close()
            self.connection.close()
        except Exception:
            pass

Do you have an idea on how to fix these errors ?

@eandersson
Copy link
Owner

Interesting - have you tried downgrading to see if the errors really goes away with 1.1.7?

@eandersson
Copy link
Owner

Technically broker forced connection closure with reason 'shutdown' means that the RabbitMQ server was shutdown. Is RabbitMQ itself maybe crashing?

@cp2587
Copy link
Contributor Author

cp2587 commented Jan 12, 2017

No the instance is completely fine and this error only happens on some of the servers (the one that are running amqp storm inside their celery task actually). I will try to downgrade

@cp2587
Copy link
Contributor Author

cp2587 commented Jan 12, 2017

Also sometimes i have a different error: AMQPConnectionError: Connection was closed by remote server: CHANNEL_ERROR - expected 'channel.open'

@eandersson
Copy link
Owner

eandersson commented Jan 12, 2017

I'll see if I can reproduce this tonight.

Some general questions that could help to narrow this down.

  • What RabbitMQ version are you running?
  • Is there any latency involved (is the server and client on the same host, network etc)?
  • Any additional RabbitMQ logs that you can provide?
  • Is this application multi-threaded?

@cp2587
Copy link
Contributor Author

cp2587 commented Jan 12, 2017

Tank you very much for your help.
I am using rabbitmq 3.6.6. Server are on the same network but not same host (they use a private IP address to communicate).
I am using amqpstorm because of its thread safetyness (running my logger inside sub thread, i need to have the socket thread safe).

Here is the full code for the logger + socket:

class AMQPLogstashHandler(SocketHandler):
    "AMQP Log Format handler

    In case rabbitmq is down, this will retry to use it every 30 sec until it is up again

    :param host: AMQP host (default 'localhost')
    :param port: AMQP port (default 5672)
    :param username: AMQP user name (default 'guest', which is the default for
        RabbitMQ)
    :param password: AMQP password (default 'guest', which is the default for
        RabbitMQ)

    :param exchange: AMQP exchange. Default 'logging.gelf'.
        A queue binding must be defined on the server to prevent
        log messages from being dropped.
    :param exchange_type: AMQP exchange type (default 'fanout').
    :param durable: AMQP exchange is durable (default False)
    :param virtual_host: AMQP virtual host (default '/').

    :param tags: list of tags for a logger (default is None).
    :param message_type: The type of the message (default logstash).
    :param version: version of logstash event schema (default is 0).

    :param extra_fields: Send extra fields on the log record to graylog
        if true (the default)
    :param fqdn: Use fully qualified domain name of localhost as source
        host (socket.getfqdn()).
    :param facility: Replace facility with specified value. If specified,
        record.name will be passed as `logger` parameter.
   "

    def __init__(self, sendgrid_credentials, host='localhost', port=5672, username='guest', password='guest',
                 exchange='logstash', queue='logstashQueue', exchange_type='fanout', virtual_host='/',
                 message_type='logstash', tags=None, durable=True, queue_is_durable=True, extra_fields=True,
                 fqdn=False, facility=None):

        SocketHandler.__init__(self, host, port)

        # AMQP parameters
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.exchange_type = exchange_type
        self.exchange = exchange
        self.exchange_is_durable = durable
        self.queue = queue
        self.queue_is_durable = queue_is_durable
        self.virtual_host = virtual_host

        # retry when rabbitmq gets unavailable
        self.closeOnError = 1

        # Extract Logstash paramaters
        self.tags = tags or []
        self.formatter = WiremindLogstashFormatter(message_type, tags, fqdn)

        # Standard logging parameters
        self.extra_fields = extra_fields
        self.fqdn = fqdn
        self.facility = facility

        # fallback
        self.fallback_handler = WiremindMailHandler(
            mailhost='smtp.sendgrid.net', fromaddr='no-reply@wiremind.fr', toaddrs=['dev@wiremind.fr'],
            subject='logstash critical', credentials=sendgrid_credentials)
        self.fallback_timeout = 10 * 60 # 10 min
        self.fallback_called_time = None

    def makeSocket(self, **kwargs):
        try:
            return AMQPStormSocket(self.host, self.port, self.username, self.password, self.virtual_host, self.exchange,
                                   self.queue, self.exchange_is_durable, self.queue_is_durable,
                                   self.exchange_type, self.fallback_call)
        except Exception as e:
            self.fallback_call(e)

    def makePickle(self, record):
        return self.formatter.format(record)

    def fallback_call(self, exception):
        # Sent an email to alert that something went bad with logstash AMQP
        if not self.fallback_called_time or time.time() - self.fallback_called_time > self.fallback_timeout:
            # condition prevent infinite loop (for instance when redis is not activated
            self.fallback_called_time = time.time()
            fn, lno, func = logger.findCaller()
            msg = 'Failed to connect to logstash AMQP on %s:\n %s' % (socket.gethostname(), exception)
            record = logger.makeRecord(self.name, logging.CRITICAL, fn, lno, msg, [], sys.exc_info(), func, {})
            self.fallback_handler.emit(record)
        raise socket.error(safe_str(exception))  # make sure SocketHandler correctly do its thing


class AMQPStormSocket(object):

    def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                 queue_is_durable, exchange_type, fallback_call):

        # create connection & channel
        self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
        self.channel = self.connection.channel()

        # create an exchange, if needed
        self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
        # create a queue, if needed
        self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
        # bind it
        self.channel.queue.bind(queue=queue, exchange=exchange)

        # needed when publishing
        self.exchange = exchange

        self.fallback_call = fallback_call

    def sendall(self, data):
        try:
            self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
        except Exception as e:
            self.fallback_call(e)

    def close(self):
        try:
            self.channel.close()
            self.connection.close()
        except Exception:
            pass

Handler is initialized by parent thread.

@cp2587
Copy link
Contributor Author

cp2587 commented Jan 19, 2017

Hello,

So i tested using librabbitmq and creating a connection/channel in every subthread but i keep having similar errors so i guess your library is not faulty.

Now i think the problem comes from the fact that after a while rabbitmq close the socket if it did not received any message so i am going to try using the heartbeat functionnality.

Since i am planning to use amqpstorm, i have a few questions for you in order to better understand how i can use it in my user case. Do you mind responding to them here ?
Here is the list:

  • How does amqpstorm handle multihtreading ? Do i create a connection on the parent thread and open new channel in every subthread ? Do i open both connection and channel in the parent ?
  • In my case, i only use basic.publish to push new messages in queue that will later on be consumed to feed my logstash. Since i am logging a lot, i would like to reduce at a minimum the time spend to send theses messages. Do you have any advice on this subject ? Does 'basic.publish' wait for a response for instance ? Can i group messages in a batch and send then all together to reduce network roundtrip ?
  • How does heartbeat functionnality works ? Do i need to do the heartbeat myself ? If no, how do you handle it ?

Thanks for you help and your work on this library.

@eandersson
Copy link
Owner

  • Heartbeats are automatically enabled, but you can configure the interval using heartbeat=X. If you are constantly sending you may want to set this to a high value, e.g. 180.

  • I would also recommend trying to bump the timeout from 1 to something higher like maybe 10.

  • For publishing you can use the same channel for all threads. You may want to have a master thread that manages the connection state though. Something similar to this example. https://github.com/eandersson/amqpstorm/blob/master/examples/scalable_consumer.py

  • The only thing not fully thread-safe is opening and closing connections.

  • You can't batch messages, but maybe there is a way to bundle messages before you send them using basic.publish? For performance you could use no_ack=True, but this could lead to message loss if you are not careful.

I recently moved to a new country so I haven't been able to be as active as I normally would, but I can try to put together a robust example for you in the coming days (or worst case weeks).

@cp2587
Copy link
Contributor Author

cp2587 commented Jan 19, 2017

Don't worry, I am starting to get my head around your code and manage to retry on connection loss so all the questions i have are more out of curiosity than to solve my original problem :p

For the no_ack=True, where do i set this ? I am not sure it is helpful in my case (i debugged and apparently when pushing on the socket a message with basic.publish, rabbitmq server does not write anything back so that's fine)...
Can't i use write_frames (note the 's') to send multiple frame in a single batch ?

PS: Also congratulation on your new position :) Blizzard is it ? That's a really nice company (especially when you are a gamer ahah)

@eandersson
Copy link
Owner

eandersson commented Jan 20, 2017

For the no_ack=True, where do i set this ?

Sorry, for no_ack=True I was actually confusing it with confirm_deliveries which is actually not set by default.

If you call channel.confirm_deliveries() before publishing RabbitMQ would reply every-time you publish a message. It's good for reliability, but at the cost of performance.

Can't i use write_frames (note the 's') to send multiple frame in a single batch ?

You could send multiple frames in a single batch, but would require some custom code.

Maybe something like this, but I haven't had time to test it yet.

import amqpstorm
from pamqp import specification
from pamqp import header as pamqp_header
from amqpstorm.basic import Basic


messages = ['messsage1', 'message2']
exchange = 'hello'
routing_key = 'queue1'

frames = []
for body in messages:
    properties = {}
    body = Basic._handle_utf8_payload(body, properties)
    properties = specification.Basic.Properties(**properties)
    method_frame = specification.Basic.Publish(exchange=exchange,
                                               routing_key=routing_key,
                                               mandatory=False,
                                               immediate=False)
    header_frame = pamqp_header.ContentHeader(body_size=len(body),
                                              properties=properties)

    frames.append(method_frame)
    frames.append(header_frame)
    for body_frame in Basic._create_content_body(body):
        frames.append(body_frame)
        
        
connection = amqpstorm.Connection(....)
channel = connection.channel()
channel.write_frames(frames)

PS: Also congratulation on your new position :) Blizzard is it ? That's a really nice company (especially when you are a gamer ahah)

Haha thanks, I have actually been working here for 8 years! I just moved from the French office, to the US office.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants