Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use KVO in RACCommand #2334

Merged
merged 6 commits into from Sep 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
142 changes: 39 additions & 103 deletions ReactiveCocoa/Objective-C/RACCommand.m
Expand Up @@ -25,20 +25,15 @@
const NSInteger RACCommandErrorNotEnabled = 1;

@interface RACCommand () {
// The mutable array backing `activeExecutionSignals`.
//
// This should only be used while synchronized on `self`.
NSMutableArray *_activeExecutionSignals;

// Atomic backing variable for `allowsConcurrentExecution`.
volatile uint32_t _allowsConcurrentExecution;
}

// An array of signals representing in-flight executions, in the order they
// began.
//
// This property is KVO-compliant.
@property (atomic, copy, readonly) NSArray *activeExecutionSignals;
/// A subject that sends added execution signals.
@property (nonatomic, strong, readonly) RACSubject *addedExecutionSignalsSubject;

/// A subject that sends the new value of `allowsConcurrentExecution` whenever it changes.
@property (nonatomic, strong, readonly) RACSubject *allowsConcurrentExecutionSubject;

// `enabled`, but without a hop to the main thread.
//
Expand All @@ -48,13 +43,6 @@ @interface RACCommand () {
// The signal block that the receiver was initialized with.
@property (nonatomic, copy, readonly) RACSignal * (^signalBlock)(id input);

// Adds a signal to `activeExecutionSignals` and generates a KVO notification.
- (void)addActiveExecutionSignal:(RACSignal *)signal;

// Removes a signal from `activeExecutionSignals` and generates a KVO
// notification.
- (void)removeActiveExecutionSignal:(RACSignal *)signal;

@end

@implementation RACCommand
Expand All @@ -66,53 +54,13 @@ - (BOOL)allowsConcurrentExecution {
}

- (void)setAllowsConcurrentExecution:(BOOL)allowed {
[self willChangeValueForKey:@keypath(self.allowsConcurrentExecution)];

if (allowed) {
OSAtomicOr32Barrier(1, &_allowsConcurrentExecution);
} else {
OSAtomicAnd32Barrier(0, &_allowsConcurrentExecution);
}

[self didChangeValueForKey:@keypath(self.allowsConcurrentExecution)];
}

- (NSArray *)activeExecutionSignals {
@synchronized (self) {
return [_activeExecutionSignals copy];
}
}

- (void)addActiveExecutionSignal:(RACSignal *)signal {
NSCParameterAssert([signal isKindOfClass:RACSignal.class]);

@synchronized (self) {
// The KVO notification has to be generated while synchronized, because
// it depends on the index remaining consistent.
NSIndexSet *indexes = [NSIndexSet indexSetWithIndex:_activeExecutionSignals.count];
[self willChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
[_activeExecutionSignals addObject:signal];
[self didChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
}
}

- (void)removeActiveExecutionSignal:(RACSignal *)signal {
NSCParameterAssert([signal isKindOfClass:RACSignal.class]);

@synchronized (self) {
// The indexes have to be calculated and the notification generated
// while synchronized, because they depend on the indexes remaining
// consistent.
NSIndexSet *indexes = [_activeExecutionSignals indexesOfObjectsPassingTest:^ BOOL (RACSignal *obj, NSUInteger index, BOOL *stop) {
return obj == signal;
}];

if (indexes.count == 0) return;

[self willChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
[_activeExecutionSignals removeObjectsAtIndexes:indexes];
[self didChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
}
[self.allowsConcurrentExecutionSubject sendNext:@(_allowsConcurrentExecution)];
}

#pragma mark Lifecycle
Expand All @@ -126,29 +74,22 @@ - (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock {
return [self initWithEnabled:nil signalBlock:signalBlock];
}

- (void)dealloc {
[_addedExecutionSignalsSubject sendCompleted];
[_allowsConcurrentExecutionSubject sendCompleted];
}

- (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock {
NSCParameterAssert(signalBlock != nil);

self = [super init];
if (self == nil) return nil;

_activeExecutionSignals = [[NSMutableArray alloc] init];
_addedExecutionSignalsSubject = [RACSubject new];
_allowsConcurrentExecutionSubject = [RACSubject new];
_signalBlock = [signalBlock copy];

// A signal of additions to `activeExecutionSignals`.
RACSignal *newActiveExecutionSignals = [[[[[self
rac_valuesAndChangesForKeyPath:@keypath(self.activeExecutionSignals) options:NSKeyValueObservingOptionNew observer:nil]
reduceEach:^(id _, NSDictionary *change) {
NSArray *signals = change[NSKeyValueChangeNewKey];
if (signals == nil) return [RACSignal empty];

return [signals.rac_sequence signalWithScheduler:RACScheduler.immediateScheduler];
}]
concat]
publish]
autoconnect];

_executionSignals = [[[newActiveExecutionSignals
_executionSignals = [[[self.addedExecutionSignalsSubject
map:^(RACSignal *signal) {
return [signal catchTo:[RACSignal empty]];
}]
Expand All @@ -160,7 +101,7 @@ - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id
//
// In other words, if someone subscribes to `errors` _after_ an execution
// has started, it should still receive any error from that execution.
RACMulticastConnection *errorsConnection = [[[newActiveExecutionSignals
RACMulticastConnection *errorsConnection = [[[self.addedExecutionSignalsSubject
flattenMap:^(RACSignal *signal) {
return [[signal
ignoreValues]
Expand All @@ -174,9 +115,22 @@ - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id
_errors = [errorsConnection.signal setNameWithFormat:@"%@ -errors", self];
[errorsConnection connect];

RACSignal *immediateExecuting = [RACObserve(self, activeExecutionSignals) map:^(NSArray *activeSignals) {
return @(activeSignals.count > 0);
}];
RACSignal *immediateExecuting = [[[[self.addedExecutionSignalsSubject
flattenMap:^(RACSignal *signal) {
return [[[signal
catchTo:[RACSignal empty]]
then:^{
return [RACSignal return:@-1];
}]
startWith:@1];
}]
scanWithStart:@0 reduce:^(NSNumber *running, NSNumber *next) {
return @(running.integerValue + next.integerValue);
}]
map:^(NSNumber *count) {
return @(count.integerValue > 0);
}]
startWith:@NO];

_executing = [[[[[immediateExecuting
deliverOn:RACScheduler.mainThreadScheduler]
Expand All @@ -185,24 +139,23 @@ - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id
distinctUntilChanged]
replayLast]
setNameWithFormat:@"%@ -executing", self];

RACSignal *moreExecutionsAllowed = [RACSignal
if:RACObserve(self, allowsConcurrentExecution)
if:[self.allowsConcurrentExecutionSubject startWith:@NO]
then:[RACSignal return:@YES]
else:[immediateExecuting not]];

if (enabledSignal == nil) {
enabledSignal = [RACSignal return:@YES];
} else {
enabledSignal = [[[enabledSignal
startWith:@YES]
takeUntil:self.rac_willDeallocSignal]
replayLast];
enabledSignal = [enabledSignal startWith:@YES];
}

_immediateEnabled = [[RACSignal
_immediateEnabled = [[[[RACSignal
combineLatest:@[ enabledSignal, moreExecutionsAllowed ]]
and];
and]
takeUntil:self.rac_willDeallocSignal]
replayLast];

_enabled = [[[[[self.immediateEnabled
take:1]
Expand Down Expand Up @@ -241,27 +194,10 @@ - (RACSignal *)execute:(id)input {
subscribeOn:RACScheduler.mainThreadScheduler]
multicast:[RACReplaySubject subject]];

@weakify(self);

[self addActiveExecutionSignal:connection.signal];
[connection.signal subscribeError:^(NSError *error) {
@strongify(self);
[self removeActiveExecutionSignal:connection.signal];
} completed:^{
@strongify(self);
[self removeActiveExecutionSignal:connection.signal];
}];
[self.addedExecutionSignalsSubject sendNext:connection.signal];

[connection connect];
return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, RACDescription(input)];
}

#pragma mark NSKeyValueObserving

+ (BOOL)automaticallyNotifiesObserversForKey:(NSString *)key {
// Generate all KVO notifications manually to avoid the performance impact
// of unnecessary swizzling.
return NO;
}

@end
16 changes: 16 additions & 0 deletions ReactiveCocoaTests/Objective-C/RACCommandSpec.m
Expand Up @@ -245,6 +245,22 @@
expect([command.executing first]).toEventually(equal(@NO));
});

qck_it(@"should have allowsConcurrentExecution be observable", ^{
RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^(RACSignal *signal) {
return signal;
}];

RACSubject *completion = [RACSubject subject];
RACSignal *allowsConcurrentExecution = [[RACObserve(command, allowsConcurrentExecution)
takeUntil:completion]
replayLast];

command.allowsConcurrentExecution = YES;

expect([allowsConcurrentExecution first]).to(beTrue());
[completion sendCompleted];
});

qck_it(@"should not deliver errors from executionSignals", ^{
RACSubject *subject = [RACSubject subject];
NSMutableArray *receivedEvents = [NSMutableArray array];
Expand Down