Permalink
Browse files

Fix more missing OnError() dispatch and some test.

  • Loading branch information...
1 parent 27198f1 commit ff66ed1deb5ce5868f1467002da5db476e6c16e1 @atsushieno committed Feb 26, 2012
@@ -338,6 +338,14 @@ IEnumerable<IObservable<long>> IntervalSelectTakeDoEnumerable (HistoricalSchedul
scheduler.AdvanceBy (TimeSpan.FromMilliseconds (50));
}
}
+
+ [Test]
+ [ExpectedException (typeof (MyException))]
+ public void FirstOrDefault ()
+ {
+ var source = Observable.Throw<int> (new MyException ());
+ var ret = source.FirstOrDefault ();
+ }
[Test] // some practical test
public void IntervalSelectTakeDo ()
@@ -41,6 +41,7 @@ static IObservable<T> NonNullableMin<T> (this IObservable<T> source)
} else if (Comparer<T>.Default.Compare (min, s) > 0)
min = s;
},
+ ex => sub.OnError (ex),
() => VerifyCompleted (got, sub, min)
);
// ----
@@ -77,6 +78,7 @@ static IObservable<T> NonNullableMax<T> (this IObservable<T> source)
} else if (Comparer<T>.Default.Compare (max, s) < 0)
max = s;
},
+ ex => sub.OnError (ex),
() => VerifyCompleted (got, sub, max)
);
// ----
@@ -91,7 +93,7 @@ static IObservable<T> NullableMax<T> (this IObservable<T> source)
return new ColdObservableEach<T> (sub => {
// ----
T max = default (T);
- return source.Subscribe ((s) => { if (Comparer<T>.Default.Compare (max, s) < 0) max = s; }, () => VerifyCompleted (true, sub, max));
+ return source.Subscribe ((s) => { if (Comparer<T>.Default.Compare (max, s) < 0) max = s; }, ex => sub.OnError (ex), () => VerifyCompleted (true, sub, max));
// ----
}, DefaultColdScheduler);
}
@@ -104,7 +106,7 @@ static IObservable<T> NonNullableSum<T> (this IObservable<T> source, Func<T,T,T>
return new ColdObservableEach<T> (sub => {
// ----
T sum = default (T);
- return source.Subscribe (s => sum = add (sum, s), () => VerifyCompleted (true, sub, sum));
+ return source.Subscribe (s => sum = add (sum, s), ex => sub.OnError (ex), () => VerifyCompleted (true, sub, sum));
// ----
}, DefaultColdScheduler);
}
@@ -117,7 +119,7 @@ static IObservable<T> NullableSum<T> (this IObservable<T> source, Func<T,T,T> ad
return new ColdObservableEach<T> (sub => {
// ----
T sum = default (T);
- return source.Subscribe (s => sum = sum != null ? s : add (sum, s), () => VerifyCompleted (true, sub, sum));
+ return source.Subscribe (s => sum = sum != null ? s : add (sum, s), ex => sub.OnError (ex), () => VerifyCompleted (true, sub, sum));
// ----
}, DefaultColdScheduler);
}
@@ -131,7 +133,7 @@ static IObservable<T> NonNullableAverage<T> (this IObservable<T> source, Func<T,
// ----
T sum = default (T);
int count = 0;
- return source.Subscribe (s => { count++; sum = add (sum, s); }, () => VerifyCompleted (true, sub, avg (sum, count)));
+ return source.Subscribe (s => { count++; sum = add (sum, s); }, ex => sub.OnError (ex), () => VerifyCompleted (true, sub, avg (sum, count)));
// ----
}, DefaultColdScheduler);
}
@@ -145,7 +147,7 @@ static IObservable<T> NullableAverage<T> (this IObservable<T> source, Func<T,T,T
// ----
T sum = default (T);
int count = 0;
- return source.Subscribe (s => { count++; sum = sum != null ? s : add (sum, s); }, () => VerifyCompleted (true, sub, avg (sum, count)));
+ return source.Subscribe (s => { count++; sum = sum != null ? s : add (sum, s); }, ex => sub.OnError (ex), () => VerifyCompleted (true, sub, avg (sum, count)));
// ----
}, DefaultColdScheduler);
}
@@ -280,6 +282,7 @@ public static IObservable<TSource> Max<TSource> (this IObservable<TSource> sourc
} else if (comparer.Compare (max, s) < 0)
max = s;
},
+ ex => sub.OnError (ex),
() => VerifyCompleted (got, sub, max));
// ----
}, DefaultColdScheduler);
@@ -363,6 +366,7 @@ public static IObservable<TSource> Min<TSource> (this IObservable<TSource> sourc
} else if (comparer.Compare (min, s) > 0)
min = s;
},
+ ex => sub.OnError (ex),
() => VerifyCompleted (got, sub, min));
// ----
}, DefaultColdScheduler);
@@ -81,7 +81,7 @@ public SubjectCountContext (int start, ISubject<TSource> subject)
subjects [x++].Subject.OnNext (v);
}
current++;
- }, () => { foreach (var sc in subjects) sc.Subject.OnCompleted (); sub.OnCompleted (); }));
+ }, ex => sub.OnError (ex), () => { foreach (var sc in subjects) sc.Subject.OnCompleted (); sub.OnCompleted (); }));
return dis;
// ----
}, DefaultColdScheduler);
@@ -198,7 +198,7 @@ public SubjectTimeShiftContext (DateTimeOffset start, ISubject<TSource> sub)
// This check makes sense when the event was published *at the same time* the subject ends its life time by timeSpan.
if (scheduler.Now - subjects [x].Start < timeSpan)
subjects [x].Subject.OnNext (v);
- }, () => { foreach (var sc in subjects) sc.Subject.OnCompleted (); sub.OnCompleted (); })));
+ }, ex => sub.OnError (ex), () => { foreach (var sc in subjects) sc.Subject.OnCompleted (); sub.OnCompleted (); })));
return dis;
// ----
}, DefaultColdScheduler);
@@ -228,14 +228,13 @@ public SubjectTimeShiftContext (DateTimeOffset start, ISubject<TSource> sub)
var l = new Subject<TSource> ();
var dis = new CompositeDisposable ();
var disClosings = new CompositeDisposable ();
- dis.Add (windowOpenings.Subscribe (Observer.Create<TWindowOpening> (
- s => {
+ dis.Add (windowOpenings.Subscribe (s => {
var closing = windowClosingSelector (s);
disClosings.Add (closing.Subscribe (c => {
sub.OnNext (l);
l = new Subject<TSource> ();
}));
- }, () => disClosings.Dispose ())));
+ }, ex => sub.OnError (ex), () => disClosings.Dispose ()));
dis.Add (source.Subscribe (
s => l.OnNext (s), ex => sub.OnError (ex), () => {

0 comments on commit ff66ed1

Please sign in to comment.