Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add +combineLatest: #44

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -85,7 +85,14 @@ typedef NSInteger RACSubscribableError;
- (RACSubscribable *)takeLast:(NSUInteger)count;

// Combine the latest values from each of the subscribables once all the
// subscribables have sent a `next`.
// subscribables have sent a `next`. The `next` will be a RACTuple of
// the values from the subscribables.
+ (RACSubscribable *)combineLatest:(NSArray *)subscribables;

// Combine the latest values from each of the subscribables once all the
// subscribables have sent a `next`. The `next` will be the return value
// of the `reduceBlock`. The argument to `reduceBlock` is a RACTuple
// of the values from the subscribables.
+ (RACSubscribable *)combineLatest:(NSArray *)subscribables reduce:(id (^)(RACTuple *xs))reduceBlock;

// Sends a `+[RACUnit defaultUnit]` when all the subscribables have sent a `next`.
Expand Down
Expand Up @@ -395,18 +395,19 @@ + (RACSubscribable *)combineLatest:(NSArray *)subscribables reduce:(id (^)(RACTu
return [RACSubscribable createSubscribable:^(id<RACSubscriber> subscriber) {
NSMutableSet *disposables = [NSMutableSet setWithCapacity:subscribables.count];
NSMutableSet *completedSubscribables = [NSMutableSet setWithCapacity:subscribables.count];
NSMutableDictionary *lastValues = [NSMutableDictionary dictionaryWithCapacity:subscribables.count];
__block NSMutableDictionary *lastValues = [NSMutableDictionary dictionaryWithCapacity:subscribables.count];
NSObject *nextGate = [NSObject new];
for(id<RACSubscribable> subscribable in subscribables) {
RACDisposable *disposable = [subscribable subscribe:[RACSubscriber subscriberWithNext:^(id x) {
@synchronized(lastValues) {
@synchronized(nextGate) {
[lastValues setObject:x ? : [RACTupleNil tupleNil] forKey:[NSString stringWithFormat:@"%p", subscribable]];

if(lastValues.count == subscribables.count) {
NSMutableArray *orderedValues = [NSMutableArray arrayWithCapacity:subscribables.count];
for(id<RACSubscribable> o in subscribables) {
[orderedValues addObject:[lastValues objectForKey:[NSString stringWithFormat:@"%p", o]]];
}

lastValues = [NSMutableDictionary dictionaryWithCapacity:subscribables.count];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just use [lastValues removeAllObjects], which would remove any need for the nextGate variable used here.

[subscriber sendNext:reduceBlock([RACTuple tupleWithObjectsFromArray:orderedValues])];
}
}
Expand Down Expand Up @@ -434,6 +435,10 @@ + (RACSubscribable *)combineLatest:(NSArray *)subscribables reduce:(id (^)(RACTu
}];
}

+ (RACSubscribable *)combineLatest:(NSArray *)subscribables {
return [self combineLatest:subscribables reduce:^ id (RACTuple *xs) { return xs; }];
}

+ (RACSubscribable *)whenAll:(NSArray *)subscribables {
return [self combineLatest:subscribables reduce:^(RACTuple *xs) { return [RACUnit defaultUnit]; }];
}
Expand Down
150 changes: 150 additions & 0 deletions ReactiveCocoaFramework/ReactiveCocoaTests/RACSubscribableSpec.m
Expand Up @@ -15,6 +15,7 @@
#import "RACBehaviorSubject.h"
#import "RACDisposable.h"
#import "RACUnit.h"
#import "RACTuple.h"


SpecBegin(RACSubscribable)
Expand Down Expand Up @@ -327,4 +328,153 @@
});
});

describe(@"combineLatest", ^{
__block id<RACSubscriber> subscriber1 = nil, subscriber2 = nil;
__block RACSubscribable *subscribable1 = nil, *subscribable2 = nil, *combined = nil;

beforeEach(^{
subscribable1 = [RACSubscribable createSubscribable:^RACDisposable *(id<RACSubscriber> subscriber) {
subscriber1 = subscriber;
return nil;
}],
subscribable2 = [RACSubscribable createSubscribable:^RACDisposable *(id<RACSubscriber> subscriber) {
subscriber2 = subscriber;
return nil;
}],
combined = [RACSubscribable combineLatest:@[ subscribable1, subscribable2 ]];
});

it(@"should yield when all sources yield", ^{
__block id result;

[combined subscribeNext:^(id x) {
result = x;
}];

[subscriber1 sendNext:@"1"];
[subscriber2 sendNext:@"2"];

expect(result).to.beKindOf([RACTuple class]);
RACTuple *tuple = (RACTuple *)result;
expect(tuple.first).to.equal(@"1");
expect(tuple.second).to.equal(@"2");
});

it(@"should not yield when some sources have not yielded", ^{
__block id result;

[combined subscribeNext:^(id x) {
result = x;
}];

[subscriber1 sendNext:@"1"];

expect(result).to.beNil();
});

it(@"should yield multiple times when all sources yield multiple times", ^{
__block int yieldCount = 0;
__block id result1, result2;

[combined subscribeNext:^(id x) {
yieldCount++;
if (yieldCount == 1)
result1 = x;
if (yieldCount == 2)
result2 = x;
}];

[subscriber1 sendNext:@"1"];
[subscriber2 sendNext:@"2"];

[subscriber1 sendNext:@"3"];
[subscriber2 sendNext:@"4"];

expect(result1).to.beKindOf([RACTuple class]);
RACTuple *tuple1 = (RACTuple *)result1;
expect(tuple1.first).to.equal(@"1");
expect(tuple1.second).to.equal(@"2");

expect(result2).to.beKindOf([RACTuple class]);
RACTuple *tuple2 = (RACTuple *)result2;
expect(tuple2.first).to.equal(@"3");
expect(tuple2.second).to.equal(@"4");
});

it(@"should not yield multiple times when all sources do not yield multiple times", ^{
__block int yieldCount = 0;
__block id result1, result2;

[combined subscribeNext:^(id x) {
yieldCount++;
if (yieldCount == 1)
result1 = x;
if (yieldCount == 2)
result2 = x;
}];

[subscriber1 sendNext:@"1"];
[subscriber2 sendNext:@"2"];

[subscriber1 sendNext:@"3"];

expect(result1).to.beKindOf([RACTuple class]);
RACTuple *tuple1 = (RACTuple *)result1;
expect(tuple1.first).to.equal(@"1");
expect(tuple1.second).to.equal(@"2");

expect(result2).to.beNil();
});

it(@"should complete when all sources complete", ^{
__block BOOL completed = NO;

[combined subscribeCompleted:^{
completed = YES;
}];

[subscriber1 sendCompleted];
[subscriber2 sendCompleted];

expect(completed).to.beTruthy();
});

it(@"should not complete when some sources are not complete", ^{
__block BOOL completed = NO;

[combined subscribeCompleted:^{
completed = YES;
}];

[subscriber1 sendCompleted];

expect(completed).to.beFalsy();
});

it(@"should error when a source errors", ^{
__block BOOL gotError = NO;

[combined subscribeError:^(NSError *error) {
gotError = YES;
}];

[subscriber1 sendError:[NSError errorWithDomain:@"" code:-1 userInfo:nil]];

expect(gotError).to.beTruthy();
});

it(@"should error multiple times when multiple sources error", ^{
__block int errorCount = 0;

[combined subscribeError:^(NSError *error) {
errorCount++;
}];

[subscriber1 sendError:[NSError errorWithDomain:@"" code:-1 userInfo:nil]];
[subscriber2 sendError:[NSError errorWithDomain:@"" code:-1 userInfo:nil]];

expect(errorCount).to.equal(2);
});
});

SpecEnd