Skip to content
Permalink
Browse files
Debug level should always be used for logging since it's a library
  • Loading branch information
Lonami committed Jul 10, 2017
1 parent eab44af commit 1f7ac7118750ed84e2165dce9c6aca2e6ea0c6a4
Showing with 115 additions and 21 deletions.
  1. +94 −0 telethon/extensions/threaded_tcp_client.py
  2. +10 −10 telethon/network/mtproto_sender.py
  3. +3 −3 telethon/telegram_bare_client.py
  4. +8 −8 telethon/telegram_client.py
@@ -0,0 +1,94 @@
import socket
import time
from datetime import datetime, timedelta
from io import BytesIO, BufferedWriter
from threading import Event, Lock, Thread, Condition

from ..errors import ReadCancelledError


class ThreadedTcpClient:
"""The main difference with the TcpClient class is that this one
will spawn a secondary thread that will be constantly reading
from the network and putting everything on another buffer.
"""
def __init__(self, proxy=None):
self.connected = False
self._proxy = proxy
self._recreate_socket()

# Support for multi-threading advantages and safety
self.cancelled = Event() # Has the read operation been cancelled?
self.delay = 0.1 # Read delay when there was no data available
self._lock = Lock()

self._buffer = []
self._read_thread = Thread(target=self._reading_thread, daemon=True)
self._cv = Condition() # Condition Variable

def _recreate_socket(self):
if self._proxy is None:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
import socks
self._socket = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
if type(self._proxy) is dict:
self._socket.set_proxy(**self._proxy)
else: # tuple, list, etc.
self._socket.set_proxy(*self._proxy)

def connect(self, ip, port, timeout):
"""Connects to the specified IP and port number.
'timeout' must be given in seconds
"""
if not self.connected:
self._socket.settimeout(timeout)
self._socket.connect((ip, port))
self._socket.setblocking(False)
self.connected = True

def close(self):
"""Closes the connection"""
if self.connected:
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
self.connected = False
self._recreate_socket()

def write(self, data):
"""Writes (sends) the specified bytes to the connected peer"""
self._socket.sendall(data)

def read(self, size, timeout=timedelta(seconds=5)):
"""Reads (receives) a whole block of 'size bytes
from the connected peer.
A timeout can be specified, which will cancel the operation if
no data has been read in the specified time. If data was read
and it's waiting for more, the timeout will NOT cancel the
operation. Set to None for no timeout
"""
with self._cv:
print('wait for...')
self._cv.wait_for(lambda: len(self._buffer) >= size, timeout=timeout.seconds)
print('got', size)
result, self._buffer = self._buffer[:size], self._buffer[size:]
return result

def _reading_thread(self):
while True:
partial = self._socket.recv(4096)
if len(partial) == 0:
self.connected = False
raise ConnectionResetError(
'The server has closed the connection.')

with self._cv:
print('extended', len(partial))
self._buffer.extend(partial)
self._cv.notify()

def cancel_read(self):
"""Cancels the read operation IF it hasn't yet
started, raising a ReadCancelledError"""
self.cancelled.set()
@@ -100,7 +100,7 @@ def receive(self, request=None, updates=None, **kwargs):
# or, if there is no request, until we read an update
while (request and not request.confirm_received) or \
(not request and not updates):
self._logger.info('Trying to .receive() the request result...')
self._logger.debug('Trying to .receive() the request result...')
seq, body = self.transport.receive(**kwargs)
message, remote_msg_id, remote_seq = self._decode_msg(body)

@@ -114,7 +114,7 @@ def receive(self, request=None, updates=None, **kwargs):
self._pending_receive.remove(request)
except ValueError: pass

self._logger.info('Request result received')
self._logger.debug('Request result received')
self._logger.debug('receive() released the lock')

def receive_updates(self, **kwargs):
@@ -226,10 +226,10 @@ def _process_msg(self, msg_id, sequence, reader, updates):
ack = reader.tgread_object()
for r in self._pending_receive:
if r.request_msg_id in ack.msg_ids:
self._logger.warning('Ack found for the a request')
self._logger.debug('Ack found for the a request')

if self.logging_out:
self._logger.info('Message ack confirmed a request')
self._logger.debug('Message ack confirmed a request')
r.confirm_received = True

return True
@@ -247,7 +247,7 @@ def _process_msg(self, msg_id, sequence, reader, updates):

return True

self._logger.warning('Unknown message: {}'.format(hex(code)))
self._logger.debug('Unknown message: {}'.format(hex(code)))
return False

