Skip to content

Commit

Permalink
Remove temporary connections and use a lock again
Browse files Browse the repository at this point in the history
These seem to be the reason for missing some updates (#237)
  • Loading branch information
Lonami committed Jan 6, 2018
1 parent 7745b8e commit d81dd05
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 70 deletions.
11 changes: 6 additions & 5 deletions telethon/network/mtproto_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import gzip
import logging
import struct
from threading import Lock

from .. import helpers as utils
from ..crypto import AES
Expand Down Expand Up @@ -53,6 +54,9 @@ def __init__(self, session, connection):
# Requests (as msg_id: Message) sent waiting to be received
self._pending_receive = {}

# Multithreading
self._send_lock = Lock()

def connect(self):
"""Connects to the server."""
self.connection.connect(self.session.server_address, self.session.port)
Expand All @@ -71,10 +75,6 @@ def disconnect(self):
self._need_confirmation.clear()
self._clear_all_pending()

def clone(self):
"""Creates a copy of this MtProtoSender as a new connection."""
return MtProtoSender(self.session, self.connection.clone())

# region Send and receive

def send(self, *requests):
Expand Down Expand Up @@ -156,7 +156,8 @@ def _send_message(self, message):
:param message: the TLMessage to be sent.
"""
self.connection.send(utils.pack_message(self.session, message))
with self._send_lock:
self.connection.send(utils.pack_message(self.session, message))

def _decode_msg(self, body):
"""
Expand Down
92 changes: 27 additions & 65 deletions telethon/telegram_bare_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,6 @@ def __init__(self, session, api_id, api_hash,
self._spawn_read_thread = spawn_read_thread
self._recv_thread = None

# Identifier of the main thread (the one that called .connect()).
# This will be used to create new connections from any other thread,
# so that requests can be sent in parallel.
self._main_thread_ident = None

# Default PingRequest delay
self._last_ping = datetime.now()
self._ping_delay = timedelta(minutes=1)
Expand Down Expand Up @@ -198,7 +193,6 @@ def connect(self, _sync_updates=True):
__log__.info('Connecting to %s:%d...',
self.session.server_address, self.session.port)

self._main_thread_ident = threading.get_ident()
self._background_error = None # Clear previous errors

try:
Expand Down Expand Up @@ -431,6 +425,9 @@ def __call__(self, *requests, retries=5):
x.content_related for x in requests):
raise TypeError('You can only invoke requests, not types!')

if self._background_error:
raise self._background_error

# For logging purposes
if len(requests) == 1:
which = type(requests[0]).__name__
Expand All @@ -439,66 +436,31 @@ def __call__(self, *requests, retries=5):
len(requests), [type(x).__name__ for x in requests])

# Determine the sender to be used (main or a new connection)
on_main_thread = threading.get_ident() == self._main_thread_ident
if on_main_thread or self._on_read_thread():
__log__.debug('Invoking %s from main thread', which)
sender = self._sender
update_state = self.updates
else:
__log__.debug('Invoking %s from background thread. '
'Creating temporary connection', which)

sender = self._sender.clone()
sender.connect()
# We're on another connection, Telegram will resend all the
# updates that we haven't acknowledged (potentially entering
# an infinite loop if we're calling this in response to an
# update event, as it would be received again and again). So
# to avoid this we will simply not process updates on these
# new temporary connections, as they will be sent and later
# acknowledged over the main connection.
update_state = None

# We should call receive from this thread if there's no background
# thread reading or if the server disconnected us and we're trying
# to reconnect. This is because the read thread may either be
# locked also trying to reconnect or we may be said thread already.
call_receive = not on_main_thread or self._recv_thread is None \
or self._reconnect_lock.locked()
try:
for attempt in range(retries):
if self._background_error and on_main_thread:
raise self._background_error

result = self._invoke(
sender, call_receive, update_state, *requests
)
if result is not None:
return result

__log__.warning('Invoking %s failed %d times, '
'reconnecting and retrying',
[str(x) for x in requests], attempt + 1)
sleep(1)
# The ReadThread has priority when attempting reconnection,
# since this thread is constantly running while __call__ is
# only done sometimes. Here try connecting only once/retry.
if sender == self._sender:
if not self._reconnect_lock.locked():
with self._reconnect_lock:
self._reconnect()
else:
sender.connect()
__log__.debug('Invoking %s', which)

call_receive = self._recv_thread is None or self._reconnect_lock.locked()
for retry in range(retries):
result = self._invoke(call_receive, *requests)
if result is not None:
return result

__log__.warning('Invoking %s failed %d times, '
'reconnecting and retrying',
[str(x) for x in requests], retry + 1)
sleep(1)
# The ReadThread has priority when attempting reconnection,
# since this thread is constantly running while __call__ is
# only done sometimes. Here try connecting only once/retry.
if not self._reconnect_lock.locked():
with self._reconnect_lock:
self._reconnect()

raise RuntimeError('Number of retries reached 0.')
finally:
if sender != self._sender:
sender.disconnect() # Close temporary connections
raise RuntimeError('Number of retries reached 0.')

# Let people use client.invoke(SomeRequest()) instead client(...)
invoke = __call__

def _invoke(self, sender, call_receive, update_state, *requests):
def _invoke(self, call_receive, *requests):
try:
# Ensure that we start with no previous errors (i.e. resending)
for x in requests:
Expand All @@ -523,7 +485,7 @@ def _invoke(self, sender, call_receive, update_state, *requests):
self._wrap_init_connection(GetConfigRequest())
)

sender.send(*requests)
self._sender.send(*requests)

if not call_receive:
# TODO This will be slightly troublesome if we allow
Expand All @@ -532,11 +494,11 @@ def _invoke(self, sender, call_receive, update_state, *requests):
# in which case a Lock would be required for .receive().
for x in requests:
x.confirm_received.wait(
sender.connection.get_timeout()
self._sender.connection.get_timeout()
)
else:
while not all(x.confirm_received.is_set() for x in requests):
sender.receive(update_state=update_state)
self._sender.receive(update_state=self.updates)

except BrokenAuthKeyError:
__log__.error('Authorization key seems broken and was invalid!')
Expand Down Expand Up @@ -578,7 +540,7 @@ def _invoke(self, sender, call_receive, update_state, *requests):
# be on the very first connection (not authorized, not running),
# but may be an issue for people who actually travel?
self._reconnect(new_dc=e.new_dc)
return self._invoke(sender, call_receive, update_state, *requests)
return self._invoke(call_receive, *requests)

except ServerError as e:
# Telegram is having some issues, just retry
Expand Down

0 comments on commit d81dd05

Please sign in to comment.