Skip to content

Commit

Permalink
LazyConnect examples
Browse files Browse the repository at this point in the history
  • Loading branch information
LeeCampbell committed Dec 17, 2016
1 parent 49bb239 commit 8e273d4
Show file tree
Hide file tree
Showing 6 changed files with 446 additions and 34 deletions.
19 changes: 4 additions & 15 deletions Algorithms/GeneratingDeltas.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Generating Delta value
# Generating Delta value

If you are looking to enrich your data from an observable sequence with delta values then this can be done quite easily with Rx.

Expand All @@ -16,7 +16,7 @@ delta = currentValue - previousValue

There are several ways we can do this. Here we will cover using overlapping windows with `Buffer`, a running accumulator with `Scan` or merging the current sequence with a copy of sequence delayed by 1 using `Publish`, `Zip` and `Skip`.

##Buffer
## Buffer
A very simple option is to use the overload of the `Buffer` that allows you to specify the buffer size _and_ the size of the stride to take.
Normally you would pass just an integer value or a timespan to the `Buffer` operator.
In these overload, the values in each produced buffer do not overlap i.e. for `Buffer(5)` you would get the first values values produced as single `IList<T>` and then once the tenth value was produced from the source, the buffer would give the next 5 values in another `IList<T>`.
Expand All @@ -34,7 +34,7 @@ var deltas = source.Buffer(2, 1)
Note that this will not produce a value until at least two values have been produced.
This can be remedied by using the `StartWith` to provide a seed value.

##Scan
## Scan

An alternate solution is to use the `Scan` operator.
The `Scan` operator is often used for producing running aggregate values e.g. running totals.
Expand All @@ -50,7 +50,7 @@ This is arguably more complex solution than the `Buffer` option.
This however does not suffer the same limitation as the `Buffer` algorithm, and can yield values as soon as the first value from the source is produced (without the need for `StartWith`).


##ZipSkip
## ZipSkip

