Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .claude/rules/broadcasting.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ Register via `Echo.addInterceptor()` or `driver.addInterceptor()` in a ServicePr
## ReverbBroadcastDriver

- Implements Pusher-compatible WebSocket protocol (Laravel Reverb, Soketi, etc.)
- `channelFactory` constructor DI parameter overrides WebSocket creation — use for testing without a real server
- Constructor DI: `channelFactory` overrides WebSocket creation, `authFactory` overrides HTTP auth call — both for testing
- Auto-reconnection: exponential backoff `min(500ms × 2^attempt, max_reconnect_delay)` — set `reconnect: false` to disable
- Reconnect resubscription: all channels re-subscribed with `await` after reconnect. Private/presence re-authenticate. `onReconnect` emits only after all resubscriptions complete
- Auth error handling: failures logged via `Log.error()` with channel name, routed through interceptor `onError()` chain. Per-channel try/catch — one failure doesn't block others
- Pusher error codes: 4000–4099 = fatal (no reconnect), 4100–4199 = immediate, 4200–4299 = backoff
- Deduplication: ring buffer of size `dedup_buffer_size` (default 100) fingerprints — suppresses duplicate events on reconnect
- Heartbeat: responds to `pusher:ping` with `pusher:pong` automatically
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file.

## [Unreleased]

