Skip to content

Commit

Permalink
Merge pull request #2334 from ReactiveCocoa/raccommand,-my-nemesis
Browse files Browse the repository at this point in the history
Don't use KVO in RACCommand
  • Loading branch information
neilpa committed Sep 12, 2015
2 parents eb6085c + b4b65fe commit e16f47c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 103 deletions.
142 changes: 39 additions & 103 deletions ReactiveCocoa/Objective-C/RACCommand.m
Expand Up @@ -25,20 +25,15 @@
const NSInteger RACCommandErrorNotEnabled = 1;

@interface RACCommand () {
// The mutable array backing `activeExecutionSignals`.
//
// This should only be used while synchronized on `self`.
NSMutableArray *_activeExecutionSignals;

// Atomic backing variable for `allowsConcurrentExecution`.
volatile uint32_t _allowsConcurrentExecution;
}

// An array of signals representing in-flight executions, in the order they
// began.
//
// This property is KVO-compliant.
@property (atomic, copy, readonly) NSArray *activeExecutionSignals;
/// A subject that sends added execution signals.
@property (nonatomic, strong, readonly) RACSubject *addedExecutionSignalsSubject;

/// A subject that sends the new value of `allowsConcurrentExecution` whenever it changes.
@property (nonatomic, strong, readonly) RACSubject *allowsConcurrentExecutionSubject;

