Skip to content
This repository has been archived by the owner on Mar 2, 2022. It is now read-only.

Commit

Permalink
Merge 516104b into cd023b2
Browse files Browse the repository at this point in the history
  • Loading branch information
aagbsn committed Mar 20, 2018
2 parents cd023b2 + 516104b commit 1748cde
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 393 deletions.
104 changes: 0 additions & 104 deletions bwscanner/attacher.py
Original file line number Diff line number Diff line change
@@ -1,113 +1,9 @@
import txtorcon
from twisted.internet import defer, reactor, endpoints
from txtorcon.interface import CircuitListenerMixin, IStreamAttacher, StreamListenerMixin
from zope.interface import implementer

from bwscanner.logger import log


@implementer(IStreamAttacher)
class SOCKSClientStreamAttacher(CircuitListenerMixin, StreamListenerMixin):
"""
An attacher that builds a chosen path for a client identified by
its source port and ip address.
"""

def __init__(self, state):
"""
Instantiates a SOCKSClientStreamAttacher with a
txtorcon.torstate.TorState instance.
"""
self.state = state
self.waiting_circuits = {}
self.expected_streams = {}
self.state.add_stream_listener(self)
self.state.add_circuit_listener(self)

def create_circuit(self, host, port, path, using_guards=False):
"""
Specify the path for streams created on a specific client
SOCKS connection.
Returns a deferred that calls back with the constructed circuit
or errs back with a failure instance.
"""
circ_deferred = defer.Deferred()
key = (str(host), int(port))
self.expected_streams[key] = circ_deferred

def add_to_waiting(circ):
self.waiting_circuits[circ.id] = (circ, circ_deferred)
return circ

circuit_build = self.state.build_circuit(
path, using_guards=using_guards)
circuit_build.addCallback(add_to_waiting)
return circ_deferred

def attach_stream(self, stream, _):
"""
Attaches a NEW stream to the circuit created for it by matching the
source address and source port of the SOCKS client connection to the
corresponding circuit in the expected_streams dictionary.
Returns a deferred that calls back with the appropriate circuit,
or None if there is no matching entry.
Note, Tor can be configured to leave streams unattached by setting
the "__LeaveStreamsUnattached" torrc option to "1".
"""
try:
key = (str(stream.source_addr), int(stream.source_port))
return self.expected_streams.pop(key)
except KeyError:
# We didn't expect this stream, so let Tor handle it
return None

def circuit_built(self, circuit):
"""
Calls back the deferred awaiting the circuit build with the
circuit object.
"""
if circuit.purpose != "GENERAL":
return
try:
(_, circ_deferred) = self.waiting_circuits.pop(circuit.id)
circ_deferred.callback(circuit)
except KeyError:
pass

def circuit_failed(self, circuit, **kw):
"""
Calls the errback of the deferred waiting the circuit build if the
circuit build failed. The failure reason is contained in the circuit
object. The corresponding state in waiting_circuits is removed.
If the circuit failure did not correspond to a circuit requested
by create_circuit, it is ignored.
"""
try:
(circ, circ_deferred) = self.waiting_circuits.pop(circuit.id)
circ_deferred.errback(circ)
except KeyError:
pass


class StreamClosedListener(StreamListenerMixin):
"""
Closes the contained circuit if the listened stream closes.
This StreamListener is used to instruct Tor to close circuits
immediately after a stream completes rather than wait for the
circuit to time out.
"""
def __init__(self, circ):
self.circ = circ

def stream_closed(self, *args, **kw):
self.circ.close(ifUnused=True)


def options_need_new_consensus(tor_config, new_options):
"""
Check if we need to wait for a new consensus after updating
Expand Down
88 changes: 10 additions & 78 deletions bwscanner/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import warnings
import hashlib

from twisted.internet import interfaces, reactor, defer, protocol
from twisted.internet import reactor, defer, protocol
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.web.client import (SchemeNotSupported, Agent, BrowserLikePolicyForHTTPS,
ResponseDone, PotentialDataLoss, PartialDownloadError)
from txsocksx.client import SOCKS5ClientFactory
from txsocksx.tls import TLSWrapClientEndpoint
from zope.interface import implementer
from twisted.web.client import (ResponseDone, PotentialDataLoss, PartialDownloadError)

from bwscanner.logger import log


def fetch(tor_state, path, url):
d = tor_state.build_circuit(path, False)
sport = get_tor_socks_endpoint(tor_state)
d.addCallback(lambda c: c.when_built())
d.addCallback(lambda c: c.web_agent(reactor, sport))
return d.addCallback(lambda a: a.request("GET", url))


def get_tor_socks_endpoint(tor_state):
proxy_endpoint = tor_state.protocol.get_conf("SocksPort")

Expand All @@ -31,78 +35,6 @@ def extract_port_value(result):
return proxy_endpoint


@implementer(interfaces.IStreamClientEndpoint)
class OnionRoutedTCPClientEndpoint(object):
def __init__(self, host, port, state, path):
"""
@param reactor: An L{IReactorTCP} provider
@param host: A hostname, used when connecting
@type host: str
@param port: The port number, used when connecting
@type port: int
@param path: A list of relay identities.
@type path: list
This endpoint will be routed through Tor over a circuit
defined by path.
"""
self.host = host
self.port = port
self.path = path
self.state = state

self.tor_socks_endpoint = get_tor_socks_endpoint(state)

def connect(self, protocol_factory):
"""
Implements L{IStreamClientEndpoint.connect} to connect via TCP, after
SOCKS5 negotiation and Tor circuit construction is done.
"""
proxy_factory = SOCKS5ClientFactory(self.host, self.port, protocol_factory)
self.tor_socks_endpoint.addCallback(lambda end: end.connect(proxy_factory))

def _create_circ(proto):
hp = proto.transport.getHost()
d = self.state._attacher.create_circuit(hp.host, hp.port, self.path)
d.addErrback(proxy_factory.deferred.errback)
return proxy_factory.deferred

return self.tor_socks_endpoint.addCallback(_create_circ)


class OnionRoutedAgent(Agent):
_tlsWrapper = TLSWrapClientEndpoint
_policyForHTTPS = BrowserLikePolicyForHTTPS

def __init__(self, *args, **kw):
self.path = kw.pop('path')
self.state = kw.pop('state')
super(OnionRoutedAgent, self).__init__(*args, **kw)

def _getEndpoint(self, parsedURI, host=None, port=None):
try:
host, port = parsedURI.host, parsedURI.port
scheme = parsedURI.scheme
except AttributeError:
scheme = parsedURI
if scheme not in ('http', 'https'):
raise SchemeNotSupported('unsupported scheme', scheme)
endpoint = OnionRoutedTCPClientEndpoint(host, port, self.state,
self.path)
if scheme == 'https':
if hasattr(self, '_wrapContextFactory'):
tls_policy = self._wrapContextFactory(host, port)
elif hasattr(self, '_policyForHTTPS'):
tls_policy = self._policyForHTTPS().creatorForNetloc(host, port)
else:
raise NotImplementedError("Cannot create a TLS validation policy.")
endpoint = self._tlsWrapper(tls_policy, endpoint)
return endpoint


class hashingReadBodyProtocol(protocol.Protocol):
"""
Protocol that collects data sent to it and hashes it.
Expand Down
22 changes: 9 additions & 13 deletions bwscanner/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
from twisted.web.client import readBody

