-
Notifications
You must be signed in to change notification settings - Fork 5
/
circuit.py
240 lines (204 loc) · 10.1 KB
/
circuit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
from __future__ import annotations
import asyncio
import datetime as dt
import logging
from collections import deque
from typing import *
from hippolyzer.lib.base.message.message import Block
from hippolyzer.lib.base.message.msgtypes import PacketFlags
from hippolyzer.lib.base.message.udpserializer import UDPMessageSerializer
from hippolyzer.lib.proxy.packets import Direction, ProxiedUDPPacket
from hippolyzer.lib.proxy.message import ProxiedMessage
if TYPE_CHECKING:
from hippolyzer.lib.proxy.region import ProxiedRegion
from hippolyzer.lib.proxy.message_logger import BaseMessageLogger
class ProxiedCircuit:
def __init__(self, near_host, far_host, transport, region: Optional[ProxiedRegion] = None,
socks_transport: Optional[bool] = None):
self.near_host = near_host
self.host = far_host
self.is_alive = True
self.socks_transport = socks_transport
self.transport: Optional[asyncio.DatagramTransport] = transport
self.in_injections = InjectionTracker(0)
self.out_injections = InjectionTracker(0)
self.serializer = UDPMessageSerializer()
self.last_packet_at = dt.datetime.now()
self.region: Optional[ProxiedRegion] = region
message_logger = None
if region:
message_logger = region.session().session_manager.message_logger
self.message_logger: Optional[BaseMessageLogger] = message_logger
def _send_prepared_message(self, message: ProxiedMessage, direction, transport=None):
try:
serialized = self.serializer.serialize(message)
except:
logging.exception(f"Failed to serialize: {message.to_dict()!r}")
raise
if self.message_logger and message.injected:
self.message_logger.log_lludp_message(self.region.session(), self.region, message)
return self.send_datagram(serialized, direction, transport=transport)
def send_datagram(self, data: bytes, direction: Direction, transport=None):
self.last_packet_at = dt.datetime.now()
src_addr, dst_addr = self.host, self.near_host
if direction == Direction.OUT:
src_addr, dst_addr = self.near_host, self.host
packet = ProxiedUDPPacket(src_addr, dst_addr, data, direction)
packet_data = packet.serialize(socks_header=self.socks_transport)
(transport or self.transport).sendto(packet_data, dst_addr)
return packet
def _get_injections(self, direction: Direction):
if direction == Direction.OUT:
return self.out_injections, self.in_injections
return self.in_injections, self.out_injections
def prepare_message(self, message: ProxiedMessage, direction=None):
if message.finalized:
raise RuntimeError(f"Trying to re-send finalized {message!r}")
direction = direction or getattr(message, 'direction')
fwd_injections, reverse_injections = self._get_injections(direction)
# Injected, let's gen an ID
if message.packet_id is None:
message.packet_id = fwd_injections.gen_injectable_id()
message.injected = True
else:
# was_dropped needs the unmodified packet ID
if fwd_injections.was_dropped(message.packet_id) and message.name != "PacketAck":
logging.warning("Attempting to re-send previously dropped %s:%s, did we ack?" %
(message.packet_id, message.name))
message.packet_id = fwd_injections.get_effective_id(message.packet_id)
fwd_injections.track_seen(message.packet_id)
message.finalized = True
if not message.injected:
# This message wasn't injected by the proxy so we need to rewrite packet IDs
# to account for IDs the other parties couldn't have known about.
message.acks = tuple(
reverse_injections.get_original_id(x) for x in message.acks
if not reverse_injections.was_injected(x)
)
if message.name == "PacketAck":
if not self._rewrite_packet_ack(message, reverse_injections):
logging.debug(f"Dropping {direction} ack for injected packets!")
# Let caller know this shouldn't be sent at all, it's strictly ACKs for
# injected packets.
return False
elif message.name == "StartPingCheck":
self._rewrite_start_ping_check(message, fwd_injections)
if not message.acks:
message.send_flags &= ~PacketFlags.ACK
return True
def send_message(self, message: ProxiedMessage, direction=None, transport=None):
direction = direction or getattr(message, 'direction')
if self.prepare_message(message, direction):
return self._send_prepared_message(message, direction, transport)
def _rewrite_packet_ack(self, message: ProxiedMessage, reverse_injections):
new_blocks = []
for block in message["Packets"]:
packet_id = block["ID"]
# This is an ACK for one the proxy injected, don't confuse
# the other side by sending through the ACK
if reverse_injections.was_injected(packet_id):
continue
block["ID"] = reverse_injections.get_original_id(packet_id)
new_blocks.append(block)
# Sending a PacketAck with nothing in it would be suspicious
if not new_blocks:
return False
message["Packets"] = new_blocks
return True
def _rewrite_start_ping_check(self, message: ProxiedMessage, fwd_injections):
orig_id = message["PingID"]["OldestUnacked"]
new_id = fwd_injections.get_effective_id(orig_id)
if orig_id != new_id:
logging.debug("Rewrote oldest unacked %s -> %s" % (orig_id, new_id))
message["PingID"]["OldestUnacked"] = new_id
def drop_message(self, message: ProxiedMessage, orig_direction=None):
if message.finalized:
raise RuntimeError(f"Trying to drop finalized {message!r}")
if message.packet_id is None:
return
orig_direction = orig_direction or message.direction
fwd_injections, reverse_injections = self._get_injections(orig_direction)
fwd_injections.mark_dropped(message.packet_id)
if hasattr(message, 'dropped'):
message.dropped = True
message.finalized = True
# Was sent reliably, tell the other end that we saw it and to shut up.
if message.reliable:
self._send_acks([message.packet_id], ~orig_direction)
# This packet had acks for the other end, send them in a separate PacketAck
effective_acks = tuple(
reverse_injections.get_original_id(x) for x in message.acks
if not reverse_injections.was_injected(x)
)
if effective_acks:
self._send_acks(effective_acks, orig_direction, packet_id=message.packet_id)
def _send_acks(self, to_ack, direction, packet_id=None):
logging.debug("%r acking %r" % (direction, to_ack))
# TODO: maybe tack this onto `.acks` for next message?
packet = ProxiedMessage('PacketAck',
*[Block('Packets', ID=x) for x in to_ack])
packet.packet_id = packet_id
packet.injected = True
packet.direction = direction
self.send_message(packet)
def __repr__(self):
return "<%s %r : %r>" % (self.__class__.__name__, self.near_host, self.host)
class InjectionTracker:
# TODO: WARNING! DOESN'T DEAL WITH PACKET ID WRAPAROUND WHATSOEVER!
# Circuits that last for hundreds of hours can be expected to break.
# Maybe just kill circuit when that happens to prevent silent wonkiness.
def __init__(self, last_seen_id=0, maxlen=10000):
self._packet_id_base = last_seen_id
self._injection_base = 0
self._maxlen = maxlen
self.injections: deque[int] = deque(maxlen=maxlen)
self.dropped: deque[int] = deque(maxlen=maxlen)
def gen_injectable_id(self) -> int:
new_id = self._packet_id_base + 1
if len(self.injections) == self.injections.maxlen:
# ID is about to fall off, old enough we can just add
# it to the base for all Packet ID corrections.
self._injection_base += 1
self.injections.append(new_id)
self.track_seen(new_id)
return new_id
def was_injected(self, packet_id: int):
return packet_id in self.injections
def was_dropped(self, packet_id: int):
return packet_id in self.dropped
def get_effective_id(self, orig_id: int):
new_id = orig_id + self._injection_base
for packet_id in self.injections:
if new_id < packet_id and new_id not in self.injections:
break
new_id += 1
if orig_id != new_id:
logging.debug("Effective corrected %d -> %d" % (orig_id, new_id))
return new_id
def get_original_id(self, effective_id: int):
if effective_id in self.injections:
raise ValueError(f"No original ID for injected packet {effective_id}!")
new_id = effective_id
for packet_id in reversed(self.injections):
if packet_id > new_id:
break
new_id -= 1
new_id -= self._injection_base
if effective_id != new_id:
logging.debug("Orig corrected %d -> %d" % (effective_id, new_id))
return new_id
def track_seen(self, orig_id: int):
# Sent a new packet that's legitimately larger than
# What we were using to generate injected packet IDs.
# Use that instead.
oldest_tracked = self._packet_id_base - self._maxlen
if orig_id > self._packet_id_base:
self._packet_id_base = orig_id
elif oldest_tracked > orig_id:
logging.warning(f"Received VERY old packet ID {orig_id}, likely generated invalid ID.")
def mark_dropped(self, packet_id: int):
if packet_id not in self.dropped:
self.dropped.append(packet_id)
def __repr__(self):
return f"{self.__class__.__name__}(inject_base={self._injection_base}," \
f" packet_base={self._packet_id_base})"