Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/stream_core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ This package is **not designed for direct use by customers**. It acts as the fou
## ⚠️ Versioning Notice

This library does follow semantic versioning. Breaking changes may be introduced at any time without warning. We reserve the right to refactor or remove functionality without deprecation periods. However, as all our products need to depend on the same version of the core packages we want to limit breaking changes as much as possible.

## Detailed docs
* [Websocket](./docs/web_socket.md)
67 changes: 67 additions & 0 deletions packages/stream_core/docs/web_socket.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Stream Core Websocket

## TODO
- [ ] cover with unit tests
- [ ] test implementation
- [ ] reconnect logic
- [ ] improve docs
- [ ] replace print statements with proper logs

## Overall architecture

The `WebSocketEngine` is purely responsible for connecting to the websocket and handling events.

The `WebSocketClient` is the public interface and can be used for apps to connect to a websocket.

The `WebSocketPingController` keeps track of the timings of the ping/pong for health checks.
It uses the `WebSocketPingClient`, implemented by the `WebSocketClient` to send the ping and listen to pongs.
It also will call the `WebSocketPingClient` if it should disconnect because of a bad connection.

The `ConnectionRecoveryHandler` manages the reconnection for all cases.



```mermaid
graph TD;
ConnectionRecoveryHandler-->WebSocketClient;
ConnectionRecoveryHandler-->NetworkMonitor;
WebSocketClient-->WebSocketEngine;
WebSocketPingController-->WebSocketClient;
```

## WebSocketClient
```dart
WebSocketClient({
required String url,
required this.eventDecoder,
this.pingReguestBuilder,
this.onConnectionEstablished,
this.onConnected,
})
```
The `WebSocketClient` always requires an `eventDecoder`. You should use this to map the websocket message to your own event.
It's important to also map to the `HealthCheckPongEvent` for health check events.


When you need to authenticate for the websocket you should sent the authentication event in `onConnectionEstablished`.
The `onConnected` is called when the connection is fully established after (optional) authentication.

## WebSocketPingController

The `WebSocketPingController` will use the `WebSocketClient` to send pings to the backend while the websocket is connected. By default it sends a ping every 25 seconds. It expects a pong from the backend within a certain interval, by default 3 seconds. If it doesn't get the pong it will request the `WebSocketClient` to disconnect using `disconnectNoPongReceived`.

By default the `WebSocketClient` will send a basic health check event for the ping with the connectionId. If you need a different health check event, for example for the SFU, you need to add a `pingReguestBuilder` in the `WebSocketClient`.

## ConnectionRecoveryHandler

The `ConnectionRecoveryHandler` manages the reconnection for all cases. Currently implemented are network related reconnection events
and reconnections for websocket errors. The reason in the disconnected state determines if the recovery will reconnect or not.

When creating a `WebSocketClient` you should also create a `ConnectionRecoveryHandler` yourself like this:

```dart
final client = WebSocketClient(...);
final recoveryHandler = ConnectionRecoveryHandler(client: client);
```

The `WebSocketClient` itself just disconnects when there is an error and the `ConnectionRecoveryHandler` is responsible for reconnecting when needed.
54 changes: 54 additions & 0 deletions packages/stream_core/lib/src/errors/client_exception.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import 'package:equatable/equatable.dart';

import 'http_api_error.dart';

class ClientException extends Equatable {
final String? message;

late final Object? underlyingError;
late final HttpApiError? apiError;

ClientException({
this.message,
Object? error,
}) {
underlyingError = error;
if (error is HttpApiError) {
apiError = error;
}
}

@override
List<Object?> get props => [message, underlyingError, apiError];
}

class WebSocketException extends ClientException {
WebSocketException(this.serverException, {super.error})
: super(
message:
(serverException ?? WebSocketEngineException.unknown()).reason,
);
final WebSocketEngineException? serverException;
}


class WebSocketEngineException extends ClientException {
static const stopErrorCode = 1000;

final String reason;
final int code;
final Object? engineError;

WebSocketEngineException({
required this.reason,
required this.code,
this.engineError,
}) : super(message: reason);

WebSocketEngineException.unknown()
: this(
reason: 'Unknown',
code: 0,
engineError: null,
);
}
31 changes: 31 additions & 0 deletions packages/stream_core/lib/src/errors/http_api_error.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
abstract interface class HttpApiError {
/// Response HTTP status code
int get statusCode;

/// API error code
int get code;

/// Additional error-specific information
List<int> get details;

/// Request duration
String get duration;

/// Additional error info
Map<String, String> get exceptionFields;

/// Message describing an error
String get message;

/// URL with additional information
String get moreInfo;

/// Flag that indicates if the error is unrecoverable, requests that return unrecoverable errors should not be retried, this error only applies to the request that caused it
///
/// Please note: This property should have been non-nullable! Since the specification file
/// does not include a default value (using the "default:" property), however, the generated
/// source code must fall back to having a nullable type.
/// Consider adding a "default:" property in the specification file to hide this note.
///
bool? get unrecoverable;
}
2 changes: 2 additions & 0 deletions packages/stream_core/lib/src/utils.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export 'utils/result.dart';
export 'utils/shared_emitter.dart';
12 changes: 12 additions & 0 deletions packages/stream_core/lib/src/utils/network_monitor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
abstract class NetworkMonitor {
NetworkStatus get currentStatus;
Stream<NetworkStatus> get onStatusChange;
}

