Skip to content

Commit

Permalink
Globally improved threadsafety. Also attempted to make threaded code …
Browse files Browse the repository at this point in the history
…syntax look more consistent.
  • Loading branch information
trupin committed Aug 2, 2019
1 parent 319ffc5 commit afb661b
Show file tree
Hide file tree
Showing 14 changed files with 458 additions and 315 deletions.
23 changes: 11 additions & 12 deletions Sources/Connectable.swift
Expand Up @@ -35,7 +35,6 @@ public protocol ConnectableSignalProtocol: SignalProtocol {
public final class ConnectableSignal<Source: SignalProtocol>: ConnectableSignalProtocol {

private let source: Source
private let lock = NSRecursiveLock()
private let subject: Subject<Source.Element, Source.Error>

public init(source: Source, subject: Subject<Source.Element, Source.Error>) {
Expand All @@ -45,7 +44,6 @@ public final class ConnectableSignal<Source: SignalProtocol>: ConnectableSignalP

/// Start the signal.
public func connect() -> Disposable {
lock.lock(); defer { lock.unlock() }
if !subject.isTerminated {
return source.observe(with: subject)
} else {
Expand All @@ -65,22 +63,23 @@ extension ConnectableSignalProtocol {
/// Convert connectable signal into the ordinary signal by calling `connect`
/// on the first observation and calling dispose when number of observers goes down to zero.
public func refCount() -> Signal<Element, Error> {
let lock = NSRecursiveLock()
var count = 0
var connectionDisposable: Disposable? = nil
let lock = NSRecursiveLock(name: "com.reactive_kit.connectable_signal.ref_count")
var _count = 0
var _connectionDisposable: Disposable? = nil
return Signal { observer in
lock.lock(); defer { lock.unlock() }
count = count + 1
_count = _count + 1
let disposable = self.observe(with: observer.on)
if connectionDisposable == nil {
connectionDisposable = self.connect()
if _connectionDisposable == nil {
_connectionDisposable = self.connect()
}
return BlockDisposable {
lock.lock(); defer { lock.unlock() }
disposable.dispose()
count = count - 1
if count == 0 {
connectionDisposable?.dispose()
connectionDisposable = nil
_count = _count - 1
if _count == 0 {
_connectionDisposable?.dispose()
_connectionDisposable = nil
}
}
}
Expand Down
112 changes: 74 additions & 38 deletions Sources/Disposable.swift
Expand Up @@ -59,35 +59,45 @@ public struct NonDisposable: Disposable {

/// A disposable that just encapsulates disposed state.
public final class SimpleDisposable: Disposable {
public private(set) var isDisposed: Bool = false

private let dispatchQueue = DispatchQueue(label: "com.reactive_kit.simple_disposable")

private var _isDisposed = false
public var isDisposed: Bool {
return dispatchQueue.sync { _isDisposed }
}

public func dispose() {
isDisposed = true
dispatchQueue.async(flags: .barrier) {
self._isDisposed = true
}
}

public init(isDisposed: Bool = false) {
self.isDisposed = isDisposed
_isDisposed = isDisposed
}
}

/// A disposable that executes the given block upon disposing.
public final class BlockDisposable: Disposable {

private let dispatchQueue = DispatchQueue(label: "com.reactive_kit.block_disposable")

public var isDisposed: Bool {
return handler == nil
return dispatchQueue.sync { _handler == nil }
}

private var handler: (() -> ())?
private let lock = NSRecursiveLock(name: "com.reactivekit.blockdisposable")
private var _handler: (() -> ())?

public init(_ handler: @escaping () -> ()) {
self.handler = handler
_handler = handler
}

public func dispose() {
lock.lock(); defer { lock.unlock() }
if let handler = handler {
self.handler = nil
if let handler = dispatchQueue.sync(execute: { _handler }) {
dispatchQueue.async(flags: .barrier) {
self._handler = nil
}
handler()
}
}
Expand All @@ -96,14 +106,26 @@ public final class BlockDisposable: Disposable {
/// A disposable that disposes itself upon deallocation.
public final class DeinitDisposable: Disposable {

public var otherDisposable: Disposable? = nil
private let dispatchQueue = DispatchQueue(label: "com.reactive_kit.deinit_disposable")

private var _otherDisposable: Disposable?
public var otherDisposable: Disposable? {
set {
dispatchQueue.async(flags: .barrier) {
self._otherDisposable = newValue
}
}
get {
return dispatchQueue.sync { _otherDisposable }
}
}

public var isDisposed: Bool {
return otherDisposable == nil
}

public init(disposable: Disposable) {
otherDisposable = disposable
_otherDisposable = disposable
}

public func dispose() {
Expand All @@ -118,9 +140,15 @@ public final class DeinitDisposable: Disposable {
/// A disposable that disposes a collection of disposables upon its own disposing.
public final class CompositeDisposable: Disposable {

public private(set) var isDisposed: Bool = false
private let lock = NSRecursiveLock(name: "com.reactive_kit.composite_disposable")

private var _isDisposed = false
public var isDisposed: Bool {
lock.lock(); defer { lock.unlock() }
return _isDisposed
}

private var disposables: [Disposable] = []
private let lock = NSRecursiveLock(name: "com.reactivekit.compositedisposable")

public convenience init() {
self.init([])
Expand All @@ -132,7 +160,7 @@ public final class CompositeDisposable: Disposable {

public func add(disposable: Disposable) {
lock.lock(); defer { lock.unlock() }
if isDisposed {
if _isDisposed {
disposable.dispose()
} else {
disposables.append(disposable)
Expand All @@ -146,23 +174,28 @@ public final class CompositeDisposable: Disposable {

public func dispose() {
lock.lock(); defer { lock.unlock() }
isDisposed = true
_isDisposed = true
disposables.forEach { $0.dispose() }
disposables.removeAll()
}
}

/// A disposable that disposes other disposable upon its own disposing.
public final class SerialDisposable: Disposable {

public private(set) var isDisposed: Bool = false
private let lock = NSRecursiveLock(name: "com.reactivekit.serialdisposable")

private let lock = NSRecursiveLock(name: "com.reactive_kit.serial_disposable")

private var _isDisposed = false
public var isDisposed: Bool {
lock.lock(); defer { lock.unlock() }
return _isDisposed
}

/// Will dispose other disposable immediately if self is already disposed.
public var otherDisposable: Disposable? {
didSet {
lock.lock(); defer { lock.unlock() }
if isDisposed {
if _isDisposed {
otherDisposable?.dispose()
}
}
Expand All @@ -174,8 +207,8 @@ public final class SerialDisposable: Disposable {

public func dispose() {
lock.lock(); defer { lock.unlock() }
if !isDisposed {
isDisposed = true
if !_isDisposed {
_isDisposed = true
otherDisposable?.dispose()
}
}
Expand Down Expand Up @@ -206,16 +239,22 @@ public protocol DisposeBagProtocol: Disposable {
///
/// When bag gets deallocated, it will dispose all disposables it contains.
public final class DisposeBag: DisposeBagProtocol {

private var disposables: [Disposable] = []
private var subject: ReplayOneSubject<Void, Never>?

private let subjectLoadingLock = NSRecursiveLock(name: "com.reactivekit.disposebag.subject")
private let disposablesLock = NSRecursiveLock(name: "com.reactivekit.disposebag.disposables")
private let disposablesLock = NSRecursiveLock(name: "com.reactive_kit.dispose_bag.disposables")
private let subjectLock = NSRecursiveLock(name: "com.reactive_kit.dispose_bag.subject")

private var _disposables: [Disposable] = []

private var _subject: ReplayOneSubject<Void, Never>?
private var subject: ReplayOneSubject<Void, Never>? {
subjectLock.lock(); defer { subjectLock.unlock() }
return _subject
}

/// `true` if bag is empty, `false` otherwise.
public var isDisposed: Bool {
return disposables.count == 0
disposablesLock.lock(); defer { disposablesLock.unlock() }
return _disposables.count == 0
}

public init() {
Expand All @@ -225,14 +264,14 @@ public final class DisposeBag: DisposeBagProtocol {
/// Disposable will be disposed when the bag is deallocated.
public func add(disposable: Disposable) {
disposablesLock.lock(); defer { disposablesLock.unlock() }
disposables.append(disposable)
_disposables.append(disposable)
}

/// Add the given disposables to the bag.
/// Disposables will be disposed when the bag is deallocated.
public func add(disposables: [Disposable]) {
disposablesLock.lock(); defer { disposablesLock.unlock() }
disposables.forEach(add)
_disposables.forEach(add)
}

/// Add a disposable to a dispose bag.
Expand All @@ -248,18 +287,15 @@ public final class DisposeBag: DisposeBagProtocol {
/// Disposes all disposables that are currenty in the bag.
public func dispose() {
disposablesLock.lock(); defer { disposablesLock.unlock() }
disposables.forEach { $0.dispose() }
disposables.removeAll()
_disposables.forEach { $0.dispose() }
_disposables.removeAll()
}

/// A signal that fires `completed` event when the bag gets deallocated.
public var deallocated: SafeSignal<Void> {
subjectLoadingLock.lock()
if subject == nil {
subject = ReplayOneSubject()
}
subjectLoadingLock.unlock()
return subject!.toSignal()
subjectLock.lock(); defer { subjectLock.unlock() }
let subject = _subject ?? ReplaySubject()
return subject.toSignal()
}

deinit {
Expand Down
50 changes: 34 additions & 16 deletions Sources/LoadingProperty.swift
Expand Up @@ -22,25 +22,35 @@
// THE SOFTWARE.
//

import Foundation

/// A property that lazily loads its value using the given signal producer closure.
/// The value will be loaded when the property is observed for the first time.
public class LoadingProperty<LoadingValue, LoadingError: Swift.Error>: PropertyProtocol, SignalProtocol, DisposeBagProvider {

private let lock = NSRecursiveLock(name: "com.reactive_kit.loading_property")

private let signalProducer: () -> LoadingSignal<LoadingValue, LoadingError>
private let subject = PassthroughSubject<LoadingState<LoadingValue, LoadingError>, Never>()
private var loadingDisposable: Disposable? = nil

private var _loadingDisposable: Disposable?

public var bag: DisposeBag {
return subject.disposeBag
}

private var _loadingState: LoadingState<LoadingValue, LoadingError> = .loading {
didSet {
subject.send(_loadingState)
}
}

/// Current state of the property. In `.loading` state until the value is loaded.
/// When the property is observed for the first time, the value will be loaded and
/// the state will be updated to either `.loaded` or `.failed` state.
public private(set) var loadingState: LoadingState<LoadingValue, LoadingError> = .loading {
didSet {
subject.send(loadingState)
}
public var loadingState: LoadingState<LoadingValue, LoadingError> {
lock.lock(); defer { lock.unlock() }
return _loadingState
}

/// Underlying value. `nil` if not yet loaded or if the property is in error state.
Expand All @@ -49,7 +59,8 @@ public class LoadingProperty<LoadingValue, LoadingError: Swift.Error>: PropertyP
return loadingState.value
}
set {
loadingState = newValue.flatMap { .loaded($0) } ?? .loading
lock.lock(); defer { lock.unlock() }
_loadingState = newValue.flatMap { .loaded($0) } ?? .loading
}
}

Expand All @@ -67,22 +78,25 @@ public class LoadingProperty<LoadingValue, LoadingError: Swift.Error>: PropertyP

private func load(silently: Bool) -> LoadingSignal<LoadingValue, LoadingError> {
return LoadingSignal { observer in
self.lock.lock(); defer { self.lock.unlock() }
if !silently {
self.loadingState = .loading
self._loadingState = .loading
}
observer.receive(.loading)
self.loadingDisposable = self.signalProducer().observe { event in
self._loadingDisposable = self.signalProducer().observe { event in
switch event {
case .next(let anyLoadingState):
let loadingSate = anyLoadingState.asLoadingState
switch loadingSate {
case .loading:
break
case .loaded:
self.loadingState = loadingSate
self.lock.lock(); defer { self.lock.unlock() }
self._loadingState = loadingSate
case .failed:
if !silently {
self.loadingState = loadingSate
self.lock.lock(); defer { self.lock.unlock() }
self._loadingState = loadingSate
}
}
observer.receive(loadingSate)
Expand All @@ -94,19 +108,23 @@ public class LoadingProperty<LoadingValue, LoadingError: Swift.Error>: PropertyP
}

return BlockDisposable {
self.loadingDisposable?.dispose()
self.loadingDisposable = nil
self.lock.lock(); defer { self.lock.unlock() }
self._loadingDisposable?.dispose()
self._loadingDisposable = nil
}
}
}

public func observe(with observer: @escaping (Event<LoadingState<LoadingValue, LoadingError>, Never>) -> Void) -> Disposable {
if case .loading = loadingState, loadingDisposable == nil {
loadingDisposable = load(silently: false).observeCompleted { [weak self] in
self?.loadingDisposable = nil
lock.lock(); defer { lock.unlock() }
if case .loading = _loadingState, _loadingDisposable == nil {
_loadingDisposable = load(silently: false).observeCompleted { [weak self] in
guard let self = self else { return }
self.lock.lock(); defer { self.lock.unlock() }
self._loadingDisposable = nil
}
}
return subject.start(with: loadingState).observe(with: observer)
return subject.start(with: _loadingState).observe(with: observer)
}
}

Expand Down

0 comments on commit afb661b

Please sign in to comment.