diff --git a/packages/stream_core/build.yaml b/packages/stream_core/build.yaml new file mode 100644 index 0000000..d439bdd --- /dev/null +++ b/packages/stream_core/build.yaml @@ -0,0 +1,7 @@ +targets: + $default: + builders: + json_serializable: + options: + explicit_to_json: true + field_rename: snake \ No newline at end of file diff --git a/packages/stream_core/lib/src/models.dart b/packages/stream_core/lib/src/models.dart new file mode 100644 index 0000000..d9391f2 --- /dev/null +++ b/packages/stream_core/lib/src/models.dart @@ -0,0 +1 @@ +export 'models/pagination_result.dart'; diff --git a/packages/stream_core/lib/src/models/pagination_result.dart b/packages/stream_core/lib/src/models/pagination_result.dart new file mode 100644 index 0000000..b644e1c --- /dev/null +++ b/packages/stream_core/lib/src/models/pagination_result.dart @@ -0,0 +1,27 @@ +import 'package:equatable/equatable.dart'; + +class PaginationResult { + const PaginationResult({ + required this.items, + required this.pagination, + }); + + final List items; + final PaginationData pagination; +} + +class PaginationData extends Equatable { + const PaginationData({ + this.next, + this.previous, + }); + + /// Item id of where to start searching from for next [results] + final String? next; + + /// Item id of where to start searching from for previous [results] + final String? previous; + + @override + List get props => [next, previous]; +} diff --git a/packages/stream_core/lib/src/user.dart b/packages/stream_core/lib/src/user.dart new file mode 100644 index 0000000..a1bee71 --- /dev/null +++ b/packages/stream_core/lib/src/user.dart @@ -0,0 +1,3 @@ +export 'user/connect_user_details_request.dart'; +export 'user/user.dart'; +export 'user/ws_auth_message_request.dart'; diff --git a/packages/stream_core/lib/src/user/connect_user_details_request.dart b/packages/stream_core/lib/src/user/connect_user_details_request.dart new file mode 100644 index 0000000..d8d03fe --- /dev/null +++ b/packages/stream_core/lib/src/user/connect_user_details_request.dart @@ -0,0 +1,26 @@ +import 'package:json_annotation/json_annotation.dart'; + +part 'connect_user_details_request.g.dart'; + +@JsonSerializable() +class ConnectUserDetailsRequest { + final String id; + final String? image; + final bool? invisible; + final String? language; + final String? name; + final Map? customData; + + const ConnectUserDetailsRequest({ + required this.id, + this.image, + this.invisible, + this.language, + this.name, + this.customData, + }); + + Map toJson() => _$ConnectUserDetailsRequestToJson(this); + static ConnectUserDetailsRequest fromJson(Map json) => + _$ConnectUserDetailsRequestFromJson(json); +} diff --git a/packages/stream_core/lib/src/user/connect_user_details_request.g.dart b/packages/stream_core/lib/src/user/connect_user_details_request.g.dart new file mode 100644 index 0000000..c9207ac --- /dev/null +++ b/packages/stream_core/lib/src/user/connect_user_details_request.g.dart @@ -0,0 +1,29 @@ +// GENERATED CODE - DO NOT MODIFY BY HAND + +part of 'connect_user_details_request.dart'; + +// ************************************************************************** +// JsonSerializableGenerator +// ************************************************************************** + +ConnectUserDetailsRequest _$ConnectUserDetailsRequestFromJson( + Map json) => + ConnectUserDetailsRequest( + id: json['id'] as String, + image: json['image'] as String?, + invisible: json['invisible'] as bool?, + language: json['language'] as String?, + name: json['name'] as String?, + customData: json['custom_data'] as Map?, + ); + +Map _$ConnectUserDetailsRequestToJson( + ConnectUserDetailsRequest instance) => + { + 'id': instance.id, + 'image': instance.image, + 'invisible': instance.invisible, + 'language': instance.language, + 'name': instance.name, + 'custom_data': instance.customData, + }; diff --git a/packages/stream_core/lib/src/user/user.dart b/packages/stream_core/lib/src/user/user.dart new file mode 100644 index 0000000..91b4d15 --- /dev/null +++ b/packages/stream_core/lib/src/user/user.dart @@ -0,0 +1,105 @@ +// +// Copyright © 2025 Stream.io Inc. All rights reserved. +// + +import 'package:meta/meta.dart'; + +/// Model for the user's info. +@immutable +class User { + /// Creates a user with the provided id. + const User({ + required this.id, + String? name, + this.imageUrl, + this.role = 'user', + this.type = UserAuthType.regular, + Map? customData, + }) : originalName = name, + customData = customData ?? const {}; + + /// Creates a guest user with the provided id. + /// - Parameter userId: the id of the user. + /// - Returns: a guest `User`. + const User.guest(String userId) + : this(id: userId, name: userId, type: UserAuthType.guest); + + /// Creates an anonymous user. + /// - Returns: an anonymous `User`. + const User.anonymous() : this(id: '!anon', type: UserAuthType.anonymous); + + /// The user's id. + final String id; + + /// The user's image URL. + final String? imageUrl; + + /// The user's role. + final String role; + + /// The user authorization type. + final UserAuthType type; + + /// The user's custom data. + final Map customData; + + /// User's name that was provided when the object was created. It will be used when communicating + /// with the API and in cases where it doesn't make sense to override `null` values with the + /// `non-null` id. + final String? originalName; + + /// A computed property that can be used for UI elements where you need to display user's identifier. + /// If a `name` value was provided on initialisation it will return it. Otherwise returns the `id`. + String get name => originalName ?? id; + + @override + bool operator ==(Object other) { + if (identical(this, other)) return true; + return other is User && + other.id == id && + other.imageUrl == imageUrl && + other.role == role && + other.type == type && + other.originalName == originalName && + _mapEquals(other.customData, customData); + } + + bool _mapEquals(Map? a, Map? b) { + if (a == null && b == null) return true; + if (a == null || b == null) return false; + if (a.length != b.length) return false; + for (final key in a.keys) { + if (!b.containsKey(key) || a[key] != b[key]) return false; + } + return true; + } + + @override + int get hashCode { + return Object.hash( + id, + imageUrl, + role, + type, + originalName, + Object.hashAll(customData.entries), + ); + } + + @override + String toString() { + return 'User(id: $id, name: $name, imageURL: $imageUrl, role: $role, type: $type, customData: $customData)'; + } +} + +/// The user authorization type. +enum UserAuthType { + /// A regular user. + regular, + + /// An anonymous user. + anonymous, + + /// A guest user. + guest, +} diff --git a/packages/stream_core/lib/src/user/ws_auth_message_request.dart b/packages/stream_core/lib/src/user/ws_auth_message_request.dart new file mode 100644 index 0000000..0fb925b --- /dev/null +++ b/packages/stream_core/lib/src/user/ws_auth_message_request.dart @@ -0,0 +1,29 @@ +import 'dart:convert'; + +import 'package:json_annotation/json_annotation.dart'; + +import '../user.dart'; +import '../ws/events/sendable_event.dart'; + +part 'ws_auth_message_request.g.dart'; + +@JsonSerializable() +class WsAuthMessageRequest implements SendableEvent { + const WsAuthMessageRequest({ + this.products, + required this.token, + this.userDetails, + }); + + final List? products; + final String token; + final ConnectUserDetailsRequest? userDetails; + + Map toJson() => _$WsAuthMessageRequestToJson(this); + + static WsAuthMessageRequest fromJson(Map json) => + _$WsAuthMessageRequestFromJson(json); + + @override + Object toSerializedData() => json.encode(toJson()); +} diff --git a/packages/stream_core/lib/src/user/ws_auth_message_request.g.dart b/packages/stream_core/lib/src/user/ws_auth_message_request.g.dart new file mode 100644 index 0000000..9c6e57f --- /dev/null +++ b/packages/stream_core/lib/src/user/ws_auth_message_request.g.dart @@ -0,0 +1,28 @@ +// GENERATED CODE - DO NOT MODIFY BY HAND + +part of 'ws_auth_message_request.dart'; + +// ************************************************************************** +// JsonSerializableGenerator +// ************************************************************************** + +WsAuthMessageRequest _$WsAuthMessageRequestFromJson( + Map json) => + WsAuthMessageRequest( + products: (json['products'] as List?) + ?.map((e) => e as String) + .toList(), + token: json['token'] as String, + userDetails: json['user_details'] == null + ? null + : ConnectUserDetailsRequest.fromJson( + json['user_details'] as Map), + ); + +Map _$WsAuthMessageRequestToJson( + WsAuthMessageRequest instance) => + { + 'products': instance.products, + 'token': instance.token, + 'user_details': instance.userDetails?.toJson(), + }; diff --git a/packages/stream_core/lib/src/utils.dart b/packages/stream_core/lib/src/utils.dart index 5bf66ae..c59af26 100644 --- a/packages/stream_core/lib/src/utils.dart +++ b/packages/stream_core/lib/src/utils.dart @@ -1,2 +1,3 @@ +export 'utils/network_monitor.dart'; export 'utils/result.dart'; export 'utils/shared_emitter.dart'; diff --git a/packages/stream_core/lib/src/ws.dart b/packages/stream_core/lib/src/ws.dart index e69de29..bc8ae77 100644 --- a/packages/stream_core/lib/src/ws.dart +++ b/packages/stream_core/lib/src/ws.dart @@ -0,0 +1,5 @@ +export 'ws/client/connection_recovery_handler.dart'; +export 'ws/client/default_connection_recovery_handler.dart'; +export 'ws/client/web_socket_client.dart' show WebSocketClient; +export 'ws/client/web_socket_connection_state.dart'; +export 'ws/events/ws_event.dart'; diff --git a/packages/stream_core/lib/src/ws/client/connection_recovery_handler.dart b/packages/stream_core/lib/src/ws/client/connection_recovery_handler.dart index dfda79a..191dcba 100644 --- a/packages/stream_core/lib/src/ws/client/connection_recovery_handler.dart +++ b/packages/stream_core/lib/src/ws/client/connection_recovery_handler.dart @@ -1,6 +1,8 @@ import 'dart:async'; import 'dart:math' as math; +import 'package:meta/meta.dart'; + import '../../utils/network_monitor.dart'; import 'web_socket_client.dart'; import 'web_socket_connection_state.dart'; @@ -11,44 +13,26 @@ abstract class AutomaticReconnectionPolicy { class ConnectionRecoveryHandler { ConnectionRecoveryHandler({ - RetryStrategy? retryStrategy, + required this.retryStrategy, required this.client, - this.networkMonitor, - }) { - this.retryStrategy = retryStrategy ?? DefaultRetryStrategy(); - - policies = [ - WebSocketAutomaticReconnectionPolicy(client: client), - if (networkMonitor case final networkMonitor?) - InternetAvailableReconnectionPolicy( - networkMonitor: networkMonitor, - ), - ]; - - _subscribe(); - } - - Future dispose() async { - await Future.wait( - subscriptions.map((subscription) => subscription.cancel())); - subscriptions.clear(); - _cancelReconnectionTimer(); - } + required this.policies, + }); final WebSocketClient client; - final NetworkMonitor? networkMonitor; - late final List policies; - List subscriptions = []; - late final RetryStrategy retryStrategy; + final List policies; + List> subscriptions = []; + final RetryStrategy retryStrategy; Timer? _reconnectionTimer; - void _reconnectIfNeeded() { + @protected + void reconnectIfNeeded() { if (!_canReconnectAutomatically()) return; client.connect(); } - void _disconnectIfNeeded() { + @protected + void disconnectIfNeeded() { final canBeDisconnected = switch (client.connectionState) { Connecting() || Connected() || Authenticating() => true, _ => false, @@ -60,15 +44,17 @@ class ConnectionRecoveryHandler { } } - void _scheduleReconnectionTimerIfNeeded() { + @protected + void scheduleReconnectionTimerIfNeeded() { if (!_canReconnectAutomatically()) return; final delay = retryStrategy.getDelayAfterFailure(); print('Scheduling reconnection in ${delay.inSeconds} seconds'); - _reconnectionTimer = Timer(delay, _reconnectIfNeeded); + _reconnectionTimer = Timer(delay, reconnectIfNeeded); } - void _cancelReconnectionTimer() { + @protected + void cancelReconnectionTimer() { if (_reconnectionTimer == null) return; print('Cancelling reconnection timer'); @@ -76,35 +62,11 @@ class ConnectionRecoveryHandler { _reconnectionTimer = null; } - void _subscribe() { - subscriptions.add( - client.connectionStateStream.listen(_websocketConnectionStateChanged)); - if (networkMonitor case final networkMonitor?) { - subscriptions - .add(networkMonitor.onStatusChange.listen(_networkStatusChanged)); - } - } - - void _networkStatusChanged(NetworkStatus status) { - if (status == NetworkStatus.connected) { - _disconnectIfNeeded(); - } else { - _reconnectIfNeeded(); - } - } - - void _websocketConnectionStateChanged(WebSocketConnectionState state) { - switch (state) { - case Connecting(): - _cancelReconnectionTimer(); - case Connected(): - retryStrategy.resetConsecutiveFailures(); - case Disconnected(): - _scheduleReconnectionTimerIfNeeded(); - case Initialized() || Authenticating() || Disconnecting(): - // Don't do anything - break; - } + Future dispose() async { + await Future.wait( + subscriptions.map((subscription) => subscription.cancel())); + subscriptions.clear(); + cancelReconnectionTimer(); } bool _canReconnectAutomatically() => diff --git a/packages/stream_core/lib/src/ws/client/default_connection_recovery_handler.dart b/packages/stream_core/lib/src/ws/client/default_connection_recovery_handler.dart new file mode 100644 index 0000000..d65dd67 --- /dev/null +++ b/packages/stream_core/lib/src/ws/client/default_connection_recovery_handler.dart @@ -0,0 +1,68 @@ +import '../../utils/network_monitor.dart'; +import 'connection_recovery_handler.dart'; +import 'web_socket_connection_state.dart'; + +class DefaultConnectionRecoveryHandler extends ConnectionRecoveryHandler + with + WebSocketAwareConnectionRecoveryHandler, + NetworkAwareConnectionRecoveryHandler { + DefaultConnectionRecoveryHandler({ + RetryStrategy? retryStrategy, + required super.client, + NetworkMonitor? networkMonitor, + }) : super( + retryStrategy: retryStrategy ?? DefaultRetryStrategy(), + policies: [ + WebSocketAutomaticReconnectionPolicy(client: client), + if (networkMonitor case final networkMonitor?) + InternetAvailableReconnectionPolicy( + networkMonitor: networkMonitor, + ), + ], + ) { + _subscribe(networkMonitor: networkMonitor); + } + + void _subscribe({NetworkMonitor? networkMonitor}) { + subscribeToNetworkChanges(networkMonitor); + subscribeToWebSocketConnectionChanges(); + } +} + +mixin NetworkAwareConnectionRecoveryHandler on ConnectionRecoveryHandler { + void _networkStatusChanged(NetworkStatus status) { + if (status == NetworkStatus.connected) { + disconnectIfNeeded(); + } else { + reconnectIfNeeded(); + } + } + + void subscribeToNetworkChanges(NetworkMonitor? networkMonitor) { + if (networkMonitor case final networkMonitor?) { + subscriptions + .add(networkMonitor.onStatusChange.listen(_networkStatusChanged)); + } + } +} + +mixin WebSocketAwareConnectionRecoveryHandler on ConnectionRecoveryHandler { + void _websocketConnectionStateChanged(WebSocketConnectionState state) { + switch (state) { + case Connecting(): + cancelReconnectionTimer(); + case Connected(): + retryStrategy.resetConsecutiveFailures(); + case Disconnected(): + scheduleReconnectionTimerIfNeeded(); + case Initialized() || Authenticating() || Disconnecting(): + // Don't do anything + break; + } + } + + void subscribeToWebSocketConnectionChanges() { + subscriptions.add( + client.connectionStateStream.listen(_websocketConnectionStateChanged)); + } +} diff --git a/packages/stream_core/lib/src/ws/client/web_socket_client.dart b/packages/stream_core/lib/src/ws/client/web_socket_client.dart index eccc23e..486e6ac 100644 --- a/packages/stream_core/lib/src/ws/client/web_socket_client.dart +++ b/packages/stream_core/lib/src/ws/client/web_socket_client.dart @@ -1,5 +1,3 @@ -import 'dart:convert'; - import '../../errors/client_exception.dart'; import '../../utils/shared_emitter.dart'; import '../events/sendable_event.dart'; @@ -37,6 +35,7 @@ class WebSocketClient implements WebSocketEngineListener, WebSocketPingClient { SharedEmitter get events => _events; final _events = MutableSharedEmitterImpl(); + String? get connectionId => _connectionId; String? _connectionId; WebSocketClient({ @@ -55,6 +54,11 @@ class WebSocketClient implements WebSocketEngineListener, WebSocketPingClient { pingController = environment.createPingController(client: this); } + @override + void send(SendableEvent message) { + engine.send(message: message); + } + //#region Connection void connect() { if (connectionState is Connecting || @@ -77,6 +81,12 @@ class WebSocketClient implements WebSocketEngineListener, WebSocketPingClient { engine.disconnect(code.code, source.toString()); } + + void dispose() { + pingController.dispose(); + _connectionStateStreamController.close(); + _events.close(); + } //#endregion //#region WebSocketEngineListener diff --git a/packages/stream_core/lib/src/ws/client/web_socket_connection_state.dart b/packages/stream_core/lib/src/ws/client/web_socket_connection_state.dart index b6e60f3..5163f61 100644 --- a/packages/stream_core/lib/src/ws/client/web_socket_connection_state.dart +++ b/packages/stream_core/lib/src/ws/client/web_socket_connection_state.dart @@ -2,8 +2,6 @@ import 'package:equatable/equatable.dart'; import '../../errors/client_exception.dart'; import '../events/ws_event.dart'; -import 'web_socket_client.dart'; -import 'web_socket_engine.dart'; import 'web_socket_ping_controller.dart'; /// A web socket connection state. @@ -162,4 +160,3 @@ final class NoPongReceived extends DisconnectionSource { /// [WebSocketPingController] didn't get a pong response. const NoPongReceived(); } - diff --git a/packages/stream_core/lib/src/ws/client/web_socket_engine.dart b/packages/stream_core/lib/src/ws/client/web_socket_engine.dart index 3e248c5..3cbaa84 100644 --- a/packages/stream_core/lib/src/ws/client/web_socket_engine.dart +++ b/packages/stream_core/lib/src/ws/client/web_socket_engine.dart @@ -112,7 +112,7 @@ class URLSessionWebSocketEngine implements WebSocketEngine { @override void send({required SendableEvent message}) { print('[send] hasWS: ${_ws != null}'); - _ws?.sink.add(message); + _ws?.sink.add(message.toSerializedData()); } @override diff --git a/packages/stream_core/lib/src/ws/client/web_socket_ping_controller.dart b/packages/stream_core/lib/src/ws/client/web_socket_ping_controller.dart index b3bffd3..fdebc2a 100644 --- a/packages/stream_core/lib/src/ws/client/web_socket_ping_controller.dart +++ b/packages/stream_core/lib/src/ws/client/web_socket_ping_controller.dart @@ -59,6 +59,11 @@ class WebSocketPingController { void _cancelPongTimeoutTimer() { _pongTimeoutTimer?.cancel(); } + + void dispose() { + _pongTimeoutTimer?.cancel(); + _pingTimer?.cancel(); + } } abstract interface class WebSocketPingClient { diff --git a/packages/stream_core/lib/src/ws/events/ws_event.dart b/packages/stream_core/lib/src/ws/events/ws_event.dart index ab2caba..f3ab03b 100644 --- a/packages/stream_core/lib/src/ws/events/ws_event.dart +++ b/packages/stream_core/lib/src/ws/events/ws_event.dart @@ -37,3 +37,18 @@ final class HealthCheckInfo extends Equatable { @override List get props => [connectionId, participantCount]; } + +final class WsErrorEvent extends WsEvent { + const WsErrorEvent({ + required this.error, + required this.message, + }); + + final Object message; + + @override + final Object error; + + @override + List get props => [error, message]; +} diff --git a/packages/stream_core/lib/stream_core.dart b/packages/stream_core/lib/stream_core.dart index b97e124..1e11211 100644 --- a/packages/stream_core/lib/stream_core.dart +++ b/packages/stream_core/lib/stream_core.dart @@ -1,2 +1,4 @@ +export 'src/models.dart'; +export 'src/user.dart'; export 'src/utils.dart'; export 'src/ws.dart'; diff --git a/packages/stream_core/pubspec.yaml b/packages/stream_core/pubspec.yaml index a067e92..1d621fd 100644 --- a/packages/stream_core/pubspec.yaml +++ b/packages/stream_core/pubspec.yaml @@ -9,8 +9,13 @@ environment: dependencies: equatable: ^2.0.7 + jose: ^0.3.4 + json_annotation: ^4.9.0 + meta: ^1.16.0 rxdart: ^0.28.0 web_socket_channel: ^3.0.1 dev_dependencies: + build_runner: ^2.5.4 + json_serializable: ^6.9.5 test: ^1.26.2 diff --git a/packages/stream_core/test/stream_core_test.dart b/packages/stream_core/test/stream_core_test.dart index 1a20365..de6b66e 100644 --- a/packages/stream_core/test/stream_core_test.dart +++ b/packages/stream_core/test/stream_core_test.dart @@ -1,4 +1,4 @@ -import 'package:flutter_test/flutter_test.dart'; +import 'package:test/test.dart'; import 'package:stream_core/stream_core.dart';