Navigation Menu

Skip to content

Commit

Permalink
Add ReplayRelay (#2111)
Browse files Browse the repository at this point in the history
  • Loading branch information
lordzsolt authored and freak4pc committed Oct 6, 2020
1 parent e7a3fcb commit 4e969fa
Show file tree
Hide file tree
Showing 16 changed files with 889 additions and 20 deletions.
1 change: 1 addition & 0 deletions .jazzy.yml
Expand Up @@ -127,6 +127,7 @@ custom_categories:
- BehaviorRelay
- Observable+Bind
- PublishRelay
- ReplayRelay
- Utils
- name: RxSwift
children:
Expand Down
4 changes: 2 additions & 2 deletions Documentation/Subjects.md
Expand Up @@ -6,10 +6,10 @@ All of behave exactly the same like described [here](http://reactivex.io/documen
Relays
======

RxRelay provides two kinds of Relays: `PublishRelay` and `BehaviorRelay`.
RxRelay provides three kinds of Relays: `PublishRelay`, `BehaviorRelay` and `ReplayRelay`.
They behave exactly like their parallel `Subject`s, with two changes:

- Relays never complete.
- Relays never emit errors.

In essence, Relays only emit `.next` events, and never terminate.
In essence, Relays only emit `.next` events, and never terminate.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -68,7 +68,7 @@ RxSwift comprises five separate components depending on each other in the follow

* **RxSwift**: The core of RxSwift, providing the Rx standard as (mostly) defined by [ReactiveX](https://reactivex.io). It has no other dependencies.
* **RxCocoa**: Provides Cocoa-specific capabilities for general iOS/macOS/watchOS & tvOS app development, such as Shared Sequences, Traits, and much more. It depends on both `RxSwift` and `RxRelay`.
* **RxRelay**: Provides `PublishRelay` and `BehaviorRelay`, two [simple wrappers around Subjects](https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Subjects.md#relays). It depends on `RxSwift`.
* **RxRelay**: Provides `PublishRelay`, `BehaviorRelay` and `ReplayRelay`, three [simple wrappers around Subjects](https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Subjects.md#relays). It depends on `RxSwift`.
* **RxTest** and **RxBlocking**: Provides testing capabilities for Rx-based systems. It depends on `RxSwift`.

###### ... find compatible
Expand Down
12 changes: 12 additions & 0 deletions Rx.xcodeproj/project.pbxproj
Expand Up @@ -47,6 +47,10 @@
54700CA11CE37E1900EF3A8F /* UINavigationItem+RxTests.swift.swift in Sources */ = {isa = PBXBuildFile; fileRef = 54700C9E1CE37D1000EF3A8F /* UINavigationItem+RxTests.swift.swift */; };
54D2138E1CE0824E0028D5B4 /* UINavigationItem+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = 54D2138C1CE081890028D5B4 /* UINavigationItem+Rx.swift */; };
601AE3DA1EE24E4F00617386 /* SwiftSupport.swift in Sources */ = {isa = PBXBuildFile; fileRef = 601AE3D91EE24E4F00617386 /* SwiftSupport.swift */; };
6A7D2CD423BBDBDC0038576E /* ReplayRelayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */; };
6A7D2CD523BBDBDC0038576E /* ReplayRelayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */; };
6A7D2CD623BBDBDC0038576E /* ReplayRelayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */; };
6A94254A23AFC2F300B7A24C /* ReplayRelay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6A94254923AFC2F300B7A24C /* ReplayRelay.swift */; };
78B6157523B69F49009C2AD9 /* Binder.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8E65EFA1F6E91D1004478C3 /* Binder.swift */; };
78B6157723B6A035009C2AD9 /* Binder+Tests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78B6157623B6A035009C2AD9 /* Binder+Tests.swift */; };
7EDBAEB41C89B1A6006CBE67 /* UITabBarItem+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7EDBAEAB1C89B1A5006CBE67 /* UITabBarItem+RxTests.swift */; };
Expand Down Expand Up @@ -941,6 +945,8 @@
54700C9E1CE37D1000EF3A8F /* UINavigationItem+RxTests.swift.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationItem+RxTests.swift.swift"; sourceTree = "<group>"; };
54D2138C1CE081890028D5B4 /* UINavigationItem+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationItem+Rx.swift"; sourceTree = "<group>"; };
601AE3D91EE24E4F00617386 /* SwiftSupport.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SwiftSupport.swift; sourceTree = "<group>"; };
6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplayRelayTests.swift; sourceTree = "<group>"; };
6A94254923AFC2F300B7A24C /* ReplayRelay.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReplayRelay.swift; sourceTree = "<group>"; };
78B6157623B6A035009C2AD9 /* Binder+Tests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Binder+Tests.swift"; sourceTree = "<group>"; };
7EDBAEAB1C89B1A5006CBE67 /* UITabBarItem+RxTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UITabBarItem+RxTests.swift"; sourceTree = "<group>"; };
7F600F3D1C5D0C0100535B1D /* UIRefreshControl+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIRefreshControl+Rx.swift"; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1517,6 +1523,7 @@
children = (
C8B0F7101F530CA700548EBE /* PublishRelay.swift */,
C8C8BCCE1F8944B800501D4D /* BehaviorRelay.swift */,
6A94254923AFC2F300B7A24C /* ReplayRelay.swift */,
A2897D61225CA3F3004EA481 /* Observable+Bind.swift */,
A2897D68225D023A004EA481 /* Utils.swift */,
A2FD4EA4225D0A8100288525 /* Info.plist */,
Expand All @@ -1528,6 +1535,7 @@
isa = PBXGroup;
children = (
A2FD4E9B225D04FF00288525 /* Observable+RelayBindTests.swift */,
6A7D2CD323BBDBDC0038576E /* ReplayRelayTests.swift */,
);
path = RxRelayTests;
sourceTree = "<group>";
Expand Down Expand Up @@ -2904,6 +2912,7 @@
buildActionMask = 2147483647;
files = (
A2897D57225CA236004EA481 /* PublishRelay.swift in Sources */,
6A94254A23AFC2F300B7A24C /* ReplayRelay.swift in Sources */,
A2897D58225CA236004EA481 /* BehaviorRelay.swift in Sources */,
A2897D62225CA3F3004EA481 /* Observable+Bind.swift in Sources */,
A2897D69225D023A004EA481 /* Utils.swift in Sources */,
Expand Down Expand Up @@ -3072,6 +3081,7 @@
C83509351C38706E0027C24C /* KVOObservableTests.swift in Sources */,
C89046581DC5F6F70041C7D8 /* UISearchBar+RxTests.swift in Sources */,
C85218011E33FC160015DD38 /* RecursiveLock.swift in Sources */,
6A7D2CD423BBDBDC0038576E /* ReplayRelayTests.swift in Sources */,
C822BACA1DB4058000F98810 /* Event+Test.swift in Sources */,
C83509421C38706E0027C24C /* MainThreadPrimitiveHotObservable.swift in Sources */,
C801DE4A1F6EBB84008DB060 /* Observable+PrimitiveSequenceTest.swift in Sources */,
Expand Down Expand Up @@ -3317,6 +3327,7 @@
C820A9AF1EB5073E00D431BC /* Observable+FilterTests.swift in Sources */,
C820A9DF1EB50CF800D431BC /* Observable+ThrottleTests.swift in Sources */,
C83509F01C3875580027C24C /* PrimitiveMockObserver.swift in Sources */,
6A7D2CD523BBDBDC0038576E /* ReplayRelayTests.swift in Sources */,
C820A9C31EB509FC00D431BC /* Observable+SkipTests.swift in Sources */,
C83509C31C3875220027C24C /* KVOObservableTests.swift in Sources */,
C8A9B6F51DAD752200C9B027 /* Observable+BindTests.swift in Sources */,
Expand Down Expand Up @@ -3416,6 +3427,7 @@
C8D970F11F532FD30058F2FE /* SharedSequence+Extensions.swift in Sources */,
C820A9941EB4FD1400D431BC /* Observable+SwitchIfEmptyTests.swift in Sources */,
C83509CF1C3875260027C24C /* NSView+RxTests.swift in Sources */,
6A7D2CD623BBDBDC0038576E /* ReplayRelayTests.swift in Sources */,
C820A9501EB4EC3C00D431BC /* Observable+ReduceTests.swift in Sources */,
C820A9841EB4FB0400D431BC /* Observable+UsingTests.swift in Sources */,
C820A9881EB4FB5B00D431BC /* Observable+DebugTests.swift in Sources */,
Expand Down
28 changes: 28 additions & 0 deletions RxCocoa/Traits/Driver/Driver+Subscription.swift
Expand Up @@ -78,6 +78,34 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt
})
}

