diff --git a/ReactiveCocoa/Objective-C/RACCommand.m b/ReactiveCocoa/Objective-C/RACCommand.m index 99baa9f1e0..eccd5a4363 100644 --- a/ReactiveCocoa/Objective-C/RACCommand.m +++ b/ReactiveCocoa/Objective-C/RACCommand.m @@ -25,20 +25,15 @@ const NSInteger RACCommandErrorNotEnabled = 1; @interface RACCommand () { - // The mutable array backing `activeExecutionSignals`. - // - // This should only be used while synchronized on `self`. - NSMutableArray *_activeExecutionSignals; - // Atomic backing variable for `allowsConcurrentExecution`. volatile uint32_t _allowsConcurrentExecution; } -// An array of signals representing in-flight executions, in the order they -// began. -// -// This property is KVO-compliant. -@property (atomic, copy, readonly) NSArray *activeExecutionSignals; +/// A subject that sends added execution signals. +@property (nonatomic, strong, readonly) RACSubject *addedExecutionSignalsSubject; + +/// A subject that sends the new value of `allowsConcurrentExecution` whenever it changes. +@property (nonatomic, strong, readonly) RACSubject *allowsConcurrentExecutionSubject; // `enabled`, but without a hop to the main thread. // @@ -48,13 +43,6 @@ @interface RACCommand () { // The signal block that the receiver was initialized with. @property (nonatomic, copy, readonly) RACSignal * (^signalBlock)(id input); -// Adds a signal to `activeExecutionSignals` and generates a KVO notification. -- (void)addActiveExecutionSignal:(RACSignal *)signal; - -// Removes a signal from `activeExecutionSignals` and generates a KVO -// notification. -- (void)removeActiveExecutionSignal:(RACSignal *)signal; - @end @implementation RACCommand @@ -66,53 +54,13 @@ - (BOOL)allowsConcurrentExecution { } - (void)setAllowsConcurrentExecution:(BOOL)allowed { - [self willChangeValueForKey:@keypath(self.allowsConcurrentExecution)]; - if (allowed) { OSAtomicOr32Barrier(1, &_allowsConcurrentExecution); } else { OSAtomicAnd32Barrier(0, &_allowsConcurrentExecution); } - [self didChangeValueForKey:@keypath(self.allowsConcurrentExecution)]; -} - -- (NSArray *)activeExecutionSignals { - @synchronized (self) { - return [_activeExecutionSignals copy]; - } -} - -- (void)addActiveExecutionSignal:(RACSignal *)signal { - NSCParameterAssert([signal isKindOfClass:RACSignal.class]); - - @synchronized (self) { - // The KVO notification has to be generated while synchronized, because - // it depends on the index remaining consistent. - NSIndexSet *indexes = [NSIndexSet indexSetWithIndex:_activeExecutionSignals.count]; - [self willChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)]; - [_activeExecutionSignals addObject:signal]; - [self didChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)]; - } -} - -- (void)removeActiveExecutionSignal:(RACSignal *)signal { - NSCParameterAssert([signal isKindOfClass:RACSignal.class]); - - @synchronized (self) { - // The indexes have to be calculated and the notification generated - // while synchronized, because they depend on the indexes remaining - // consistent. - NSIndexSet *indexes = [_activeExecutionSignals indexesOfObjectsPassingTest:^ BOOL (RACSignal *obj, NSUInteger index, BOOL *stop) { - return obj == signal; - }]; - - if (indexes.count == 0) return; - - [self willChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)]; - [_activeExecutionSignals removeObjectsAtIndexes:indexes]; - [self didChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)]; - } + [self.allowsConcurrentExecutionSubject sendNext:@(_allowsConcurrentExecution)]; } #pragma mark Lifecycle @@ -126,29 +74,22 @@ - (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock { return [self initWithEnabled:nil signalBlock:signalBlock]; } +- (void)dealloc { + [_addedExecutionSignalsSubject sendCompleted]; + [_allowsConcurrentExecutionSubject sendCompleted]; +} + - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock { NSCParameterAssert(signalBlock != nil); self = [super init]; if (self == nil) return nil; - _activeExecutionSignals = [[NSMutableArray alloc] init]; + _addedExecutionSignalsSubject = [RACSubject new]; + _allowsConcurrentExecutionSubject = [RACSubject new]; _signalBlock = [signalBlock copy]; - // A signal of additions to `activeExecutionSignals`. - RACSignal *newActiveExecutionSignals = [[[[[self - rac_valuesAndChangesForKeyPath:@keypath(self.activeExecutionSignals) options:NSKeyValueObservingOptionNew observer:nil] - reduceEach:^(id _, NSDictionary *change) { - NSArray *signals = change[NSKeyValueChangeNewKey]; - if (signals == nil) return [RACSignal empty]; - - return [signals.rac_sequence signalWithScheduler:RACScheduler.immediateScheduler]; - }] - concat] - publish] - autoconnect]; - - _executionSignals = [[[newActiveExecutionSignals + _executionSignals = [[[self.addedExecutionSignalsSubject map:^(RACSignal *signal) { return [signal catchTo:[RACSignal empty]]; }] @@ -160,7 +101,7 @@ - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id // // In other words, if someone subscribes to `errors` _after_ an execution // has started, it should still receive any error from that execution. - RACMulticastConnection *errorsConnection = [[[newActiveExecutionSignals + RACMulticastConnection *errorsConnection = [[[self.addedExecutionSignalsSubject flattenMap:^(RACSignal *signal) { return [[signal ignoreValues] @@ -174,9 +115,22 @@ - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id _errors = [errorsConnection.signal setNameWithFormat:@"%@ -errors", self]; [errorsConnection connect]; - RACSignal *immediateExecuting = [RACObserve(self, activeExecutionSignals) map:^(NSArray *activeSignals) { - return @(activeSignals.count > 0); - }]; + RACSignal *immediateExecuting = [[[[self.addedExecutionSignalsSubject + flattenMap:^(RACSignal *signal) { + return [[[signal + catchTo:[RACSignal empty]] + then:^{ + return [RACSignal return:@-1]; + }] + startWith:@1]; + }] + scanWithStart:@0 reduce:^(NSNumber *running, NSNumber *next) { + return @(running.integerValue + next.integerValue); + }] + map:^(NSNumber *count) { + return @(count.integerValue > 0); + }] + startWith:@NO]; _executing = [[[[[immediateExecuting deliverOn:RACScheduler.mainThreadScheduler] @@ -185,24 +139,23 @@ - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id distinctUntilChanged] replayLast] setNameWithFormat:@"%@ -executing", self]; - + RACSignal *moreExecutionsAllowed = [RACSignal - if:RACObserve(self, allowsConcurrentExecution) + if:[self.allowsConcurrentExecutionSubject startWith:@NO] then:[RACSignal return:@YES] else:[immediateExecuting not]]; if (enabledSignal == nil) { enabledSignal = [RACSignal return:@YES]; } else { - enabledSignal = [[[enabledSignal - startWith:@YES] - takeUntil:self.rac_willDeallocSignal] - replayLast]; + enabledSignal = [enabledSignal startWith:@YES]; } - _immediateEnabled = [[RACSignal + _immediateEnabled = [[[[RACSignal combineLatest:@[ enabledSignal, moreExecutionsAllowed ]] - and]; + and] + takeUntil:self.rac_willDeallocSignal] + replayLast]; _enabled = [[[[[self.immediateEnabled take:1] @@ -241,27 +194,10 @@ - (RACSignal *)execute:(id)input { subscribeOn:RACScheduler.mainThreadScheduler] multicast:[RACReplaySubject subject]]; - @weakify(self); - - [self addActiveExecutionSignal:connection.signal]; - [connection.signal subscribeError:^(NSError *error) { - @strongify(self); - [self removeActiveExecutionSignal:connection.signal]; - } completed:^{ - @strongify(self); - [self removeActiveExecutionSignal:connection.signal]; - }]; + [self.addedExecutionSignalsSubject sendNext:connection.signal]; [connection connect]; return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, RACDescription(input)]; } -#pragma mark NSKeyValueObserving - -+ (BOOL)automaticallyNotifiesObserversForKey:(NSString *)key { - // Generate all KVO notifications manually to avoid the performance impact - // of unnecessary swizzling. - return NO; -} - @end diff --git a/ReactiveCocoaTests/Objective-C/RACCommandSpec.m b/ReactiveCocoaTests/Objective-C/RACCommandSpec.m index 11f60427f6..490a004a8d 100644 --- a/ReactiveCocoaTests/Objective-C/RACCommandSpec.m +++ b/ReactiveCocoaTests/Objective-C/RACCommandSpec.m @@ -245,6 +245,22 @@ expect([command.executing first]).toEventually(equal(@NO)); }); +qck_it(@"should have allowsConcurrentExecution be observable", ^{ + RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^(RACSignal *signal) { + return signal; + }]; + + RACSubject *completion = [RACSubject subject]; + RACSignal *allowsConcurrentExecution = [[RACObserve(command, allowsConcurrentExecution) + takeUntil:completion] + replayLast]; + + command.allowsConcurrentExecution = YES; + + expect([allowsConcurrentExecution first]).to(beTrue()); + [completion sendCompleted]; +}); + qck_it(@"should not deliver errors from executionSignals", ^{ RACSubject *subject = [RACSubject subject]; NSMutableArray *receivedEvents = [NSMutableArray array];