/
FilterableChangeObserver.cs
85 lines (77 loc) · 3.02 KB
/
FilterableChangeObserver.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
using System;
namespace Meadow;
/// <summary>
/// An `IObserver` that handles change notifications and has an optional
/// predicate that automatically filters results so only results that match
/// the predicate will reach the subscriber.
/// </summary>
/// <typeparam name="UNIT">The datatype that contains the notification data.
/// I.e. `Temperature` or `decimal`. Must be a `struct`.</typeparam>
public class FilterableChangeObserver<UNIT> : IObserver<IChangeResult<UNIT>>
where UNIT : struct
{
/// <summary>
/// Than handler that is called in `OnNext` if the filter is satisfied.
/// </summary>
protected Action<IChangeResult<UNIT>> Handler { get; } = default!;
/// <summary>
/// A filter that specifies whether or not the observer should get notified.
/// </summary>
protected Predicate<IChangeResult<UNIT>>? Filter { get; } = null;
/// <summary>
/// The last notified value. Note that this may differ from the `Old`
/// property on the result, because this only gets updated if the filter
/// is satisfied and the result is sent to the observer.
/// </summary>
protected UNIT? lastNotifiedValue;
/// <summary>
/// Creates a new `FilterableChangeObserver` that will execute the handler
/// when a change occurs. If the `filter` parameter is supplied with a
/// `Predicate`, only changes that satisfy that predicate (return `true`)
/// will cause the handler to be invoked.
/// </summary>
/// <param name="handler">An `Action` that will be invoked when a
/// change occurs.</param>
/// <param name="filter">An optional `Predicate` that filters out any
/// notifications that don't satisfy (return `true`) the predicate condition.
/// Note that the first reading will always call the handler.</param>
public FilterableChangeObserver(Action<IChangeResult<UNIT>> handler, Predicate<IChangeResult<UNIT>>? filter = null)
{
this.Handler = handler;
this.Filter = filter;
}
/// <summary>
/// Called by an Observable when a change occurs.
/// </summary>
/// <param name="result"></param>
public void OnNext(IChangeResult<UNIT> result)
{
// if the last notified value isn't null, inject it into the result.
// (each last notified is specific to the observer)
if (lastNotifiedValue is { } last)
{
result.Old = last;
}
// if there is no filter,
// OR
// if the filter satisfies the result,
// OR
// if it's the first time (result.Old == null)
if (Filter == null || Filter(result) || result.Old is null)
{
// save the last notified value as this new value
lastNotifiedValue = result.New;
// invoke (execute) the handler
Handler?.Invoke(result);
}
}
/// <inheritdoc/>
public void OnCompleted()
{
}
/// <inheritdoc/>
public void OnError(Exception error)
{
Console.WriteLine("Filtered Observer error: " + error.Message);
}
}