/**
Creates new subscription and sends elements to `ReplayRelay`.
This method can be only called from `MainThread`.
- parameter relay: Target relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
*/
public func drive(_ relays: ReplayRelay<Element>...) -> Disposable {
MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
return self.drive(onNext: { e in
relays.forEach { $0.accept(e) }
})
}

/**
Creates new subscription and sends elements to `ReplayRelay`.
This method can be only called from `MainThread`.
- parameter relay: Target relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
*/
public func drive(_ relays: ReplayRelay<Element?>...) -> Disposable {
MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
return self.drive(onNext: { e in
relays.forEach { $0.accept(e) }
})
}

/**
Subscribes to observable sequence using custom binder function.
This method can be only called from `MainThread`.
Expand Down
31 changes: 26 additions & 5 deletions RxCocoa/Traits/Signal/Signal+Subscription.swift
Expand Up @@ -66,7 +66,7 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
}

/**
Creates new subscription and sends elements to relay.
Creates new subscription and sends elements to `PublishRelay`.
- parameter to: Target relays for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
Expand All @@ -78,7 +78,7 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
}

/**
Creates new subscription and sends elements to relay.
Creates new subscription and sends elements to `PublishRelay`.
- parameter to: Target relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
Expand All @@ -89,6 +89,30 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
})
}

/**
Creates new subscription and sends elements to `ReplayRelay`.
- parameter to: Target relays for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
*/
public func emit(to relays: ReplayRelay<Element>...) -> Disposable {
return self.emit(onNext: { e in
relays.forEach { $0.accept(e) }
})
}

