Skip to content

Commit

Permalink
fix(Auth): Refactoring state machine logic to fix memory leak (#3613)
Browse files Browse the repository at this point in the history
* fix(Auth): Fixing memory leaks happening because of the state machine

* Update AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/StateMachine/hierarchical-state-machine-swift/StateMachine.swift

Co-authored-by: Di Wu <github@wudi.me>

* worked on review comment

---------

Co-authored-by: Di Wu <github@wudi.me>
  • Loading branch information
harsh62 and 5d committed Apr 30, 2024
1 parent af4a5f7 commit d03f5a6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Combine

class CancellableAsyncStream<Element>: AsyncSequence {

typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
private let asyncStream: AsyncStream<Element>
private let cancellable: AnyCancellable?

deinit {
cancel()
}

init(asyncStream: AsyncStream<Element>, cancellable: AnyCancellable?) {
self.asyncStream = asyncStream
self.cancellable = cancellable
}

convenience init(with publisher: AnyPublisher<Element, Never>) {
var cancellable: AnyCancellable?
self.init(asyncStream: AsyncStream { continuation in
cancellable = publisher.sink { _ in
continuation.finish()
} receiveValue: {
continuation.yield($0)
}
}, cancellable: cancellable)
}

func makeAsyncIterator() -> AsyncStream<Element>.AsyncIterator {
asyncStream.makeAsyncIterator()
}

func cancel() {
cancellable?.cancel()
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@
// SPDX-License-Identifier: Apache-2.0
//

import Foundation

/// Captures a weak reference to the value
class WeakWrapper<T> where T: AnyObject {
private(set) weak var value: T?
init(_ value: T) {
self.value = value
}
}
import Combine

/// Models, evolves, and processes effects for a system.
///
Expand All @@ -31,15 +23,18 @@ actor StateMachine<
EnvironmentType: Environment
> where StateType: State {

/// AsyncSequences are invoked a minimum of one time: Each sequence receives the current
/// state as soon as `listen()` is invoked, and will receive each subsequent state change.
typealias StateChangeSequence = StateAsyncSequence<StateType>

private let environment: EnvironmentType
private let resolver: AnyResolver<StateType>

private(set) var currentState: StateType
private var subscribers: [WeakWrapper<StateAsyncSequence<StateType>>]
public var currentState: StateType {
currentStateSubject.value
}

private let currentStateSubject: CurrentValueSubject<StateType, Never>

deinit {
currentStateSubject.send(completion: .finished)
}

init<ResolverType>(
resolver: ResolverType,
Expand All @@ -48,22 +43,16 @@ actor StateMachine<
) where ResolverType: StateMachineResolver, ResolverType.StateType == StateType {
self.resolver = resolver.eraseToAnyResolver()
self.environment = environment
self.currentState = initialState ?? resolver.defaultState

self.subscribers = []
self.currentStateSubject = CurrentValueSubject(initialState ?? resolver.defaultState)
}

/// Start listening to state change updates. The current state and all subsequent state changes will be sent to the sequence.
///
/// - Returns: An async sequence that get states asynchronously
func listen() -> StateChangeSequence {
let sequence = StateAsyncSequence<StateType>()
let currentState = self.currentState
let wrappedSequence = WeakWrapper(sequence)
subscribers.append(wrappedSequence)
sequence.send(currentState)
return sequence
func listen() -> CancellableAsyncStream<StateType> {
CancellableAsyncStream(with: currentStateSubject.eraseToAnyPublisher())
}

}

extension StateMachine: EventDispatcher {
Expand All @@ -88,33 +77,11 @@ extension StateMachine: EventDispatcher {
)

if currentState != resolution.newState {
currentState = resolution.newState
subscribers.removeAll { item in
!notify(subscriberElement: item, about: resolution.newState)
}
currentStateSubject.send(resolution.newState)
}
execute(resolution.actions)
}

/// - Parameters:
/// - subscriberElement: A weak wrapped async sequence
/// - newState: The new state to be sent
/// - Returns: true if the subscriber was notified, false if the wrapper reference was nil or a cancellation was pending
private func notify(
subscriberElement: WeakWrapper<StateChangeSequence>,
about newState: StateType
) -> Bool {

// If weak reference has become nil, do not process, and return false so caller can remove
// the subscription from the subscribers list
guard let sequence = subscriberElement.value else {
return false
}

sequence.send(newState)
return true
}

private func execute(_ actions: [Action]) {
guard !actions.isEmpty else {
return
Expand Down

0 comments on commit d03f5a6

Please sign in to comment.