From 4e969fa385d444fcd391b53e17d5f909b478efe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zsolt=20Kov=C3=A1cs?= Date: Thu, 9 Jan 2020 08:15:50 +0100 Subject: [PATCH] Add ReplayRelay (#2111) --- .jazzy.yml | 1 + Documentation/Subjects.md | 4 +- README.md | 2 +- Rx.xcodeproj/project.pbxproj | 12 + .../Traits/Driver/Driver+Subscription.swift | 28 +++ .../Traits/Signal/Signal+Subscription.swift | 31 ++- RxRelay/Observable+Bind.swift | 46 ++++ RxRelay/ReplayRelay.swift | 50 +++++ Sources/AllTestz/ReplayRelayTests.swift | 1 + Sources/AllTestz/main.swift | 53 ++++- Sources/RxRelay/ReplayRelay.swift | 1 + Tests/RxCocoaTests/Driver+Test.swift | 129 ++++++++++- Tests/RxCocoaTests/Signal+Test.swift | 122 +++++++++- .../Observable+RelayBindTests.swift | 105 ++++++++- Tests/RxRelayTests/ReplayRelayTests.swift | 113 ++++++++++ Tests/RxSwiftTests/ReplaySubjectTest.swift | 211 ++++++++++++++++++ 16 files changed, 889 insertions(+), 20 deletions(-) create mode 100644 RxRelay/ReplayRelay.swift create mode 120000 Sources/AllTestz/ReplayRelayTests.swift create mode 120000 Sources/RxRelay/ReplayRelay.swift create mode 100644 Tests/RxRelayTests/ReplayRelayTests.swift diff --git a/.jazzy.yml b/.jazzy.yml index 91ee58d89..dad245d61 100644 --- a/.jazzy.yml +++ b/.jazzy.yml @@ -127,6 +127,7 @@ custom_categories: - BehaviorRelay - Observable+Bind - PublishRelay + - ReplayRelay - Utils - name: RxSwift children: diff --git a/Documentation/Subjects.md b/Documentation/Subjects.md index 2b111708a..2aa4b3747 100644 --- a/Documentation/Subjects.md +++ b/Documentation/Subjects.md @@ -6,10 +6,10 @@ All of behave exactly the same like described [here](http://reactivex.io/documen Relays ====== -RxRelay provides two kinds of Relays: `PublishRelay` and `BehaviorRelay`. +RxRelay provides three kinds of Relays: `PublishRelay`, `BehaviorRelay` and `ReplayRelay`. They behave exactly like their parallel `Subject`s, with two changes: - Relays never complete. - Relays never emit errors. -In essence, Relays only emit `.next` events, and never terminate. \ No newline at end of file +In essence, Relays only emit `.next` events, and never terminate. diff --git a/README.md b/README.md index 25e5871b5..2be9bd48d 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ RxSwift comprises five separate components depending on each other in the follow * **RxSwift**: The core of RxSwift, providing the Rx standard as (mostly) defined by [ReactiveX](https://reactivex.io). It has no other dependencies. * **RxCocoa**: Provides Cocoa-specific capabilities for general iOS/macOS/watchOS & tvOS app development, such as Shared Sequences, Traits, and much more. It depends on both `RxSwift` and `RxRelay`. -* **RxRelay**: Provides `PublishRelay` and `BehaviorRelay`, two [simple wrappers around Subjects](https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Subjects.md#relays). It depends on `RxSwift`. +* **RxRelay**: Provides `PublishRelay`, `BehaviorRelay` and `ReplayRelay`, three [simple wrappers around Subjects](https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Subjects.md#relays). It depends on `RxSwift`. * **RxTest** and **RxBlocking**: Provides testing capabilities for Rx-based systems. It depends on `RxSwift`. ###### ... find compatible diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 57766c8ec..3d2faec8d 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -47,6 +47,10 @@ 54700CA11CE37E1900EF3A8F /* UINavigationItem+RxTests.swift.swift in Sources */ = {isa = PBXBuildFile; fileRef = 54700C9E1CE37D1000EF3A8F /* UINavigationItem+RxTests.swift.swift */; }; 54D2138E1CE0824E0028D5B4 /* UINavigationItem+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = 54D2138C1CE081890028D5B4 /* UINavigationItem+Rx.swift */; }; 601AE3DA1EE24E4F00617386 /* SwiftSupport.swift in Sources */ = {isa = PBXBuildFile; fileRef = 601AE3D91EE24E4F00617386 /* SwiftSupport.swift */; }; + 6A7D2CD423BBDBDC0038576E /* ReplayRelayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */; }; + 6A7D2CD523BBDBDC0038576E /* ReplayRelayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */; }; + 6A7D2CD623BBDBDC0038576E /* ReplayRelayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */; }; + 6A94254A23AFC2F300B7A24C /* ReplayRelay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A94254923AFC2F300B7A24C /* ReplayRelay.swift */; }; 78B6157523B69F49009C2AD9 /* Binder.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8E65EFA1F6E91D1004478C3 /* Binder.swift */; }; 78B6157723B6A035009C2AD9 /* Binder+Tests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78B6157623B6A035009C2AD9 /* Binder+Tests.swift */; }; 7EDBAEB41C89B1A6006CBE67 /* UITabBarItem+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7EDBAEAB1C89B1A5006CBE67 /* UITabBarItem+RxTests.swift */; }; @@ -941,6 +945,8 @@ 54700C9E1CE37D1000EF3A8F /* UINavigationItem+RxTests.swift.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationItem+RxTests.swift.swift"; sourceTree = ""; }; 54D2138C1CE081890028D5B4 /* UINavigationItem+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationItem+Rx.swift"; sourceTree = ""; }; 601AE3D91EE24E4F00617386 /* SwiftSupport.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SwiftSupport.swift; sourceTree = ""; }; + 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplayRelayTests.swift; sourceTree = ""; }; + 6A94254923AFC2F300B7A24C /* ReplayRelay.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplayRelay.swift; sourceTree = ""; }; 78B6157623B6A035009C2AD9 /* Binder+Tests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Binder+Tests.swift"; sourceTree = ""; }; 7EDBAEAB1C89B1A5006CBE67 /* UITabBarItem+RxTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UITabBarItem+RxTests.swift"; sourceTree = ""; }; 7F600F3D1C5D0C0100535B1D /* UIRefreshControl+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIRefreshControl+Rx.swift"; sourceTree = ""; }; @@ -1517,6 +1523,7 @@ children = ( C8B0F7101F530CA700548EBE /* PublishRelay.swift */, C8C8BCCE1F8944B800501D4D /* BehaviorRelay.swift */, + 6A94254923AFC2F300B7A24C /* ReplayRelay.swift */, A2897D61225CA3F3004EA481 /* Observable+Bind.swift */, A2897D68225D023A004EA481 /* Utils.swift */, A2FD4EA4225D0A8100288525 /* Info.plist */, @@ -1528,6 +1535,7 @@ isa = PBXGroup; children = ( A2FD4E9B225D04FF00288525 /* Observable+RelayBindTests.swift */, + 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */, ); path = RxRelayTests; sourceTree = ""; @@ -2904,6 +2912,7 @@ buildActionMask = 2147483647; files = ( A2897D57225CA236004EA481 /* PublishRelay.swift in Sources */, + 6A94254A23AFC2F300B7A24C /* ReplayRelay.swift in Sources */, A2897D58225CA236004EA481 /* BehaviorRelay.swift in Sources */, A2897D62225CA3F3004EA481 /* Observable+Bind.swift in Sources */, A2897D69225D023A004EA481 /* Utils.swift in Sources */, @@ -3072,6 +3081,7 @@ C83509351C38706E0027C24C /* KVOObservableTests.swift in Sources */, C89046581DC5F6F70041C7D8 /* UISearchBar+RxTests.swift in Sources */, C85218011E33FC160015DD38 /* RecursiveLock.swift in Sources */, + 6A7D2CD423BBDBDC0038576E /* ReplayRelayTests.swift in Sources */, C822BACA1DB4058000F98810 /* Event+Test.swift in Sources */, C83509421C38706E0027C24C /* MainThreadPrimitiveHotObservable.swift in Sources */, C801DE4A1F6EBB84008DB060 /* Observable+PrimitiveSequenceTest.swift in Sources */, @@ -3317,6 +3327,7 @@ C820A9AF1EB5073E00D431BC /* Observable+FilterTests.swift in Sources */, C820A9DF1EB50CF800D431BC /* Observable+ThrottleTests.swift in Sources */, C83509F01C3875580027C24C /* PrimitiveMockObserver.swift in Sources */, + 6A7D2CD523BBDBDC0038576E /* ReplayRelayTests.swift in Sources */, C820A9C31EB509FC00D431BC /* Observable+SkipTests.swift in Sources */, C83509C31C3875220027C24C /* KVOObservableTests.swift in Sources */, C8A9B6F51DAD752200C9B027 /* Observable+BindTests.swift in Sources */, @@ -3416,6 +3427,7 @@ C8D970F11F532FD30058F2FE /* SharedSequence+Extensions.swift in Sources */, C820A9941EB4FD1400D431BC /* Observable+SwitchIfEmptyTests.swift in Sources */, C83509CF1C3875260027C24C /* NSView+RxTests.swift in Sources */, + 6A7D2CD623BBDBDC0038576E /* ReplayRelayTests.swift in Sources */, C820A9501EB4EC3C00D431BC /* Observable+ReduceTests.swift in Sources */, C820A9841EB4FB0400D431BC /* Observable+UsingTests.swift in Sources */, C820A9881EB4FB5B00D431BC /* Observable+DebugTests.swift in Sources */, diff --git a/RxCocoa/Traits/Driver/Driver+Subscription.swift b/RxCocoa/Traits/Driver/Driver+Subscription.swift index 6ed117983..a00370bb9 100644 --- a/RxCocoa/Traits/Driver/Driver+Subscription.swift +++ b/RxCocoa/Traits/Driver/Driver+Subscription.swift @@ -78,6 +78,34 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt }) } + /** + Creates new subscription and sends elements to `ReplayRelay`. + This method can be only called from `MainThread`. + + - parameter relay: Target relay for sequence elements. + - returns: Disposable object that can be used to unsubscribe the observer from the relay. + */ + public func drive(_ relays: ReplayRelay...) -> Disposable { + MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage) + return self.drive(onNext: { e in + relays.forEach { $0.accept(e) } + }) + } + + /** + Creates new subscription and sends elements to `ReplayRelay`. + This method can be only called from `MainThread`. + + - parameter relay: Target relay for sequence elements. + - returns: Disposable object that can be used to unsubscribe the observer from the relay. + */ + public func drive(_ relays: ReplayRelay...) -> Disposable { + MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage) + return self.drive(onNext: { e in + relays.forEach { $0.accept(e) } + }) + } + /** Subscribes to observable sequence using custom binder function. This method can be only called from `MainThread`. diff --git a/RxCocoa/Traits/Signal/Signal+Subscription.swift b/RxCocoa/Traits/Signal/Signal+Subscription.swift index 1daff7c41..cfd780629 100644 --- a/RxCocoa/Traits/Signal/Signal+Subscription.swift +++ b/RxCocoa/Traits/Signal/Signal+Subscription.swift @@ -66,7 +66,7 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt } /** - Creates new subscription and sends elements to relay. + Creates new subscription and sends elements to `PublishRelay`. - parameter to: Target relays for sequence elements. - returns: Disposable object that can be used to unsubscribe the observer from the relay. @@ -78,7 +78,7 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt } /** - Creates new subscription and sends elements to relay. + Creates new subscription and sends elements to `PublishRelay`. - parameter to: Target relay for sequence elements. - returns: Disposable object that can be used to unsubscribe the observer from the relay. @@ -89,6 +89,30 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt }) } + /** + Creates new subscription and sends elements to `ReplayRelay`. + + - parameter to: Target relays for sequence elements. + - returns: Disposable object that can be used to unsubscribe the observer from the relay. + */ + public func emit(to relays: ReplayRelay...) -> Disposable { + return self.emit(onNext: { e in + relays.forEach { $0.accept(e) } + }) + } + + /** + Creates new subscription and sends elements to `ReplayRelay`. + + - parameter to: Target relay for sequence elements. + - returns: Disposable object that can be used to unsubscribe the observer from the relay. + */ + public func emit(to relays: ReplayRelay...) -> Disposable { + return self.emit(onNext: { e in + relays.forEach { $0.accept(e) } + }) + } + /** Subscribes an element handler, a completion handler and disposed handler to an observable sequence. @@ -105,6 +129,3 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt self.asObservable().subscribe(onNext: onNext, onCompleted: onCompleted, onDisposed: onDisposed) } } - - - diff --git a/RxRelay/Observable+Bind.swift b/RxRelay/Observable+Bind.swift index 8b782fc04..7a2efc7b0 100644 --- a/RxRelay/Observable+Bind.swift +++ b/RxRelay/Observable+Bind.swift @@ -100,4 +100,50 @@ extension ObservableType { } } } + + /** + Creates new subscription and sends elements to replay relay(s). + In case error occurs in debug mode, `fatalError` will be raised. + In case error occurs in release mode, `error` will be logged. + - parameter to: Target replay relay for sequence elements. + - returns: Disposable object that can be used to unsubscribe the observer. + */ + public func bind(to relays: ReplayRelay...) -> Disposable { + self.bind(to: relays) + } + + /** + Creates new subscription and sends elements to replay relay(s). + + In case error occurs in debug mode, `fatalError` will be raised. + In case error occurs in release mode, `error` will be logged. + + - parameter to: Target replay relay for sequence elements. + - returns: Disposable object that can be used to unsubscribe the observer. + */ + public func bind(to relays: ReplayRelay...) -> Disposable { + self.map { $0 as Element? }.bind(to: relays) + } + + /** + Creates new subscription and sends elements to replay relay(s). + In case error occurs in debug mode, `fatalError` will be raised. + In case error occurs in release mode, `error` will be logged. + - parameter to: Target replay relay for sequence elements. + - returns: Disposable object that can be used to unsubscribe the observer. + */ + private func bind(to relays: [ReplayRelay]) -> Disposable { + subscribe { e in + switch e { + case let .next(element): + relays.forEach { + $0.accept(element) + } + case let .error(error): + rxFatalErrorInDebug("Binding error to behavior relay: \(error)") + case .completed: + break + } + } + } } diff --git a/RxRelay/ReplayRelay.swift b/RxRelay/ReplayRelay.swift new file mode 100644 index 000000000..3cc87cd09 --- /dev/null +++ b/RxRelay/ReplayRelay.swift @@ -0,0 +1,50 @@ +// +// ReplayRelay.swift +// RxRelay +// +// Created by Zsolt Kovacs on 12/22/19. +// Copyright © 2019 Krunoslav Zaher. All rights reserved. +// + +import RxSwift + +/// ReplayRelay is a wrapper for `ReplaySubject`. +/// +/// Unlike `ReplaySubject` it can't terminate with an error or complete. +public final class ReplayRelay: ObservableType { + private let subject: ReplaySubject + + // Accepts `event` and emits it to subscribers + public func accept(_ event: Element) { + self.subject.onNext(event) + } + + private init(subject: ReplaySubject) { + self.subject = subject + } + + /// Creates new instance of `ReplayRelay` that replays at most `bufferSize` last elements sent to it. + /// + /// - parameter bufferSize: Maximal number of elements to replay to observers after subscription. + /// - returns: New instance of replay relay. + public static func create(bufferSize: Int) -> ReplayRelay { + ReplayRelay(subject: ReplaySubject.create(bufferSize: bufferSize)) + } + + /// Creates a new instance of `ReplayRelay` that buffers all the sent to it. + /// To avoid filling up memory, developer needs to make sure that the use case will only ever store a 'reasonable' + /// number of elements. + public static func createUnbound() -> ReplayRelay { + ReplayRelay(subject: ReplaySubject.createUnbounded()) + } + + /// Subscribes observer + public func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { + self.subject.subscribe(observer) + } + + /// - returns: Canonical interface for push style sequence + public func asObservable() -> Observable { + self.subject.asObserver() + } +} diff --git a/Sources/AllTestz/ReplayRelayTests.swift b/Sources/AllTestz/ReplayRelayTests.swift new file mode 120000 index 000000000..4cac618da --- /dev/null +++ b/Sources/AllTestz/ReplayRelayTests.swift @@ -0,0 +1 @@ +../../Tests/RxRelayTests/ReplayRelayTests.swift \ No newline at end of file diff --git a/Sources/AllTestz/main.swift b/Sources/AllTestz/main.swift index f42706163..d4929f115 100644 --- a/Sources/AllTestz/main.swift +++ b/Sources/AllTestz/main.swift @@ -258,13 +258,20 @@ final class DriverTest_ : DriverTest, RxTestCase { ("testDriveOptionalObserver", DriverTest.testDriveOptionalObserver), ("testDriveOptionalObservers", DriverTest.testDriveOptionalObservers), ("testDriveNoAmbiguity", DriverTest.testDriveNoAmbiguity), - ("testDriveRelay", DriverTest.testDriveRelay), - ("testDriveRelays", DriverTest.testDriveRelays), - ("testDriveOptionalRelay1", DriverTest.testDriveOptionalRelay1), + ("testDriveBehaviorRelay", DriverTest.testDriveBehaviorRelay), + ("testDriveBehaviorRelays", DriverTest.testDriveBehaviorRelays), + ("testDriveOptionalBehaviorRelay1", DriverTest.testDriveOptionalBehaviorRelay1), ("testDriveOptionalBehaviorRelays1", DriverTest.testDriveOptionalBehaviorRelays1), - ("testDriveOptionalRelay2", DriverTest.testDriveOptionalRelay2), + ("testDriveOptionalBehaviorRelay2", DriverTest.testDriveOptionalBehaviorRelay2), ("testDriveOptionalBehaviorRelays2", DriverTest.testDriveOptionalBehaviorRelays2), - ("testDriveRelayNoAmbiguity", DriverTest.testDriveRelayNoAmbiguity), + ("testDriveBehaviorRelayNoAmbiguity", DriverTest.testDriveBehaviorRelayNoAmbiguity), + ("testDriveReplayRelay", DriverTest.testDriveReplayRelay), + ("testDriveReplayRelays", DriverTest.testDriveReplayRelays), + ("testDriveOptionalReplayRelay1", DriverTest.testDriveOptionalReplayRelay1), + ("testDriveOptionalReplayRelays", DriverTest.testDriveOptionalReplayRelays), + ("testDriveOptionalReplayRelay2", DriverTest.testDriveOptionalReplayRelay2), + ("testDriveReplayRelays2", DriverTest.testDriveReplayRelays2), + ("testDriveReplayRelayNoAmbiguity", DriverTest.testDriveReplayRelayNoAmbiguity), ] } } @@ -1186,6 +1193,11 @@ final class ObservableRelayBindTest_ : ObservableRelayBindTest, RxTestCase { ("testBindToOptionalBehaviorRelay", ObservableRelayBindTest.testBindToOptionalBehaviorRelay), ("testBindToOptionalBehaviorRelays", ObservableRelayBindTest.testBindToOptionalBehaviorRelays), ("testBindToBehaviorRelayNoAmbiguity", ObservableRelayBindTest.testBindToBehaviorRelayNoAmbiguity), + ("testBindToReplayRelay", ObservableRelayBindTest.testBindToReplayRelay), + ("testBindToReplayRelays", ObservableRelayBindTest.testBindToReplayRelays), + ("testBindToOptionalReplayRelay", ObservableRelayBindTest.testBindToOptionalReplayRelay), + ("testBindToOptionalReplayRelays", ObservableRelayBindTest.testBindToOptionalReplayRelays), + ("testBindToReplayRelayNoAmbiguity", ObservableRelayBindTest.testBindToReplayRelayNoAmbiguity), ] } } @@ -1855,6 +1867,21 @@ final class RecursiveLockTests_ : RecursiveLockTests, RxTestCase { ] } } +final class ReplayRelayTests_ : ReplayRelayTests, RxTestCase { + #if os(macOS) + required override init() { + super.init() + } + #endif + + static var allTests: [(String, (ReplayRelayTests_) -> () -> Void)] { return [ + ("test_noEvents", ReplayRelayTests.test_noEvents), + ("test_fewerEventsThanBufferSize", ReplayRelayTests.test_fewerEventsThanBufferSize), + ("test_moreEventsThanBufferSize", ReplayRelayTests.test_moreEventsThanBufferSize), + ("test_moreEventsThanBufferSizeMultipleObservers", ReplayRelayTests.test_moreEventsThanBufferSizeMultipleObservers), + ] } +} + final class ReplaySubjectTest_ : ReplaySubjectTest, RxTestCase { #if os(macOS) required override init() { @@ -1866,6 +1893,14 @@ final class ReplaySubjectTest_ : ReplaySubjectTest, RxTestCase { ("test_hasObserversNoObservers", ReplaySubjectTest.test_hasObserversNoObservers), ("test_hasObserversOneObserver", ReplaySubjectTest.test_hasObserversOneObserver), ("test_hasObserversManyObserver", ReplaySubjectTest.test_hasObserversManyObserver), + ("test_noEvents", ReplaySubjectTest.test_noEvents), + ("test_fewerEventsThanBufferSize", ReplaySubjectTest.test_fewerEventsThanBufferSize), + ("test_moreEventsThanBufferSize", ReplaySubjectTest.test_moreEventsThanBufferSize), + ("test_moreEventsThanBufferSizeMultipleObservers", ReplaySubjectTest.test_moreEventsThanBufferSizeMultipleObservers), + ("test_subscribingBeforeComplete", ReplaySubjectTest.test_subscribingBeforeComplete), + ("test_subscribingAfterComplete", ReplaySubjectTest.test_subscribingAfterComplete), + ("test_subscribingBeforeError", ReplaySubjectTest.test_subscribingBeforeError), + ("test_subscribingAfterError", ReplaySubjectTest.test_subscribingAfterError), ] } } @@ -1970,6 +2005,13 @@ final class SignalTests_ : SignalTests, RxTestCase { ("testEmitOptionalPublishRelay2", SignalTests.testEmitOptionalPublishRelay2), ("testEmitPublishRelays2", SignalTests.testEmitPublishRelays2), ("testEmitPublishRelayNoAmbiguity", SignalTests.testEmitPublishRelayNoAmbiguity), + ("testEmitReplayRelay", SignalTests.testEmitReplayRelay), + ("testEmitReplayRelays", SignalTests.testEmitReplayRelays), + ("testEmitOptionalReplayRelay1", SignalTests.testEmitOptionalReplayRelay1), + ("testEmitOptionalReplayRelays", SignalTests.testEmitOptionalReplayRelays), + ("testEmitOptionalReplayRelay2", SignalTests.testEmitOptionalReplayRelay2), + ("testEmitReplayRelays2", SignalTests.testEmitReplayRelays2), + ("testEmitReplayRelayNoAmbiguity", SignalTests.testEmitReplayRelayNoAmbiguity), ] } } @@ -2171,6 +2213,7 @@ func XCTMain(_ tests: [() -> Void]) { testCase(PublishSubjectTest_.allTests), testCase(ReactiveTests_.allTests), testCase(RecursiveLockTests_.allTests), + testCase(ReplayRelayTests_.allTests), testCase(ReplaySubjectTest_.allTests), testCase(SharedSequenceOperatorTests_.allTests), testCase(SharingSchedulerTest_.allTests), diff --git a/Sources/RxRelay/ReplayRelay.swift b/Sources/RxRelay/ReplayRelay.swift new file mode 120000 index 000000000..5602c4875 --- /dev/null +++ b/Sources/RxRelay/ReplayRelay.swift @@ -0,0 +1 @@ +../../RxRelay/ReplayRelay.swift \ No newline at end of file diff --git a/Tests/RxCocoaTests/Driver+Test.swift b/Tests/RxCocoaTests/Driver+Test.swift index 00177bf8b..8ae08d5cb 100644 --- a/Tests/RxCocoaTests/Driver+Test.swift +++ b/Tests/RxCocoaTests/Driver+Test.swift @@ -397,7 +397,7 @@ extension DriverTest { // MARK: drive optional behavior relay extension DriverTest { - func testDriveRelay() { + func testDriveBehaviorRelay() { let relay = BehaviorRelay(value: 0) let subscription = (Driver.just(1) as Driver).drive(relay) @@ -406,7 +406,7 @@ extension DriverTest { subscription.dispose() } - func testDriveRelays() { + func testDriveBehaviorRelays() { let relay1 = BehaviorRelay(value: 0) let relay2 = BehaviorRelay(value: 0) @@ -416,7 +416,7 @@ extension DriverTest { XCTAssertEqual(relay2.value, 1) } - func testDriveOptionalRelay1() { + func testDriveOptionalBehaviorRelay1() { let relay = BehaviorRelay(value: 0) _ = (Driver.just(1) as Driver).drive(relay) @@ -434,7 +434,7 @@ extension DriverTest { XCTAssertEqual(relay2.value, 1) } - func testDriveOptionalRelay2() { + func testDriveOptionalBehaviorRelay2() { let relay = BehaviorRelay(value: 0) _ = (Driver.just(1) as Driver).drive(relay) @@ -452,7 +452,7 @@ extension DriverTest { XCTAssertEqual(relay2.value, 1) } - func testDriveRelayNoAmbiguity() { + func testDriveBehaviorRelayNoAmbiguity() { let relay = BehaviorRelay(value: 0) // shouldn't cause compile time error @@ -461,3 +461,122 @@ extension DriverTest { XCTAssertEqual(relay.value, 1) } } + +// MARK: drive optional behavior relay +extension DriverTest { + func testDriveReplayRelay() { + let relay = ReplayRelay.create(bufferSize: 1) + + var latest: Int? + _ = relay.subscribe(onNext: { latestElement in + latest = latestElement + }) + + _ = (Driver.just(1) as Driver).drive(relay) + + XCTAssertEqual(latest, 1) + } + + func testDriveReplayRelays() { + let relay1 = ReplayRelay.create(bufferSize: 1) + let relay2 = ReplayRelay.create(bufferSize: 1) + + var latest1: Int? + var latest2: Int? + + _ = relay1.subscribe(onNext: { latestElement in + latest1 = latestElement + }) + + _ = relay2.subscribe(onNext: { latestElement in + latest2 = latestElement + }) + + _ = (Driver.just(1) as Driver).drive(relay1, relay2) + + XCTAssertEqual(latest1, 1) + XCTAssertEqual(latest2, 1) + } + + func testDriveOptionalReplayRelay1() { + let relay = ReplayRelay.create(bufferSize: 1) + + var latest: Int? = nil + _ = relay.subscribe(onNext: { latestElement in + latest = latestElement + }) + + _ = (Driver.just(1) as Driver).drive(relay) + + XCTAssertEqual(latest, 1) + } + + func testDriveOptionalReplayRelays() { + let relay1 = ReplayRelay.create(bufferSize: 1) + let relay2 = ReplayRelay.create(bufferSize: 1) + + var latest1: Int? + var latest2: Int? + + _ = relay1.subscribe(onNext: { latestElement in + latest1 = latestElement + }) + + _ = relay2.subscribe(onNext: { latestElement in + latest2 = latestElement + }) + + _ = (Driver.just(1) as Driver).drive(relay1, relay2) + + XCTAssertEqual(latest1, 1) + XCTAssertEqual(latest2, 1) + } + + func testDriveOptionalReplayRelay2() { + let relay = ReplayRelay.create(bufferSize: 1) + + var latest: Int? + _ = relay.subscribe(onNext: { latestElement in + latest = latestElement + }) + + _ = (Driver.just(1) as Driver).drive(relay) + + XCTAssertEqual(latest, 1) + } + + func testDriveReplayRelays2() { + let relay1 = ReplayRelay.create(bufferSize: 1) + let relay2 = ReplayRelay.create(bufferSize: 1) + + var latest1: Int? + var latest2: Int? + + _ = relay1.subscribe(onNext: { latestElement in + latest1 = latestElement + }) + + _ = relay2.subscribe(onNext: { latestElement in + latest2 = latestElement + }) + + _ = (Driver.just(1) as Driver).drive(relay1, relay2) + + XCTAssertEqual(latest1, 1) + XCTAssertEqual(latest2, 1) + } + + func testDriveReplayRelayNoAmbiguity() { + let relay = ReplayRelay.create(bufferSize: 1) + + var latest: Int? = nil + _ = relay.subscribe(onNext: { latestElement in + latest = latestElement + }) + + // shouldn't cause compile time error + _ = Driver.just(1).drive(relay) + + XCTAssertEqual(latest, 1) + } +} diff --git a/Tests/RxCocoaTests/Signal+Test.swift b/Tests/RxCocoaTests/Signal+Test.swift index 2b1991c11..d2b452b41 100644 --- a/Tests/RxCocoaTests/Signal+Test.swift +++ b/Tests/RxCocoaTests/Signal+Test.swift @@ -408,7 +408,7 @@ extension SignalTests { } } -// MARK: Emit to relay +// MARK: Emit to publish relay extension SignalTests { func testEmitPublishRelay() { @@ -527,3 +527,123 @@ extension SignalTests { XCTAssertEqual(latest, 1) } } + +// MARK: Emit to replay relay + +extension SignalTests { + func testEmitReplayRelay() { + let relay = ReplayRelay.create(bufferSize: 1) + + var latest: Int? + _ = relay.subscribe(onNext: { latestElement in + latest = latestElement + }) + + _ = (Signal.just(1) as Signal).emit(to: relay) + + XCTAssertEqual(latest, 1) + } + + func testEmitReplayRelays() { + let relay1 = ReplayRelay.create(bufferSize: 1) + let relay2 = ReplayRelay.create(bufferSize: 1) + + var latest1: Int? + var latest2: Int? + + _ = relay1.subscribe(onNext: { latestElement in + latest1 = latestElement + }) + + _ = relay2.subscribe(onNext: { latestElement in + latest2 = latestElement + }) + + _ = (Signal.just(1) as Signal).emit(to: relay1, relay2) + + XCTAssertEqual(latest1, 1) + XCTAssertEqual(latest2, 1) + } + + func testEmitOptionalReplayRelay1() { + let relay = ReplayRelay.create(bufferSize: 1) + + var latest: Int? = nil + _ = relay.subscribe(onNext: { latestElement in + latest = latestElement + }) + + _ = (Signal.just(1) as Signal).emit(to: relay) + + XCTAssertEqual(latest, 1) + } + + func testEmitOptionalReplayRelays() { + let relay1 = ReplayRelay.create(bufferSize: 1) + let relay2 = ReplayRelay.create(bufferSize: 1) + + var latest1: Int? + var latest2: Int? + + _ = relay1.subscribe(onNext: { latestElement in + latest1 = latestElement + }) + + _ = relay2.subscribe(onNext: { latestElement in + latest2 = latestElement + }) + + _ = (Signal.just(1) as Signal).emit(to: relay1, relay2) + + XCTAssertEqual(latest1, 1) + XCTAssertEqual(latest2, 1) + } + + func testEmitOptionalReplayRelay2() { + let relay = ReplayRelay.create(bufferSize: 1) + + var latest: Int? + _ = relay.subscribe(onNext: { latestElement in + latest = latestElement + }) + + _ = (Signal.just(1) as Signal).emit(to: relay) + + XCTAssertEqual(latest, 1) + } + + func testEmitReplayRelays2() { + let relay1 = ReplayRelay.create(bufferSize: 1) + let relay2 = ReplayRelay.create(bufferSize: 1) + + var latest1: Int? + var latest2: Int? + + _ = relay1.subscribe(onNext: { latestElement in + latest1 = latestElement + }) + + _ = relay2.subscribe(onNext: { latestElement in + latest2 = latestElement + }) + + _ = (Signal.just(1) as Signal).emit(to: relay1, relay2) + + XCTAssertEqual(latest1, 1) + XCTAssertEqual(latest2, 1) + } + + func testEmitReplayRelayNoAmbiguity() { + let relay = ReplayRelay.create(bufferSize: 1) + + var latest: Int? = nil + _ = relay.subscribe(onNext: { latestElement in + latest = latestElement + }) + + // shouldn't cause compile time error + _ = Signal.just(1).emit(to: relay) + + XCTAssertEqual(latest, 1) + } +} diff --git a/Tests/RxRelayTests/Observable+RelayBindTests.swift b/Tests/RxRelayTests/Observable+RelayBindTests.swift index 4038d95fd..1423ea7bb 100644 --- a/Tests/RxRelayTests/Observable+RelayBindTests.swift +++ b/Tests/RxRelayTests/Observable+RelayBindTests.swift @@ -22,7 +22,7 @@ extension ObservableRelayBindTest { let relay = PublishRelay() - _ = relay.subscribe{ event in + _ = relay.subscribe { event in events.append(Recorded(time: 0, value: event)) } @@ -164,3 +164,106 @@ extension ObservableRelayBindTest { XCTAssertEqual(relay.value, 1) } } + +// MARK: bind(to:) replay relay +extension ObservableRelayBindTest { + func testBindToReplayRelay() { + var events: [Recorded>] = [] + + let relay = ReplayRelay.create(bufferSize: 1) + + _ = relay.subscribe { event in + events.append(Recorded(time: 0, value: event)) + } + + _ = Observable.just(1).bind(to: relay) + + XCTAssertEqual(events, [ + .next(1), + ]) + } + + func testBindToReplayRelays() { + var events1: [Recorded>] = [] + var events2: [Recorded>] = [] + + let relay1 = ReplayRelay.create(bufferSize: 1) + let relay2 = ReplayRelay.create(bufferSize: 1) + + _ = relay1.subscribe { event in + events1.append(Recorded(time: 0, value: event)) + } + + _ = relay2.subscribe { event in + events2.append(Recorded(time: 0, value: event)) + } + + _ = Observable.just(1).bind(to: relay1, relay2) + + XCTAssertEqual(events1, [ + .next(1), + ]) + + XCTAssertEqual(events2, [ + .next(1), + ]) + } + + func testBindToOptionalReplayRelay() { + var events: [Recorded>] = [] + + let relay = ReplayRelay.create(bufferSize: 1) + + _ = relay.subscribe { event in + events.append(Recorded(time: 0, value: event)) + } + + _ = (Observable.just(1) as Observable).bind(to: relay) + + XCTAssertEqual(events, [ + .next(1), + ]) + } + + func testBindToOptionalReplayRelays() { + var events1: [Recorded>] = [] + var events2: [Recorded>] = [] + + let relay1 = ReplayRelay.create(bufferSize: 1) + let relay2 = ReplayRelay.create(bufferSize: 1) + + _ = relay1.subscribe { event in + events1.append(Recorded(time: 0, value: event)) + } + + _ = relay2.subscribe { event in + events2.append(Recorded(time: 0, value: event)) + } + + _ = (Observable.just(1) as Observable).bind(to: relay1, relay2) + + XCTAssertEqual(events1, [ + .next(1), + ]) + + XCTAssertEqual(events2, [ + .next(1), + ]) + } + + func testBindToReplayRelayNoAmbiguity() { + var events: [Recorded>] = [] + + let relay = ReplayRelay.create(bufferSize: 1) + + _ = relay.subscribe { event in + events.append(Recorded(time: 0, value: event)) + } + + _ = Observable.just(1).bind(to: relay) + + XCTAssertEqual(events, [ + .next(1), + ]) + } +} diff --git a/Tests/RxRelayTests/ReplayRelayTests.swift b/Tests/RxRelayTests/ReplayRelayTests.swift new file mode 100644 index 000000000..b54333a19 --- /dev/null +++ b/Tests/RxRelayTests/ReplayRelayTests.swift @@ -0,0 +1,113 @@ +// +// ReplayRelayTests.swift +// Tests +// +// Created by Zsolt Kovacs on 12/31/19. +// Copyright © 2019 Krunoslav Zaher. All rights reserved. +// + +import XCTest +import RxSwift +import RxRelay +import RxTest + +class ReplayRelayTests: RxTest { + func test_noEvents() { + let scheduler = TestScheduler(initialClock: 0) + + let relay = ReplayRelay.create(bufferSize: 3) + let result = scheduler.createObserver(Int.self) + + _ = relay.subscribe(result) + + XCTAssertTrue(result.events.isEmpty) + } + + func test_fewerEventsThanBufferSize() { + let scheduler = TestScheduler(initialClock: 0) + + var relay: ReplayRelay! = nil + let result = scheduler.createObserver(Int.self) + var subscription: Disposable! = nil + + scheduler.scheduleAt(100) { relay = ReplayRelay.create(bufferSize: 3) } + scheduler.scheduleAt(150) { relay.accept(1) } + scheduler.scheduleAt(200) { relay.accept(2) } + scheduler.scheduleAt(300) { subscription = relay.subscribe(result) } + scheduler.scheduleAt(350) { + XCTAssertEqual(result.events, [ + .next(300, 1), + .next(300, 2), + ]) + } + scheduler.scheduleAt(400) { subscription.dispose() } + + scheduler.start() + } + + func test_moreEventsThanBufferSize() { + let scheduler = TestScheduler(initialClock: 0) + + var relay: ReplayRelay! = nil + let result = scheduler.createObserver(Int.self) + var subscription: Disposable! = nil + + scheduler.scheduleAt(100) { relay = ReplayRelay.create(bufferSize: 3) } + scheduler.scheduleAt(150) { relay.accept(1) } + scheduler.scheduleAt(200) { relay.accept(2) } + scheduler.scheduleAt(250) { relay.accept(3) } + scheduler.scheduleAt(300) { relay.accept(4) } + scheduler.scheduleAt(350) { relay.accept(5) } + scheduler.scheduleAt(400) { subscription = relay.subscribe(result) } + scheduler.scheduleAt(450) { + XCTAssertEqual(result.events, [ + .next(400, 3), + .next(400, 4), + .next(400, 5), + ]) + } + scheduler.scheduleAt(500) { subscription.dispose() } + + scheduler.start() + } + + func test_moreEventsThanBufferSizeMultipleObservers() { + let scheduler = TestScheduler(initialClock: 0) + + var relay: ReplayRelay! = nil + let result1 = scheduler.createObserver(Int.self) + var subscription1: Disposable! = nil + + let result2 = scheduler.createObserver(Int.self) + var subscription2: Disposable! = nil + + scheduler.scheduleAt(100) { relay = ReplayRelay.create(bufferSize: 3) } + scheduler.scheduleAt(150) { subscription1 = relay.subscribe(result1) } + scheduler.scheduleAt(200) { relay.accept(1) } + scheduler.scheduleAt(250) { relay.accept(2) } + scheduler.scheduleAt(300) { relay.accept(3) } + scheduler.scheduleAt(350) { relay.accept(4) } + scheduler.scheduleAt(400) { relay.accept(5) } + scheduler.scheduleAt(450) { subscription2 = relay.subscribe(result2) } + scheduler.scheduleAt(500) { + XCTAssertEqual(result1.events, [ + .next(200, 1), + .next(250, 2), + .next(300, 3), + .next(350, 4), + .next(400, 5), + ]) + XCTAssertEqual(result2.events, [ + .next(450, 3), + .next(450, 4), + .next(450, 5), + ]) + } + scheduler.scheduleAt(550) { + subscription1.dispose() + subscription2.dispose() + } + + scheduler.start() + } +} diff --git a/Tests/RxSwiftTests/ReplaySubjectTest.swift b/Tests/RxSwiftTests/ReplaySubjectTest.swift index b10af3abc..2e26b78e4 100644 --- a/Tests/RxSwiftTests/ReplaySubjectTest.swift +++ b/Tests/RxSwiftTests/ReplaySubjectTest.swift @@ -70,4 +70,215 @@ class ReplaySubjectTest: RxTest { scheduler.start() } + + func test_noEvents() { + let scheduler = TestScheduler(initialClock: 0) + + let subject = ReplaySubject.create(bufferSize: 3) + let result = scheduler.createObserver(Int.self) + + _ = subject.subscribe(result) + + XCTAssertTrue(result.events.isEmpty) + } + + func test_fewerEventsThanBufferSize() { + let scheduler = TestScheduler(initialClock: 0) + + var subject: ReplaySubject! = nil + let result = scheduler.createObserver(Int.self) + var subscription: Disposable! = nil + + scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) } + scheduler.scheduleAt(150) { subject.onNext(1) } + scheduler.scheduleAt(200) { subject.onNext(2) } + scheduler.scheduleAt(300) { subscription = subject.subscribe(result) } + scheduler.scheduleAt(350) { + XCTAssertEqual(result.events, [ + .next(300, 1), + .next(300, 2), + ]) + } + scheduler.scheduleAt(400) { subscription.dispose() } + + scheduler.start() + } + + func test_moreEventsThanBufferSize() { + let scheduler = TestScheduler(initialClock: 0) + + var subject: ReplaySubject! = nil + let result = scheduler.createObserver(Int.self) + var subscription: Disposable! = nil + + scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) } + scheduler.scheduleAt(150) { subject.onNext(1) } + scheduler.scheduleAt(200) { subject.onNext(2) } + scheduler.scheduleAt(250) { subject.onNext(3) } + scheduler.scheduleAt(300) { subject.onNext(4) } + scheduler.scheduleAt(350) { subject.onNext(5) } + scheduler.scheduleAt(400) { subscription = subject.subscribe(result) } + scheduler.scheduleAt(450) { + XCTAssertEqual(result.events, [ + .next(400, 3), + .next(400, 4), + .next(400, 5), + ]) + } + scheduler.scheduleAt(500) { subscription.dispose() } + + scheduler.start() + } + + func test_moreEventsThanBufferSizeMultipleObservers() { + let scheduler = TestScheduler(initialClock: 0) + + var subject: ReplaySubject! = nil + let result1 = scheduler.createObserver(Int.self) + var subscription1: Disposable! = nil + + let result2 = scheduler.createObserver(Int.self) + var subscription2: Disposable! = nil + + scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) } + scheduler.scheduleAt(150) { subscription1 = subject.subscribe(result1) } + scheduler.scheduleAt(200) { subject.onNext(1) } + scheduler.scheduleAt(250) { subject.onNext(2) } + scheduler.scheduleAt(300) { subject.onNext(3) } + scheduler.scheduleAt(350) { subject.onNext(4) } + scheduler.scheduleAt(400) { subject.onNext(5) } + scheduler.scheduleAt(450) { subscription2 = subject.subscribe(result2) } + scheduler.scheduleAt(500) { + XCTAssertEqual(result1.events, [ + .next(200, 1), + .next(250, 2), + .next(300, 3), + .next(350, 4), + .next(400, 5), + ]) + XCTAssertEqual(result2.events, [ + .next(450, 3), + .next(450, 4), + .next(450, 5), + ]) + } + scheduler.scheduleAt(550) { + subscription1.dispose() + subscription2.dispose() + } + + scheduler.start() + } + + func test_subscribingBeforeComplete() { + let scheduler = TestScheduler(initialClock: 0) + + var subject: ReplaySubject! = nil + let result = scheduler.createObserver(Int.self) + var subscription: Disposable! = nil + + scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) } + scheduler.scheduleAt(150) { subject.onNext(1) } + scheduler.scheduleAt(200) { subject.onNext(2) } + scheduler.scheduleAt(250) { subject.onNext(3) } + scheduler.scheduleAt(300) { subject.onNext(4) } + scheduler.scheduleAt(350) { subject.onNext(5) } + scheduler.scheduleAt(400) { subscription = subject.subscribe(result) } + scheduler.scheduleAt(450) { subject.onCompleted() } + scheduler.scheduleAt(500) { + XCTAssertEqual(result.events, [ + .next(400, 3), + .next(400, 4), + .next(400, 5), + .completed(450), + ]) + } + scheduler.scheduleAt(550) { subscription.dispose() } + + scheduler.start() + } + + func test_subscribingAfterComplete() { + let scheduler = TestScheduler(initialClock: 0) + + var subject: ReplaySubject! = nil + let result = scheduler.createObserver(Int.self) + var subscription: Disposable! = nil + + scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) } + scheduler.scheduleAt(150) { subject.onNext(1) } + scheduler.scheduleAt(200) { subject.onNext(2) } + scheduler.scheduleAt(250) { subject.onNext(3) } + scheduler.scheduleAt(300) { subject.onNext(4) } + scheduler.scheduleAt(350) { subject.onNext(5) } + scheduler.scheduleAt(400) { subject.onCompleted() } + scheduler.scheduleAt(450) { subscription = subject.subscribe(result) } + scheduler.scheduleAt(500) { + XCTAssertEqual(result.events, [ + .next(450, 3), + .next(450, 4), + .next(450, 5), + .completed(450), + ]) + } + scheduler.scheduleAt(550) { subscription.dispose() } + + scheduler.start() + } + + func test_subscribingBeforeError() { + let scheduler = TestScheduler(initialClock: 0) + + var subject: ReplaySubject! = nil + let result = scheduler.createObserver(Int.self) + var subscription: Disposable! = nil + + scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) } + scheduler.scheduleAt(150) { subject.onNext(1) } + scheduler.scheduleAt(200) { subject.onNext(2) } + scheduler.scheduleAt(250) { subject.onNext(3) } + scheduler.scheduleAt(300) { subject.onNext(4) } + scheduler.scheduleAt(350) { subject.onNext(5) } + scheduler.scheduleAt(400) { subscription = subject.subscribe(result) } + scheduler.scheduleAt(450) { subject.onError(testError) } + scheduler.scheduleAt(500) { + XCTAssertEqual(result.events, [ + .next(400, 3), + .next(400, 4), + .next(400, 5), + .error(450, testError), + ]) + } + scheduler.scheduleAt(550) { subscription.dispose() } + + scheduler.start() + } + + func test_subscribingAfterError() { + let scheduler = TestScheduler(initialClock: 0) + + var subject: ReplaySubject! = nil + let result = scheduler.createObserver(Int.self) + var subscription: Disposable! = nil + + scheduler.scheduleAt(100) { subject = ReplaySubject.create(bufferSize: 3) } + scheduler.scheduleAt(150) { subject.onNext(1) } + scheduler.scheduleAt(200) { subject.onNext(2) } + scheduler.scheduleAt(250) { subject.onNext(3) } + scheduler.scheduleAt(300) { subject.onNext(4) } + scheduler.scheduleAt(350) { subject.onNext(5) } + scheduler.scheduleAt(400) { subject.onError(testError) } + scheduler.scheduleAt(450) { subscription = subject.subscribe(result) } + scheduler.scheduleAt(500) { + XCTAssertEqual(result.events, [ + .next(450, 3), + .next(450, 4), + .next(450, 5), + .error(450, testError), + ]) + } + scheduler.scheduleAt(550) { subscription.dispose() } + + scheduler.start() + } }