enum NetworkStatus {
/// Internet is available because at least one of the HEAD requests succeeded.
connected,

/// None of the HEAD requests succeeded. Basically, no internet.
disconnected,
}
170 changes: 170 additions & 0 deletions packages/stream_core/lib/src/utils/result.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import 'package:equatable/equatable.dart';

enum _ResultType { success, failure }

/// A class which encapsulates a successful outcome with a value of type [T]
/// or a failure with [VideoError].
abstract class Result<T> extends Equatable {
const Result._(this._type);

const factory Result.success(T value) = Success._;

const factory Result.failure(Object error, [StackTrace stackTrace]) = Failure._;

final _ResultType _type;

/// Checks if the result is a [Success].
bool get isSuccess => _type == _ResultType.success;

/// Check if the result is a [Failure].
bool get isFailure => _type == _ResultType.failure;
}

/// Represents successful result.
class Success<T> extends Result<T> {
const Success._(this.data) : super._(_ResultType.success);

/// The [T] data associated with the result.
final T data;

@override
List<Object?> get props => [data];

@override
String toString() {
return 'Result.Success{data: $data}';
}
}

/// Represents failed result.
class Failure extends Result<Never> {
const Failure._(this.error, [this.stackTrace]) : super._(_ResultType.failure);

/// The [error] associated with the result.
final Object error;

/// The [stackTrace] associated with the result.
final StackTrace? stackTrace;

@override
List<Object?> get props => [error, stackTrace];

@override
String toString() {
return 'Result.Failure{error: $error, stackTrace: $stackTrace}';
}
}

extension PatternMatching<T> on Result<T> {
/// The [when] method is the equivalent to pattern matching.
/// Its prototype depends on the _Result [_type]s defined.
R when<R extends Object?>({
required R Function(T data) success,
required R Function(Object error) failure,
}) {
switch (_type) {
case _ResultType.success:
return success((this as Success<T>).data);
case _ResultType.failure:
return failure((this as Failure).error);
}
}

/// The [whenOrElse] method is equivalent to [when], but doesn't require
/// all callbacks to be specified.
///
/// On the other hand, it adds an extra orElse required parameter,
/// for fallback behavior.
R whenOrElse<R extends Object>({
R Function(T data)? success,
R Function(Object error)? failure,
required R Function(Result<T>) orElse,
}) {
switch (_type) {
case _ResultType.success:
return success?.call((this as Success<T>).data) ?? orElse(this);
case _ResultType.failure:
return failure?.call((this as Failure).error) ?? orElse(this);
}
}

/// The [whenOrNull] method is equivalent to [whenOrElse],
/// but non-exhaustive.
R? whenOrNull<R extends Object?>({
R Function(T data)? success,
R Function(Object error)? failure,
}) {
switch (_type) {
case _ResultType.success:
return success?.call((this as Success<T>).data);
case _ResultType.failure:
return failure?.call((this as Failure).error);
}
}

/// The [map] method is the equivalent to pattern matching.
/// Its prototype depends on the _Result [_type]s defined.
Result<R> map<R>(R Function(T data) convert) {
switch (_type) {
case _ResultType.success:
final origin = this as Success<T>;
return Result.success(convert(origin.data));
case _ResultType.failure:
return this as Failure;
}
}

/// The [fold] method is the equivalent to pattern matching.
/// Its prototype depends on the _Result [_type]s defined.
R fold<R extends Object?>({
required R Function(Success<T> success) success,
required R Function(Failure failure) failure,
}) {
switch (_type) {
case _ResultType.success:
return success(this as Success<T>);
case _ResultType.failure:
return failure(this as Failure);
}
}

/// The [foldOrElse] method is equivalent to [fold], but doesn't require
/// all callbacks to be specified.
///
/// On the other hand, it adds an extra orElse required parameter,
/// for fallback behavior.
R foldOrElse<R extends Object>({
R Function(Success<T> success)? success,
R Function(Failure failure)? failure,
required R Function(Result<T>) orElse,
}) {
switch (_type) {
case _ResultType.success:
return success?.call(this as Success<T>) ?? orElse(this);
case _ResultType.failure:
return failure?.call(this as Failure) ?? orElse(this);
}
}

/// The [foldOrNull] method is equivalent to [whenOrElse],
/// but non-exhaustive.
R? foldOrNull<R extends Object?>({
R Function(Success<T> success)? success,
R Function(Failure failure)? failure,
}) {
switch (_type) {
case _ResultType.success:
return success?.call(this as Success<T>);
case _ResultType.failure:
return failure?.call(this as Failure);
}
}

/// Returns the encapsulated value if this instance represents success
/// or null of it is failure.
T? getDataOrNull() => whenOrNull(success: _identity);

Object? getErrorOrNull() => whenOrNull(failure: _identity);
}

T _identity<T>(T x) => x;
Loading
Loading