From 3e6b58ee45b42da1e809837bb26f9304cbbf6e91 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 16:23:16 -0800 Subject: [PATCH 01/18] Document , and especially its threading requirements --- ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h index 56ed111f15..fca4ab37b7 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h +++ b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h @@ -10,7 +10,17 @@ @class RACDisposable; +// Represents any object which can directly receive values from a . +// +// You generally shouldn't need to implement this protocol. +[RACSignal +// createSignal:], 's subscription methods, or RACSubject should work +// for most uses. +// +// Implementors of this protocol may receive messages and values from multiple +// threads simultaneously, and so should be thread-safe. @protocol RACSubscriber +@required + // Send the next value to subscribers. `value` can be nil. - (void)sendNext:(id)value; @@ -22,6 +32,7 @@ // Sends the subscriber the disposable that represents its subscription. - (void)didSubscribeWithDisposable:(RACDisposable *)disposable; + @end From 9ecef6fe1c1f941376d89f411f93d6bc7a5de766 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 16:24:08 -0800 Subject: [PATCH 02/18] Document the RACSubscriber class --- ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h index fca4ab37b7..0026620f38 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h +++ b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.h @@ -35,7 +35,10 @@ @end - +// A simple block-based subscriber. +// +// You shouldn't need to interact with this class directly. Use +// -subscribeNext:error:completed: from instead. @interface RACSubscriber : NSObject // Creates a new subscriber with the given blocks. From 79a2958d1b7556d5ea4ed336b32033e0c6b8cc8e Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 16:42:53 -0800 Subject: [PATCH 03/18] Added a multithreading shared example for s --- .../ReactiveCocoa.xcodeproj/project.pbxproj | 10 +++++ .../ReactiveCocoaTests/RACSubjectSpec.m | 44 +++++++++++++++++++ .../RACSubscriberExamples.h | 17 +++++++ .../RACSubscriberExamples.m | 44 +++++++++++++++++++ .../ReactiveCocoaTests/RACSubscriberSpec.m | 44 +++++++++++++++++++ 5 files changed, 159 insertions(+) create mode 100644 ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberExamples.h create mode 100644 ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberExamples.m create mode 100644 ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m diff --git a/ReactiveCocoaFramework/ReactiveCocoa.xcodeproj/project.pbxproj b/ReactiveCocoaFramework/ReactiveCocoa.xcodeproj/project.pbxproj index 05ebbd7069..347ac86568 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa.xcodeproj/project.pbxproj +++ b/ReactiveCocoaFramework/ReactiveCocoa.xcodeproj/project.pbxproj @@ -179,6 +179,8 @@ D05C5C9F16490C4900DE6D3E /* NSNotificationCenter+RACSupport.m in Sources */ = {isa = PBXBuildFile; fileRef = 88442C8C16090C1500636B49 /* NSNotificationCenter+RACSupport.m */; }; D05C5CA016490C4D00DE6D3E /* NSString+RACSupport.m in Sources */ = {isa = PBXBuildFile; fileRef = 88442C8E16090C1500636B49 /* NSString+RACSupport.m */; }; D05C5CA116490C5200DE6D3E /* NSTask+RACSupport.m in Sources */ = {isa = PBXBuildFile; fileRef = 88442C9016090C1500636B49 /* NSTask+RACSupport.m */; }; + D0C70EC616659333005AAD03 /* RACSubscriberExamples.m in Sources */ = {isa = PBXBuildFile; fileRef = D0C70EC516659333005AAD03 /* RACSubscriberExamples.m */; }; + D0C70EC8166595AD005AAD03 /* RACSubscriberSpec.m in Sources */ = {isa = PBXBuildFile; fileRef = D0C70EC7166595AD005AAD03 /* RACSubscriberSpec.m */; }; D0C70F90164337A2007027B4 /* RACSequenceAdditionsSpec.m in Sources */ = {isa = PBXBuildFile; fileRef = D0C70F8F164337A2007027B4 /* RACSequenceAdditionsSpec.m */; }; D0C70F93164337E3007027B4 /* RACSequenceExamples.m in Sources */ = {isa = PBXBuildFile; fileRef = D0C70F92164337E3007027B4 /* RACSequenceExamples.m */; }; D0D487011642550100DD7605 /* RACStream.h in Headers */ = {isa = PBXBuildFile; fileRef = D0D486FF1642550100DD7605 /* RACStream.h */; settings = {ATTRIBUTES = (Public, ); }; }; @@ -476,6 +478,9 @@ D095BDCE15CB2E4B00E9BB13 /* Mac-Framework.xcconfig */ = {isa = PBXFileReference; lastKnownFileType = text.xcconfig; path = "Mac-Framework.xcconfig"; sourceTree = ""; }; D095BDCF15CB2E4B00E9BB13 /* Mac-StaticLibrary.xcconfig */ = {isa = PBXFileReference; lastKnownFileType = text.xcconfig; path = "Mac-StaticLibrary.xcconfig"; sourceTree = ""; }; D095BDD015CB2E4B00E9BB13 /* README.md */ = {isa = PBXFileReference; lastKnownFileType = text; path = README.md; sourceTree = ""; }; + D0C70EC416659333005AAD03 /* RACSubscriberExamples.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = RACSubscriberExamples.h; sourceTree = ""; }; + D0C70EC516659333005AAD03 /* RACSubscriberExamples.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = RACSubscriberExamples.m; sourceTree = ""; }; + D0C70EC7166595AD005AAD03 /* RACSubscriberSpec.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = RACSubscriberSpec.m; sourceTree = ""; }; D0C70F8F164337A2007027B4 /* RACSequenceAdditionsSpec.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = RACSequenceAdditionsSpec.m; sourceTree = ""; }; D0C70F91164337E3007027B4 /* RACSequenceExamples.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = RACSequenceExamples.h; sourceTree = ""; }; D0C70F92164337E3007027B4 /* RACSequenceExamples.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = RACSequenceExamples.m; sourceTree = ""; }; @@ -733,6 +738,9 @@ D0487AB1164314430085D890 /* RACStreamExamples.h */, D0487AB2164314430085D890 /* RACStreamExamples.m */, 889D0A7F15974B2A00F833E3 /* RACSubjectSpec.m */, + D0C70EC416659333005AAD03 /* RACSubscriberExamples.h */, + D0C70EC516659333005AAD03 /* RACSubscriberExamples.m */, + D0C70EC7166595AD005AAD03 /* RACSubscriberSpec.m */, 8820937B1501C8A600796685 /* RACSignalSpec.m */, 88FC735A16114FFB00F8A774 /* RACSubscriptingAssignmentTrampolineSpec.m */, 88442A321608A9AD00636B49 /* RACTestObject.h */, @@ -1368,6 +1376,8 @@ D05C5C9F16490C4900DE6D3E /* NSNotificationCenter+RACSupport.m in Sources */, D05C5CA016490C4D00DE6D3E /* NSString+RACSupport.m in Sources */, D05C5CA116490C5200DE6D3E /* NSTask+RACSupport.m in Sources */, + D0C70EC616659333005AAD03 /* RACSubscriberExamples.m in Sources */, + D0C70EC8166595AD005AAD03 /* RACSubscriberSpec.m in Sources */, ); runOnlyForDeploymentPostprocessing = 0; }; diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m index 46615cf276..d8406af329 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m @@ -7,6 +7,7 @@ // #import "RACSpecs.h" +#import "RACSubscriberExamples.h" #import "RACSubject.h" #import "RACAsyncSubject.h" @@ -16,6 +17,37 @@ SpecBegin(RACSubject) +describe(@"RACSubject", ^{ + __block RACSubject *subject; + __block NSMutableSet *values; + + __block BOOL success; + __block NSError *error; + + beforeEach(^{ + values = [NSMutableSet set]; + + subject = [RACSubject subject]; + success = YES; + error = nil; + + [subject subscribeNext:^(id value) { + [values addObject:value]; + } error:^(NSError *e) { + error = e; + success = NO; + } completed:^{ + success = YES; + }]; + }); + + itShouldBehaveLike(RACSubscriberExamples, ^{ return subject; }, [^(NSSet *expectedValues) { + expect(success).to.beTruthy(); + expect(error).to.beNil(); + expect(values).to.equal(expectedValues); + } copy], nil); +}); + describe(@"RACAsyncSubject", ^{ __block RACAsyncSubject *subject = nil; @@ -113,6 +145,18 @@ beforeEach(^{ subject = [RACReplaySubject subject]; }); + + itShouldBehaveLike(RACSubscriberExamples, ^{ return subject; }, ^(NSSet *expectedValues) { + NSMutableSet *values = [NSMutableSet set]; + + // This subscription should synchronously dump all values already + // received into 'values'. + [subject subscribeNext:^(id value) { + [values addObject:value]; + }]; + + expect(values).to.equal(expectedValues); + }, nil); it(@"should send both values to new subscribers after completion", ^{ id firstValue = @"blah"; diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberExamples.h b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberExamples.h new file mode 100644 index 0000000000..e7e05ae914 --- /dev/null +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberExamples.h @@ -0,0 +1,17 @@ +// +// RACSubscriberExamples.h +// ReactiveCocoa +// +// Created by Justin Spahr-Summers on 2012-11-27. +// Copyright (c) 2012 GitHub, Inc. All rights reserved. +// + +// The name of the shared examples for implementors of . This +// example should be passed the following arguments: +// +// getSubscriber - A block of type `id (^)(void)`, which +// should return a . +// verifyNexts - A block of type `void (^)(NSSet *)`, which should verify +// that the subscriber received all of the values in the set +// (regardless of order). +extern NSString * const RACSubscriberExamples; diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberExamples.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberExamples.m new file mode 100644 index 0000000000..5bd26e2e70 --- /dev/null +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberExamples.m @@ -0,0 +1,44 @@ +// +// RACSubscriberExamples.m +// ReactiveCocoa +// +// Created by Justin Spahr-Summers on 2012-11-27. +// Copyright (c) 2012 GitHub, Inc. All rights reserved. +// + +#import "RACSpecs.h" +#import "RACSubscriberExamples.h" + +#import "RACSubscriber.h" + +NSString * const RACSubscriberExamples = @"RACSubscriberExamples"; + +SharedExampleGroupsBegin(RACSubscriberExamples) + +sharedExamplesFor(RACSubscriberExamples, ^(id (^getSubscriber)(void), void (^verifyNexts)(NSSet *)) { + __block id subscriber; + __block NSSet *values; + + beforeEach(^{ + subscriber = getSubscriber(); + expect(subscriber).notTo.beNil(); + + NSMutableSet *mutableValues = [NSMutableSet set]; + for (NSUInteger i = 0; i < 20; i++) { + [mutableValues addObject:@(i)]; + } + + values = [mutableValues copy]; + }); + + it(@"should send nexts serially, even when delivered from multiple threads", ^{ + NSArray *allValues = values.allObjects; + dispatch_apply(allValues.count, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), [^(size_t index) { + [subscriber sendNext:allValues[index]]; + } copy]); + + verifyNexts(values); + }); +}); + +SharedExampleGroupsEnd diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m new file mode 100644 index 0000000000..3b3c723ed3 --- /dev/null +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m @@ -0,0 +1,44 @@ +// +// RACSubscriberSpec.m +// ReactiveCocoa +// +// Created by Justin Spahr-Summers on 2012-11-27. +// Copyright (c) 2012 GitHub, Inc. All rights reserved. +// + +#import "RACSpecs.h" +#import "RACSubscriberExamples.h" + +#import "RACSubscriber.h" + +SpecBegin(RACSubscriber) + +__block RACSubscriber *subscriber; +__block NSMutableSet *values; + +__block BOOL success; +__block NSError *error; + +beforeEach(^{ + values = [NSMutableSet set]; + + success = YES; + error = nil; + + subscriber = [RACSubscriber subscriberWithNext:^(id value) { + [values addObject:value]; + } error:^(NSError *e) { + error = e; + success = NO; + } completed:^{ + success = YES; + }]; +}); + +itShouldBehaveLike(RACSubscriberExamples, ^{ return subscriber; }, [^(NSSet *expectedValues) { + expect(success).to.beTruthy(); + expect(error).to.beNil(); + expect(values).to.equal(expectedValues); +} copy], nil); + +SpecEnd From 3744da7778111c39ebf70b06da63591870a6e6f2 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 17:27:47 -0800 Subject: [PATCH 04/18] Made RACSubscriber thread-safe --- .../ReactiveCocoa/RACSubscriber.m | 91 +++++++++---------- 1 file changed, 44 insertions(+), 47 deletions(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m index 2e835e13a4..c8221a7500 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m +++ b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m @@ -10,79 +10,76 @@ #import "RACDisposable.h" @interface RACSubscriber () -@property (nonatomic, copy) void (^next)(id value); -@property (nonatomic, copy) void (^error)(NSError *error); -@property (nonatomic, copy) void (^completed)(void); + +@property (nonatomic, copy, readonly) void (^next)(id value); +@property (nonatomic, copy, readonly) void (^error)(NSError *error); +@property (nonatomic, copy, readonly) void (^completed)(void); + +// These properties should only be accessed while synchronized on self. @property (nonatomic, strong) RACDisposable *disposable; -@property (assign) BOOL completedOrErrored; -@end +@property (nonatomic, assign) BOOL completedOrErrored; +- (void)stopSubscription; + +@end @implementation RACSubscriber +#pragma mark Lifecycle + ++ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed { + RACSubscriber *subscriber = [[self alloc] init]; + + subscriber->_next = [next copy]; + subscriber->_error = [error copy]; + subscriber->_completed = [completed copy]; + + return subscriber; +} + +- (void)stopSubscription { + @synchronized (self) { + [self.disposable dispose]; + self.disposable = nil; + } +} + - (void)dealloc { [self stopSubscription]; } - #pragma mark RACSubscriber - (void)sendNext:(id)value { - if(self.next != NULL) { - self.next(value); + if (self.next != NULL) { + @synchronized (self) { + self.next(value); + } } } - (void)sendError:(NSError *)e { - self.completedOrErrored = YES; - - [self stopSubscription]; + @synchronized (self) { + self.completedOrErrored = YES; + [self stopSubscription]; - if(self.error != NULL) { - self.error(e); + if (self.error != NULL) self.error(e); } } - (void)sendCompleted { - self.completedOrErrored = YES; - - [self stopSubscription]; - - if(self.completed != NULL) { - self.completed(); + @synchronized (self) { + self.completedOrErrored = YES; + [self stopSubscription]; + + if (self.completed != NULL) self.completed(); } } - (void)didSubscribeWithDisposable:(RACDisposable *)d { - @synchronized(self) { + @synchronized (self) { self.disposable = d; - } - - if (self.completedOrErrored) { - [self stopSubscription]; - } -} - - -#pragma mark API - -@synthesize next; -@synthesize error; -@synthesize completed; -@synthesize disposable; - -+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed { - RACSubscriber *subscriber = [[self alloc] init]; - subscriber.next = next; - subscriber.error = error; - subscriber.completed = completed; - return subscriber; -} - -- (void)stopSubscription { - @synchronized(self) { - [self.disposable dispose]; - self.disposable = nil; + if (self.completedOrErrored) [self stopSubscription]; } } From 560046244001374394ec47d3da88dbb129564407 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 18:21:22 -0800 Subject: [PATCH 05/18] Copy all the blocks ugh --- ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m | 6 +++--- .../ReactiveCocoaTests/RACSubscriberSpec.m | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m index d8406af329..6cffefb36d 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m @@ -41,7 +41,7 @@ }]; }); - itShouldBehaveLike(RACSubscriberExamples, ^{ return subject; }, [^(NSSet *expectedValues) { + itShouldBehaveLike(RACSubscriberExamples, [^{ return subject; } copy], [^(NSSet *expectedValues) { expect(success).to.beTruthy(); expect(error).to.beNil(); expect(values).to.equal(expectedValues); @@ -146,7 +146,7 @@ subject = [RACReplaySubject subject]; }); - itShouldBehaveLike(RACSubscriberExamples, ^{ return subject; }, ^(NSSet *expectedValues) { + itShouldBehaveLike(RACSubscriberExamples, [^{ return subject; } copy], [^(NSSet *expectedValues) { NSMutableSet *values = [NSMutableSet set]; // This subscription should synchronously dump all values already @@ -156,7 +156,7 @@ }]; expect(values).to.equal(expectedValues); - }, nil); + } copy], nil); it(@"should send both values to new subscribers after completion", ^{ id firstValue = @"blah"; diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m index 3b3c723ed3..8409ce0fc8 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m @@ -35,7 +35,7 @@ }]; }); -itShouldBehaveLike(RACSubscriberExamples, ^{ return subscriber; }, [^(NSSet *expectedValues) { +itShouldBehaveLike(RACSubscriberExamples, [^{ return subscriber; } copy], [^(NSSet *expectedValues) { expect(success).to.beTruthy(); expect(error).to.beNil(); expect(values).to.equal(expectedValues); From 608e15fe6bafc2e963c6d55502ba772a1911c99e Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 18:44:09 -0800 Subject: [PATCH 06/18] Test that RACSubscriber doesn't send nexts after completed/error --- .../ReactiveCocoaTests/RACSubscriberSpec.m | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m index 8409ce0fc8..4baa08b9a6 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m @@ -10,22 +10,28 @@ #import "RACSubscriberExamples.h" #import "RACSubscriber.h" +#import SpecBegin(RACSubscriber) __block RACSubscriber *subscriber; __block NSMutableSet *values; +__block volatile BOOL finished; + __block BOOL success; __block NSError *error; beforeEach(^{ values = [NSMutableSet set]; + finished = NO; success = YES; error = nil; subscriber = [RACSubscriber subscriberWithNext:^(id value) { + expect(finished).to.beFalsy(); + [values addObject:value]; } error:^(NSError *e) { error = e; @@ -41,4 +47,76 @@ expect(values).to.equal(expectedValues); } copy], nil); +describe(@"finishing", ^{ + __block void (^sendValues)(void); + __block BOOL expectedSuccess; + + __block dispatch_group_t dispatchGroup; + __block dispatch_queue_t concurrentQueue; + + beforeEach(^{ + dispatchGroup = dispatch_group_create(); + expect(dispatchGroup).notTo.beNil(); + + concurrentQueue = dispatch_queue_create("com.github.ReactiveCocoa.RACSubscriberSpec", DISPATCH_QUEUE_CONCURRENT); + expect(concurrentQueue).notTo.beNil(); + + dispatch_suspend(concurrentQueue); + + sendValues = [^{ + for (NSUInteger i = 0; i < 15; i++) { + dispatch_group_async(dispatchGroup, concurrentQueue, ^{ + [subscriber sendNext:@(i)]; + }); + } + } copy]; + + sendValues(); + }); + + afterEach(^{ + sendValues(); + dispatch_resume(concurrentQueue); + + // Time out after one second. + dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)); + expect(dispatch_group_wait(dispatchGroup, time)).to.equal(0); + + dispatch_release(dispatchGroup); + dispatchGroup = NULL; + + dispatch_release(concurrentQueue); + concurrentQueue = NULL; + + if (expectedSuccess) { + expect(success).to.beTruthy(); + expect(error).to.beNil(); + } else { + expect(success).to.beFalsy(); + } + }); + + it(@"should never invoke next after sending completed", ^{ + expectedSuccess = YES; + + dispatch_group_async(dispatchGroup, concurrentQueue, ^{ + [subscriber sendCompleted]; + + finished = YES; + OSMemoryBarrier(); + }); + }); + + it(@"should never invoke next after sending error", ^{ + expectedSuccess = NO; + + dispatch_group_async(dispatchGroup, concurrentQueue, ^{ + [subscriber sendError:nil]; + + finished = YES; + OSMemoryBarrier(); + }); + }); +}); + SpecEnd From 7e082d352bcecafc7ff58bea75dae9004ed58ca8 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 18:53:20 -0800 Subject: [PATCH 07/18] Move expect() to the main thread Otherwise, a matcher failure isn't picked up as build/test failures. :trollface: --- .../ReactiveCocoaTests/RACSubscriberSpec.m | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m index 4baa08b9a6..3cdcc8e11f 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscriberSpec.m @@ -18,6 +18,7 @@ __block NSMutableSet *values; __block volatile BOOL finished; +__block volatile int32_t nextsAfterFinished; __block BOOL success; __block NSError *error; @@ -26,11 +27,13 @@ values = [NSMutableSet set]; finished = NO; + nextsAfterFinished = 0; + success = YES; error = nil; subscriber = [RACSubscriber subscriberWithNext:^(id value) { - expect(finished).to.beFalsy(); + if (finished) OSAtomicIncrement32Barrier(&nextsAfterFinished); [values addObject:value]; } error:^(NSError *e) { @@ -88,6 +91,8 @@ dispatch_release(concurrentQueue); concurrentQueue = NULL; + expect(nextsAfterFinished).to.equal(0); + if (expectedSuccess) { expect(success).to.beTruthy(); expect(error).to.beNil(); From c9153d11f7fbfa21018257ffc901386449dcd8b2 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 19:06:50 -0800 Subject: [PATCH 08/18] BREAKING CHANGE: Always terminate RACSubscribers on completed/error This matches the documentation, but hasn't actually been the case until now. --- ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m index c8221a7500..b33bcd9cdf 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m +++ b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m @@ -53,6 +53,8 @@ - (void)dealloc { - (void)sendNext:(id)value { if (self.next != NULL) { @synchronized (self) { + if (self.completedOrErrored) return; + self.next(value); } } @@ -60,6 +62,8 @@ - (void)sendNext:(id)value { - (void)sendError:(NSError *)e { @synchronized (self) { + if (self.completedOrErrored) return; + self.completedOrErrored = YES; [self stopSubscription]; @@ -69,6 +73,8 @@ - (void)sendError:(NSError *)e { - (void)sendCompleted { @synchronized (self) { + if (self.completedOrErrored) return; + self.completedOrErrored = YES; [self stopSubscription]; From 34717517d736441886654a6e217adb8220b2db58 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 18:57:02 -0800 Subject: [PATCH 09/18] +combineLatest: should only ever error once This contradicts the unit test that was previously here, but matches the documented semantics of , which explicitly says that -sendCompleted and -sendError: terminate the subscription. --- ReactiveCocoaFramework/ReactiveCocoaTests/RACSignalSpec.m | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSignalSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSignalSpec.m index 46d9f50ce1..900c0fa522 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSignalSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSignalSpec.m @@ -470,7 +470,7 @@ expect(gotError).to.beTruthy(); }); - it(@"should error multiple times when multiple sources error", ^{ + it(@"should error only once when multiple sources error", ^{ __block int errorCount = 0; [combined subscribeError:^(NSError *error) { @@ -480,7 +480,7 @@ [subscriber1 sendError:[NSError errorWithDomain:@"" code:-1 userInfo:nil]]; [subscriber2 sendError:[NSError errorWithDomain:@"" code:-1 userInfo:nil]]; - expect(errorCount).to.equal(2); + expect(errorCount).to.equal(1); }); }); From fb5f48bde4dc3eefd1a71f09e621a7995d2d699d Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 19:31:21 -0800 Subject: [PATCH 10/18] Refactor methods for automatic resubscription Instead of re-adding the same RACSubscriber, create a new one (implicitly) any time we resubscribe. --- .../ReactiveCocoa/RACSignalProtocol.m | 164 +++++++++++------- 1 file changed, 99 insertions(+), 65 deletions(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACSignalProtocol.m b/ReactiveCocoaFramework/ReactiveCocoa/RACSignalProtocol.m index 37eb72c1df..171a1ffaa4 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACSignalProtocol.m +++ b/ReactiveCocoaFramework/ReactiveCocoa/RACSignalProtocol.m @@ -27,6 +27,51 @@ NSString * const RACSignalErrorDomain = @"RACSignalErrorDomain"; +// Subscribes to the given signal with the given blocks. +// +// If the signal errors or completes, the corresponding block is invoked. If the +// disposable passed to the block is _not_ disposed, then the signal is +// subscribed to again. +static RACDisposable *subscribeForever (id signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) { + next = [next copy]; + error = [error copy]; + completed = [completed copy]; + + NSRecursiveLock *lock = [[NSRecursiveLock alloc] init]; + + // These should only be accessed while 'lock' is held. + __block BOOL disposed = NO; + __block RACDisposable *innerDisposable = nil; + + RACDisposable *shortCircuitingDisposable = [RACDisposable disposableWithBlock:^{ + [lock lock]; + disposed = YES; + [innerDisposable dispose]; + [lock unlock]; + }]; + + RACDisposable *outerDisposable = [signal subscribeNext:next error:^(NSError *e) { + error(e, shortCircuitingDisposable); + + [lock lock]; + if (disposed) return; + innerDisposable = subscribeForever(signal, next, error, completed); + [lock unlock]; + } completed:^{ + completed(shortCircuitingDisposable); + + [lock lock]; + if (disposed) return; + innerDisposable = subscribeForever(signal, next, error, completed); + [lock unlock]; + }]; + + return [RACDisposable disposableWithBlock:^{ + [shortCircuitingDisposable dispose]; + [outerDisposable dispose]; + }]; +} + @concreteprotocol(RACSignal) #pragma mark RACStream @@ -217,42 +262,33 @@ - (RACDisposable *)subscribeError:(void (^)(NSError *))errorBlock completed:(voi - (id)repeat { return [RACSignal createSignal:^(id subscriber) { - __block RACDisposable *currentDisposable = nil; - - __block RACSubscriber *innerObserver = [RACSubscriber subscriberWithNext:^(id x) { - [subscriber sendNext:x]; - } error:^(NSError *error) { - [subscriber sendError:error]; - } completed:^{ - currentDisposable = [self subscribe:innerObserver]; - }]; - - currentDisposable = [self subscribe:innerObserver]; - - return [RACDisposable disposableWithBlock:^{ - [currentDisposable dispose]; - }]; + return subscribeForever(self, + ^(id x) { + [subscriber sendNext:x]; + }, + ^(NSError *error, RACDisposable *disposable) { + [disposable dispose]; + [subscriber sendError:error]; + }, + ^(RACDisposable *disposable) { + // Resubscribe. + }); }]; } - (id)asMaybes { return [RACSignal createSignal:^(id subscriber) { - __block RACDisposable *currentDisposable = nil; - - __block RACSubscriber *innerObserver = [RACSubscriber subscriberWithNext:^(id x) { - [subscriber sendNext:[RACMaybe maybeWithObject:x]]; - } error:^(NSError *error) { - [subscriber sendNext:[RACMaybe maybeWithError:error]]; - currentDisposable = [self subscribe:innerObserver]; - } completed:^{ - [subscriber sendCompleted]; - }]; - - currentDisposable = [self subscribe:innerObserver]; - - return [RACDisposable disposableWithBlock:^{ - [currentDisposable dispose]; - }]; + return subscribeForever(self, + ^(id x) { + [subscriber sendNext:[RACMaybe maybeWithObject:x]]; + }, + ^(NSError *error, RACDisposable *disposable) { + [subscriber sendNext:[RACMaybe maybeWithError:error]]; + }, + ^(RACDisposable *disposable) { + [disposable dispose]; + [subscriber sendCompleted]; + }); }]; } @@ -261,24 +297,25 @@ - (RACDisposable *)subscribeError:(void (^)(NSError *))errorBlock completed:(voi return [RACSignal createSignal:^(id subscriber) { __block RACDisposable *innerDisposable = nil; - RACDisposable *outerDisposable = [self subscribeNext:^(id x) { - [subscriber sendNext:x]; - } error:^(NSError *error) { - id signal = catchBlock(error); - innerDisposable = [signal subscribe:[RACSubscriber subscriberWithNext:^(id x) { + + RACDisposable *outerDisposable = subscribeForever(self, + ^(id x) { [subscriber sendNext:x]; - } error:^(NSError *error) { - [subscriber sendError:error]; - } completed:^{ + }, + ^(NSError *error, RACDisposable *outerDisposable) { + [outerDisposable dispose]; + + id signal = catchBlock(error); + innerDisposable = [signal subscribe:subscriber]; + }, + ^(RACDisposable *outerDisposable) { + [outerDisposable dispose]; [subscriber sendCompleted]; - }]]; - } completed:^{ - [subscriber sendCompleted]; - }]; - + }); + return [RACDisposable disposableWithBlock:^{ - [innerDisposable dispose]; [outerDisposable dispose]; + [innerDisposable dispose]; }]; }]; } @@ -1148,27 +1185,24 @@ - (RACConnectableSignal *)multicast:(RACSubject *)subject { - (id)retry:(NSInteger)retryCount { return [RACSignal createSignal:^(id subscriber) { __block NSInteger currentRetryCount = 0; - - __block RACDisposable *currentDisposable = nil; - __block RACSubscriber *innerSubscriber = [RACSubscriber subscriberWithNext:^(id x) { - [subscriber sendNext:x]; - } error:^(NSError *error) { - if(retryCount == 0 || currentRetryCount < retryCount) { - currentDisposable = [self subscribe:innerSubscriber]; - } else { + return subscribeForever(self, + ^(id x) { + [subscriber sendNext:[RACMaybe maybeWithObject:x]]; + }, + ^(NSError *error, RACDisposable *disposable) { + if (retryCount == 0 || currentRetryCount < retryCount) { + // Resubscribe. + currentRetryCount++; + return; + } + + [disposable dispose]; [subscriber sendError:error]; - } - - currentRetryCount++; - } completed:^{ - [subscriber sendCompleted]; - }]; - - currentDisposable = [self subscribe:innerSubscriber]; - - return [RACDisposable disposableWithBlock:^{ - [currentDisposable dispose]; - }]; + }, + ^(RACDisposable *disposable) { + [disposable dispose]; + [subscriber sendCompleted]; + }); }]; } From eef776dda8680fc1bd3a5658c74a9a68a413d819 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 19:41:09 -0800 Subject: [PATCH 11/18] Test to verify that RACReplaySubject uses the same order every time --- .../ReactiveCocoaTests/RACSubjectSpec.m | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m index 6cffefb36d..ab63fefd28 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m @@ -179,6 +179,26 @@ expect(valuesReceived).to.equal(expected); expect(completed).to.beTruthy(); }); + + it(@"should send values in the same order live as when replaying", ^{ + NSMutableArray *liveValues = [NSMutableArray array]; + [subject subscribeNext:^(id value) { + @synchronized (liveValues) { + [liveValues addObject:value]; + } + }]; + + size_t count = 50; + dispatch_apply(count, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), [^(size_t index) { + [subject sendNext:@(index)]; + } copy]); + + [subject sendCompleted]; + + expect(liveValues.count).to.equal(count); + expect(liveValues).to.equal(subject.toArray); + expect(subject.toArray).to.equal(subject.toArray); + }); }); SpecEnd From 398bd75ba636dd18fba9e2b6e05b0d9756b5c374 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 19:59:04 -0800 Subject: [PATCH 12/18] Synchronized faster in the RACReplaySubject test, to uncover races --- .../ReactiveCocoaTests/RACSubjectSpec.m | 43 ++++++++++++++----- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m index ab63fefd28..09363f3811 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m @@ -9,12 +9,12 @@ #import "RACSpecs.h" #import "RACSubscriberExamples.h" +#import "EXTScope.h" #import "RACSubject.h" #import "RACAsyncSubject.h" #import "RACBehaviorSubject.h" #import "RACReplaySubject.h" - SpecBegin(RACSubject) describe(@"RACSubject", ^{ @@ -181,21 +181,42 @@ }); it(@"should send values in the same order live as when replaying", ^{ - NSMutableArray *liveValues = [NSMutableArray array]; - [subject subscribeNext:^(id value) { - @synchronized (liveValues) { - [liveValues addObject:value]; - } + NSUInteger count = 50; + + // Just leak it, ain't no thang. + __strong volatile id *values = (__strong id *)calloc(count, sizeof(*values)); + __block volatile int32_t nextIndex = 0; + + [subject subscribeNext:^(NSNumber *value) { + int32_t indexPlusOne = OSAtomicIncrement32(&nextIndex); + NSAssert((NSUInteger)indexPlusOne <= count, @"Index out of bounds: %u", (unsigned)(indexPlusOne - 1)); + + values[indexPlusOne - 1] = value; }]; - size_t count = 50; - dispatch_apply(count, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), [^(size_t index) { - [subject sendNext:@(index)]; - } copy]); + dispatch_queue_t queue = dispatch_queue_create("com.github.ReactiveCocoa.RACSubjectSpec", DISPATCH_QUEUE_CONCURRENT); + @onExit { + dispatch_release(queue); + }; - [subject sendCompleted]; + dispatch_suspend(queue); + + for (NSUInteger i = 0; i < count; i++) { + dispatch_async(queue, ^{ + [subject sendNext:@(i)]; + }); + } + + dispatch_resume(queue); + dispatch_barrier_sync(queue, ^{ + [subject sendCompleted]; + }); + OSMemoryBarrier(); + + NSArray *liveValues = [NSArray arrayWithObjects:(id *)values count:(NSUInteger)nextIndex]; expect(liveValues.count).to.equal(count); + expect(liveValues).to.equal(subject.toArray); expect(subject.toArray).to.equal(subject.toArray); }); From 7299cec8c6291e5b6b9fe84d2da8022a0d013944 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 20:06:21 -0800 Subject: [PATCH 13/18] Just use huge values, damnit --- .../ReactiveCocoaTests/RACSubjectSpec.m | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m index 09363f3811..a0b13327f3 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m @@ -181,16 +181,14 @@ }); it(@"should send values in the same order live as when replaying", ^{ - NSUInteger count = 50; + NSUInteger count = 49317; // Just leak it, ain't no thang. - __strong volatile id *values = (__strong id *)calloc(count, sizeof(*values)); + __unsafe_unretained volatile id *values = (__unsafe_unretained id *)calloc(count, sizeof(*values)); __block volatile int32_t nextIndex = 0; [subject subscribeNext:^(NSNumber *value) { int32_t indexPlusOne = OSAtomicIncrement32(&nextIndex); - NSAssert((NSUInteger)indexPlusOne <= count, @"Index out of bounds: %u", (unsigned)(indexPlusOne - 1)); - values[indexPlusOne - 1] = value; }]; @@ -216,9 +214,16 @@ NSArray *liveValues = [NSArray arrayWithObjects:(id *)values count:(NSUInteger)nextIndex]; expect(liveValues.count).to.equal(count); + + NSArray *replayedValues = subject.toArray; + expect(replayedValues.count).to.equal(count); - expect(liveValues).to.equal(subject.toArray); - expect(subject.toArray).to.equal(subject.toArray); + // It should return the same ordering for multiple invocations too. + expect(replayedValues).to.equal(subject.toArray); + + [replayedValues enumerateObjectsUsingBlock:^(id value, NSUInteger index, BOOL *stop) { + expect(liveValues[index]).to.equal(value); + }]; }); }); From 2874ee6a1aa38d3a6424a478d55101a4c9f5399f Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 20:13:59 -0800 Subject: [PATCH 14/18] Refactor RACReplaySubject to be consistent when live vs. replaying --- .../ReactiveCocoa/RACReplaySubject.m | 105 +++++++++--------- 1 file changed, 51 insertions(+), 54 deletions(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACReplaySubject.m b/ReactiveCocoaFramework/ReactiveCocoa/RACReplaySubject.m index c7e0e9c6cd..dac54e499e 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACReplaySubject.m +++ b/ReactiveCocoaFramework/ReactiveCocoa/RACReplaySubject.m @@ -14,90 +14,87 @@ const NSUInteger RACReplaySubjectUnlimitedCapacity = 0; @interface RACReplaySubject () -@property (nonatomic, strong) NSMutableArray *valuesReceived; -@property (nonatomic, assign) NSUInteger capacity; -@property (assign) BOOL hasCompletedAlready; -@property (strong) NSError *error; + +@property (nonatomic, assign, readonly) NSUInteger capacity; + +// These properties should only be modified while synchronized on self. +@property (nonatomic, strong, readonly) NSMutableArray *valuesReceived; +@property (nonatomic, assign) BOOL hasCompleted; +@property (nonatomic, assign) BOOL hasError; +@property (nonatomic, strong) NSError *error; + @end @implementation RACReplaySubject +#pragma mark Lifecycle + ++ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity { + return [[self alloc] initWithCapacity:capacity]; +} + - (instancetype)init { + return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity]; +} + +- (instancetype)initWithCapacity:(NSUInteger)capacity { self = [super init]; - if(self == nil) return nil; + if (self == nil) return nil; - self.valuesReceived = [NSMutableArray array]; + _capacity = capacity; + _valuesReceived = [NSMutableArray arrayWithCapacity:capacity]; return self; } - #pragma mark RACSignal - (RACDisposable *)subscribe:(id)subscriber { - RACDisposable *disposable = [super subscribe:subscriber]; - NSArray *valuesCopy = nil; - @synchronized(self.valuesReceived) { - valuesCopy = [self.valuesReceived copy]; - } - - for(id value in valuesCopy) { - [subscriber sendNext:[value isKindOfClass:[RACTupleNil class]] ? nil : value]; - } - - if(self.hasCompletedAlready) { - [subscriber sendCompleted]; - [disposable dispose]; - } else if(self.error != nil) { - [subscriber sendError:self.error]; - [disposable dispose]; + @synchronized (self) { + RACDisposable *disposable = nil; + if (!self.hasCompleted && !self.hasError) disposable = [super subscribe:subscriber]; + + for (id value in self.valuesReceived) { + [subscriber sendNext:([value isKindOfClass:RACTupleNil.class] ? nil : value)]; + } + + if (self.hasCompleted) { + [subscriber sendCompleted]; + } else if (self.hasError) { + [subscriber sendError:self.error]; + } + + return disposable; } - - return disposable; } - #pragma mark RACSubscriber - (void)sendNext:(id)value { - [super sendNext:value]; - - @synchronized(self.valuesReceived) { - [self.valuesReceived addObject:value ? : [RACTupleNil tupleNil]]; + @synchronized (self) { + [self.valuesReceived addObject:value ?: RACTupleNil.tupleNil]; + [super sendNext:value]; - if(self.capacity != RACReplaySubjectUnlimitedCapacity) { - while(self.valuesReceived.count > self.capacity) { - [self.valuesReceived removeObjectAtIndex:0]; - } + if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) { + [self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)]; } } } - (void)sendCompleted { - self.hasCompletedAlready = YES; - - [super sendCompleted]; + @synchronized (self) { + self.hasCompleted = YES; + [super sendCompleted]; + } } - (void)sendError:(NSError *)e { - self.error = e; - - [super sendError:e]; -} - - -#pragma mark API - -@synthesize valuesReceived; -@synthesize capacity; -@synthesize hasCompletedAlready; -@synthesize error; - -+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity { - RACReplaySubject *subject = [self subject]; - subject.capacity = capacity; - return subject; + @synchronized (self) { + self.hasError = YES; + self.error = e; + [super sendError:e]; + } } @end From 0af2ba53b5c6e8b6682db9c8788f6e2e64b495b6 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 20:30:16 -0800 Subject: [PATCH 15/18] Removed RACAsyncSubject -- use RACReplaySubject instead --- RACExtensions/NSData+RACSupport.m | 2 +- RACExtensions/NSString+RACSupport.m | 2 +- RACExtensions/NSTask+RACSupport.m | 2 +- .../ReactiveCocoa.xcodeproj/project.pbxproj | 18 +- .../ReactiveCocoa/RACAsyncSubject.h | 21 -- .../ReactiveCocoa/RACAsyncSubject.m | 74 ----- .../ReactiveCocoa/RACSignal.m | 4 +- .../ReactiveCocoa/ReactiveCocoa.h | 1 - .../ReactiveCocoaTests/RACSignalSpec.m | 18 +- .../ReactiveCocoaTests/RACSubjectSpec.m | 299 +++++++++--------- 10 files changed, 162 insertions(+), 279 deletions(-) delete mode 100644 ReactiveCocoaFramework/ReactiveCocoa/RACAsyncSubject.h delete mode 100644 ReactiveCocoaFramework/ReactiveCocoa/RACAsyncSubject.m diff --git a/RACExtensions/NSData+RACSupport.m b/RACExtensions/NSData+RACSupport.m index 8ca4d2bc8b..e2f2b570e2 100644 --- a/RACExtensions/NSData+RACSupport.m +++ b/RACExtensions/NSData+RACSupport.m @@ -13,7 +13,7 @@ @implementation NSData (RACSupport) + (RACSignal *)rac_readContentsOfURL:(NSURL *)URL options:(NSDataReadingOptions)options scheduler:(RACScheduler *)scheduler { NSParameterAssert(scheduler != nil); - RACAsyncSubject *subject = [RACAsyncSubject subject]; + RACReplaySubject *subject = [RACReplaySubject subject]; [scheduler schedule:^{ NSError *error = nil; diff --git a/RACExtensions/NSString+RACSupport.m b/RACExtensions/NSString+RACSupport.m index 3da481a336..9eb3f1799f 100644 --- a/RACExtensions/NSString+RACSupport.m +++ b/RACExtensions/NSString+RACSupport.m @@ -13,7 +13,7 @@ @implementation NSString (RACSupport) + (RACSignal *)rac_readContentsOfURL:(NSURL *)URL usedEncoding:(NSStringEncoding *)encoding scheduler:(RACScheduler *)scheduler { NSParameterAssert(scheduler != nil); - RACAsyncSubject *subject = [RACAsyncSubject subject]; + RACReplaySubject *subject = [RACReplaySubject subject]; [scheduler schedule:^{ NSError *error = nil; diff --git a/RACExtensions/NSTask+RACSupport.m b/RACExtensions/NSTask+RACSupport.m index 24f44875d9..f5c26d10dd 100644 --- a/RACExtensions/NSTask+RACSupport.m +++ b/RACExtensions/NSTask+RACSupport.m @@ -55,7 +55,7 @@ - (RACCancelableSignal *)rac_run { - (RACCancelableSignal *)rac_runWithScheduler:(RACScheduler *)scheduler { NSParameterAssert(scheduler != nil); - RACAsyncSubject *subject = [RACAsyncSubject subject]; + RACReplaySubject *subject = [RACReplaySubject subject]; __block BOOL canceled = NO; [[RACScheduler mainQueueScheduler] schedule:^{ diff --git a/ReactiveCocoaFramework/ReactiveCocoa.xcodeproj/project.pbxproj b/ReactiveCocoaFramework/ReactiveCocoa.xcodeproj/project.pbxproj index 347ac86568..a9bf5b5b5c 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa.xcodeproj/project.pbxproj +++ b/ReactiveCocoaFramework/ReactiveCocoa.xcodeproj/project.pbxproj @@ -84,8 +84,6 @@ 88CDF7E715000FCF00163A9F /* InfoPlist.strings in Resources */ = {isa = PBXBuildFile; fileRef = 88CDF7E515000FCF00163A9F /* InfoPlist.strings */; }; 88D4AB3E1510F6C30011494F /* RACReplaySubject.h in Headers */ = {isa = PBXBuildFile; fileRef = 88D4AB3C1510F6C30011494F /* RACReplaySubject.h */; settings = {ATTRIBUTES = (Public, ); }; }; 88D4AB3F1510F6C30011494F /* RACReplaySubject.m in Sources */ = {isa = PBXBuildFile; fileRef = 88D4AB3D1510F6C30011494F /* RACReplaySubject.m */; }; - 88D4AB491510F8F10011494F /* RACAsyncSubject.h in Headers */ = {isa = PBXBuildFile; fileRef = 88D4AB471510F8F10011494F /* RACAsyncSubject.h */; settings = {ATTRIBUTES = (Public, ); }; }; - 88D4AB4A1510F8F10011494F /* RACAsyncSubject.m in Sources */ = {isa = PBXBuildFile; fileRef = 88D4AB481510F8F10011494F /* RACAsyncSubject.m */; }; 88DA309815071CBA00C19D0F /* RACValueTransformer.m in Sources */ = {isa = PBXBuildFile; fileRef = 88DA309615071CBA00C19D0F /* RACValueTransformer.m */; }; 88E2C6B4153C771C00C7493C /* RACScheduler.h in Headers */ = {isa = PBXBuildFile; fileRef = 88E2C6B2153C771C00C7493C /* RACScheduler.h */; settings = {ATTRIBUTES = (Public, ); }; }; 88E2C6B5153C771C00C7493C /* RACScheduler.m in Sources */ = {isa = PBXBuildFile; fileRef = 88E2C6B3153C771C00C7493C /* RACScheduler.m */; }; @@ -99,7 +97,6 @@ 88F440BF153DAD600097B4C3 /* RACConnectableSignal.m in Sources */ = {isa = PBXBuildFile; fileRef = 88F5870115361BCD0084BD32 /* RACConnectableSignal.m */; }; 88F440C0153DAD630097B4C3 /* RACSubject.m in Sources */ = {isa = PBXBuildFile; fileRef = 880B9175150B09190008488E /* RACSubject.m */; }; 88F440C1153DAD640097B4C3 /* RACReplaySubject.m in Sources */ = {isa = PBXBuildFile; fileRef = 88D4AB3D1510F6C30011494F /* RACReplaySubject.m */; }; - 88F440C2153DAD660097B4C3 /* RACAsyncSubject.m in Sources */ = {isa = PBXBuildFile; fileRef = 88D4AB481510F8F10011494F /* RACAsyncSubject.m */; }; 88F440C3153DAD690097B4C3 /* RACBehaviorSubject.m in Sources */ = {isa = PBXBuildFile; fileRef = 883A84D91513964B006DB4C7 /* RACBehaviorSubject.m */; }; 88F440C4153DAD6A0097B4C3 /* RACMaybe.m in Sources */ = {isa = PBXBuildFile; fileRef = 8820370915096A4F002428D3 /* RACMaybe.m */; }; 88F440C5153DAD6C0097B4C3 /* RACDisposable.m in Sources */ = {isa = PBXBuildFile; fileRef = 883A84DE1513B5EC006DB4C7 /* RACDisposable.m */; }; @@ -137,7 +134,6 @@ 90AF46D71625537D0054F8A0 /* RACCancelableSignal+Private.h in Headers */ = {isa = PBXBuildFile; fileRef = 88A5F4FD156B3FED009E49DC /* RACCancelableSignal+Private.h */; }; 90AF46D81625537D0054F8A0 /* RACSubject.h in Headers */ = {isa = PBXBuildFile; fileRef = 880B9174150B09190008488E /* RACSubject.h */; }; 90AF46D91625537D0054F8A0 /* RACReplaySubject.h in Headers */ = {isa = PBXBuildFile; fileRef = 88D4AB3C1510F6C30011494F /* RACReplaySubject.h */; }; - 90AF46DA1625537D0054F8A0 /* RACAsyncSubject.h in Headers */ = {isa = PBXBuildFile; fileRef = 88D4AB471510F8F10011494F /* RACAsyncSubject.h */; }; 90AF46DB1625537D0054F8A0 /* RACBehaviorSubject.h in Headers */ = {isa = PBXBuildFile; fileRef = 883A84D81513964B006DB4C7 /* RACBehaviorSubject.h */; }; 90AF46DC1625537D0054F8A0 /* RACDisposable.h in Headers */ = {isa = PBXBuildFile; fileRef = 883A84DD1513B5EC006DB4C7 /* RACDisposable.h */; }; 90AF46DD1625537D0054F8A0 /* RACScopedDisposable.h in Headers */ = {isa = PBXBuildFile; fileRef = 884476E2152367D100958F44 /* RACScopedDisposable.h */; }; @@ -422,8 +418,6 @@ 88CDF82C15008C0500163A9F /* NSObject+RACPropertySubscribing.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; lineEnding = 0; path = "NSObject+RACPropertySubscribing.h"; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.objcpp; }; 88D4AB3C1510F6C30011494F /* RACReplaySubject.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = RACReplaySubject.h; sourceTree = ""; }; 88D4AB3D1510F6C30011494F /* RACReplaySubject.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; lineEnding = 0; path = RACReplaySubject.m; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.objc; }; - 88D4AB471510F8F10011494F /* RACAsyncSubject.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = RACAsyncSubject.h; sourceTree = ""; }; - 88D4AB481510F8F10011494F /* RACAsyncSubject.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; lineEnding = 0; path = RACAsyncSubject.m; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.objc; }; 88DA309515071CBA00C19D0F /* RACValueTransformer.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = RACValueTransformer.h; sourceTree = ""; }; 88DA309615071CBA00C19D0F /* RACValueTransformer.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = RACValueTransformer.m; sourceTree = ""; }; 88DA30D31508051F00C19D0F /* RACSpecs.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = RACSpecs.h; sourceTree = ""; }; @@ -787,7 +781,7 @@ 887ACDA6165878A7009190AD /* NSInvocation+RACTypeParsing.m */, D0D486FF1642550100DD7605 /* RACStream.h */, D0D487001642550100DD7605 /* RACStream.m */, - D0D486FB164253B600DD7605 /* Subscribables */, + D0D486FB164253B600DD7605 /* Signals */, D0D486FD164253D500DD7605 /* Disposables */, D0D486FE164253E100DD7605 /* Commands */, D0E967561641EF8200FCFF06 /* Sequences */, @@ -911,7 +905,7 @@ path = "Mac OS X"; sourceTree = ""; }; - D0D486FB164253B600DD7605 /* Subscribables */ = { + D0D486FB164253B600DD7605 /* Signals */ = { isa = PBXGroup; children = ( 88CDF80415001CA800163A9F /* RACSignal.h */, @@ -929,7 +923,7 @@ 88A5F4FA156B3FCB009E49DC /* RACCancelableSignal.m */, D0D486FC164253C400DD7605 /* Subjects */, ); - name = Subscribables; + name = Signals; sourceTree = ""; }; D0D486FC164253C400DD7605 /* Subjects */ = { @@ -939,8 +933,6 @@ 880B9175150B09190008488E /* RACSubject.m */, 88D4AB3C1510F6C30011494F /* RACReplaySubject.h */, 88D4AB3D1510F6C30011494F /* RACReplaySubject.m */, - 88D4AB471510F8F10011494F /* RACAsyncSubject.h */, - 88D4AB481510F8F10011494F /* RACAsyncSubject.m */, 883A84D81513964B006DB4C7 /* RACBehaviorSubject.h */, 883A84D91513964B006DB4C7 /* RACBehaviorSubject.m */, ); @@ -1007,7 +999,6 @@ 88A5F4FB156B3FCB009E49DC /* RACCancelableSignal.h in Headers */, 88037FC41505646C001A5B19 /* RACAsyncCommand.h in Headers */, 88D4AB3E1510F6C30011494F /* RACReplaySubject.h in Headers */, - 88D4AB491510F8F10011494F /* RACAsyncSubject.h in Headers */, 883A84DA1513964B006DB4C7 /* RACBehaviorSubject.h in Headers */, 883A84DF1513B5EC006DB4C7 /* RACDisposable.h in Headers */, 886F702A1551CF920045D68B /* RACGroupedSignal.h in Headers */, @@ -1073,7 +1064,6 @@ 90AF46D71625537D0054F8A0 /* RACCancelableSignal+Private.h in Headers */, 90AF46D81625537D0054F8A0 /* RACSubject.h in Headers */, 90AF46D91625537D0054F8A0 /* RACReplaySubject.h in Headers */, - 90AF46DA1625537D0054F8A0 /* RACAsyncSubject.h in Headers */, 90AF46DB1625537D0054F8A0 /* RACBehaviorSubject.h in Headers */, 90AF46DC1625537D0054F8A0 /* RACDisposable.h in Headers */, 90AF46DD1625537D0054F8A0 /* RACScopedDisposable.h in Headers */, @@ -1306,7 +1296,6 @@ 8820370B15096A4F002428D3 /* RACMaybe.m in Sources */, 880B9177150B09190008488E /* RACSubject.m in Sources */, 88D4AB3F1510F6C30011494F /* RACReplaySubject.m in Sources */, - 88D4AB4A1510F8F10011494F /* RACAsyncSubject.m in Sources */, 88977C3E1512914A00A09EC5 /* RACSignal.m in Sources */, 883A84DB1513964B006DB4C7 /* RACBehaviorSubject.m in Sources */, 883A84E01513B5EC006DB4C7 /* RACDisposable.m in Sources */, @@ -1398,7 +1387,6 @@ 88F440C1153DAD640097B4C3 /* RACReplaySubject.m in Sources */, 88F440C0153DAD630097B4C3 /* RACSubject.m in Sources */, 88F440C6153DAD6E0097B4C3 /* RACScopedDisposable.m in Sources */, - 88F440C2153DAD660097B4C3 /* RACAsyncSubject.m in Sources */, 88A5F4FE156B4301009E49DC /* RACCancelableSignal.m in Sources */, 88F440C3153DAD690097B4C3 /* RACBehaviorSubject.m in Sources */, 88F440CD153DAD820097B4C3 /* NSObject+RACBindings.m in Sources */, diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACAsyncSubject.h b/ReactiveCocoaFramework/ReactiveCocoa/RACAsyncSubject.h deleted file mode 100644 index bce4523e15..0000000000 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACAsyncSubject.h +++ /dev/null @@ -1,21 +0,0 @@ -// -// RACAsyncSubject.h -// ReactiveCocoa -// -// Created by Josh Abernathy on 3/14/12. -// Copyright (c) 2012 GitHub, Inc. All rights reserved. -// - -#import "RACSubject.h" - - -// An async subject saves the most recent object sent and waits to send it until -// the subject completes. If the subject gets a new subscriber after it has been -// completed, it sends that last value and then completes again. -// -// This lets us avoid race conditions when dealing with asynchronous operations. -// If async operation completes before our subscription occurs, the async -// subject will simply replay that result and completion for us. -@interface RACAsyncSubject : RACSubject - -@end diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACAsyncSubject.m b/ReactiveCocoaFramework/ReactiveCocoa/RACAsyncSubject.m deleted file mode 100644 index d57e1e6184..0000000000 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACAsyncSubject.m +++ /dev/null @@ -1,74 +0,0 @@ -// -// RACAsyncSubject.m -// ReactiveCocoa -// -// Created by Josh Abernathy on 3/14/12. -// Copyright (c) 2012 GitHub, Inc. All rights reserved. -// - -#import "RACAsyncSubject.h" -#import "RACSubscriber.h" -#import "RACDisposable.h" - -@interface RACAsyncSubject () - -// These should only be read or written while synchronized on self. -@property (nonatomic, strong) id lastValue; -@property (nonatomic, assign) BOOL hasLastValue; -@property (nonatomic, assign) BOOL hasCompletedAlready; -@property (nonatomic, strong) NSError *error; -@property (nonatomic, assign) BOOL hasError; - -@end - - -@implementation RACAsyncSubject - - -#pragma mark RACSignal - -- (RACDisposable *)subscribe:(id)subscriber { - RACDisposable *disposable = [super subscribe:subscriber]; - - @synchronized (self) { - if (self.hasCompletedAlready) { - [self sendCompleted]; - [disposable dispose]; - } else if (self.hasError) { - [self sendError:self.error]; - [disposable dispose]; - } - } - - return disposable; -} - - -#pragma mark RACSubscriber - -- (void)sendNext:(id)value { - @synchronized (self) { - self.lastValue = value; - self.hasLastValue = YES; - } -} - -- (void)sendCompleted { - @synchronized (self) { - self.hasCompletedAlready = YES; - if (self.hasLastValue) [super sendNext:self.lastValue]; - } - - [super sendCompleted]; -} - -- (void)sendError:(NSError *)e { - @synchronized (self) { - self.error = e; - self.hasError = YES; - } - - [super sendError:e]; -} - -@end diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACSignal.m b/ReactiveCocoaFramework/ReactiveCocoa/RACSignal.m index 5e9a2d9670..5fb812608b 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACSignal.m +++ b/ReactiveCocoaFramework/ReactiveCocoa/RACSignal.m @@ -8,9 +8,9 @@ #import "RACSignal.h" #import "NSObject+RACExtensions.h" -#import "RACAsyncSubject.h" #import "RACBehaviorSubject.h" #import "RACDisposable.h" +#import "RACReplaySubject.h" #import "RACScheduler.h" #import "RACSubject.h" #import "RACSignal+Private.h" @@ -324,7 +324,7 @@ + (RACSignal *)startWithScheduler:(RACScheduler *)scheduler block:(id (^)(BOOL * + (RACSignal *)startWithScheduler:(RACScheduler *)scheduler subjectBlock:(void (^)(RACSubject *subject))block { NSParameterAssert(block != NULL); - RACAsyncSubject *subject = [RACAsyncSubject subject]; + RACReplaySubject *subject = [RACReplaySubject subject]; [scheduler schedule:^{ block(subject); }]; diff --git a/ReactiveCocoaFramework/ReactiveCocoa/ReactiveCocoa.h b/ReactiveCocoaFramework/ReactiveCocoa/ReactiveCocoa.h index 925f418bba..e406d8adb2 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/ReactiveCocoa.h +++ b/ReactiveCocoaFramework/ReactiveCocoa/ReactiveCocoa.h @@ -15,7 +15,6 @@ #import #import #import -#import #import #import #import diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSignalSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSignalSpec.m index 900c0fa522..ff2763bbff 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSignalSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSignalSpec.m @@ -12,17 +12,17 @@ #import "RACStreamExamples.h" #import "EXTKeyPathCoding.h" -#import "RACSignal.h" -#import "RACSubscriber.h" -#import "RACSubject.h" +#import "NSObject+RACPropertySubscribing.h" #import "RACBehaviorSubject.h" #import "RACDisposable.h" -#import "RACUnit.h" -#import "RACTuple.h" +#import "RACReplaySubject.h" #import "RACScheduler.h" +#import "RACSignal.h" +#import "RACSubject.h" +#import "RACSubscriber.h" #import "RACTestObject.h" -#import "NSObject+RACPropertySubscribing.h" -#import "RACAsyncSubject.h" +#import "RACTuple.h" +#import "RACUnit.h" static NSString * const RACSignalMergeConcurrentCompletionExampleGroup = @"RACSignalMergeConcurrentCompletionExampleGroup"; static NSString * const RACSignalMaxConcurrent = @"RACSignalMaxConcurrent"; @@ -740,11 +740,11 @@ expect(deallocd).will.beTruthy(); }); - it(@"should dealloc an async subject if it completes immediately", ^{ + it(@"should dealloc a replay subject if it completes immediately", ^{ __block BOOL completed = NO; __block BOOL deallocd = NO; @autoreleasepool { - RACAsyncSubject *subject __attribute__((objc_precise_lifetime)) = [RACAsyncSubject subject]; + RACReplaySubject *subject __attribute__((objc_precise_lifetime)) = [RACReplaySubject subject]; [subject sendCompleted]; [subject rac_addDeallocDisposable:[RACDisposable disposableWithBlock:^{ diff --git a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m index a0b13327f3..d211de715e 100644 --- a/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m +++ b/ReactiveCocoaFramework/ReactiveCocoaTests/RACSubjectSpec.m @@ -11,7 +11,6 @@ #import "EXTScope.h" #import "RACSubject.h" -#import "RACAsyncSubject.h" #import "RACBehaviorSubject.h" #import "RACReplaySubject.h" @@ -48,182 +47,174 @@ } copy], nil); }); -describe(@"RACAsyncSubject", ^{ - __block RACAsyncSubject *subject = nil; - - beforeEach(^{ - subject = [RACAsyncSubject subject]; - }); - - it(@"should send the last value only at completion", ^{ - id firstValue = @"blah"; - id secondValue = @"more blah"; - - __block id valueReceived = nil; - __block NSUInteger nextsReceived = 0; - [subject subscribeNext:^(id x) { - valueReceived = x; - nextsReceived++; - }]; - - [subject sendNext:firstValue]; - [subject sendNext:secondValue]; - - expect(nextsReceived).to.equal(0); - expect(valueReceived).to.beNil(); - - [subject sendCompleted]; - - expect(nextsReceived).to.equal(1); - expect(valueReceived).to.equal(secondValue); - }); - - it(@"should send the last value to new subscribers after completion", ^{ - id firstValue = @"blah"; - id secondValue = @"more blah"; - - __block id valueReceived = nil; - __block NSUInteger nextsReceived = 0; - - [subject sendNext:firstValue]; - [subject sendNext:secondValue]; - - expect(nextsReceived).to.equal(0); - expect(valueReceived).to.beNil(); - - [subject sendCompleted]; +describe(@"RACReplaySubject", ^{ + __block RACReplaySubject *subject = nil; + + describe(@"with a capacity of 1", ^{ + beforeEach(^{ + subject = [RACReplaySubject replaySubjectWithCapacity:1]; + }); - [subject subscribeNext:^(id x) { - valueReceived = x; - nextsReceived++; - }]; + it(@"should send the last value", ^{ + id firstValue = @"blah"; + id secondValue = @"more blah"; + + [subject sendNext:firstValue]; + [subject sendNext:secondValue]; + + __block id valueReceived = nil; + [subject subscribeNext:^(id x) { + valueReceived = x; + }]; + + expect(valueReceived).to.equal(secondValue); + }); - expect(nextsReceived).to.equal(1); - expect(valueReceived).to.equal(secondValue); - }); - - it(@"should not send any values to new subscribers if none were sent originally", ^{ - [subject sendCompleted]; - - __block BOOL nextInvoked = NO; - [subject subscribeNext:^(id x) { - nextInvoked = YES; - }]; - - expect(nextInvoked).to.beFalsy(); - }); - - it(@"should resend errors", ^{ - NSError *error = [NSError errorWithDomain:NSCocoaErrorDomain code:0 userInfo:nil]; - [subject sendError:error]; + it(@"should send the last value to new subscribers after completion", ^{ + id firstValue = @"blah"; + id secondValue = @"more blah"; + + __block id valueReceived = nil; + __block NSUInteger nextsReceived = 0; + + [subject sendNext:firstValue]; + [subject sendNext:secondValue]; + + expect(nextsReceived).to.equal(0); + expect(valueReceived).to.beNil(); + + [subject sendCompleted]; + + [subject subscribeNext:^(id x) { + valueReceived = x; + nextsReceived++; + }]; + + expect(nextsReceived).to.equal(1); + expect(valueReceived).to.equal(secondValue); + }); - __block BOOL errorSent = NO; - [subject subscribeError:^(NSError *sentError) { - expect(sentError).to.equal(error); - errorSent = YES; - }]; + it(@"should not send any values to new subscribers if none were sent originally", ^{ + [subject sendCompleted]; - expect(errorSent).to.beTruthy(); - }); + __block BOOL nextInvoked = NO; + [subject subscribeNext:^(id x) { + nextInvoked = YES; + }]; - it(@"should resend nil errors", ^{ - [subject sendError:nil]; + expect(nextInvoked).to.beFalsy(); + }); - __block BOOL errorSent = NO; - [subject subscribeError:^(NSError *sentError) { - expect(sentError).to.beNil(); - errorSent = YES; - }]; + it(@"should resend errors", ^{ + NSError *error = [NSError errorWithDomain:NSCocoaErrorDomain code:0 userInfo:nil]; + [subject sendError:error]; - expect(errorSent).to.beTruthy(); - }); -}); + __block BOOL errorSent = NO; + [subject subscribeError:^(NSError *sentError) { + expect(sentError).to.equal(error); + errorSent = YES; + }]; -describe(@"RACReplaySubject", ^{ - __block RACReplaySubject *subject = nil; - - beforeEach(^{ - subject = [RACReplaySubject subject]; - }); + expect(errorSent).to.beTruthy(); + }); - itShouldBehaveLike(RACSubscriberExamples, [^{ return subject; } copy], [^(NSSet *expectedValues) { - NSMutableSet *values = [NSMutableSet set]; + it(@"should resend nil errors", ^{ + [subject sendError:nil]; - // This subscription should synchronously dump all values already - // received into 'values'. - [subject subscribeNext:^(id value) { - [values addObject:value]; - }]; + __block BOOL errorSent = NO; + [subject subscribeError:^(NSError *sentError) { + expect(sentError).to.beNil(); + errorSent = YES; + }]; - expect(values).to.equal(expectedValues); - } copy], nil); - - it(@"should send both values to new subscribers after completion", ^{ - id firstValue = @"blah"; - id secondValue = @"more blah"; - - [subject sendNext:firstValue]; - [subject sendNext:secondValue]; - [subject sendCompleted]; - - __block BOOL completed = NO; - NSMutableArray *valuesReceived = [NSMutableArray array]; - [subject subscribeNext:^(id x) { - [valuesReceived addObject:x]; - } completed:^{ - completed = YES; - }]; - - expect(valuesReceived.count).to.equal(2); - NSArray *expected = [NSArray arrayWithObjects:firstValue, secondValue, nil]; - expect(valuesReceived).to.equal(expected); - expect(completed).to.beTruthy(); + expect(errorSent).to.beTruthy(); + }); }); - it(@"should send values in the same order live as when replaying", ^{ - NSUInteger count = 49317; - - // Just leak it, ain't no thang. - __unsafe_unretained volatile id *values = (__unsafe_unretained id *)calloc(count, sizeof(*values)); - __block volatile int32_t nextIndex = 0; + describe(@"with an unlimited capacity", ^{ + beforeEach(^{ + subject = [RACReplaySubject subject]; + }); - [subject subscribeNext:^(NSNumber *value) { - int32_t indexPlusOne = OSAtomicIncrement32(&nextIndex); - values[indexPlusOne - 1] = value; - }]; + itShouldBehaveLike(RACSubscriberExamples, [^{ return subject; } copy], [^(NSSet *expectedValues) { + NSMutableSet *values = [NSMutableSet set]; - dispatch_queue_t queue = dispatch_queue_create("com.github.ReactiveCocoa.RACSubjectSpec", DISPATCH_QUEUE_CONCURRENT); - @onExit { - dispatch_release(queue); - }; + // This subscription should synchronously dump all values already + // received into 'values'. + [subject subscribeNext:^(id value) { + [values addObject:value]; + }]; - dispatch_suspend(queue); + expect(values).to.equal(expectedValues); + } copy], nil); - for (NSUInteger i = 0; i < count; i++) { - dispatch_async(queue, ^{ - [subject sendNext:@(i)]; - }); - } - - dispatch_resume(queue); - dispatch_barrier_sync(queue, ^{ + it(@"should send both values to new subscribers after completion", ^{ + id firstValue = @"blah"; + id secondValue = @"more blah"; + + [subject sendNext:firstValue]; + [subject sendNext:secondValue]; [subject sendCompleted]; + + __block BOOL completed = NO; + NSMutableArray *valuesReceived = [NSMutableArray array]; + [subject subscribeNext:^(id x) { + [valuesReceived addObject:x]; + } completed:^{ + completed = YES; + }]; + + expect(valuesReceived.count).to.equal(2); + NSArray *expected = [NSArray arrayWithObjects:firstValue, secondValue, nil]; + expect(valuesReceived).to.equal(expected); + expect(completed).to.beTruthy(); }); - OSMemoryBarrier(); + it(@"should send values in the same order live as when replaying", ^{ + NSUInteger count = 49317; + + // Just leak it, ain't no thang. + __unsafe_unretained volatile id *values = (__unsafe_unretained id *)calloc(count, sizeof(*values)); + __block volatile int32_t nextIndex = 0; + + [subject subscribeNext:^(NSNumber *value) { + int32_t indexPlusOne = OSAtomicIncrement32(&nextIndex); + values[indexPlusOne - 1] = value; + }]; + + dispatch_queue_t queue = dispatch_queue_create("com.github.ReactiveCocoa.RACSubjectSpec", DISPATCH_QUEUE_CONCURRENT); + @onExit { + dispatch_release(queue); + }; + + dispatch_suspend(queue); + + for (NSUInteger i = 0; i < count; i++) { + dispatch_async(queue, ^{ + [subject sendNext:@(i)]; + }); + } + + dispatch_resume(queue); + dispatch_barrier_sync(queue, ^{ + [subject sendCompleted]; + }); - NSArray *liveValues = [NSArray arrayWithObjects:(id *)values count:(NSUInteger)nextIndex]; - expect(liveValues.count).to.equal(count); - - NSArray *replayedValues = subject.toArray; - expect(replayedValues.count).to.equal(count); + OSMemoryBarrier(); - // It should return the same ordering for multiple invocations too. - expect(replayedValues).to.equal(subject.toArray); + NSArray *liveValues = [NSArray arrayWithObjects:(id *)values count:(NSUInteger)nextIndex]; + expect(liveValues.count).to.equal(count); + + NSArray *replayedValues = subject.toArray; + expect(replayedValues.count).to.equal(count); - [replayedValues enumerateObjectsUsingBlock:^(id value, NSUInteger index, BOOL *stop) { - expect(liveValues[index]).to.equal(value); - }]; + // It should return the same ordering for multiple invocations too. + expect(replayedValues).to.equal(subject.toArray); + + [replayedValues enumerateObjectsUsingBlock:^(id value, NSUInteger index, BOOL *stop) { + expect(liveValues[index]).to.equal(value); + }]; + }); }); }); From 9965cd639b0d94d83c3eb799eab01344be0355e8 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 27 Nov 2012 20:35:01 -0800 Subject: [PATCH 16/18] Fix GHAPIDemo --- GHAPIDemo/GHAPIDemo/GHGitHubClient.m | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/GHAPIDemo/GHAPIDemo/GHGitHubClient.m b/GHAPIDemo/GHAPIDemo/GHGitHubClient.m index 8b05e95f03..d7f75e151f 100644 --- a/GHAPIDemo/GHAPIDemo/GHGitHubClient.m +++ b/GHAPIDemo/GHAPIDemo/GHGitHubClient.m @@ -90,7 +90,7 @@ - (RACSignal *)postPublicKey:(NSString *)key title:(NSString *)title { } - (RACSignal *)enqueueRequestWithMethod:(NSString *)method path:(NSString *)path parameters:(NSDictionary *)parameters { - RACAsyncSubject *subject = [RACAsyncSubject subject]; + RACReplaySubject *subject = [RACReplaySubject subject]; NSURLRequest *request = [self requestWithMethod:method path:path parameters:parameters]; AFHTTPRequestOperation *operation = [self HTTPRequestOperationWithRequest:request success:^(AFHTTPRequestOperation *operation, id responseObject) { [subject sendNext:responseObject]; From 34487f7f447c50720913105e4214f0a429d8ce96 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Wed, 28 Nov 2012 14:54:19 -0800 Subject: [PATCH 17/18] Document -[RACSubscriber stopSubscription] --- ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m | 1 + 1 file changed, 1 insertion(+) diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m index b33bcd9cdf..a02524bd35 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m +++ b/ReactiveCocoaFramework/ReactiveCocoa/RACSubscriber.m @@ -19,6 +19,7 @@ @interface RACSubscriber () @property (nonatomic, strong) RACDisposable *disposable; @property (nonatomic, assign) BOOL completedOrErrored; +// Disposes of and releases the receiver's disposable. - (void)stopSubscription; @end From baadde1f7ae093ad8724b37225a63b094c4042de Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Wed, 28 Nov 2012 15:08:29 -0800 Subject: [PATCH 18/18] Fix subscribeForever not getting unlocked due to short circuiting Also, this is exception-safe, where the previous implementation wasn't. --- .../ReactiveCocoa/RACSignalProtocol.m | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/ReactiveCocoaFramework/ReactiveCocoa/RACSignalProtocol.m b/ReactiveCocoaFramework/ReactiveCocoa/RACSignalProtocol.m index 05601a255d..716358f0aa 100644 --- a/ReactiveCocoaFramework/ReactiveCocoa/RACSignalProtocol.m +++ b/ReactiveCocoaFramework/ReactiveCocoa/RACSignalProtocol.m @@ -7,6 +7,7 @@ // #import "RACSignalProtocol.h" +#import "EXTScope.h" #import "NSArray+RACSequenceAdditions.h" #import "NSObject+RACExtensions.h" #import "NSObject+RACPropertySubscribing.h" @@ -38,6 +39,7 @@ completed = [completed copy]; NSRecursiveLock *lock = [[NSRecursiveLock alloc] init]; + lock.name = @"com.github.ReactiveCocoa.RACSignalProtocol.subscribeForever"; // These should only be accessed while 'lock' is held. __block BOOL disposed = NO; @@ -45,25 +47,34 @@ RACDisposable *shortCircuitingDisposable = [RACDisposable disposableWithBlock:^{ [lock lock]; + @onExit { + [lock unlock]; + }; + disposed = YES; [innerDisposable dispose]; - [lock unlock]; }]; RACDisposable *outerDisposable = [signal subscribeNext:next error:^(NSError *e) { error(e, shortCircuitingDisposable); [lock lock]; + @onExit { + [lock unlock]; + }; + if (disposed) return; innerDisposable = subscribeForever(signal, next, error, completed); - [lock unlock]; } completed:^{ completed(shortCircuitingDisposable); [lock lock]; + @onExit { + [lock unlock]; + }; + if (disposed) return; innerDisposable = subscribeForever(signal, next, error, completed); - [lock unlock]; }]; return [RACDisposable disposableWithBlock:^{