Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to forcefully close a connection. #29

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 24 additions & 8 deletions amqpstorm/channel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""AMQPStorm Connection.Channel."""

import logging
import multiprocessing
from time import sleep

from pamqp import specification as pamqp_spec
Expand Down Expand Up @@ -29,7 +30,7 @@ class Channel(BaseChannel):
"""Connection.channel"""
__slots__ = [
'confirming_deliveries', 'consumer_callback', 'rpc', '_basic',
'_connection', '_exchange', '_inbound', '_queue', '_tx'
'_connection', '_exchange', '_inbound', '_queue', '_tx', '_die'
]

def __init__(self, channel_id, connection, rpc_timeout):
Expand All @@ -44,6 +45,8 @@ def __init__(self, channel_id, connection, rpc_timeout):
self._tx = Tx(self)
self._queue = Queue(self)

self._die = multiprocessing.Value("b", 0)

def __enter__(self):
return self

Expand Down Expand Up @@ -108,6 +111,8 @@ def build_inbound_messages(self, break_on_empty=False, to_tuple=False):
"""
self.check_for_errors()
while not self.is_closed:
if self._die.value != 0:
return
message = self._build_message()
if not message:
if break_on_empty:
Expand All @@ -120,6 +125,11 @@ def build_inbound_messages(self, break_on_empty=False, to_tuple=False):
continue
yield message


def kill(self):
self._die.value = 1
self.set_state(self.CLOSED)

def close(self, reply_code=200, reply_text=''):
"""Close Channel.

Expand Down Expand Up @@ -247,6 +257,9 @@ def process_data_events(self, to_tuple=False):
if not self.consumer_callback:
raise AMQPChannelError('no consumer_callback defined')
for message in self.build_inbound_messages(break_on_empty=True):
if self._die.value != 0:
return

if not to_tuple:
# noinspection PyCallingNonCallable
self.consumer_callback(message)
Expand Down Expand Up @@ -278,10 +291,12 @@ def start_consuming(self, to_tuple=False):

:return:
"""
while not self.is_closed:
self.process_data_events(to_tuple=to_tuple)
if not self.consumer_tags:
while self.consumer_tags:
if self.is_closed:
break
if self._die.value != 0:
break
self.process_data_events(to_tuple=to_tuple)

def stop_consuming(self):
"""Stop consuming messages.
Expand Down Expand Up @@ -339,8 +354,8 @@ def _basic_return(self, frame_in):
"Message not delivered: %s (%s) to queue '%s' from exchange '%s'" %
(
reply_text,
frame_in.reply_code,
frame_in.routing_key,
frame_in.reply_code,
frame_in.routing_key,
frame_in.exchange
)
)
Expand All @@ -353,6 +368,7 @@ def _build_message(self):

:rtype: Message
"""
# print("_build_message call")
with self.lock:
if len(self._inbound) < 2:
return None
Expand All @@ -377,15 +393,15 @@ def _build_message_headers(self):
if not isinstance(basic_deliver, pamqp_spec.Basic.Deliver):
LOGGER.warning(
'Received an out-of-order frame: %s was '
'expecting a Basic.Deliver frame',
'expecting a Basic.Deliver frame',
type(basic_deliver)
)
return None
content_header = self._inbound.pop(0)
if not isinstance(content_header, ContentHeader):
LOGGER.warning(
'Received an out-of-order frame: %s was '
'expecting a ContentHeader frame',
'expecting a ContentHeader frame',
type(content_header)
)
return None
Expand Down
7 changes: 7 additions & 0 deletions amqpstorm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ def close(self):
self.set_state(self.CLOSED)
LOGGER.debug('Connection Closed')

def kill(self):
for channel in self._channels.items():
channel.remove_consumer_tag()

self._io.kill()


def open(self):
"""Open Connection.

Expand Down
18 changes: 15 additions & 3 deletions amqpstorm/io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""AMQPStorm Connection.IO."""

import logging
import traceback
import multiprocessing
import select
import socket
import threading
Expand Down Expand Up @@ -54,14 +56,16 @@ def is_ready(self):
class IO(object):
"""AMQP Connection.io"""

def __init__(self, parameters, exceptions=None, on_read=None):
def __init__(self, parameters, exceptions=None, on_read=None, name=None):
self._exceptions = exceptions
self._lock = threading.Lock()
self._inbound_thread = None
self._on_read = on_read
self._running = threading.Event()
self._running = multiprocessing.Event()
self._die = multiprocessing.Value('b', 0)
self._parameters = parameters
self.data_in = EMPTY_BUFFER
self.name = name
self.poller = None
self.socket = None
self.use_ssl = self._parameters['ssl']
Expand All @@ -84,6 +88,12 @@ def close(self):
finally:
self._lock.release()

def kill(self):
if self._inbound_thread.is_alive():
self._die.value = 1
while self._inbound_thread.is_alive():
self._inbound_thread.join(1)

def open(self):
"""Open Socket and establish a connection.

Expand Down Expand Up @@ -222,6 +232,8 @@ def _process_incoming_data(self):
self.data_in += self._receive()
self.data_in = self._on_read(self.data_in)
sleep(IDLE_WAIT)
if self._die.value == 1:
break

def _receive(self):
"""Receive any incoming socket data.
Expand All @@ -239,7 +251,7 @@ def _receive(self):
except (IOError, OSError) as why:
if why.args[0] not in (EWOULDBLOCK, EAGAIN):
self._exceptions.append(AMQPConnectionError(why))
self._running.clear()
self._running.clear()
return data_in

def _read_from_socket(self):
Expand Down