-
Notifications
You must be signed in to change notification settings - Fork 0
WebSocket
The Razy\WebSocket namespace provides a full RFC 6455 WebSocket implementation — an event-driven Server, a connecting Client, and the low-level Frame and Connection building blocks. All communication uses native PHP stream sockets with no external dependencies.
use Razy\WebSocket\Server;
use Razy\WebSocket\Connection;
use Razy\WebSocket\Frame;
$server = new Server('0.0.0.0', 8080);
$server
->onOpen(fn(Connection $conn) => echo "Connected: {$conn->getId()}\n")
->onMessage(function (Connection $conn, Frame $frame) use ($server) {
echo "Received: {$frame->getPayload()}\n";
// Echo back
$conn->sendText('echo: ' . $frame->getPayload());
// Or broadcast to all
$server->broadcast($frame->getPayload(), exclude: $conn);
})
->onClose(fn(Connection $conn, int $code, string $reason) =>
echo "Disconnected ({$code}): {$reason}\n"
)
->onError(fn(Connection $conn, \Throwable $e) =>
echo "Error: {$e->getMessage()}\n"
);
$server->start();use Razy\WebSocket\Client;
$client = Client::connect('ws://localhost:8080/chat');
$client->sendText('Hello, server!');
$frame = $client->receiveBlocking(10);
echo $frame->getPayload(); // "echo: Hello, server!"
$client->close();| Class | Purpose |
|-------|---------|
| Frame | RFC 6455 frame encode/decode — opcodes, masking, payload length encoding |
| Connection | Stream wrapper — handshake, frame I/O, ping/pong, close protocol |
| Server | Event-driven TCP server with stream_select loop |
| Client | Connects to a remote server, performs handshake, send/receive |
Represents a single WebSocket frame per RFC 6455 §5.2.
| Constant | Value | Description |
|----------|-------|-------------|
| OPCODE_CONTINUATION | 0x0 | Continuation frame |
| OPCODE_TEXT | 0x1 | Text frame (UTF-8) |
| OPCODE_BINARY | 0x2 | Binary frame |
| OPCODE_CLOSE | 0x8 | Connection close |
| OPCODE_PING | 0x9 | Ping frame |
| OPCODE_PONG | 0xA | Pong frame |
new Frame(int $opcode, string $payload = '', bool $fin = true, bool $masked = false)| Method | Description |
|--------|-------------|
| Frame::text(string $payload, bool $mask = false) | Create a text frame |
| Frame::binary(string $payload, bool $mask = false) | Create a binary frame |
| Frame::ping(string $payload = '') | Create a ping frame |
| Frame::pong(string $payload = '') | Create a pong frame |
| Frame::close(int $code = 1000, string $reason = '') | Create a close frame |
| Method | Returns | Description |
|--------|---------|-------------|
| getOpcode() | int | Frame opcode |
| getOpcodeName() | string | Human-readable opcode name |
| getPayload() | string | Raw payload data |
| getPayloadLength() | int | Payload byte length |
| isFin() | bool | True if this is the final fragment |
| isMasked() | bool | True if payload is masked |
| isControl() | bool | True for close, ping, pong |
| isText() | bool | Opcode is TEXT |
| isBinary() | bool | Opcode is BINARY |
| isClose() | bool | Opcode is CLOSE |
| isPing() | bool | Opcode is PING |
| isPong() | bool | Opcode is PONG |
| getCloseCode() | ?int | 2-byte status code from close frame |
| getCloseReason() | string | Reason string from close frame |
| encode(bool $mask = false) | string | Encode to binary wire format |
| Method | Returns | Description |
|--------|---------|-------------|
| Frame::decode(string $buffer) | ?array{Frame, int} | Decode one frame from buffer; returns [frame, bytesConsumed] or null if incomplete |
| Frame::applyMask(string $data, string $maskKey) | string | XOR-mask/unmask per RFC 6455 §5.3 |
use Razy\WebSocket\Frame;
// Encode a text frame (unmasked, for server → client)
$frame = Frame::text('Hello');
$binary = $frame->encode(mask: false);
// Decode a frame from raw bytes
$result = Frame::decode($binary);
if ($result !== null) {
[$decoded, $bytesConsumed] = $result;
echo $decoded->getPayload(); // "Hello"
}Wraps a PHP stream resource and provides bidirectional WebSocket communication — handshake, frame-level read/write, automatic ping/pong, and the close protocol.
new Connection(resource $stream, bool $maskOutput = false)-
$maskOutput→ set totruefor client connections (RFC 6455 §5.3 requires clients to mask outgoing frames)
| Method | Returns | Description |
|--------|---------|-------------|
| performServerHandshake() | bool | Read the client's HTTP Upgrade request, send 101 response |
| completeClientHandshake(string $expectedAccept) | bool | Validate the server's 101 response |
| isHandshakeCompleted() | bool | Whether the opening handshake is done |
| Method | Description |
|--------|-------------|
| sendText(string $text) | Send a UTF-8 text frame |
| sendBinary(string $data) | Send a binary frame |
| sendPing(string $payload = '') | Send a ping frame |
| sendPong(string $payload = '') | Send a pong frame |
| sendClose(int $code = 1000, string $reason = '') | Send a close frame |
| sendFrame(Frame $frame) | Send any raw Frame |
| Method | Returns | Description |
|--------|---------|-------------|
| readFrame() | ?Frame | Read one frame; auto-replies ping with pong, echoes close |
| readFrames() | Frame[] | Read all available frames |
| Method | Returns | Description |
|--------|---------|-------------|
| getId() | string | Unique connection ID |
| getStream() | resource | Underlying stream |
| isOpen() | bool | Stream is open and close not sent |
| isCloseComplete() | bool | Both sides have exchanged close frames |
| getRequestUri() | ?string | URI from the opening GET request |
| getHeaders() | array | HTTP headers from the handshake |
| getHeader(string $name) | ?string | Single header value (case-insensitive) |
| getUserData() | mixed | Retrieve custom user data |
| setUserData(mixed $data) | void | Attach custom user data |
| disconnect() | void | Close the stream |
A single-process, event-driven WebSocket server using stream_socket_server + stream_select.
new Server(string $host = '0.0.0.0', int $port = 8080, int $timeout = 200000)-
$timeout→stream_selecttimeout in microseconds (default 200 ms)
All callbacks return $this for fluent chaining:
| Method | Callback Signature | When Called |
|--------|-------------------|-------------|
| onOpen(Closure $cb) | fn(Connection): void | After a client completes the handshake |
| onMessage(Closure $cb) | fn(Connection, Frame): void | On each text or binary data frame |
| onClose(Closure $cb) | fn(Connection, int, string): void | When a connection is closed (code + reason) |
| onError(Closure $cb) | fn(Connection, Throwable): void | On exception during frame processing |
| onTick(Closure $cb) | fn(Server): void | Once per event-loop iteration |
| Method | Description |
|--------|-------------|
| start() | Bind the socket and enter the event loop |
| stop() | Signal the loop to exit after the current iteration |
| Method | Returns | Description |
|--------|---------|-------------|
| getConnections() | array<string, Connection> | Active connections keyed by ID |
| broadcast(string $text, ?Connection $exclude = null) | void | Send text to all connected clients |
use Razy\WebSocket\Server;
use Razy\WebSocket\Connection;
use Razy\WebSocket\Frame;
$server = new Server('0.0.0.0', 9000);
$server
->onOpen(function (Connection $conn) use ($server) {
$server->broadcast("User {$conn->getId()} joined", $conn);
})
->onMessage(function (Connection $conn, Frame $frame) use ($server) {
$msg = $conn->getId() . ': ' . $frame->getPayload();
$server->broadcast($msg);
})
->onClose(function (Connection $conn, int $code, string $reason) use ($server) {
$server->broadcast("User {$conn->getId()} left");
});
$server->start();$server->onTick(function (Server $srv) {
static $lastPing = 0;
$now = time();
if ($now - $lastPing >= 30) {
foreach ($srv->getConnections() as $conn) {
$conn->sendPing();
}
$lastPing = $now;
}
});Connects to a remote WebSocket server, performs the opening handshake, and provides send/receive methods. Supports both ws:// and wss:// (TLS).
$client = Client::connect(string $url, array $headers = [], int $timeout = 5): Client-
$url→ Full WebSocket URL (ws://host:port/pathorwss://...) -
$headers→ Extra HTTP headers as['Header-Name' => 'value'] -
$timeout→ Connection timeout in seconds
| Method | Returns | Description |
|--------|---------|-------------|
| sendText(string $text) | void | Send a UTF-8 text message |
| sendBinary(string $data) | void | Send a binary message |
| sendPing(string $payload = '') | void | Send a ping frame |
| receive() | ?Frame | Non-blocking receive (returns null if no data) |
| receiveBlocking(int $timeoutSeconds = 30) | ?Frame | Block until a frame arrives or timeout |
| close(int $code = 1000, string $reason = '') | void | Send close frame, read response, disconnect |
| listen(Closure $onMessage, int $pollMs = 50) | void | Run a receive loop until connection closes |
| isOpen() | bool | Whether the connection is still open |
| getConnection() | Connection | Access the underlying Connection |
use Razy\WebSocket\Client;
use Razy\WebSocket\Frame;
$client = Client::connect('ws://localhost:8080/feed');
$client->listen(function (Frame $frame) {
echo "Got: {$frame->getPayload()}\n";
// Return false to stop listening
if ($frame->getPayload() === 'bye') {
return false;
}
});$client = Client::connect('wss://secure.example.com/ws', [
'Authorization' => 'Bearer my-token',
'X-Custom' => 'value',
], timeout: 10);
$client->sendText(json_encode(['action' => 'subscribe', 'channel' => 'updates']));
$frame = $client->receiveBlocking(30);
echo $frame->getPayload();
$client->close();WebSocket servers are long-running processes and cannot run inside a normal HTTP request. Use one of the approaches below to start a server from the command line.
Create a PHP file (e.g. ws-server.php) and run it directly:
<?php
// ws-server.php
require __DIR__ . '/vendor/autoload.php';
use Razy\WebSocket\Server;
use Razy\WebSocket\Connection;
use Razy\WebSocket\Frame;
$host = $argv[1] ?? '0.0.0.0';
$port = (int) ($argv[2] ?? 8080);
$server = new Server($host, $port);
$server
->onOpen(fn(Connection $conn) => echo "[open] {$conn->getId()}\n")
->onMessage(function (Connection $conn, Frame $frame) use ($server) {
$server->broadcast($frame->getPayload(), $conn);
})
->onClose(fn(Connection $conn, int $code, string $reason) =>
echo "[close] {$conn->getId()} ({$code})\n"
);
echo "WebSocket server listening on ws://{$host}:{$port}\n";
$server->start();# Start on port 8080 (default)
php ws-server.php
# Custom bind address and port
php ws-server.php 127.0.0.1 9000
Register a CLI script inside a Razy module so it can be launched via runapp:
1. Register the script in your controller's __onInit:
public function __onInit(Agent $agent): bool
{
$agent->addScript('ws:start', 'script/ws_start');
return true;
}2. Create the handler (controller/script/{module_code}.ws_start.php):
<?php
use Razy\WebSocket\Server;
use Razy\WebSocket\Connection;
use Razy\WebSocket\Frame;
return function (mixed ...$args): void {
$port = (int) ($args[0] ?? 8080);
$server = new Server('0.0.0.0', $port);
$server
->onOpen(fn(Connection $conn) => echo "Connected: {$conn->getId()}\n")
->onMessage(function (Connection $conn, Frame $frame) use ($server) {
$server->broadcast($frame->getPayload(), $conn);
})
->onClose(fn(Connection $conn) => echo "Disconnected: {$conn->getId()}\n");
echo "WebSocket server listening on port {$port}\n";
$server->start();
};3. Launch via runapp:
# Start the interactive shell
php Razy.phar runapp main
# Inside the shell, run the script
> run /ws:start
For production, run the server as a background process or under a process manager:
# Background with nohup (Linux/macOS)
nohup php ws-server.php 0.0.0.0 8080 > ws.log 2>&1 &
# Supervisor (supervisord.conf)
[program:ws-server]
command=php /var/www/myproject/ws-server.php 0.0.0.0 8080
autostart=true
autorestart=true
stderr_logfile=/var/log/ws-server.err.log
stdout_logfile=/var/log/ws-server.out.log
# systemd unit
[Unit]
Description=Razy WebSocket Server
After=network.target
[Service]
ExecStart=/usr/bin/php /var/www/myproject/ws-server.php 0.0.0.0 8080
Restart=always
User=www-data
[Install]
WantedBy=multi-user.target
| Mistake | Why It Fails | Fix |
|---------|-------------|-----|
| Forgetting to mask client frames | RFC 6455 requires client — server masking; server will reject | Use Client::connect() (auto-masks) or new Connection($stream, maskOutput: true) |
| Calling readFrame() on a blocking stream | Blocks indefinitely if no data | Set stream_set_blocking($stream, false) or use receiveBlocking() with a timeout |
| Not handling ping frames | Connection may time out on the remote end | Connection::readFrame() auto-replies ping with pong — just keep reading |
| Ignoring the close handshake | Remote side waits for close echo before dropping TCP | Connection::readFrame() auto-echoes close; or call sendClose() explicitly |
| Using Server::broadcast() before handshake completes | Sends frames to connections with no WebSocket layer | broadcast() already checks isHandshakeCompleted() before sending |
| Scenario | Use |
|----------|-----|
| Real-time push from server only (one-way) | SSE (simpler, HTTP-based) |
| Bidirectional real-time messaging | WebSocket Server + Client |
| Sending HTTP requests to an API | HttpClient |
| Inter-process message passing | SimplifiedMessage over pipes or sockets |
| Background job processing | Queue |
Client Server
│ │
│?→ TCP connect →→→→→→→→→→→→→→→→→→→ │stream_socket_accept()
│ │
→?→ GET / HTTP/1.1 →→→→→→→→→→→→→→→ →performServerHandshake()
│ Upgrade: websocket │
│ Sec-WebSocket-Key: ... │
│ │
│?→→ HTTP/1.1 101 →→→→→→→→→→→→→→→→ │
│ Sec-WebSocket-Accept: ... │
│ │
→→?WebSocket frames →?→?→?→?→?→→? →readFrame() →onMessage()
→?→?→?→?→?→?→?→?→?→?→?→?→?→?→?→?→? →sendText() / broadcast()
│ │
→?→ Close(1000) →→→→→→→→→→→→→→→→→→ →readFrame() auto-echoes close
│?→→ Close(1000) →→→→→→→→→→→→→→→→ │
│ │
→?→ TCP FIN →→→→→→→→→→→→→→→→→→→→→→ →removeConnection()