Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: SOCKS proxy support #51

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions httpcore/__init__.py
Expand Up @@ -36,5 +36,7 @@
"ReadError",
"WriteError",
"CloseError",
"ProtocolError",
"ProxyError",
]
__version__ = "0.7.0"
101 changes: 91 additions & 10 deletions httpcore/_async/connection.py
@@ -1,7 +1,11 @@
from ssl import SSLContext
from typing import Dict, List, Optional, Tuple, Union
from typing import List, Optional, Tuple, Union

from socksio import socks4

from .._backends.auto import AsyncLock, AutoBackend
from .._exceptions import ProxyError
from .._types import URL, Headers, Origin, TimeoutDict
from .base import (
AsyncByteStream,
AsyncHTTPTransport,
Expand All @@ -14,10 +18,7 @@

class AsyncHTTPConnection(AsyncHTTPTransport):
def __init__(
self,
origin: Tuple[bytes, bytes, int],
http2: bool = False,
ssl_context: SSLContext = None,
self, origin: Origin, http2: bool = False, ssl_context: SSLContext = None,
):
self.origin = origin
self.http2 = http2
Expand All @@ -44,10 +45,10 @@ def request_lock(self) -> AsyncLock:
async def request(
self,
method: bytes,
url: Tuple[bytes, bytes, int, bytes],
headers: List[Tuple[bytes, bytes]] = None,
url: URL,
headers: Optional[Headers] = None,
stream: AsyncByteStream = None,
timeout: Dict[str, Optional[float]] = None,
timeout: Optional[TimeoutDict] = None,
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], AsyncByteStream]:
assert url[:3] == self.origin

