Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ mime = "0.3.17"
indexmap = { version = "2.7.0", features = ["serde"] }
cookie = "0.18.0"
arc-swap = "1.7.1"
rquest = { version = "2.1.0", features = ["full"] }
rquest = { version = "2.1.0", features = ["full", "websocket"] }
futures-util = { version = "0.3.0", default-features = false }

[profile.release]
lto = true
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Asynchronous Python HTTP Client with Black Magic, powered by FFI from [rquest](h
- Redirect Policy
- Cookie Store
- HTTP Proxies
- WebSocket Upgrade
- HTTPS via BoringSSL
- Perfectly Chrome, Safari, and Firefox

Expand Down
51 changes: 51 additions & 0 deletions examples/ws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio
import signal
import rnet
from rnet import Message


async def send_message(ws):
for i in range(20):
print(f"Sending: Message {i + 1}")
await ws.send(Message.from_text(f"Message {i + 1}"))
await asyncio.sleep(1)


async def receive_message(ws):
while True:
try:
message = await ws.recv()
print("Received: ", message)
if message.data == b"Message 20":
print("Closing connection...")
break
except asyncio.CancelledError:
break


async def main():
resp = await rnet.websocket("wss://echo.websocket.org")
print("Status Code: ", resp.status)
print("Version: ", resp.version)
print("Headers: ", resp.headers.to_dict())
print("Remote Address: ", resp.remote_addr)

ws = await resp.into_websocket()

send_task = asyncio.create_task(send_message(ws))
receive_task = asyncio.create_task(receive_message(ws))

async def close_ws():
await ws.close()
send_task.cancel()
receive_task.cancel()

loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(close_ws()))

await asyncio.gather(send_task, receive_task)


if __name__ == "__main__":
asyncio.run(main())
206 changes: 206 additions & 0 deletions rnet.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ class Client:
"""
...

def websocket(self, url:builtins.str, **kwds) -> typing.Any:
r"""
Sends a WebSocket request.
"""
...


class ClientParams:
r"""
Expand Down Expand Up @@ -413,6 +419,109 @@ class ImpersonateOS:
...


class Message:
r"""
A WebSocket message.
"""
text: typing.Optional[builtins.str]
close: typing.Optional[tuple[builtins.int, typing.Optional[builtins.str]]]
def __str__(self) -> builtins.str:
r"""
Returns a string representation of the message.

# Returns

A string representing the message.
"""
...

def __repr__(self) -> builtins.str:
r"""
Returns a string representation of the message.

# Returns

A string representing the message.
"""
...

@staticmethod
def from_text(text:builtins.str) -> Message:
r"""
Creates a new text message.

# Arguments

* `text` - The text content of the message.

# Returns

A new `Message` instance containing the text message.
"""
...

@staticmethod
def from_binary(data:typing.Sequence[builtins.int]) -> Message:
r"""
Creates a new binary message.

# Arguments

* `data` - The binary data of the message.

# Returns

A new `Message` instance containing the binary message.
"""
...

@staticmethod
def from_ping(data:typing.Sequence[builtins.int]) -> Message:
r"""
Creates a new ping message.

# Arguments

* `data` - The ping data of the message.

# Returns

A new `Message` instance containing the ping message.
"""
...

@staticmethod
def from_pong(data:typing.Sequence[builtins.int]) -> Message:
r"""
Creates a new pong message.

# Arguments

* `data` - The pong data of the message.

# Returns

A new `Message` instance containing the pong message.
"""
...

@staticmethod
def from_close(code:builtins.int, reason:typing.Optional[builtins.str]=None) -> Message:
r"""
Creates a new close message.

# Arguments

* `code` - The close code.
* `reason` - An optional reason for closing.

# Returns

A new `Message` instance containing the close message.
"""
...


class Method:
r"""
A HTTP method.
Expand Down Expand Up @@ -822,6 +931,80 @@ class Version:
...


class WebSocket:
r"""
A WebSocket connection.
"""
...

class WebSocketParams:
r"""
The parameters for a WebSocket request.

# Examples