from bwscanner.logger import log
from bwscanner.attacher import SOCKSClientStreamAttacher
from bwscanner.circuit import TwoHop
from bwscanner.fetcher import OnionRoutedAgent
from bwscanner.fetcher import fetch
from bwscanner.writer import ResultSink

# defer.setDebugging(True)
Expand All @@ -32,6 +31,7 @@ def __init__(self, state, clock, measurement_dir, **kwargs):
this_partition: which partition of circuit we will process
"""
self.state = state
self._socks = None
self.clock = clock
self.measurement_dir = measurement_dir
self.partitions = kwargs.get('partitions', 1)
Expand All @@ -56,9 +56,6 @@ def __init__(self, state, clock, measurement_dir, **kwargs):

self.result_sink = ResultSink(self.measurement_dir, chunk_size=10)

# Add a stream attacher
self.state.set_attacher(SOCKSClientStreamAttacher(self.state), clock)

def now(self):
return time.time()

Expand Down Expand Up @@ -162,14 +159,13 @@ def gotResult(result):
return result
deferred.addBoth(gotResult)

agent = OnionRoutedAgent(self.clock, path=path, state=self.state)
request = agent.request("GET", url)
request.addCallback(readBody)
timeoutDeferred(request, self.request_timeout)
request.addCallbacks(get_circuit_bw)
request.addErrback(circ_failure)
request.addCallback(self.result_sink.send)
return request
d = fetch(self.state, path, url)
d.addCallback(readBody)
timeoutDeferred(d, self.request_timeout)
d.addCallbacks(get_circuit_bw)
d.addErrback(circ_failure)
d.addCallback(self.result_sink.send)
return d

@defer.inlineCallbacks
def get_r_ns_bw(self, router):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ service-identity==16.0.0
stem>=1.4.0
Twisted==16.2.0
txsocksx==1.15.0.2
txtorcon==0.19.3
txtorcon==0.20.0
14 changes: 6 additions & 8 deletions scripts/exitip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import json
import re
import sys
from bwscanner.attacher import SOCKSClientStreamAttacher, start_tor
from bwscanner.circuit import ExitScan
from bwscanner.fetcher import OnionRoutedAgent
from bwscanner.attacher import start_tor
from twisted.internet import defer, reactor, task
from twisted.python import log
from twisted.web.client import readBody
Expand Down Expand Up @@ -57,20 +55,20 @@ def pop(circuits):
def shutdown(ignore):
reactor.stop()

def add_attacher(state):
state.set_attacher(SOCKSClientStreamAttacher(state), reactor)
return state

def setup_failed(failure):
log.err(failure)

def save_results(result, outfile):
outfile.write(json.dumps(dict([r[1] for r in result if r[1] != None])))

@defer.inlineCallbacks
def socks(state):
s = yield get_tor_socks_endpoint(state)
defer.returnValue(s)

def main():
log.startLogging(sys.stdout)
tor = start_tor(TorConfig())
tor.addCallback(add_attacher)
tor.addCallback(run_scan)
tor.addErrback(log.err)
tor.addBoth(shutdown)
Expand Down
5 changes: 1 addition & 4 deletions test/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from twisted.trial import unittest

from bwscanner import circuit
from bwscanner.attacher import SOCKSClientStreamAttacher, connect_to_tor
from bwscanner.attacher import connect_to_tor


class TorTestCase(unittest.TestCase):
Expand All @@ -18,9 +18,6 @@ def setUp(self):
circuit_build_timeout=30,
)

self.attacher = SOCKSClientStreamAttacher(self.tor_state)
yield self.tor_state.set_attacher(self.attacher, reactor)

@property
def routers(self):
return list(set(self.tor_state.routers.values()))
Expand Down
Loading

0 comments on commit 1748cde

Please sign in to comment.