Expand All @@ -68,7 +69,7 @@ async def request(
assert self.connection is not None
return await self.connection.request(method, url, headers, stream, timeout)

async def _connect(self, timeout: Dict[str, Optional[float]] = None) -> None:
async def _connect(self, timeout: TimeoutDict = None) -> None:
scheme, hostname, port = self.origin
timeout = {} if timeout is None else timeout
ssl_context = self.ssl_context if scheme == b"https" else None
Expand Down Expand Up @@ -99,7 +100,87 @@ def mark_as_ready(self) -> None:
self.connection.mark_as_ready()

async def start_tls(
self, hostname: bytes, timeout: Dict[str, Optional[float]] = None
self, hostname: bytes, timeout: Optional[TimeoutDict] = None
) -> None:
if self.connection is not None:
await self.connection.start_tls(hostname, timeout)


class AsyncSOCKSConnection(AsyncHTTPConnection):
"""An HTTP/1.1 connection with SOCKS proxy negotiation."""

def __init__(
self,
origin: Origin,
proxy_origin: Origin,
socks_version: str,
user_id: bytes = b"httpcore",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required by SOCKS4.

SOCKS5 will require username/password and likely defining acceptable authentication methods. Which makes me think we might want separate AsyncSOCKS4Connection and AsyncSOCKS5Connection.

ssl_context: Optional[SSLContext] = None,
) -> None:
self.origin = origin
self.proxy_origin = proxy_origin
self.ssl_context = SSLContext() if ssl_context is None else ssl_context
self.connection: Union[None, AsyncHTTP11Connection, AsyncHTTP2Connection] = None
self.is_http11 = True
self.is_http2 = False
self.connect_failed = False
self.expires_at: Optional[float] = None
self.backend = AutoBackend()

self.user_id = user_id
self.socks_connection = self._get_socks_connection(socks_version)

def _get_socks_connection(self, socks_version: str) -> socks4.SOCKS4Connection:
if socks_version == "SOCKS4":
return socks4.SOCKS4Connection(user_id=self.user_id)
else:
raise NotImplementedError

async def _connect(self, timeout: Optional[TimeoutDict] = None) -> None:
"""SOCKS4 negotiation prior to creating an HTTP/1.1 connection."""
# First setup the socket to talk to the proxy server
_, hostname, port = self.proxy_origin
timeout = {} if timeout is None else timeout
ssl_context = None
socket = await self.backend.open_tcp_stream(
hostname, port, ssl_context, timeout
)

# Use socksio to negotiate the connection with the remote host
request = socks4.SOCKS4Request.from_address(
socks4.SOCKS4Command.CONNECT, (self.origin[1].decode(), self.origin[2])
)
self.socks_connection.send(request)
bytes_to_send = self.socks_connection.data_to_send()
await socket.write(bytes_to_send, timeout)

# Read the response from the proxy
data = await socket.read(1024, timeout)
event = self.socks_connection.receive_data(data)

# Bail if rejected
if event.reply_code != socks4.SOCKS4ReplyCode.REQUEST_GRANTED:
raise ProxyError(
"Proxy server could not connect to remote host: {}".format(
event.reply_code
)
)

# Otherwise use the socket as usual
# TODO: this is never going to be HTTP/2 at this point
# since we're connected only to the proxy server
# Keeping it for now in case we need to move it
http_version = socket.get_http_version()
if http_version == "HTTP/2":
self.is_http2 = True
self.connection = AsyncHTTP2Connection(socket=socket, backend=self.backend)
else:
self.is_http11 = True
self.connection = AsyncHTTP11Connection(socket=socket)

# Upgrading to TLS needs to happen at request time because SOCKS
# requires the target host and port, as opposed to tunnelling which
# it target independent
scheme = self.origin[0]
if scheme == b"https" and self.ssl_context:
await self.connection.start_tls(hostname, timeout=timeout or {})
83 changes: 68 additions & 15 deletions httpcore/_async/http_proxy.py
@@ -1,16 +1,13 @@
from enum import Enum
from ssl import SSLContext
from typing import Dict, List, Optional, Tuple
from typing import Tuple

from .._exceptions import ProxyError
from .base import AsyncByteStream, AsyncHTTPTransport
from .connection import AsyncHTTPConnection
from .._types import URL, Headers, Origin, TimeoutDict
from .base import AsyncByteStream
from .connection import AsyncHTTPConnection, AsyncSOCKSConnection
from .connection_pool import AsyncConnectionPool, ResponseByteStream

Origin = Tuple[bytes, bytes, int]
URL = Tuple[bytes, bytes, int, bytes]
Headers = List[Tuple[bytes, bytes]]
TimeoutDict = Dict[str, Optional[float]]


async def read_body(stream: AsyncByteStream) -> bytes:
try:
Expand All @@ -19,18 +16,33 @@ async def read_body(stream: AsyncByteStream) -> bytes:
await stream.aclose()


class ProxyModes(Enum):
DEFAULT = "DEFAULT"
FORWARD_ONLY = "FORWARD_ONLY"
TUNNEL_ONLY = "TUNNEL_ONLY"
SOCKS4 = "SOCKS4"
SOCKS4A = "SOCKS4A"
SOCKS5 = "SOCKS5"


class AsyncHTTPProxy(AsyncConnectionPool):
"""
A connection pool for making HTTP requests via an HTTP proxy.

**Parameters:**

* **proxy_origin** - `Tuple[bytes, bytes, int]` - The address of the proxy service as a 3-tuple of (scheme, host, port).
* **proxy_headers** - `Optional[List[Tuple[bytes, bytes]]]` - A list of proxy headers to include.
* **proxy_mode** - `str` - A proxy mode to operate in. May be "DEFAULT", "FORWARD_ONLY", or "TUNNEL_ONLY".
* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections to allow before closing keep-alive connections.
* **proxy_origin** - `Tuple[bytes, bytes, int]` - The address of the proxy
service as a 3-tuple of (scheme, host, port).
* **proxy_headers** - `Optional[List[Tuple[bytes, bytes]]]` - A list of
proxy headers to include.
* **proxy_mode** - `str` - A proxy mode to operate in. One of "DEFAULT",
"FORWARD_ONLY", or "TUNNEL_ONLY".
* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for
verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent
connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections
to allow before closing keep-alive connections.
* **http2** - `bool` - Enable HTTP/2 support.
"""

Expand All @@ -45,7 +57,7 @@ def __init__(
keepalive_expiry: float = None,
http2: bool = False,
):
assert proxy_mode in ("DEFAULT", "FORWARD_ONLY", "TUNNEL_ONLY")
assert ProxyModes(proxy_mode) # TODO: use ProxyModes type of argument
yeraydiazdiaz marked this conversation as resolved.
Show resolved Hide resolved

self.proxy_origin = proxy_origin
self.proxy_headers = [] if proxy_headers is None else proxy_headers
Expand Down Expand Up @@ -76,6 +88,14 @@ async def request(
return await self._forward_request(
method, url, headers=headers, stream=stream, timeout=timeout
)
elif self.proxy_mode == "SOCKS4":
return await self._socks4_request(
method, url, headers=headers, stream=stream, timeout=timeout
)
elif self.proxy_mode == "SOCKS4A":
raise NotImplementedError
elif self.proxy_mode == "SOCKS5":
raise NotImplementedError
else:
# By default HTTPS should be tunnelled.
return await self._tunnel_request(
Expand Down Expand Up @@ -184,3 +204,36 @@ async def _tunnel_request(
response[4], connection=connection, callback=self._response_closed
)
return response[0], response[1], response[2], response[3], wrapped_stream

async def _socks4_request(
self,
method: bytes,
url: URL,
headers: Headers = None,
stream: AsyncByteStream = None,
timeout: TimeoutDict = None,
) -> Tuple[bytes, int, bytes, Headers, AsyncByteStream]:
"""
SOCKS4 requires negotiation with the proxy.
"""
origin = url[:3]
connection = await self._get_connection_from_pool(origin)

if connection is None:
connection = AsyncSOCKSConnection(origin, self.proxy_origin, "SOCKS4")
async with self._thread_lock:
self._connections.setdefault(origin, set())
self._connections[origin].add(connection)

# Issue a forwarded proxy request...

# GET https://www.example.org/path HTTP/1.1
yeraydiazdiaz marked this conversation as resolved.
Show resolved Hide resolved
# [proxy headers]
# [headers]
response = await connection.request(
method, url, headers=headers, stream=stream, timeout=timeout
)
wrapped_stream = ResponseByteStream(
response[4], connection=connection, callback=self._response_closed
)
return response[0], response[1], response[2], response[3], wrapped_stream
86 changes: 76 additions & 10 deletions httpcore/_sync/connection.py
@@ -1,7 +1,10 @@
from ssl import SSLContext
from typing import Dict, List, Optional, Tuple, Union
from typing import List, Optional, Tuple, Union

from socksio import socks4

from .._backends.auto import SyncLock, SyncBackend
from .._types import URL, Headers, Origin, TimeoutDict
from .base import (
SyncByteStream,
SyncHTTPTransport,
Expand All @@ -14,10 +17,7 @@

class SyncHTTPConnection(SyncHTTPTransport):
def __init__(
self,
origin: Tuple[bytes, bytes, int],
http2: bool = False,
ssl_context: SSLContext = None,
self, origin: Origin, http2: bool = False, ssl_context: SSLContext = None,
):
self.origin = origin
self.http2 = http2
Expand All @@ -44,10 +44,10 @@ def request_lock(self) -> SyncLock:
def request(
self,
method: bytes,
url: Tuple[bytes, bytes, int, bytes],
headers: List[Tuple[bytes, bytes]] = None,
url: URL,
headers: Optional[Headers] = None,
stream: SyncByteStream = None,
timeout: Dict[str, Optional[float]] = None,
timeout: Optional[TimeoutDict] = None,
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], SyncByteStream]:
assert url[:3] == self.origin

Expand All @@ -68,7 +68,7 @@ def request(
assert self.connection is not None
return self.connection.request(method, url, headers, stream, timeout)

def _connect(self, timeout: Dict[str, Optional[float]] = None) -> None:
def _connect(self, timeout: TimeoutDict = None) -> None:
scheme, hostname, port = self.origin
timeout = {} if timeout is None else timeout
ssl_context = self.ssl_context if scheme == b"https" else None
Expand Down Expand Up @@ -99,7 +99,73 @@ def mark_as_ready(self) -> None:
self.connection.mark_as_ready()

def start_tls(
self, hostname: bytes, timeout: Dict[str, Optional[float]] = None
self, hostname: bytes, timeout: Optional[TimeoutDict] = None
) -> None:
if self.connection is not None:
self.connection.start_tls(hostname, timeout)


class SyncSOCKSConnection(SyncHTTPConnection):
"""An HTTP/1.1 connection with SOCKS proxy negotiation."""

def __init__(
self,
origin: Origin,
proxy_origin: Origin,
socks_version: str,
user_id: bytes = b"httpcore",
ssl_context: Optional[SSLContext] = None,
) -> None:
self.origin = origin
self.proxy_origin = proxy_origin
self.ssl_context = SSLContext() if ssl_context is None else ssl_context
self.connection: Union[None, SyncHTTP11Connection] = None
self.is_http11 = True
self.is_http2 = False
self.connect_failed = False
self.expires_at: Optional[float] = None
self.backend = SyncBackend()

self.user_id = user_id
self.socks_connection = self._get_socks_connection(socks_version)

def _get_socks_connection(self, socks_version: str) -> socks4.SOCKS4Connection:
if socks_version == "SOCKS4":
return socks4.SOCKS4Connection(user_id=self.user_id)
else:
raise NotImplementedError

def _connect(self, timeout: Optional[TimeoutDict] = None,) -> None:
"""SOCKS4 negotiation prior to creating an HTTP/1.1 connection."""
# First setup the socket to talk to the proxy server
_, hostname, port = self.proxy_origin
timeout = {} if timeout is None else timeout
ssl_context = None
socket = self.backend.open_tcp_stream(
hostname, port, ssl_context, timeout
)

# Use socksio to negotiate the connection with the remote host
request = socks4.SOCKS4Request.from_address(
socks4.SOCKS4Command.CONNECT, (self.origin[1].decode(), self.origin[2])
)
self.socks_connection.send(request)
bytes_to_send = self.socks_connection.data_to_send()
socket.write(bytes_to_send, timeout)

# Read the response from the proxy
data = socket.read(1024, timeout)
event = self.socks_connection.receive_data(data)

# Bail if rejected
if event.reply_code != socks4.SOCKS4ReplyCode.REQUEST_GRANTED:
raise Exception(
"Proxy server could not connect to remote host: {}".format(
event.reply_code
)
)

# Otherwise use the socket as usual
self.connection = SyncHTTP11Connection(
socket=socket, ssl_context=self.ssl_context
)