paulbatum / Reactive

My lame attempt at mimicking the Rx framework

This URL has Read+Write access

Reactive / WhereObservable.cs
100644 23 lines (19 sloc) 0.657 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
using System;
 
namespace Reactive
{
    public class WhereObservable<T> : IObservable<T>
    {
        private readonly IObservable<T> _source;
        private readonly Func<T, bool> _predicate;
 
        public WhereObservable(IObservable<T> source, Func<T, bool> predicate)
        {
            _source = source;
            _predicate = predicate;
        }
 
        public IDisposable Subscribe(IObserver<T> observer)
        {
            IObserver<T> whereObserver = ObserverBuilder.Create(observer, (T t) => { if (_predicate(t)) observer.OnNext(t); });
            return _source.Subscribe(whereObserver);
        }
    }
    
}