The _ZipSkip_ algorithm is popular however it is more difficult to learn, and requires more effort to share subscriptions.
Effectively the _ZipSkip_ algorithm will `Zip` the source sequence with the source sequence again but skipping the first value.
Expand Down Expand Up @@ -87,14 +87,3 @@ var previousValues = source;
var currentValues = previousValues.Skip(1);
var deltas = previousValues.Zip(currentvalues, (prev, curr) => curr - prev)
```











16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,34 @@ RxCookbook

Collection of recipes and snippets helping you create useful Rx code

##Rx in your Model
## Rx in your Model
See how you can use Rx in your regular domain.

[Property Change notifications](Model/PropertyChange.md)
[Collection Change notifications](Model/CollectionChange.md)


##Rx for Instrumentation
## Rx for Instrumentation
Use Rx to help instrument your code, or analyse instrumented data from other systems.

[Logging](Instrumentation/Logging.md)

##Rx with IO
## Rx in your repositories
There are common patterns that I see occurring in the repository "layer" in reactive applications.

[Polling with Rx](Repository/Polling.md)
[Lazy Connect](Repository/LazyConnect.md)

## Rx with IO
Here we look at different ways to use Rx with various forms in Input/Output.

###Disk
### Disk
Rx can help stream data to and from the disk.
I have found that it can greatly reduce complexity while also delivering impressive performance benefits.

[Rx Disk IO](IO/Disk/ReadMe.md)

###Communications
### Communications
Rx can provide a useful abstraction over numerous communications layers.
Here we look at various implementations for different technologies.

Expand Down
159 changes: 159 additions & 0 deletions Repository/LazyConnect.linq
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
<Query Kind="Program">
<NuGetReference>Microsoft.Reactive.Testing</NuGetReference>
<Namespace>Microsoft.Reactive.Testing</Namespace>
<Namespace>System</Namespace>
<Namespace>System.Linq</Namespace>
<Namespace>System.Reactive</Namespace>
<Namespace>System.Reactive.Concurrency</Namespace>
<Namespace>System.Reactive.Disposables</Namespace>
<Namespace>System.Reactive.Joins</Namespace>
<Namespace>System.Reactive.Linq</Namespace>
<Namespace>System.Reactive.PlatformServices</Namespace>
<Namespace>System.Reactive.Subjects</Namespace>
<Namespace>System.Reactive.Threading.Tasks</Namespace>
<Namespace>System.Threading.Tasks</Namespace>
</Query>

void Main()
{
//Tests
FirstSubscriberConnectsSequence();
SecondSubscriberGetsSharedData();
ResurectionKeepsOriginalSubscription();
CanDisposeTheConnection();
}

// Define other methods and classes here


public static class ObservableExtensions
{
/*For first 2 tests
public static IObservable<T> LazyConnect<T>(this IConnectableObservable<T> source)
{
return source.RefCount();
}*/

/// <summary>
/// Returns an observable sequence that connects on first subscription. Connection is terminated by the provided <see cref="SingleAssignmentDisposable"/>.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="source">The source sequence that will be connected on first subscription.</param>
/// <param name="connection">The resource that the connection will be attached to.</param>
/// <returns>An observable sequence that connects on first subscription.</returns>
public static IObservable<T> LazyConnect<T>(this IConnectableObservable<T> source, SingleAssignmentDisposable connection)
{
int isConnected = 0;
return Observable.Create<T>(
o =>
{
var subscription = source.Subscribe(o);
var isDisconnected = Interlocked.CompareExchange(ref isConnected, 1, 0) == 0;
if (isDisconnected)
connection.Disposable = source.Connect();
return subscription;
});
}
}
#region Tests
public void FirstSubscriberConnectsSequence()
{
var testScheduler = new TestScheduler();
var source = Observable.Timer(TimeSpan.FromSeconds(5), testScheduler)
.Timestamp(testScheduler)
.Replay(1)
.LazyConnect(new SingleAssignmentDisposable());
var observer = testScheduler.CreateObserver<Timestamped<long>>();
testScheduler.AdvanceTo(60.Seconds());
source.Subscribe(observer);
testScheduler.Start();
//If we connect after 60s and the value take 5s to be produced, we should recieve it at 65s
ReactiveAssert.AssertEqual(observer.Messages,
ReactiveTest.OnNext(65.Seconds(), new Timestamped<long>(0L, 65.Seconds().SinceEpoch())),
ReactiveTest.OnCompleted<Timestamped<long>>(65.Seconds())
);
}
public void SecondSubscriberGetsSharedData()
{
var testScheduler = new TestScheduler();
var source = Observable.Timer(TimeSpan.FromSeconds(5), testScheduler)
.Timestamp(testScheduler)
.Replay(1)
.LazyConnect(new SingleAssignmentDisposable());
var observer1 = testScheduler.CreateObserver<Timestamped<long>>();
var observer2 = testScheduler.CreateObserver<Timestamped<long>>();
testScheduler.Schedule(TimeSpan.FromSeconds(60), ()=>source.Subscribe(observer1));
testScheduler.Schedule(TimeSpan.FromSeconds(120), ()=>source.Subscribe(observer2));

testScheduler.Start();

//We subscribe at 120s and should immediately get the cached value.
// The cached value was producd at 65s (and timestamped as such).
// We should recieve it as soon as we subscribe at 120s.
ReactiveAssert.AssertEqual(observer2.Messages,
ReactiveTest.OnNext(120.Seconds(), new Timestamped<long>(0L, 65.Seconds().SinceEpoch())),
ReactiveTest.OnCompleted<Timestamped<long>>(120.Seconds())
);
}
public void ResurectionKeepsOriginalSubscription()
{
var testScheduler = new TestScheduler();
var source = Observable.Timer(TimeSpan.FromSeconds(5), testScheduler)
.Timestamp(testScheduler)
.Replay(1)
.LazyConnect(new SingleAssignmentDisposable());

var observer1 = testScheduler.CreateObserver<Timestamped<long>>();
var observer2 = testScheduler.CreateObserver<Timestamped<long>>();

testScheduler.Schedule(TimeSpan.FromSeconds(60), () => source.Take(1).Subscribe(observer1));
testScheduler.Schedule(TimeSpan.FromSeconds(120), () => source.Subscribe(observer2));

testScheduler.Start();

//We subscribe at 120s and should immediately get the cached value.
// The cached value was producd at 65s (and timestamped as such).
// We should recieve it as soon as we subscribe at 120s.
ReactiveAssert.AssertEqual(observer2.Messages,
ReactiveTest.OnNext(120.Seconds(), new Timestamped<long>(0L, 65.Seconds().SinceEpoch())),
ReactiveTest.OnCompleted<Timestamped<long>>(120.Seconds())
);
}
public void CanDisposeTheConnection()
{
var testScheduler = new TestScheduler();
var connection = new SingleAssignmentDisposable();
var source = testScheduler.CreateColdObservable<int>();

//First subscriber will connect the sequence.
source.Replay(1)
.LazyConnect(connection)
.Subscribe();

connection.Dispose();

ReactiveAssert.AssertEqual(source.Subscriptions,
new Subscription(0,0));
}
#endregion

#region Test helpers
public static class TemporalExtensions
{
public static long Seconds(this int seconds)
{
return TimeSpan.FromSeconds(seconds).Ticks;
}
public static DateTimeOffset SinceEpoch(this long ticks)
{
return DateTimeOffset.MinValue.AddTicks(ticks);
}
}
#endregion
Loading

0 comments on commit 8e273d4

Please sign in to comment.