Skip to content

Commit

Permalink
Merge pull request ReactiveCocoa#24 from Lightricks/feature/switch-to…
Browse files Browse the repository at this point in the history
…-latest-dispose-inner-signal-first

RACSignal+Operations: rewrite -switchToLatest without -takeUntil:.
  • Loading branch information
StatusReport committed Dec 12, 2018
2 parents 5406a55 + 8d6619b commit df81491
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 17 deletions.
74 changes: 62 additions & 12 deletions ReactiveObjC/RACSignal+Operations.m
Expand Up @@ -28,6 +28,7 @@
#import "RACUnit.h"
#import <libkern/OSAtomic.h>
#import <objc/runtime.h>
#import <os/lock.h>

NSString * const RACSignalErrorDomain = @"RACSignalErrorDomain";

Expand Down Expand Up @@ -852,22 +853,71 @@ - (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {

- (RACSignal *)switchToLatest {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACMulticastConnection *connection = [self publish];
__block BOOL outerCompleted = NO;
__block BOOL innerCompleted = NO;
__block NSUInteger currentInnerIndex = 0;

RACSerialDisposable *innerDisposable = [[RACSerialDisposable alloc] init];
__block os_unfair_lock lock = OS_UNFAIR_LOCK_INIT;

RACDisposable *outerDisposable = [self subscribeNext:^(RACSignal *signal) {
NSCAssert(!signal || [signal isKindOfClass:RACSignal.class], @"-switchToLatest expects signals as values, instead we got: %@", signal);
signal = signal ?: [RACSignal empty];

// Ensure no further data is sent from the previous inner signal.
[innerDisposable.disposable dispose];

os_unfair_lock_lock(&lock);
NSUInteger innerIndex = ++currentInnerIndex;
// We got a new inner signal, so reset the inner completion flag only if the outer signal has
// not completed already, otherwise we may miss sending a completion in a different code path.
innerCompleted &= outerCompleted;
BOOL alreadyCompleted = innerCompleted;
os_unfair_lock_unlock(&lock);

// Avoid subscribing to a new inner signal which may send more values if the signal already
// completed. In this case we're already going to send a completion via the inner or the outer
// signals in a different code path.
if (alreadyCompleted) {
return;
}

RACDisposable *subscriptionDisposable = [[connection.signal
flattenMap:^(RACSignal *x) {
NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);
// Required to guarantee that the previous inner signal will not send any new values from now
// on.
[innerDisposable.disposable dispose];

// -concat:[RACSignal never] prevents completion of the receiver from
// prematurely terminating the inner signal.
return [x takeUntil:[connection.signal concat:[RACSignal never]]];
}]
subscribe:subscriber];
innerDisposable.disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
os_unfair_lock_lock(&lock);
if (innerIndex == currentInnerIndex) {
innerCompleted = YES;
}
BOOL sendCompleted = innerCompleted && outerCompleted;
os_unfair_lock_unlock(&lock);

if (sendCompleted) {
[subscriber sendCompleted];
}
}];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
os_unfair_lock_lock(&lock);
BOOL localInnerCompleted = innerCompleted;
outerCompleted = YES;
os_unfair_lock_unlock(&lock);

if (localInnerCompleted) {
[subscriber sendCompleted];
}
}];

RACDisposable *connectionDisposable = [connection connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[connectionDisposable dispose];
[innerDisposable dispose];
[outerDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -switchToLatest", self.name];
}
Expand Down
55 changes: 50 additions & 5 deletions ReactiveObjCTests/RACSignalSpec.m
Expand Up @@ -252,7 +252,7 @@ + (void)configure:(Configuration *)configuration {
expect(@(secondSubscribed)).to(beFalsy());
expect(@(errored)).to(beTruthy());
});

qck_it(@"should not retain signals that are subscribed", ^{
__weak RACSignal *weakSignal;
@autoreleasepool {
Expand Down Expand Up @@ -1985,6 +1985,20 @@ + (void)configure:(Configuration *)configuration {
expect(@(completed)).to(beTruthy());
});

qck_it(@"should dispose previous inner signal before subscribing to new inner signal", ^{
RACSubject *otherSignal = [RACSubject subject];
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[otherSignal sendNext:@"foo"];
return nil;
}];

[subject sendNext:otherSignal];
expect(values).to(equal(@[]));

[subject sendNext:signal];
expect(values).to(equal(@[]));
});

qck_it(@"should accept nil signals", ^{
[subject sendNext:nil];
[subject sendNext:[RACSignal createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
Expand All @@ -1997,6 +2011,37 @@ + (void)configure:(Configuration *)configuration {
expect(values).to(equal(expected));
});

qck_it(@"should deliver in right order when inner signals deliver on multiple schedulers", ^{
for (int i = 0; i < 50; ++i) {
@autoreleasepool {
RACSignal *signalA = [[RACSignal return:@1]
subscribeOn:[RACScheduler scheduler]];

RACSignal *signalB = [[RACSignal return:@2]
subscribeOn:[RACScheduler scheduler]];

__block NSMutableArray *values = [NSMutableArray array];
RACSubject *subject = [RACSubject subject];

__block atomic_bool completed = NO;
[[subject
switchToLatest]
subscribeNext:^(id x) {
[values addObject:x];
} completed:^{
expect(values.lastObject).to(equal(@2));
completed = YES;
}];

[subject sendNext:signalA];
[subject sendNext:signalB];
[subject sendCompleted];

expect(completed).toEventually(beTruthy());
}
}
});

qck_it(@"should return a cold signal", ^{
__block NSUInteger subscriptions = 0;
RACSignal *signalOfSignals = [RACSignal createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
Expand Down Expand Up @@ -3264,7 +3309,7 @@ + (void)configure:(Configuration *)configuration {
qck_beforeEach(^{
signal = [[[RACSignal return:@0] concat:[RACSignal return:@1]] concat:[RACSignal return:@2]];
});

qck_it(@"should return true when the predicate is truthy for at least one value", ^{
RACSignal *any = [signal any:^BOOL(NSNumber *value) {
return value.integerValue > 0;
Expand All @@ -3280,7 +3325,7 @@ + (void)configure:(Configuration *)configuration {
expect(values).to(equal(@[@YES]));
expect(@(completed)).to(equal(@YES));
});

qck_it(@"should return false when the predicate is falsy for all values", ^{
RACSignal *any = [signal any:^BOOL(NSNumber *value) {
return value.integerValue == 3;
Expand Down Expand Up @@ -3322,7 +3367,7 @@ + (void)configure:(Configuration *)configuration {
qck_beforeEach(^{
signal = [[[RACSignal return:@0] concat:[RACSignal return:@1]] concat:[RACSignal return:@2]];
});

qck_it(@"should return true when all values pass", ^{
RACSignal *all = [signal all:^BOOL(NSNumber *value) {
return value.integerValue >= 0;
Expand All @@ -3338,7 +3383,7 @@ + (void)configure:(Configuration *)configuration {
expect(values).to(equal(@[@YES]));
expect(@(completed)).to(equal(@YES));
});

qck_it(@"should return false when at least one value fails", ^{
RACSignal *all = [signal all:^BOOL(NSNumber *value) {
return value.integerValue < 2;
Expand Down

0 comments on commit df81491

Please sign in to comment.