Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten changes #2976

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 23 additions & 3 deletions ReactiveCocoa/RACSignal+Operations.m
Expand Up @@ -516,15 +516,29 @@ - (RACSignal *)flatten:(NSUInteger)maxConcurrent {
}
};

[compoundDisposable addDisposable:[RACDisposable disposableWithBlock:^{
// A strong reference is held to `subscribeToSignal` until we're
// done, preventing it from deallocating early.
subscribeToSignal = nil;
}]];

// The signals waiting to be started.
//
// This array should only be used while synchronized on `subscriber`.
NSMutableArray *queuedSignals = [NSMutableArray array];

// Whether the next signal is about to be subscribed to (prevent race condition
// with a new signal being added to the queue at the same time as the
// current signal completes)
//
// This should only be used while synchronized on `subscriber`.
__block BOOL pendingSubscription = NO;

recur = subscribeToSignal = ^(RACSignal *signal) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

@synchronized (subscriber) {
pendingSubscription = NO;
[compoundDisposable addDisposable:serialDisposable];
[activeDisposables addObject:serialDisposable];
}
Expand All @@ -543,14 +557,18 @@ - (RACSignal *)flatten:(NSUInteger)maxConcurrent {

if (queuedSignals.count == 0) {
completeIfAllowed();
pendingSubscription = NO;
return;
}

nextSignal = queuedSignals[0];
[queuedSignals removeObjectAtIndex:0];
pendingSubscription = YES;
}

subscribeToSignal(nextSignal);
if (subscribeToSignal) {
subscribeToSignal(nextSignal);
}
}];
};

Expand All @@ -560,7 +578,7 @@ - (RACSignal *)flatten:(NSUInteger)maxConcurrent {
NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);

@synchronized (subscriber) {
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
if (maxConcurrent > 0 && (activeDisposables.count >= maxConcurrent || pendingSubscription)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works generally. It works for -concat because maxConcurrent is 1, but flatten: needs to handle n—including n pending subscriptions.

Think of the case where a signal is flatten:10. If 3 threads race to send a signal through the flatten:, all 3 should be immediately subscribed to—even if one of them is pendingSubscription.

So I think pendingSubscription should be an NSUInteger of pending subscriptions, and this should check maxConcurrent > 0 || (activeDisposables.count + pendingSubscriptions >= maxConcurrent). Or maybe it would be simpler to add a activeCount and use/manipulate that instead.

This is probably relatively uncommon—flatten: didn't seem to be widely used. But it could lead to some bad behavior.

[queuedSignals addObject:signal];

// If we need to wait, skip subscribing to this
Expand All @@ -569,7 +587,9 @@ - (RACSignal *)flatten:(NSUInteger)maxConcurrent {
}
}

subscribeToSignal(signal);
if (subscribeToSignal) {
subscribeToSignal(signal);
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
Expand Down
55 changes: 55 additions & 0 deletions ReactiveCocoaTests/RACSignalSpec.m
Expand Up @@ -1907,6 +1907,61 @@ + (void)configure:(Configuration *)configuration {
expect(@(flattenDisposable.disposed)).toEventually(beTruthy());
}
});

qck_it(@"only subscribes to one signal at a time - even when adding new signals at the same time as one completes", ^{
for (int i = 0; i < 100; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not wild about adding a long test like this. 😕 It takes 1.6s on CI.

dispatch_semaphore_t ready1 = dispatch_semaphore_create(0);
dispatch_semaphore_t ready2 = dispatch_semaphore_create(0);
dispatch_semaphore_t barrier1 = dispatch_semaphore_create(0);
dispatch_semaphore_t barrier2 = dispatch_semaphore_create(0);
subscribedTo1 = NO;
subscribedTo2 = NO;
subscribedTo3 = NO;

RACDisposable *flattenDisposable = [[signalsSubject flatten:1] subscribeNext:^(id x) {
}];

[signalsSubject sendNext:sub1];
[signalsSubject sendNext:sub2];

expect(@(subscribedTo1)).to(beTruthy());
expect(@(subscribedTo2)).to(beFalsy());
expect(@(subscribedTo3)).to(beFalsy());

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
dispatch_semaphore_signal(ready1);
dispatch_semaphore_wait(barrier1, DISPATCH_TIME_FOREVER);
[subject1 sendCompleted];
});

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
dispatch_semaphore_signal(ready2);
dispatch_semaphore_wait(barrier2, DISPATCH_TIME_FOREVER);
[signalsSubject sendNext:sub3];
});

dispatch_semaphore_wait(ready1, DISPATCH_TIME_FOREVER);
dispatch_semaphore_wait(ready2, DISPATCH_TIME_FOREVER);

// Try to complete one signal and add a new signal
// very, very shortly after
dispatch_semaphore_signal(barrier1);
dispatch_semaphore_signal(barrier2);

// The race condition allows the just added signal to
// immediately get subscribed to - see that the third signal
// doesn't get that subscription yet.
expect(@(subscribedTo2)).toEventually(beTruthy());
expect(@(subscribedTo3)).to(beFalsy());

[subject2 sendCompleted];
expect(@(subscribedTo3)).toEventually(beTruthy());

[subject3 sendCompleted];

[flattenDisposable dispose];
}
});
});

qck_describe(@"-switchToLatest", ^{
Expand Down