Skip to content

Commit

Permalink
Handle ping messages
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed May 12, 2024
1 parent 2044c8b commit 56fb2c3
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 12 deletions.
3 changes: 1 addition & 2 deletions lib/src/model/command.dart
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,8 @@ final class SpinifyHistoryRequest extends SpinifyCommand {
final class SpinifyPingRequest extends SpinifyCommand {
/// {@macro command}
const SpinifyPingRequest({
required super.id,
required super.timestamp,
});
}) : super(id: 0);

@override
String get type => 'PingRequest';
Expand Down
15 changes: 15 additions & 0 deletions lib/src/model/reply.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ sealed class SpinifyReply implements Comparable<SpinifyReply> {
String toString() => '$type{id: $id}';
}

/// Server ping message. Server will send this message to client periodically
/// to check if client is still connected.
///
/// Client must respond with async `SpinifyPingRequest{id: 0}` command message.
/// {@macro reply}
final class SpinifyServerPing extends SpinifyReply {
/// {@macro reply}
const SpinifyServerPing({
required super.timestamp,
}) : super(id: 0);

@override
String get type => 'ServerPing';
}

/// Push can be sent to a client as part of Reply in case of bidirectional
/// transport or without additional wrapping in case of unidirectional
/// transports. ProtocolVersion2 uses channel and one of the possible concrete
Expand Down
16 changes: 13 additions & 3 deletions lib/src/protobuf/protobuf_codec.dart
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ final class ProtobufCommandEncoder
token: subRefresh.token,
);
}
assert(() {
print('Command > ${cmd.toProto3Json()}');
return true;
}());
/* final buffer = pb.CodedBufferWriter();
pb.writeToCodedBufferWriter(buffer);
return buffer.toBuffer(); */
Expand All @@ -131,7 +135,10 @@ final class ProtobufReplyDecoder extends Converter<List<int>, SpinifyReply> {
//while (!reader.isAtEnd()) {
final reply = pb.Reply();
reader.readMessage(reply, pb.ExtensionRegistry.EMPTY);
//}
assert(() {
print('Reply < ${reply.toProto3Json()}');
return true;
}());
if (reply.hasPush()) {
return _decodePush(reply.push);
} else if (reply.hasId() && reply.id > 0) {
Expand All @@ -146,9 +153,12 @@ final class ProtobufReplyDecoder extends Converter<List<int>, SpinifyReply> {
temporary: error.temporary,
);
} else {
// TODO(plugfox): Implement ping reply
throw UnimplementedError('Unsupported reply type');
return SpinifyServerPing(
timestamp: DateTime.now(),
);
}
//}
assert(reader.isAtEnd(), 'Data is not fully consumed');
}

/*
Expand Down
13 changes: 11 additions & 2 deletions lib/src/spinify_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,14 @@ base mixin SpinifyCommandMixin on SpinifyBase {
}

Future<void> _sendCommandAsync(SpinifyCommand command) async {
assert(command.id > 0, 'Command ID is not set');
assert(command.id > -1, 'Command ID should be greater or equal to 0');
assert(_transport != null, 'Transport is not connected');
await _transport?.send(command);
}

@override
Future<void> _onReply(SpinifyReply reply) async {
_replies.remove(reply.id)?.complete(reply);
if (reply.id case int id when id > 0) _replies.remove(id)?.complete(reply);
await super._onReply(reply);
}

Expand Down Expand Up @@ -411,6 +411,15 @@ base mixin SpinifyPingPongMixin
_restartPingTimer();
}

@override
Future<void> _onReply(SpinifyReply reply) async {
if (reply is SpinifyServerPing) {
await _sendCommandAsync(SpinifyPingRequest(timestamp: DateTime.now()));
_restartPingTimer();
}
await super._onReply(reply);
}

@override
Future<void> _onDisconnect(({int? code, String? reason}) arg) async {
_tearDownPingTimer();
Expand Down
2 changes: 1 addition & 1 deletion test/smoke/smoke_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ void main() {
expect(client.state, isA<SpinifyState$Disconnected>());
await client.close();
expect(client.state, isA<SpinifyState$Closed>());
});
}, timeout: const Timeout(Duration(minutes: 7)));
});
}
9 changes: 5 additions & 4 deletions tool/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func Centrifuge() (*centrifuge.Node, error) {
select {
case <-client.Context().Done():
return
case <-time.After(5 * time.Second):
case <-time.After(60 * time.Second):
// Periodically send message to client.
err := client.Send([]byte(`{"time": "` + strconv.FormatInt(time.Now().Unix(), 10) + `"}`))
if err != nil {
if err == io.EOF {
Expand All @@ -130,7 +131,7 @@ func Centrifuge() (*centrifuge.Node, error) {
client.OnRefresh(func(e centrifuge.RefreshEvent, cb centrifuge.RefreshCallback) {
log.Printf("[user %s] connection is going to expire, refreshing", client.UserID())
cb(centrifuge.RefreshReply{
ExpireAt: time.Now().Unix() + 10,
ExpireAt: time.Now().Unix() + 25,
}, nil)
})

Expand Down Expand Up @@ -235,7 +236,7 @@ func Centrifuge() (*centrifuge.Node, error) {
log.Printf("error publishing to personal channel: %s", err)
}
i++
time.Sleep(5000 * time.Millisecond)
time.Sleep(1 * time.Minute)
}
}()

Expand All @@ -252,7 +253,7 @@ func Centrifuge() (*centrifuge.Node, error) {
log.Printf("error publishing to channel: %s", err)
}
i++
time.Sleep(10000 * time.Millisecond)
time.Sleep(1 * time.Minute)
}
}()

Expand Down

0 comments on commit 56fb2c3

Please sign in to comment.