Skip to content

Commit

Permalink
little redesign of logging
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbers committed Jun 6, 2018
1 parent b1431b6 commit 0778060
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions mtprotoproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def create_aes_cbc(key, iv):
return AES.new(key, AES.MODE_CBC, iv)

except ImportError:
print("Failed to find pycrypto, using slow AES version", flush=True)
print("Failed to find pycrypto, using slow AES version", flush=True, file=sys.stderr)
import pyaes

def create_aes_ctr(key, iv):
Expand Down Expand Up @@ -112,6 +112,10 @@ def decrypt(self, data):
my_ip_info = {"ipv4": None, "ipv6": None}


def print_err(*params):
print(*params, file=sys.stderr, flush=True)


def init_stats():
global stats
stats = {user: collections.Counter() for user in USERS}
Expand Down Expand Up @@ -196,8 +200,8 @@ def __init__(self, upstream, encryptor, block_size=1):

def write(self, data):
if len(data) % self.block_size != 0:
print("BUG: writing %d bytes not aligned to block size %d" % (
len(data), self.block_size))
print_err("BUG: writing %d bytes not aligned to block size %d" % (
len(data), self.block_size))
return 0
q = self.encryptor.encrypt(data)
return self.upstream.write(q)
Expand All @@ -218,13 +222,13 @@ async def read(self, buf_size):

len_is_bad = (msg_len % len(PADDING_FILLER) != 0)
if not MIN_MSG_LEN <= msg_len <= MAX_MSG_LEN or len_is_bad:
print("msg_len is bad, closing connection", msg_len)
print_err("msg_len is bad, closing connection", msg_len)
return b""

msg_seq_bytes = await self.upstream.readexactly(4)
msg_seq = int.from_bytes(msg_seq_bytes, "little", signed=True)
if msg_seq != self.seq_no:
print("unexpected seq_no")
print_err("unexpected seq_no")
return b""

self.seq_no += 1
Expand Down Expand Up @@ -287,7 +291,7 @@ def write(self, data):
LARGE_PKT_BORGER = 256 ** 3

if len(data) % 4 != 0:
print("BUG: MTProtoFrameStreamWriter attempted to send msg with len %d" % len(msg))
print_err("BUG: MTProtoFrameStreamWriter attempted to send msg with len %d" % len(msg))
return 0

len_div_four = len(data) // 4
Expand All @@ -296,9 +300,9 @@ def write(self, data):
return self.upstream.write(bytes([len_div_four]) + data)
elif len_div_four < LARGE_PKT_BORGER:
return self.upstream.write(b'\x7f' + bytes(int.to_bytes(len_div_four, 3, 'little')) +
data)
data)
else:
print("Attempted to send too large pkt len =", len(data))
print_err("Attempted to send too large pkt len =", len(data))
return 0


Expand All @@ -317,7 +321,7 @@ async def read(self, msg):
return b""

if ans_type != RPC_PROXY_ANS:
print("ans_type != RPC_PROXY_ANS", ans_type)
print_err("ans_type != RPC_PROXY_ANS", ans_type)
return b""

return conn_data
Expand All @@ -326,7 +330,7 @@ async def read(self, msg):
class ProxyReqStreamWriter(LayeredStreamWriterBase):
def __init__(self, upstream, cl_ip, cl_port, my_ip, my_port):
self.upstream = upstream

if ":" not in cl_ip:
self.remote_ip_port = b"\x00" * 10 + b"\xff\xff"
self.remote_ip_port += socket.inet_pton(socket.AF_INET, cl_ip)
Expand All @@ -350,7 +354,7 @@ def write(self, msg):
FOUR_BYTES_ALIGNER = b"\x00\x00\x00"

if len(msg) % 4 != 0:
print("BUG: attempted to send msg with len %d" % len(msg))
print_err("BUG: attempted to send msg with len %d" % len(msg))
return 0

full_msg = bytearray()
Expand Down Expand Up @@ -412,8 +416,10 @@ async def do_direct_handshake(dc_idx, dec_key_and_iv=None):
try:
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", addr, port)
return False
except OSError as E:
print_err("Unable to connect to", addr, port)
return False

while True:
Expand Down Expand Up @@ -507,8 +513,10 @@ async def do_middleproxy_handshake(dc_idx, cl_ip, cl_port):
try:
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", addr, port)
return False
except OSError as E:
print_err("Unable to connect to", addr, port)
return False

writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
Expand Down Expand Up @@ -546,7 +554,7 @@ async def do_middleproxy_handshake(dc_idx, cl_ip, cl_port):
if my_ip_info["ipv4"]:
# prefer global ip settings to work behind NAT
my_ip = my_ip_info["ipv4"]

tg_ip_bytes = socket.inet_pton(socket.AF_INET, tg_ip)[::-1]
my_ip_bytes = socket.inet_pton(socket.AF_INET, my_ip)[::-1]

Expand Down Expand Up @@ -660,10 +668,10 @@ async def connect_reader_to_writer(rd, wr, user):
update_stats(user, octets=len(data))
wr.write(data)
await wr.drain()
except (ConnectionResetError, BrokenPipeError, OSError,
AttributeError, asyncio.streams.IncompleteReadError) as e:
except (ConnectionResetError, BrokenPipeError, OSError, AttributeError,
asyncio.streams.IncompleteReadError, TimeoutError) as e:
wr.close()
# print(e)
# print_err(e)
finally:
update_stats(user, curr_connects_x2=-1)

Expand All @@ -674,7 +682,7 @@ async def connect_reader_to_writer(rd, wr, user):
async def handle_client_wrapper(reader, writer):
try:
await handle_client(reader, writer)
except (asyncio.IncompleteReadError, ConnectionResetError):
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
writer.close()


Expand Down Expand Up @@ -713,7 +721,7 @@ def init_ip_info():
pass

if USE_MIDDLE_PROXY and not my_ip_info["ipv4"]: # and not my_ip_info["ipv6"]:
print("Failed to determine your ip, advertising disabled", flush=True)
print_err("Failed to determine your ip, advertising disabled")
USE_MIDDLE_PROXY = False


Expand Down

0 comments on commit 0778060

Please sign in to comment.