Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Lots more reactive

  • Loading branch information...
commit 5bfd49f7fd7506931f4fdd68cce2f1a9e3b64419 1 parent 29a11b1
@Strilanc authored
Showing with 119 additions and 7 deletions.
  1. +19 −0 Library/PacketSocket.vb
  2. +91 −6 Library/Reactive.vb
  3. +9 −1 Tinker.vbproj
View
19 Library/PacketSocket.vb
@@ -151,6 +151,25 @@ Public NotInheritable Class PacketSocket
Disconnect(expected:=False, reason:="Connection went idle.")
End Sub
+ Public Function ObservePackets(Optional ct As CancellationToken = Nothing) As IObservable(Of IRist(Of Byte))
+ Dim r = New ManualObservable(Of IRist(Of Byte))
+ Call Async Sub()
+ Do
+ Try
+ Dim packet = Await AsyncReadPacket()
+ If ct.IsCancellationRequested Then Return
+ r.PushNext(packet)
+ Catch ex As Exception
+ If _isConnected Then
+ r.PushError(ex)
+ Else
+ r.PushCompleted()
+ End If
+ End Try
+ Loop
+ End Sub()
+ Return r
+ End Function
Public Async Function AsyncReadPacket() As Task(Of IRist(Of Byte))
'Contract.Ensures(Contract.Result(Of Task(Of IRist(Of Byte)))() IsNot Nothing)
Try
View
97 Library/Reactive.vb
@@ -29,16 +29,90 @@ Public Class Observable(Of T)
Return _subscribe(observer)
End Function
End Class
+Public Class ManualObservable(Of T)
+ Implements IObservable(Of T)
+ Private ReadOnly _observers As New List(Of IObserver(Of T))()
+ Public Function Subscribe(observer As IObserver(Of T)) As IDisposable Implements IObservable(Of T).Subscribe
+ _observers.Add(observer)
+ Return New DelegatedDisposable(Sub() _observers.Remove(observer))
+ End Function
+ Public Sub PushNext(value As T)
+ For Each obs In _observers
+ obs.OnNext(value)
+ Next
+ End Sub
+ Public Sub PushCompleted()
+ For Each obs In _observers
+ obs.OnCompleted()
+ Next
+ End Sub
+ Public Sub PushError([error] As Exception)
+ For Each obs In _observers
+ obs.OnError([error])
+ Next
+ End Sub
+End Class
+Public Class ObservableWalker(Of T)
+ Implements IObserver(Of T)
+
+ Private ReadOnly _ready As New Queue(Of Task(Of Renullable(Of T)))
+ Private ReadOnly _lock As New Object()
+ Private _next As TaskCompletionSource(Of Renullable(Of T))
+
+ Public Function TryNext() As Task(Of Renullable(Of T))
+ SyncLock _lock
+ If _ready.Count > 0 Then
+ Dim r = _ready.Peek()
+ If r.Status = TaskStatus.RanToCompletion AndAlso r.Result IsNot Nothing Then
+ _ready.Dequeue()
+ End If
+ Return r
+ End If
+
+ If _next IsNot Nothing Then Throw New InvalidOperationException("Overlapping TryNext calls.")
+ _next = New TaskCompletionSource(Of Renullable(Of T))
+ Return _next.Task
+ End SyncLock
+ End Function
+
+ Private Sub OnCompleted() Implements IObserver(Of T).OnCompleted
+ SyncLock _lock
+ _next = If(_next, New TaskCompletionSource(Of Renullable(Of T)))
+ _next.SetResult(Nothing)
+ _ready.Enqueue(_next.Task)
+ End SyncLock
+ End Sub
+ Private Sub OnError([error] As Exception) Implements IObserver(Of T).OnError
+ SyncLock _lock
+ _next = If(_next, New TaskCompletionSource(Of Renullable(Of T)))
+ _next.SetException([error])
+ _ready.Enqueue(_next.Task)
+ End SyncLock
+ End Sub
+ Private Sub OnNext(value As T) Implements IObserver(Of T).OnNext
+ SyncLock _lock
+ If _next IsNot Nothing Then
+ _next.SetResult(value)
+ Else
+ _ready.Enqueue(Task.FromResult(New Renullable(Of T)(value)))
+ End If
+ End SyncLock
+ End Sub
+End Class
Public Module ReactiveUtil
<Extension> <Pure>
+ Public Function Observe(Of T)(observable As IObservable(Of T), onNext As Action(Of T), onCompleted As Action, onError As Action(Of Exception)) As IDisposable
+ Return observable.Subscribe(New Observer(Of T)(onNext, onCompleted, onError))
+ End Function
+ <Extension> <Pure>
Public Function CollectListAsync(Of T)(observable As IObservable(Of T), Optional ct As CancellationToken = Nothing) As Task(Of List(Of T))
Dim r = New List(Of T)()
Dim s = New TaskCompletionSource(Of List(Of T))()
- Dim reg = observable.Subscribe(New Observer(Of T)(
+ Dim reg = observable.Observe(
Sub(value) r.Add(value),
Sub() s.TrySetResult(r),
- Sub([error]) s.TrySetException([error])))
+ Sub([error]) s.TrySetException([error]))
ct.Register(Sub()
reg.Dispose()
s.TrySetCanceled()
@@ -47,7 +121,7 @@ Public Module ReactiveUtil
End Function
<Extension> <Pure>
Public Function [Select](Of T, R)(observable As IObservable(Of T), projection As Func(Of T, R)) As IObservable(Of R)
- Return New Observable(Of R)(Function(observer) observable.Subscribe(New Observer(Of T)(
+ Return New Observable(Of R)(Function(observer) observable.Observe(
Sub(value)
Dim v As R
Try
@@ -59,11 +133,11 @@ Public Module ReactiveUtil
observer.OnNext(v)
End Sub,
AddressOf observer.OnCompleted,
- AddressOf observer.OnError)))
+ AddressOf observer.OnError))
End Function
<Extension> <Pure>
Public Function Where(Of T)(observable As IObservable(Of T), filter As Func(Of T, Boolean)) As IObservable(Of T)
- Return New Observable(Of T)(Function(observer) observable.Subscribe(New Observer(Of T)(
+ Return New Observable(Of T)(Function(observer) observable.Observe(
Sub(value)
Dim keep As Boolean
Try
@@ -75,6 +149,17 @@ Public Module ReactiveUtil
If keep Then observer.OnNext(value)
End Sub,
AddressOf observer.OnCompleted,
- AddressOf observer.OnError)))
+ AddressOf observer.OnError))
+ End Function
+ <Extension> <Pure>
+ Public Function InCurrentSyncContext(Of T)(observable As IObservable(Of T)) As IObservable(Of T)
+ Dim context = SynchronizationContext.Current
+ Dim post = If(context Is Nothing,
+ Sub(e As Action) e(),
+ Sub(e As Action) context.Post(Sub() e(), Nothing))
+ Return New Observable(Of T)(Function(observer) observable.Observe(
+ Sub(value) post(Sub() observer.OnNext(value)),
+ Sub() post(AddressOf observer.OnCompleted),
+ Sub(err) post(Sub() observer.OnError(err))))
End Function
End Module
View
10 Tinker.vbproj
@@ -68,7 +68,7 @@
</CodeContractsCustomRewriterClass>
<CodeContractsRuntimeCheckingLevel>Preconditions</CodeContractsRuntimeCheckingLevel>
<CodeContractsRuntimeOnlyPublicSurface>False</CodeContractsRuntimeOnlyPublicSurface>
- <CodeContractsRunCodeAnalysis>True</CodeContractsRunCodeAnalysis>
+ <CodeContractsRunCodeAnalysis>False</CodeContractsRunCodeAnalysis>
<CodeContractsBuildReferenceAssembly>False</CodeContractsBuildReferenceAssembly>
<CodeContractsNonNullObligations>True</CodeContractsNonNullObligations>
<CodeContractsBoundsObligations>False</CodeContractsBoundsObligations>
@@ -99,6 +99,14 @@
<CodeContractsEnumObligations>False</CodeContractsEnumObligations>
<CodeContractsAnalysisWarningLevel>2</CodeContractsAnalysisWarningLevel>
<Prefer32Bit>false</Prefer32Bit>
+ <CodeContractsInferRequires>False</CodeContractsInferRequires>
+ <CodeContractsInferEnsures>False</CodeContractsInferEnsures>
+ <CodeContractsInferObjectInvariants>False</CodeContractsInferObjectInvariants>
+ <CodeContractsSuggestAssumptions>False</CodeContractsSuggestAssumptions>
+ <CodeContractsSuggestRequires>True</CodeContractsSuggestRequires>
+ <CodeContractsSuggestEnsures>False</CodeContractsSuggestEnsures>
+ <CodeContractsSuggestObjectInvariants>False</CodeContractsSuggestObjectInvariants>
+ <CodeContractsDisjunctiveRequires>False</CodeContractsDisjunctiveRequires>
</PropertyGroup>
<ItemGroup>
<Reference Include="MpqLibrary, Version=1.0.0.0, Culture=neutral, PublicKeyToken=08060530187cdd8c, processorArchitecture=x86">
Please sign in to comment.
Something went wrong with that request. Please try again.