/**
Creates new subscription and sends elements to `ReplayRelay`.
- parameter to: Target relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer from the relay.
*/
public func emit(to relays: ReplayRelay<Element?>...) -> Disposable {
return self.emit(onNext: { e in
relays.forEach { $0.accept(e) }
})
}

/**
Subscribes an element handler, a completion handler and disposed handler to an observable sequence.
Expand All @@ -105,6 +129,3 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
self.asObservable().subscribe(onNext: onNext, onCompleted: onCompleted, onDisposed: onDisposed)
}
}



46 changes: 46 additions & 0 deletions RxRelay/Observable+Bind.swift
Expand Up @@ -100,4 +100,50 @@ extension ObservableType {
}
}
}

/**
Creates new subscription and sends elements to replay relay(s).
In case error occurs in debug mode, `fatalError` will be raised.
In case error occurs in release mode, `error` will be logged.
- parameter to: Target replay relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer.
*/
public func bind(to relays: ReplayRelay<Element>...) -> Disposable {
self.bind(to: relays)
}

/**
Creates new subscription and sends elements to replay relay(s).
In case error occurs in debug mode, `fatalError` will be raised.
In case error occurs in release mode, `error` will be logged.
- parameter to: Target replay relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer.
*/
public func bind(to relays: ReplayRelay<Element?>...) -> Disposable {
self.map { $0 as Element? }.bind(to: relays)
}

/**
Creates new subscription and sends elements to replay relay(s).
In case error occurs in debug mode, `fatalError` will be raised.
In case error occurs in release mode, `error` will be logged.
- parameter to: Target replay relay for sequence elements.
- returns: Disposable object that can be used to unsubscribe the observer.
*/
private func bind(to relays: [ReplayRelay<Element>]) -> Disposable {
subscribe { e in
switch e {
case let .next(element):
relays.forEach {
$0.accept(element)
}
case let .error(error):
rxFatalErrorInDebug("Binding error to behavior relay: \(error)")
case .completed:
break
}
}
}
}
50 changes: 50 additions & 0 deletions RxRelay/ReplayRelay.swift
@@ -0,0 +1,50 @@
//
// ReplayRelay.swift
// RxRelay
//
// Created by Zsolt Kovacs on 12/22/19.
// Copyright © 2019 Krunoslav Zaher. All rights reserved.
//

import RxSwift

/// ReplayRelay is a wrapper for `ReplaySubject`.
///
/// Unlike `ReplaySubject` it can't terminate with an error or complete.
public final class ReplayRelay<Element>: ObservableType {
private let subject: ReplaySubject<Element>

// Accepts `event` and emits it to subscribers
public func accept(_ event: Element) {
self.subject.onNext(event)
}

private init(subject: ReplaySubject<Element>) {
self.subject = subject
}

/// Creates new instance of `ReplayRelay` that replays at most `bufferSize` last elements sent to it.
///
/// - parameter bufferSize: Maximal number of elements to replay to observers after subscription.
/// - returns: New instance of replay relay.
public static func create(bufferSize: Int) -> ReplayRelay<Element> {
ReplayRelay(subject: ReplaySubject.create(bufferSize: bufferSize))
}

/// Creates a new instance of `ReplayRelay` that buffers all the sent to it.
/// To avoid filling up memory, developer needs to make sure that the use case will only ever store a 'reasonable'
/// number of elements.
public static func createUnbound() -> ReplayRelay<Element> {
ReplayRelay(subject: ReplaySubject.createUnbounded())
}

/// Subscribes observer
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.subject.subscribe(observer)
}

/// - returns: Canonical interface for push style sequence
public func asObservable() -> Observable<Element> {
self.subject.asObserver()
}
}
1 change: 1 addition & 0 deletions Sources/AllTestz/ReplayRelayTests.swift

0 comments on commit 4e969fa

Please sign in to comment.