Skip to content

Commit

Permalink
fix(auth, web): stream handlers are properly cleaned up and recreated (
Browse files Browse the repository at this point in the history
  • Loading branch information
russellwheatley authored Jun 5, 2024
1 parent 1333f4a commit daaef12
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 50 deletions.
157 changes: 113 additions & 44 deletions packages/firebase_auth/firebase_auth_web/lib/firebase_auth_web.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import 'src/firebase_auth_web_user_credential.dart';
import 'src/interop/auth.dart' as auth_interop;
import 'src/interop/multi_factor.dart' as multi_factor;

enum StateListener { authStateChange, userStateChange, idTokenChange }

/// The web delegate implementation for [FirebaseAuth].
class FirebaseAuthWeb extends FirebaseAuthPlatform {
/// Stub initializer to allow the [registerWith] to create an instance without
Expand All @@ -37,50 +39,9 @@ class FirebaseAuthWeb extends FirebaseAuthPlatform {
/// The entry point for the [FirebaseAuthWeb] class.
FirebaseAuthWeb({required FirebaseApp app}) : super(appInstance: app) {
// Create a app instance broadcast stream for both delegate listener events
_userChangesListeners[app.name] =
StreamController<UserPlatform?>.broadcast();
_authStateChangesListeners[app.name] =
StreamController<UserPlatform?>.broadcast();
_idTokenChangesListeners[app.name] =
StreamController<UserPlatform?>.broadcast();

// TODO(rrousselGit): close StreamSubscription
delegate.onAuthStateChanged.map((auth_interop.User? webUser) {
if (!_initialized.isCompleted) {
_initialized.complete();
}

if (webUser == null) {
return null;
} else {
return UserWeb(
this,
MultiFactorWeb(this, multi_factor.multiFactor(webUser)),
webUser,
_webAuth,
);
}
}).listen((UserWeb? webUser) {
_authStateChangesListeners[app.name]!.add(webUser);
});

// TODO(rrousselGit): close StreamSubscription
// Also triggers `userChanged` events
delegate.onIdTokenChanged.map((auth_interop.User? webUser) {
if (webUser == null) {
return null;
} else {
return UserWeb(
this,
MultiFactorWeb(this, multi_factor.multiFactor(webUser)),
webUser,
_webAuth,
);
}
}).listen((UserWeb? webUser) {
_idTokenChangesListeners[app.name]!.add(webUser);
_userChangesListeners[app.name]!.add(webUser);
});
_createStreamListener(app.name, StateListener.authStateChange);
_createStreamListener(app.name, StateListener.idTokenChange);
_createStreamListener(app.name, StateListener.userStateChange);
}

/// Called by PluginRegistry to register this plugin for Flutter Web
Expand Down Expand Up @@ -138,6 +99,105 @@ class FirebaseAuthWeb extends FirebaseAuthPlatform {
return FirebaseAuthWeb._();
}

bool _cancelUserStream = false;
bool _cancelIdTokenStream = false;

void _createStreamListener(String appName, StateListener stateListener) {
switch (stateListener) {
case StateListener.authStateChange:
_authStateChangesListeners[appName] =
StreamController<UserPlatform?>.broadcast(
onCancel: () {
_authStateChangesListeners[appName]!.close();
_authStateChangesListeners.remove(appName);
delegate.authStateController?.close();
},
);
delegate.onAuthStateChanged.map((auth_interop.User? webUser) {
if (!_initialized.isCompleted) {
_initialized.complete();
}

if (webUser == null) {
return null;
} else {
return UserWeb(
this,
MultiFactorWeb(this, multi_factor.multiFactor(webUser)),
webUser,
_webAuth,
);
}
}).listen((UserWeb? webUser) {
_authStateChangesListeners[app.name]!.add(webUser);
});
break;
case StateListener.idTokenChange:
_cancelIdTokenStream = false;
_idTokenChangesListeners[appName] =
StreamController<UserPlatform?>.broadcast(
onCancel: () {
if (_userChangesListeners[appName] == null) {
// We cannot remove if there is a userChanges listener as we use this stream for it
_idTokenChangesListeners[appName]!.close();
_idTokenChangesListeners.remove(appName);
delegate.idTokenController?.close();
} else {
// We need to do this because if idTokenListener and userChanges are being listened to
// We need to cancel both at the same time otherwise neither will be closed & removed
_cancelIdTokenStream = true;
}

if (_cancelUserStream) {
_userChangesListeners[appName]!.close();
_userChangesListeners.remove(appName);
}
},
);

// Also triggers `userChanged` events
delegate.onIdTokenChanged.map((auth_interop.User? webUser) {
if (webUser == null) {
return null;
} else {
return UserWeb(
this,
MultiFactorWeb(this, multi_factor.multiFactor(webUser)),
webUser,
_webAuth,
);
}
}).listen((UserWeb? webUser) {
_idTokenChangesListeners[app.name]!.add(webUser);
_userChangesListeners[app.name]!.add(webUser);
});
break;
case StateListener.userStateChange:
_cancelUserStream = false;
_userChangesListeners[appName] =
StreamController<UserPlatform?>.broadcast(
onCancel: () {
if (_idTokenChangesListeners[appName] == null) {
_userChangesListeners[appName]!.close();
_userChangesListeners.remove(appName);
// There is no delegate for userChanges as we use idTokenChanges
} else {
_cancelUserStream = true;
}

if (_cancelIdTokenStream) {
// We need to do this because if idTokenListener and userChanges are being listened to
// We need to cancel both at the same time otherwise neither will be closed & removed
_idTokenChangesListeners[appName]!.close();
_idTokenChangesListeners.remove(appName);
delegate.idTokenController?.close();
}
},
);
break;
}
}

/// instance of Auth from the web plugin
auth_interop.Auth? _webAuth;

Expand Down Expand Up @@ -254,20 +314,29 @@ class FirebaseAuthWeb extends FirebaseAuthPlatform {
Stream<UserPlatform?> authStateChanges() async* {
await _initialized.future;
yield currentUser;
if (_authStateChangesListeners[app.name] == null) {
_createStreamListener(app.name, StateListener.authStateChange);
}
yield* _authStateChangesListeners[app.name]!.stream;
}

@override
Stream<UserPlatform?> idTokenChanges() async* {
await _initialized.future;
yield currentUser;
if (_idTokenChangesListeners[app.name] == null) {
_createStreamListener(app.name, StateListener.idTokenChange);
}
yield* _idTokenChangesListeners[app.name]!.stream;
}

@override
Stream<UserPlatform?> userChanges() async* {
await _initialized.future;
yield currentUser;
if (_userChangesListeners[app.name] == null) {
_createStreamListener(app.name, StateListener.userStateChange);
}
yield* _userChangesListeners[app.name]!.stream;
}

Expand Down
18 changes: 12 additions & 6 deletions packages/firebase_auth/firebase_auth_web/lib/src/interop/auth.dart
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ class Auth extends JsObjectWrapper<auth_interop.AuthJsImpl> {

JSFunction? _onAuthUnsubscribe;

// TODO(rrousselGit): fix memory leak – the controller isn't closed even in onCancel
StreamController<User?>? get authStateController => _changeController;
StreamController<User?>? get idTokenController => _idTokenChangedController;

// ignore: close_sinks
StreamController<User?>? _changeController;

Expand All @@ -412,9 +414,11 @@ class Auth extends JsObjectWrapper<auth_interop.AuthJsImpl> {
jsObject.onAuthStateChanged(nextWrapper.toJS, errorWrapper.toJS);
}

void stopListen() {
(_onAuthUnsubscribe!.dartify()! as Function)();
Future<void> stopListen() async {
await (_onAuthUnsubscribe!.callAsFunction() as JSPromise<JSAny?>?)
?.toDart;
_onAuthUnsubscribe = null;
_changeController = null;
}

_changeController = StreamController<User?>.broadcast(
Expand All @@ -430,7 +434,6 @@ class Auth extends JsObjectWrapper<auth_interop.AuthJsImpl> {

JSFunction? _onIdTokenChangedUnsubscribe;

// TODO(rrousselGit): fix memory leak – the controller isn't closed even in onCancel
// ignore: close_sinks
StreamController<User?>? _idTokenChangedController;

Expand All @@ -454,9 +457,12 @@ class Auth extends JsObjectWrapper<auth_interop.AuthJsImpl> {
jsObject.onIdTokenChanged(nextWrapper.toJS, errorWrapper.toJS);
}

void stopListen() {
(_onIdTokenChangedUnsubscribe!.dartify()! as Function)();
Future<void> stopListen() async {
await (_onIdTokenChangedUnsubscribe!.callAsFunction()
as JSPromise<JSAny?>?)
?.toDart;
_onIdTokenChangedUnsubscribe = null;
_idTokenChangedController = null;
}

_idTokenChangedController = StreamController<User?>.broadcast(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,42 @@ void main() {
skip: !kIsWeb && Platform.isWindows,
);

group('test all stream listeners', () {
Matcher containsExactlyThreeUsers() => predicate<List>(
(list) => list.whereType<User>().length == 3,
'a list containing exactly 3 User instances',
);
test('create, cancel and reopen all user event stream handlers', () async {
final auth = FirebaseAuth.instance;
final events = [];
final streamHandler = events.add;

StreamSubscription<User?> userChanges =
auth.userChanges().listen(streamHandler);

StreamSubscription<User?> authStateChanges =
auth.authStateChanges().listen(streamHandler);

StreamSubscription<User?> idTokenChanges =
auth.idTokenChanges().listen(streamHandler);

await userChanges.cancel();
await authStateChanges.cancel();
await idTokenChanges.cancel();

userChanges = auth.userChanges().listen(streamHandler);
authStateChanges = auth.authStateChanges().listen(streamHandler);
idTokenChanges = auth.idTokenChanges().listen(streamHandler);

await auth.signInWithEmailAndPassword(
email: testEmail,
password: testPassword,
);

expect(events, containsExactlyThreeUsers());
});
});

group('currentUser', () {
test('should return currentUser', () async {
await ensureSignedIn(testEmail);
Expand Down

0 comments on commit daaef12

Please sign in to comment.