Skip to content

Commit

Permalink
Refactor SpinifyCommandMixin error handling and connection logic
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed May 12, 2024
1 parent 476a70e commit 70353cf
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 24 deletions.
2 changes: 1 addition & 1 deletion lib/src/model/command.dart
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ final class SpinifyRefreshRequest extends SpinifyCommand {
String get type => 'RefreshRequest';

/// Token to refresh.
final String token;
final String? token;
}

/// {@macro command}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/model/reply.dart
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ final class SpinifyConnectResult extends SpinifyReply {
/// Expires
final bool expires;

/// TTL
/// TTL (Time to live)
final DateTime? ttl;

/// Payload
Expand Down
82 changes: 67 additions & 15 deletions lib/src/spinify_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ base mixin SpinifyConnectionMixin
on SpinifyBase, SpinifyCommandMixin, SpinifyStateMixin {
Completer<void>? _readyCompleter;

@protected
@nonVirtual
Timer? _refreshTimer;

@override
Future<void> connect(String url) async {
if (state.url == url) return;
Expand All @@ -178,7 +182,22 @@ base mixin SpinifyConnectionMixin
..onDisconnect = () => _onDisconnect().ignore();

// Prepare connect request.
final request = await _prepareConnectRequest();
final SpinifyConnectRequest request;
{
final token = await config.getToken?.call();
assert(token == null || token.length > 5, 'Spinify JWT is too short');
final payload = await config.getPayload?.call();
request = SpinifyConnectRequest(
id: _getNextCommandId(),
timestamp: DateTime.now(),
token: token,
data: payload,
// TODO(plugfox): Implement subscriptions.
subs: const <String, SpinifySubscribeRequest>{},
name: config.client.name,
version: config.client.version,
);
}

final reply = await _sendCommand<SpinifyConnectResult>(request);
_setState(SpinifyState$Connected(
Expand All @@ -194,6 +213,8 @@ base mixin SpinifyConnectionMixin
data: reply.data,
));

_setUpRefreshConnection();

// Notify ready.
if (!completer.isCompleted) completer.complete();
_readyCompleter = null;
Expand All @@ -204,20 +225,50 @@ base mixin SpinifyConnectionMixin
}
}

Future<SpinifyConnectRequest> _prepareConnectRequest() async {
final token = await config.getToken?.call();
assert(token == null || token.length > 5, 'Spinify JWT is too short');
final payload = await config.getPayload?.call();
return SpinifyConnectRequest(
id: _getNextCommandId(),
timestamp: DateTime.now(),
token: token,
data: payload,
// TODO(plugfox): Implement subscriptions.
subs: const <String, SpinifySubscribeRequest>{},
name: config.client.name,
version: config.client.version,
);
void _setUpRefreshConnection() {
_refreshTimer?.cancel();
if (state
case SpinifyState$Connected(
:String url,
:bool expires,
:DateTime? ttl,
:String? node,
:Duration? pingInterval,
:bool? sendPong,
:String? session,
:List<int>? data,
) when expires && ttl != null) {
final duration = ttl.difference(DateTime.now()) - config.timeout;
if (duration < Duration.zero) {
assert(false, 'Token TTL is too short');
return;
}
_refreshTimer = Timer(duration, () async {
if (!state.isConnected) return;
final token = await config.getToken?.call();
assert(token == null || token.length > 5, 'Spinify JWT is too short');
if (token == null) return;
final request = SpinifyRefreshRequest(
id: _getNextCommandId(),
timestamp: DateTime.now(),
token: token,
);
final reply = await _sendCommand<SpinifyRefreshResult>(request);
_setState(SpinifyState$Connected(
url: url,
client: reply.client,
version: reply.version,
expires: reply.expires,
ttl: reply.ttl,
node: node,
pingInterval: pingInterval,
sendPong: sendPong,
session: session,
data: data,
));
_setUpRefreshConnection();
});
}
}

@override
Expand All @@ -235,6 +286,7 @@ base mixin SpinifyConnectionMixin

@override
Future<void> _onDisconnect() async {
_refreshTimer?.cancel();
_transport = null;
await super._onDisconnect();
}
Expand Down
19 changes: 12 additions & 7 deletions tool/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func Centrifuge() (*centrifuge.Node, error) {
node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
cred, _ := centrifuge.GetCredentials(ctx)
return centrifuge.ConnectReply{
Data: []byte(`{}`),
Data: []byte(`{}`),
ClientSideRefresh: false,
// Subscribe to a personal server-side channel.
Subscriptions: map[string]centrifuge.SubscribeOptions{
"#" + cred.UserID: {
Expand Down Expand Up @@ -129,12 +130,16 @@ func Centrifuge() (*centrifuge.Node, error) {
}()

client.OnRefresh(func(e centrifuge.RefreshEvent, cb centrifuge.RefreshCallback) {
log.Printf("[user %s] connection is going to expire, refreshing", client.UserID())
// Prolong connection lifetime with that server side callback.
// Without notification to client.
cb(centrifuge.RefreshReply{
ExpireAt: time.Now().Unix() + 25,
}, nil)
// Prolong connection lifetime from client-side refresh.
/* if e.ClientSideRefresh {
log.Printf("[user %s] refresh connection from client with token '%s'", client.UserID(), e.Token)
cb(centrifuge.RefreshReply{ExpireAt: time.Now().Unix() + 25}, nil)
} else {
log.Printf("[user %s] connection is going to expire", client.UserID())
} */

log.Printf("[user %s] refresh connection", client.UserID())
cb(centrifuge.RefreshReply{ExpireAt: time.Now().Unix() + 25}, nil)
})

client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
Expand Down

0 comments on commit 70353cf

Please sign in to comment.