// `enabled`, but without a hop to the main thread.
//
Expand All @@ -48,13 +43,6 @@ @interface RACCommand () {
// The signal block that the receiver was initialized with.
@property (nonatomic, copy, readonly) RACSignal * (^signalBlock)(id input);

// Adds a signal to `activeExecutionSignals` and generates a KVO notification.
- (void)addActiveExecutionSignal:(RACSignal *)signal;

// Removes a signal from `activeExecutionSignals` and generates a KVO
// notification.
- (void)removeActiveExecutionSignal:(RACSignal *)signal;

@end

@implementation RACCommand
Expand All @@ -66,53 +54,13 @@ - (BOOL)allowsConcurrentExecution {
}

- (void)setAllowsConcurrentExecution:(BOOL)allowed {
[self willChangeValueForKey:@keypath(self.allowsConcurrentExecution)];

if (allowed) {
OSAtomicOr32Barrier(1, &_allowsConcurrentExecution);
} else {
OSAtomicAnd32Barrier(0, &_allowsConcurrentExecution);
}

[self didChangeValueForKey:@keypath(self.allowsConcurrentExecution)];
}

- (NSArray *)activeExecutionSignals {
@synchronized (self) {
return [_activeExecutionSignals copy];
}
}

- (void)addActiveExecutionSignal:(RACSignal *)signal {
NSCParameterAssert([signal isKindOfClass:RACSignal.class]);

@synchronized (self) {
// The KVO notification has to be generated while synchronized, because
// it depends on the index remaining consistent.
NSIndexSet *indexes = [NSIndexSet indexSetWithIndex:_activeExecutionSignals.count];
[self willChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
[_activeExecutionSignals addObject:signal];
[self didChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
}
}

- (void)removeActiveExecutionSignal:(RACSignal *)signal {
NSCParameterAssert([signal isKindOfClass:RACSignal.class]);

@synchronized (self) {
// The indexes have to be calculated and the notification generated
// while synchronized, because they depend on the indexes remaining
// consistent.
NSIndexSet *indexes = [_activeExecutionSignals indexesOfObjectsPassingTest:^ BOOL (RACSignal *obj, NSUInteger index, BOOL *stop) {
return obj == signal;
}];

if (indexes.count == 0) return;

[self willChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
[_activeExecutionSignals removeObjectsAtIndexes:indexes];
[self didChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
}
[self.allowsConcurrentExecutionSubject sendNext:@(_allowsConcurrentExecution)];
}

#pragma mark Lifecycle
Expand All @@ -126,29 +74,22 @@ - (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock {
return [self initWithEnabled:nil signalBlock:signalBlock];
}

- (void)dealloc {
[_addedExecutionSignalsSubject sendCompleted];
[_allowsConcurrentExecutionSubject sendCompleted];
}

- (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock {
NSCParameterAssert(signalBlock != nil);

self = [super init];
if (self == nil) return nil;

_activeExecutionSignals = [[NSMutableArray alloc] init];
_addedExecutionSignalsSubject = [RACSubject new];
_allowsConcurrentExecutionSubject = [RACSubject new];
_signalBlock = [signalBlock copy];

// A signal of additions to `activeExecutionSignals`.
RACSignal *newActiveExecutionSignals = [[[[[self
rac_valuesAndChangesForKeyPath:@keypath(self.activeExecutionSignals) options:NSKeyValueObservingOptionNew observer:nil]
reduceEach:^(id _, NSDictionary *change) {
NSArray *signals = change[NSKeyValueChangeNewKey];
if (signals == nil) return [RACSignal empty];

return [signals.rac_sequence signalWithScheduler:RACScheduler.immediateScheduler];
}]
concat]
publish]
autoconnect];

_executionSignals = [[[newActiveExecutionSignals
_executionSignals = [[[self.addedExecutionSignalsSubject
map:^(RACSignal *signal) {
return [signal catchTo:[RACSignal empty]];
}]
Expand All @@ -160,7 +101,7 @@ - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id
//
// In other words, if someone subscribes to `errors` _after_ an execution
// has started, it should still receive any error from that execution.
RACMulticastConnection *errorsConnection = [[[newActiveExecutionSignals
RACMulticastConnection *errorsConnection = [[[self.addedExecutionSignalsSubject
flattenMap:^(RACSignal *signal) {
return [[signal
ignoreValues]
Expand All @@ -174,9 +115,22 @@ - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id
_errors = [errorsConnection.signal setNameWithFormat:@"%@ -errors", self];
[errorsConnection connect];

RACSignal *immediateExecuting = [RACObserve(self, activeExecutionSignals) map:^(NSArray *activeSignals) {
return @(activeSignals.count > 0);
}];
RACSignal *immediateExecuting = [[[[self.addedExecutionSignalsSubject
flattenMap:^(RACSignal *signal) {
return [[[signal
catchTo:[RACSignal empty]]
then:^{
return [RACSignal return:@-1];
}]
startWith:@1];
}]
scanWithStart:@0 reduce:^(NSNumber *running, NSNumber *next) {
return @(running.integerValue + next.integerValue);
}]
map:^(NSNumber *count) {
return @(count.integerValue > 0);
}]
startWith:@NO];

_executing = [[[[[immediateExecuting
deliverOn:RACScheduler.mainThreadScheduler]
Expand All @@ -185,24 +139,23 @@ - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id
distinctUntilChanged]
replayLast]
setNameWithFormat:@"%@ -executing", self];

RACSignal *moreExecutionsAllowed = [RACSignal
if:RACObserve(self, allowsConcurrentExecution)
if:[self.allowsConcurrentExecutionSubject startWith:@NO]
then:[RACSignal return:@YES]
else:[immediateExecuting not]];

if (enabledSignal == nil) {
enabledSignal = [RACSignal return:@YES];
} else {
enabledSignal = [[[enabledSignal
startWith:@YES]
takeUntil:self.rac_willDeallocSignal]
replayLast];
enabledSignal = [enabledSignal startWith:@YES];
}

_immediateEnabled = [[RACSignal
_immediateEnabled = [[[[RACSignal
combineLatest:@[ enabledSignal, moreExecutionsAllowed ]]
and];
and]
takeUntil:self.rac_willDeallocSignal]
replayLast];

_enabled = [[[[[self.immediateEnabled
take:1]
Expand Down Expand Up @@ -241,27 +194,10 @@ - (RACSignal *)execute:(id)input {
subscribeOn:RACScheduler.mainThreadScheduler]
multicast:[RACReplaySubject subject]];

@weakify(self);

[self addActiveExecutionSignal:connection.signal];
[connection.signal subscribeError:^(NSError *error) {
@strongify(self);
[self removeActiveExecutionSignal:connection.signal];
} completed:^{
@strongify(self);
[self removeActiveExecutionSignal:connection.signal];
}];
[self.addedExecutionSignalsSubject sendNext:connection.signal];

[connection connect];
return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, RACDescription(input)];
}

#pragma mark NSKeyValueObserving

+ (BOOL)automaticallyNotifiesObserversForKey:(NSString *)key {
// Generate all KVO notifications manually to avoid the performance impact
// of unnecessary swizzling.
return NO;
}

@end
16 changes: 16 additions & 0 deletions ReactiveCocoaTests/Objective-C/RACCommandSpec.m
Expand Up @@ -245,6 +245,22 @@
expect([command.executing first]).toEventually(equal(@NO));
});

qck_it(@"should have allowsConcurrentExecution be observable", ^{
RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^(RACSignal *signal) {
return signal;
}];

RACSubject *completion = [RACSubject subject];
RACSignal *allowsConcurrentExecution = [[RACObserve(command, allowsConcurrentExecution)
takeUntil:completion]
replayLast];

command.allowsConcurrentExecution = YES;

expect([allowsConcurrentExecution first]).to(beTrue());
[completion sendCompleted];
});

qck_it(@"should not deliver errors from executionSignals", ^{
RACSubject *subject = [RACSubject subject];
NSMutableArray *receivedEvents = [NSMutableArray array];
Expand Down

0 comments on commit e16f47c

Please sign in to comment.