# endregion
@@ -263,7 +263,7 @@ def _handle_pong(self, msg_id, sequence, reader):
request = next(r for r in self._pending_receive
if r.request_msg_id == received_msg_id)

self._logger.warning('Pong confirmed a request')
self._logger.debug('Pong confirmed a request')
request.confirm_received = True
except StopIteration: pass

@@ -318,8 +318,8 @@ def _handle_bad_msg_notification(self, msg_id, sequence, reader):
# Use the current msg_id to determine the right time offset.
self.session.update_time_offset(correct_msg_id=msg_id)
self.session.save()
self._logger.warning('Read Bad Message error: ' + str(error))
self._logger.info('Attempting to use the correct time offset.')
self._logger.debug('Read Bad Message error: ' + str(error))
self._logger.debug('Attempting to use the correct time offset.')
return True
else:
raise error
@@ -346,7 +346,7 @@ def _handle_rpc_result(self, msg_id, sequence, reader):
self._need_confirmation.append(request_id)
self._send_acknowledges()

self._logger.warning('Read RPC error: %s', str(error))
self._logger.debug('Read RPC error: %s', str(error))
if isinstance(error, InvalidDCError):
# Must resend this request, if any
if request:
@@ -368,7 +368,7 @@ def _handle_rpc_result(self, msg_id, sequence, reader):
else:
# If it's really a result for RPC from previous connection
# session, it will be skipped by the handle_container()
self._logger.warning('Lost request will be skipped.')
self._logger.debug('Lost request will be skipped.')
return False

def _handle_gzip_packed(self, msg_id, sequence, reader, updates):
@@ -90,7 +90,7 @@ def connect(self, exported_auth=None):
determine the authorization key for the current session.
"""
if self._sender and self._sender.is_connected():
self._logger.warning(
self._logger.debug(
'Attempted to connect when the client was already connected.'
)
return
@@ -143,7 +143,7 @@ def connect(self, exported_auth=None):
except (RPCError, ConnectionError) as error:
# Probably errors from the previous session, ignore them
self.disconnect()
self._logger.warning('Could not stabilise initial connection: {}'
self._logger.debug('Could not stabilise initial connection: {}'
.format(error))
return False

@@ -277,7 +277,7 @@ def invoke(self, request, updates=None):
return request.result

except ConnectionResetError:
self._logger.info('Server disconnected us. Reconnecting and '
self._logger.debug('Server disconnected us. Reconnecting and '
'resending request...')
self.reconnect()
return self.invoke(request)
@@ -214,7 +214,7 @@ def invoke(self, request, *args):
return result

except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e:
self._logger.info('DC error when invoking request, '
self._logger.debug('DC error when invoking request, '
'attempting to reconnect at DC {}'
.format(e.new_dc))

@@ -698,7 +698,7 @@ def _set_updates_thread(self, running):
return

# Different state, update the saved value and behave as required
self._logger.info('Changing updates thread running status to %s', running)
self._logger.debug('Changing updates thread running status to %s', running)
if running:
self._updates_thread_running.set()
if not self._updates_thread:
@@ -739,7 +739,7 @@ def _updates_thread_method(self):
updates = self._sender.receive_updates(timeout=timeout)

self._updates_thread_receiving.clear()
self._logger.info(
self._logger.debug(
'Received {} update(s) from the updates thread'
.format(len(updates))
)
@@ -748,25 +748,25 @@ def _updates_thread_method(self):
handler(update)

except ConnectionResetError:
self._logger.info('Server disconnected us. Reconnecting...')
self._logger.debug('Server disconnected us. Reconnecting...')
self.reconnect()

except TimeoutError:
self._logger.debug('Receiving updates timed out')

except ReadCancelledError:
self._logger.info('Receiving updates cancelled')
self._logger.debug('Receiving updates cancelled')

except BrokenPipeError:
self._logger.info('Tcp session is broken. Reconnecting...')
self._logger.debug('Tcp session is broken. Reconnecting...')
self.reconnect()

except InvalidChecksumError:
self._logger.info('MTProto session is broken. Reconnecting...')
self._logger.debug('MTProto session is broken. Reconnecting...')
self.reconnect()

except OSError:
self._logger.warning('OSError on updates thread, %s logging out',
self._logger.debug('OSError on updates thread, %s logging out',
'was' if self._sender.logging_out else 'was not')

if self._sender.logging_out:

1 comment on commit 1f7ac71

@Lonami

This comment has been minimized.

Copy link
Member Author

@Lonami Lonami commented on 1f7ac71 Jul 18, 2017

I didn't mean to introduce ThreadedTcpClient here, I was testing stuff (continuous .receive()) with it…

Please sign in to comment.