Skip to content

Loading…

callbacks for success/failure on stream sending #182

Open
wants to merge 1 commit into from

2 participants

@ghost

Currently, when sending any kind of stanza, you submit it to the queue and then never know what happened. It would be nice to be able to get callbacks when the stanza is actually sent over the tcp socket (or if it errors out on send). This patch also gives the ability to send extra data along to the callback so that additional state can be passed along.

Obviously, this doesn't guarantee that the message actually made it to the recipient, but it does let us know if it made it to the wire or not (as far as our program is concerned).

@legastero
Collaborator

Interesting. This sounds like something that would be suited for the XEP-0198 plugin, since that lets you know the stanza made it to at least the server.

@ghost

XEP-0198 support would be awesome, I admit. The primary xmpp servers I deal with, though, are gtalk and facebook, neither of which seem to implement that (correct me if I've missed something). I have dialed back my expectations to only contending with the underlying network and its issues. The current socket_error event does not give enough information to determine what exactly failed. Hence if there is a network problem and a message doesn't get sent, there's currently no good way to know what didn't get sent. Therefore, the failure event. I added the success event for symmetry since I'm tracking successful and failed messages in detail.

BTW, I don't necessarily expect you guys to accept this submission. I just wanted to offer it to the project in case it solves problems other people have encountered. My situation may be somewhat unique.

@bear bear added the Enhancement label
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 11, 2012
  1. callbacks for success/failure on stream sending

    Jay Farrimond committed
Showing with 42 additions and 16 deletions.
  1. +14 −6 sleekxmpp/basexmpp.py
  2. +2 −2 sleekxmpp/xmlstream/stanzabase.py
  3. +26 −8 sleekxmpp/xmlstream/xmlstream.py
View
20 sleekxmpp/basexmpp.py
@@ -504,7 +504,7 @@ def make_presence(self, pshow=None, pstatus=None, ppriority=None,
return presence
def send_message(self, mto, mbody, msubject=None, mtype=None,
- mhtml=None, mfrom=None, mnick=None):
+ mhtml=None, mfrom=None, mnick=None, extra=None):
"""
Create, initialize, and send a new
:class:`~sleekxmpp.stanza.message.Message` stanza.
@@ -519,12 +519,15 @@ def send_message(self, mto, mbody, msubject=None, mtype=None,
be aware that some servers require that the full JID
of the sender be used.
:param mnick: Optional nickname of the sender.
+ :param object extra: Extra data to be sent to the callback function
+ for send_success and send_failure messages.
"""
self.make_message(mto, mbody, msubject, mtype,
- mhtml, mfrom, mnick).send()
+ mhtml, mfrom, mnick).send(extra=extra)
def send_presence(self, pshow=None, pstatus=None, ppriority=None,
- pto=None, pfrom=None, ptype=None, pnick=None):
+ pto=None, pfrom=None, ptype=None, pnick=None,
+ extra=None):
"""
Create, initialize, and send a new
:class:`~sleekxmpp.stanza.presence.Presence` stanza.
@@ -536,12 +539,15 @@ def send_presence(self, pshow=None, pstatus=None, ppriority=None,
:param ptype: The type of presence, such as ``'subscribe'``.
:param pfrom: The sender of the presence.
:param pnick: Optional nickname of the presence's sender.
+ :param object extra: Extra data to be sent to the callback function
+ for send_success and send_failure messages.
"""
self.make_presence(pshow, pstatus, ppriority, pto,
- ptype, pfrom, pnick).send()
+ ptype, pfrom, pnick).send(extra=extra)
def send_presence_subscription(self, pto, pfrom=None,
- ptype='subscribe', pnick=None):
+ ptype='subscribe', pnick=None,
+ extra=None):
"""
Create, initialize, and send a new
:class:`~sleekxmpp.stanza.presence.Presence` stanza of
@@ -551,11 +557,13 @@ def send_presence_subscription(self, pto, pfrom=None,
:param pfrom: The sender of the presence.
:param ptype: The type of presence, such as ``'subscribe'``.
:param pnick: Optional nickname of the presence's sender.
+ :param object extra: Extra data to be sent to the callback function
+ for send_success and send_failure messages.
"""
self.make_presence(ptype=ptype,
pfrom=pfrom,
pto=JID(pto).bare,
- pnick=pnick).send()
+ pnick=pnick).send(extra=extra)
@property
def jid(self):
View
4 sleekxmpp/xmlstream/stanzabase.py
@@ -1568,14 +1568,14 @@ def exception(self, e):
log.exception('Error handling {%s}%s stanza', self.namespace,
self.name)
- def send(self, now=False):
+ def send(self, now=False, extra=None):
"""Queue the stanza to be sent on the XML stream.
:param bool now: Indicates if the queue should be skipped and the
stanza sent immediately. Useful for stream
initialization. Defaults to ``False``.
"""
- self.stream.send(self, now=now)
+ self.stream.send(self, now=now, extra=extra)
def __copy__(self):
"""Return a copy of the stanza object that does not share the
View
34 sleekxmpp/xmlstream/xmlstream.py
@@ -1150,7 +1150,7 @@ def incoming_filter(self, xml):
"""
return xml
- def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
+ def send(self, data, mask=None, timeout=None, now=False, use_filters=True, extra=None):
"""A wrapper for :meth:`send_raw()` for sending stanza objects.
May optionally block until an expected response is received.
@@ -1172,6 +1172,8 @@ def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
applied to the given stanza data. Disabling
filters is useful when resending stanzas.
Defaults to ``True``.
+ :param object extra: Extra data to be sent to the callback function
+ for send_success and send_failure messages.
"""
if timeout is None:
timeout = self.response_timeout
@@ -1199,13 +1201,13 @@ def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
if data is None:
return
str_data = str(data)
- self.send_raw(str_data, now)
+ self.send_raw(str_data, now, extra=extra)
else:
- self.send_raw(data, now)
+ self.send_raw(data, now, extra=extra)
if mask is not None:
return wait_for.wait(timeout)
- def send_xml(self, data, mask=None, timeout=None, now=False):
+ def send_xml(self, data, mask=None, timeout=None, now=False, extra=None):
"""Send an XML object on the stream, and optionally wait
for a response.
@@ -1222,12 +1224,14 @@ def send_xml(self, data, mask=None, timeout=None, now=False):
sending the stanza immediately. Useful mainly
for stream initialization stanzas.
Defaults to ``False``.
+ :param object extra: Extra data to be sent to the callback function
+ for send_success and send_failure messages.
"""
if timeout is None:
timeout = self.response_timeout
- return self.send(tostring(data), mask, timeout, now)
+ return self.send(tostring(data), mask, timeout, now, extra=extra)
- def send_raw(self, data, now=False, reconnect=None):
+ def send_raw(self, data, now=False, reconnect=None, extra=None):
"""Send raw data across the stream.
:param string data: Any string value.
@@ -1235,6 +1239,8 @@ def send_raw(self, data, now=False, reconnect=None):
restarted if there is an error sending
the stanza. Used mainly for testing.
Defaults to :attr:`auto_reconnect`.
+ :param object extra: Extra data to be sent to the callback function
+ for send_success and send_failure messages.
"""
if now:
log.debug("SEND (IMMED): %s", data)
@@ -1254,6 +1260,8 @@ def send_raw(self, data, now=False, reconnect=None):
log.debug('SSL error: max retries reached')
self.exception(serr)
log.warning("Failed to send %s", data)
+ self.event(
+ 'send_failure', (data, extra), direct=True)
if reconnect is None:
reconnect = self.auto_reconnect
if not self.stop.is_set():
@@ -1268,12 +1276,15 @@ def send_raw(self, data, now=False, reconnect=None):
except (Socket.error, ssl.SSLError) as serr:
self.event('socket_error', serr, direct=True)
log.warning("Failed to send %s", data)
+ self.event('send_failure', (data, extra), direct=True)
if reconnect is None:
reconnect = self.auto_reconnect
if not self.stop.is_set():
self.disconnect(reconnect, send_close=False)
+ else:
+ self.event('send_success', (data, extra), direct=True)
else:
- self.send_queue.put(data)
+ self.send_queue.put((data, extra))
return True
def _start_thread(self, name, target, track=True):
@@ -1647,10 +1658,11 @@ def _send_thread(self):
self.session_started_event.wait(timeout=0.1)
if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza
+ extra = None
self.__failed_send_stanza = None
else:
try:
- data = self.send_queue.get(True, 1)
+ data, extra = self.send_queue.get(True, 1)
except queue.Empty:
continue
log.debug("SEND: %s", data)
@@ -1671,6 +1683,8 @@ def _send_thread(self):
log.debug('SSL error: max retries reached')
self.exception(serr)
log.warning("Failed to send %s", data)
+ self.event(
+ 'send_failure', (data, extra), direct=True)
if not self.stop.is_set():
self.disconnect(self.auto_reconnect,
send_close=False)
@@ -1684,11 +1698,15 @@ def _send_thread(self):
except (Socket.error, ssl.SSLError) as serr:
self.event('socket_error', serr, direct=True)
log.warning("Failed to send %s", data)
+ self.event('send_failure', (data, extra), direct=True)
if not self.stop.is_set():
self.__failed_send_stanza = data
self._end_thread('send')
self.disconnect(self.auto_reconnect, send_close=False)
return
+ else:
+ log.debug('send_success event about to be called')
+ self.event('send_success', (data, extra), direct=True)
except Exception as ex:
log.exception('Unexpected error in send thread: %s', ex)
self.exception(ex)
Something went wrong with that request. Please try again.