### 🐛 Bug Fixes
- **Broadcasting**: Auth failures in private/presence channels now surface via `Log.error()` and interceptor `onError()` chain instead of being silently swallowed. Reconnect resubscribes all channels with `await` — `onReconnect` stream emits only after completion. Per-channel error handling ensures one auth failure does not block other channels. (#45)
- **Database**: `sqlite3.wasm` now loads via absolute URI (`/sqlite3.wasm`) instead of relative — fixes 404s on deep routes when using path URL strategy. (#46)

## [1.0.0-alpha.8] - 2026-04-07

### ✨ Features
Expand Down
8 changes: 6 additions & 2 deletions doc/digging-deeper/broadcasting.md
Original file line number Diff line number Diff line change
Expand Up @@ -476,14 +476,18 @@ Echo.connectionState.listen((state) {
});
```

Re-subscribe to channels after a reconnect using `Echo.onReconnect`:
After a successful reconnect, the driver automatically re-subscribes **all** previously subscribed channels (public, private, and presence). Private and presence channels re-authenticate via the `auth_endpoint`. The `onReconnect` stream emits only after all resubscriptions complete, so listeners can safely assume channels are ready:

```dart
Echo.onReconnect.listen((_) {
Echo.channel('orders').listen('OrderShipped', onShipped);
// All channels are already resubscribed at this point.
// Use this for UI updates or additional logic — not for re-subscribing.
print('Reconnected and all channels restored');
});
```

If a private/presence channel auth fails during reconnect, the error is logged via `Log.error()` and routed through the interceptor `onError()` chain. Other channels continue resubscribing — one failure does not block the rest.

<a name="reconnection-and-heartbeat"></a>
### Reconnection and Heartbeat

Expand Down
85 changes: 68 additions & 17 deletions lib/src/broadcasting/drivers/reverb_broadcast_driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'dart:math';
import 'package:web_socket_channel/web_socket_channel.dart';

import '../../facades/http.dart';
import '../../facades/log.dart';
import '../broadcast_connection_state.dart';
import '../broadcast_event.dart';
import '../contracts/broadcast_channel.dart';
Expand Down Expand Up @@ -60,17 +61,48 @@ class ReverbBroadcastDriver implements BroadcastDriver {
/// [config] contains connection parameters read from
/// `broadcasting.connections.reverb`. [channelFactory] overrides WebSocket
/// creation for testing — defaults to [WebSocketChannel.connect].
/// [authFactory] overrides the HTTP auth call for testing — defaults to
/// calling [Http.post] and returning the response data.
ReverbBroadcastDriver(
this._config, {
WebSocketChannel Function(Uri uri)? channelFactory,
}) : _channelFactory = channelFactory ?? WebSocketChannel.connect;
Future<Map<String, dynamic>> Function(
String endpoint,
Map<String, dynamic> data,
)?
authFactory,
}) : _channelFactory = channelFactory ?? WebSocketChannel.connect,
_authFactory = authFactory ?? _defaultAuthFactory;

// ---------------------------------------------------------------------------
// Dependencies
// ---------------------------------------------------------------------------

final Map<String, dynamic> _config;
final WebSocketChannel Function(Uri uri) _channelFactory;
final Future<Map<String, dynamic>> Function(
String endpoint,
Map<String, dynamic> data,
)
_authFactory;

static Future<Map<String, dynamic>> _defaultAuthFactory(
String endpoint,
Map<String, dynamic> data,
) async {
final response = await Http.post(endpoint, data: data);
final responseData = response.data;

if (responseData is Map<String, dynamic>) {
return responseData;
}

if (responseData is Map) {
return Map<String, dynamic>.from(responseData);
}

return <String, dynamic>{};
}

// ---------------------------------------------------------------------------
// Connection state
Expand Down Expand Up @@ -565,16 +597,17 @@ class ReverbBroadcastDriver implements BroadcastDriver {
final authEndpoint =
_config['auth_endpoint'] as String? ?? '/broadcasting/auth';

final response = await Http.post(
authEndpoint,
data: <String, dynamic>{
'socket_id': _socketId,
'channel_name': channelName,
},
);
final authData = await _authFactory(authEndpoint, <String, dynamic>{
'socket_id': _socketId,
'channel_name': channelName,
});

final authData = response.data;
if (authData is! Map<String, dynamic> || authData['auth'] == null) return;
if (authData['auth'] == null) {
Log.warning(
'Broadcasting auth response malformed for channel: $channelName',
);
return;
}

final subscribeData = <String, dynamic>{
'channel': channelName,
Expand All @@ -587,8 +620,12 @@ class ReverbBroadcastDriver implements BroadcastDriver {
}

_send({'event': 'pusher:subscribe', 'data': subscribeData});
} catch (_) {
// Auth failure — channel will not be subscribed.
} catch (error) {
Log.error('Broadcasting auth failed for channel: $channelName', error);
dynamic processed = error;
for (final interceptor in _interceptors) {
processed = interceptor.onError(processed);
}
}
}

Expand Down Expand Up @@ -653,11 +690,19 @@ class ReverbBroadcastDriver implements BroadcastDriver {

await connect();

// Resubscribe all channels.
for (final entry in _channels.entries) {
final name = entry.key;
// Resubscribe all channels. Snapshot keys to avoid concurrent
// modification if a handler modifies _channels during iteration.
for (final name in _channels.keys.toList()) {
// Skip channels that were left after the snapshot was taken.
if (!_channels.containsKey(name)) continue;

if (name.startsWith('presence-') || name.startsWith('private-')) {
_authenticateAndSubscribe(name);
try {
await _authenticateAndSubscribe(name);
} catch (_) {
// Per-channel failure — continue to next channel.
// Auth errors are already logged in _authenticateAndSubscribe.
}
} else {
_send({
'event': 'pusher:subscribe',
Expand All @@ -667,7 +712,13 @@ class ReverbBroadcastDriver implements BroadcastDriver {
}
Comment thread
anilcancakir marked this conversation as resolved.

_onReconnectController.add(null);
} catch (_) {
} catch (error) {
// Route through interceptor chain before scheduling retry.
dynamic processed = error;
for (final interceptor in _interceptors) {
processed = interceptor.onError(processed);
}
Log.error('Reconnect failed', error);
_scheduleReconnect();
}
});
Expand Down
2 changes: 1 addition & 1 deletion lib/src/database/connectors/connection_factory_web.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ConnectionFactory implements ConnectionFactoryContract {
final dbName = config['database'] as String? ?? 'magic_app.db';

// Load WASM if not already loaded
_sqlite3 ??= await WasmSqlite3.loadFromUrl(Uri.parse('sqlite3.wasm'));
_sqlite3 ??= await WasmSqlite3.loadFromUrl(Uri.parse('/sqlite3.wasm'));

// Setup IndexedDB file system if not already setup
if (_fileSystem == null) {
Expand Down
6 changes: 5 additions & 1 deletion skills/magic-framework/references/secondary-systems.md
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,11 @@ Config keys under `broadcasting.connections.reverb`:
| `max_reconnect_delay` | `30000` | Max backoff delay in ms |
| `dedup_buffer_size` | `100` | Ring buffer size for deduplication |

The `channelFactory` constructor parameter overrides WebSocket creation for testing (dependency injection).
Constructor DI parameters for testing:
- `channelFactory` — overrides WebSocket creation (inject mock channels)
- `authFactory` — overrides the HTTP auth call for private/presence channels (inject mock auth responses)

Auth failures in `_authenticateAndSubscribe()` are logged via `Log.error()` and routed through the interceptor `onError()` chain. On reconnect, all channels are re-subscribed with `await` — `onReconnect` emits only after completion. Per-channel error handling ensures partial failures don't block other channels.

### NullBroadcastDriver

Expand Down
Loading