Skip to content
This repository has been archived by the owner on Feb 19, 2020. It is now read-only.

callbacks for success/failure on stream sending #182

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 14 additions & 6 deletions sleekxmpp/basexmpp.py
Expand Up @@ -504,7 +504,7 @@ def make_presence(self, pshow=None, pstatus=None, ppriority=None,
return presence return presence


def send_message(self, mto, mbody, msubject=None, mtype=None, def send_message(self, mto, mbody, msubject=None, mtype=None,
mhtml=None, mfrom=None, mnick=None): mhtml=None, mfrom=None, mnick=None, extra=None):
""" """
Create, initialize, and send a new Create, initialize, and send a new
:class:`~sleekxmpp.stanza.message.Message` stanza. :class:`~sleekxmpp.stanza.message.Message` stanza.
Expand All @@ -519,12 +519,15 @@ def send_message(self, mto, mbody, msubject=None, mtype=None,
be aware that some servers require that the full JID be aware that some servers require that the full JID
of the sender be used. of the sender be used.
:param mnick: Optional nickname of the sender. :param mnick: Optional nickname of the sender.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
""" """
self.make_message(mto, mbody, msubject, mtype, self.make_message(mto, mbody, msubject, mtype,
mhtml, mfrom, mnick).send() mhtml, mfrom, mnick).send(extra=extra)


def send_presence(self, pshow=None, pstatus=None, ppriority=None, def send_presence(self, pshow=None, pstatus=None, ppriority=None,
pto=None, pfrom=None, ptype=None, pnick=None): pto=None, pfrom=None, ptype=None, pnick=None,
extra=None):
""" """
Create, initialize, and send a new Create, initialize, and send a new
:class:`~sleekxmpp.stanza.presence.Presence` stanza. :class:`~sleekxmpp.stanza.presence.Presence` stanza.
Expand All @@ -536,12 +539,15 @@ def send_presence(self, pshow=None, pstatus=None, ppriority=None,
:param ptype: The type of presence, such as ``'subscribe'``. :param ptype: The type of presence, such as ``'subscribe'``.
:param pfrom: The sender of the presence. :param pfrom: The sender of the presence.
:param pnick: Optional nickname of the presence's sender. :param pnick: Optional nickname of the presence's sender.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
""" """
self.make_presence(pshow, pstatus, ppriority, pto, self.make_presence(pshow, pstatus, ppriority, pto,
ptype, pfrom, pnick).send() ptype, pfrom, pnick).send(extra=extra)


def send_presence_subscription(self, pto, pfrom=None, def send_presence_subscription(self, pto, pfrom=None,
ptype='subscribe', pnick=None): ptype='subscribe', pnick=None,
extra=None):
""" """
Create, initialize, and send a new Create, initialize, and send a new
:class:`~sleekxmpp.stanza.presence.Presence` stanza of :class:`~sleekxmpp.stanza.presence.Presence` stanza of
Expand All @@ -551,11 +557,13 @@ def send_presence_subscription(self, pto, pfrom=None,
:param pfrom: The sender of the presence. :param pfrom: The sender of the presence.
:param ptype: The type of presence, such as ``'subscribe'``. :param ptype: The type of presence, such as ``'subscribe'``.
:param pnick: Optional nickname of the presence's sender. :param pnick: Optional nickname of the presence's sender.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
""" """
self.make_presence(ptype=ptype, self.make_presence(ptype=ptype,
pfrom=pfrom, pfrom=pfrom,
pto=JID(pto).bare, pto=JID(pto).bare,
pnick=pnick).send() pnick=pnick).send(extra=extra)


@property @property
def jid(self): def jid(self):
Expand Down
4 changes: 2 additions & 2 deletions sleekxmpp/xmlstream/stanzabase.py
Expand Up @@ -1568,14 +1568,14 @@ def exception(self, e):
log.exception('Error handling {%s}%s stanza', self.namespace, log.exception('Error handling {%s}%s stanza', self.namespace,
self.name) self.name)


def send(self, now=False): def send(self, now=False, extra=None):
"""Queue the stanza to be sent on the XML stream. """Queue the stanza to be sent on the XML stream.


:param bool now: Indicates if the queue should be skipped and the :param bool now: Indicates if the queue should be skipped and the
stanza sent immediately. Useful for stream stanza sent immediately. Useful for stream
initialization. Defaults to ``False``. initialization. Defaults to ``False``.
""" """
self.stream.send(self, now=now) self.stream.send(self, now=now, extra=extra)


def __copy__(self): def __copy__(self):
"""Return a copy of the stanza object that does not share the """Return a copy of the stanza object that does not share the
Expand Down
34 changes: 26 additions & 8 deletions sleekxmpp/xmlstream/xmlstream.py
Expand Up @@ -1150,7 +1150,7 @@ def incoming_filter(self, xml):
""" """
return xml return xml


def send(self, data, mask=None, timeout=None, now=False, use_filters=True): def send(self, data, mask=None, timeout=None, now=False, use_filters=True, extra=None):
"""A wrapper for :meth:`send_raw()` for sending stanza objects. """A wrapper for :meth:`send_raw()` for sending stanza objects.


May optionally block until an expected response is received. May optionally block until an expected response is received.
Expand All @@ -1172,6 +1172,8 @@ def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
applied to the given stanza data. Disabling applied to the given stanza data. Disabling
filters is useful when resending stanzas. filters is useful when resending stanzas.
Defaults to ``True``. Defaults to ``True``.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
""" """
if timeout is None: if timeout is None:
timeout = self.response_timeout timeout = self.response_timeout
Expand Down Expand Up @@ -1199,13 +1201,13 @@ def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
if data is None: if data is None:
return return
str_data = str(data) str_data = str(data)
self.send_raw(str_data, now) self.send_raw(str_data, now, extra=extra)
else: else:
self.send_raw(data, now) self.send_raw(data, now, extra=extra)
if mask is not None: if mask is not None:
return wait_for.wait(timeout) return wait_for.wait(timeout)


def send_xml(self, data, mask=None, timeout=None, now=False): def send_xml(self, data, mask=None, timeout=None, now=False, extra=None):
"""Send an XML object on the stream, and optionally wait """Send an XML object on the stream, and optionally wait
for a response. for a response.


Expand All @@ -1222,19 +1224,23 @@ def send_xml(self, data, mask=None, timeout=None, now=False):
sending the stanza immediately. Useful mainly sending the stanza immediately. Useful mainly
for stream initialization stanzas. for stream initialization stanzas.
Defaults to ``False``. Defaults to ``False``.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
""" """
if timeout is None: if timeout is None:
timeout = self.response_timeout timeout = self.response_timeout
return self.send(tostring(data), mask, timeout, now) return self.send(tostring(data), mask, timeout, now, extra=extra)


def send_raw(self, data, now=False, reconnect=None): def send_raw(self, data, now=False, reconnect=None, extra=None):
"""Send raw data across the stream. """Send raw data across the stream.


:param string data: Any string value. :param string data: Any string value.
:param bool reconnect: Indicates if the stream should be :param bool reconnect: Indicates if the stream should be
restarted if there is an error sending restarted if there is an error sending
the stanza. Used mainly for testing. the stanza. Used mainly for testing.
Defaults to :attr:`auto_reconnect`. Defaults to :attr:`auto_reconnect`.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
""" """
if now: if now:
log.debug("SEND (IMMED): %s", data) log.debug("SEND (IMMED): %s", data)
Expand All @@ -1254,6 +1260,8 @@ def send_raw(self, data, now=False, reconnect=None):
log.debug('SSL error: max retries reached') log.debug('SSL error: max retries reached')
self.exception(serr) self.exception(serr)
log.warning("Failed to send %s", data) log.warning("Failed to send %s", data)
self.event(
'send_failure', (data, extra), direct=True)
if reconnect is None: if reconnect is None:
reconnect = self.auto_reconnect reconnect = self.auto_reconnect
if not self.stop.is_set(): if not self.stop.is_set():
Expand All @@ -1268,12 +1276,15 @@ def send_raw(self, data, now=False, reconnect=None):
except (Socket.error, ssl.SSLError) as serr: except (Socket.error, ssl.SSLError) as serr:
self.event('socket_error', serr, direct=True) self.event('socket_error', serr, direct=True)
log.warning("Failed to send %s", data) log.warning("Failed to send %s", data)
self.event('send_failure', (data, extra), direct=True)
if reconnect is None: if reconnect is None:
reconnect = self.auto_reconnect reconnect = self.auto_reconnect
if not self.stop.is_set(): if not self.stop.is_set():
self.disconnect(reconnect, send_close=False) self.disconnect(reconnect, send_close=False)
else:
self.event('send_success', (data, extra), direct=True)
else: else:
self.send_queue.put(data) self.send_queue.put((data, extra))
return True return True


def _start_thread(self, name, target, track=True): def _start_thread(self, name, target, track=True):
Expand Down Expand Up @@ -1647,10 +1658,11 @@ def _send_thread(self):
self.session_started_event.wait(timeout=0.1) self.session_started_event.wait(timeout=0.1)
if self.__failed_send_stanza is not None: if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza data = self.__failed_send_stanza
extra = None
self.__failed_send_stanza = None self.__failed_send_stanza = None
else: else:
try: try:
data = self.send_queue.get(True, 1) data, extra = self.send_queue.get(True, 1)
except queue.Empty: except queue.Empty:
continue continue
log.debug("SEND: %s", data) log.debug("SEND: %s", data)
Expand All @@ -1671,6 +1683,8 @@ def _send_thread(self):
log.debug('SSL error: max retries reached') log.debug('SSL error: max retries reached')
self.exception(serr) self.exception(serr)
log.warning("Failed to send %s", data) log.warning("Failed to send %s", data)
self.event(
'send_failure', (data, extra), direct=True)
if not self.stop.is_set(): if not self.stop.is_set():
self.disconnect(self.auto_reconnect, self.disconnect(self.auto_reconnect,
send_close=False) send_close=False)
Expand All @@ -1684,11 +1698,15 @@ def _send_thread(self):
except (Socket.error, ssl.SSLError) as serr: except (Socket.error, ssl.SSLError) as serr:
self.event('socket_error', serr, direct=True) self.event('socket_error', serr, direct=True)
log.warning("Failed to send %s", data) log.warning("Failed to send %s", data)
self.event('send_failure', (data, extra), direct=True)
if not self.stop.is_set(): if not self.stop.is_set():
self.__failed_send_stanza = data self.__failed_send_stanza = data
self._end_thread('send') self._end_thread('send')
self.disconnect(self.auto_reconnect, send_close=False) self.disconnect(self.auto_reconnect, send_close=False)
return return
else:
log.debug('send_success event about to be called')
self.event('send_success', (data, extra), direct=True)
except Exception as ex: except Exception as ex:
log.exception('Unexpected error in send thread: %s', ex) log.exception('Unexpected error in send thread: %s', ex)
self.exception(ex) self.exception(ex)
Expand Down