```python
import rnet
from rnet import Impersonate, Version

params = rnet.WebSocketParams(
proxy="http://proxy.example.com",
local_address="192.168.1.1",
interface="eth0",
headers={"Content-Type": "application/json"},
auth="Basic dXNlcjpwYXNzd29yZA==",
bearer_auth="Bearer token",
basic_auth=("user", "password"),
query=[("key1", "value1"), ("key2", "value2")]
)

async with rnet.websocket("wss://echo.websocket.org") as ws:
await ws.send("Hello, World!")
message = await ws.recv()
print(message)

asyncio.run(run())
```
"""
proxy: typing.Optional[builtins.str]
interface: typing.Optional[builtins.str]
auth: typing.Optional[builtins.str]
bearer_auth: typing.Optional[builtins.str]
basic_auth: typing.Optional[tuple[builtins.str, typing.Optional[builtins.str]]]
query: typing.Optional[builtins.list[tuple[builtins.str, builtins.str]]]

class WebSocketResponse:
r"""
A WebSocket response.
"""
ok: builtins.bool
status: builtins.int
version: Version
headers: HeaderMap
remote_addr: typing.Optional[SocketAddr]
def peer_certificate(self) -> typing.Optional[builtins.list[builtins.int]]:
r"""
Returns the TLS peer certificate of the response.

# Returns

A Python object representing the TLS peer certificate of the response.
"""
...

def into_websocket(self) -> typing.Any:
r"""
Returns the WebSocket of the response.
"""
...

def close(self) -> None:
r"""
Closes the response connection.
"""
...


def delete(url:builtins.str, **kwds) -> typing.Any:
r"""
Shortcut method to quickly make a `DELETE` request.
Expand Down Expand Up @@ -1018,3 +1201,26 @@ def trace(url:builtins.str, **kwds) -> typing.Any:
"""
...

def websocket(url:builtins.str, **kwds) -> typing.Any:
r"""
Make a WebSocket connection with the given parameters.

This function allows you to make a WebSocket connection with the specified parameters encapsulated in a `WebSocket` object.

# Examples

```python
import rnet
import asyncio

async def run():
async with rnet.websocket("wss://echo.websocket.org") as ws:
await ws.send("Hello, World!")
message = await ws.recv()
print(message)

asyncio.run(run())
```
"""
...

68 changes: 66 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
error::{wrap_invali_header_name_error, wrap_rquest_error},
param::{ClientParams, RequestParams},
response::Response,
param::{ClientParams, RequestParams, WebSocketParams},
response::{Response, WebSocketResponse},
types::Method,
Result,
};
Expand Down Expand Up @@ -581,6 +581,18 @@ impl Client {
let client = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py(py, execute_request(client, method, url, kwds))
}

/// Sends a WebSocket request.
#[pyo3(signature = (url, **kwds))]
pub fn websocket<'rt>(
&self,
py: Python<'rt>,
url: String,
kwds: Option<WebSocketParams>,
) -> PyResult<Bound<'rt, PyAny>> {
let client = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py(py, execute_websocket_request(client, url, kwds))
}
}

/// Executes an HTTP request.
Expand Down Expand Up @@ -672,3 +684,55 @@ async fn execute_request(
.map(Response::from)
.map_err(wrap_rquest_error)
}

/// Executes a WebSocket request.
async fn execute_websocket_request(
client: rquest::Client,
url: String,
mut params: Option<WebSocketParams>,
) -> Result<WebSocketResponse> {
let params = params.get_or_insert_default();
let mut builder = client.websocket(url);

// The protocols to use for the request.
apply_option!(apply_if_some, builder, params.protocols, protocols);

// The origin to use for the request.
builder = builder.with_builder(|mut builder| {
// Network options.
apply_option!(apply_if_some, builder, params.proxy, proxy);
apply_option!(apply_if_some, builder, params.local_address, local_address);
rquest::cfg_bindable_device!(
apply_option!(apply_if_some, builder, params.interface, interface);
);

// Authentication options.
apply_option!(apply_if_some, builder, params.auth, auth);

// Bearer authentication options.
apply_option!(apply_if_some, builder, params.bearer_auth, bearer_auth);

// Basic authentication options.
if let Some(basic_auth) = params.basic_auth.take() {
builder = builder.basic_auth(basic_auth.0, basic_auth.1);
}

// Headers options.
if let Some(headers) = params.headers.take() {
for (key, value) in headers {
builder = builder.header(key, value);
}
}

// Query options.
apply_option!(apply_if_some_ref, builder, params.query, query);

builder
});

builder
.send()
.await
.map(WebSocketResponse::from)
.map_err(wrap_rquest_error)
}
Loading