Skip to content

Commit

Permalink
Add possibility for caller proxy to return an observable
Browse files Browse the repository at this point in the history
When subscribed to it will call a progressive RPC

Code-Sharp#238
  • Loading branch information
Johan 't Hart committed Mar 2, 2021
1 parent 82a672e commit 77226d9
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
using NUnit.Framework;
using WampSharp.Core.Serialization;
Expand Down Expand Up @@ -64,6 +66,42 @@ public async Task ProgressiveCallsCalleeProxyProgress()
Assert.That(result.Result, Is.EqualTo(10));
}

[Test]
public async Task ProgressiveCallsCalleeProxyObservable()
{
WampPlayground playground = new WampPlayground();

CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
IWampChannel calleeChannel = dualChannel.CalleeChannel;
IWampChannel callerChannel = dualChannel.CallerChannel;

MyOperation myOperation = new MyOperation();

await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObsService>();

IEnumerable<int> results = proxy.LongOp(9, false).ToEnumerable(); // it will emit one more than asked

CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), results);
}

[Test]
public async Task ProgressiveCallsCalleeProxyObservableError()
{
WampPlayground playground = new WampPlayground();

CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
IWampChannel calleeChannel = dualChannel.CalleeChannel;
IWampChannel callerChannel = dualChannel.CallerChannel;

MyOperation myOperation = new MyOperation();

await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObsService>();

Assert.Throws(typeof(WampException), () => proxy.LongOp(9, true).ToEnumerable().Count());
}

public class MyOperation : IWampRpcOperation
{
public string Procedure => "com.myapp.longop";
Expand All @@ -80,16 +118,27 @@ public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCal
TMessage number = arguments[0];
int n = formatter.Deserialize<int>(number);

bool endWithError = arguments.Length > 0 && formatter.Deserialize<bool>(arguments[1]);

for (int i = 0; i < n; i++)
{
caller.Result(WampObjectFormatter.Value,
new YieldOptions {Progress = true},
new object[] {i});
}

caller.Result(WampObjectFormatter.Value,
new YieldOptions(),
new object[] {n});
if (endWithError)
{
caller.Error(WampObjectFormatter.Value,
new Dictionary<string, string>(),
"Something bad happened");
}
else
{
caller.Result(WampObjectFormatter.Value,
new YieldOptions(),
new object[] { n });
}

return null;
}
Expand Down Expand Up @@ -122,6 +171,31 @@ public async Task<int> LongOp(int n, IProgress<int> progress)
}
}

public interface ILongOpObsService
{
[WampProcedure("com.myapp.longop")]
[WampProgressiveResultProcedure]
IObservable<int> LongOp(int n, bool endWithError);
}

public class LongOpObsService : ILongOpObsService
{
public IObservable<int> LongOp(int n, bool endWithError) => Observable.Create<int>(async obs =>
{
for (int i = 0; i < n; i++)
{
obs.OnNext(i);
await Task.Delay(100);
}
if (endWithError)
obs.OnError(new WampException("wamp.error", "Something bad happened"));
else
obs.OnCompleted();
return Disposable.Empty;
});
}

public class MyCallback : IWampRawRpcOperationClientCallback
{
private readonly TaskCompletionSource<int> mTask = new TaskCompletionSource<int>();
Expand Down Expand Up @@ -187,4 +261,4 @@ public void Report(T value)
mAction(value);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Reflection;
using WampSharp.V2.Core.Contracts;
using WampSharp.V2.Rpc;
Expand Down Expand Up @@ -37,7 +38,7 @@ public virtual CallOptions GetCallOptions(MethodInfo method)

public virtual string GetProcedureUri(MethodInfo method)
{
WampProcedureAttribute attribute =
WampProcedureAttribute attribute =
method.GetCustomAttribute<WampProcedureAttribute>();

if (attribute == null)
Expand All @@ -48,4 +49,4 @@ public virtual string GetProcedureUri(MethodInfo method)
return attribute.Procedure;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Reactive;
using System.Reflection;
using System.Threading.Tasks;
using WampSharp.V2.Rpc;
Expand Down Expand Up @@ -28,7 +29,12 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
Type genericArgument;
Type interceptorType;

if (!typeof(Task).IsAssignableFrom(returnType))
if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(IObservable<>))
{
genericArgument = returnType.GetGenericArguments()[0];
interceptorType = typeof(ObservableCalleeProxyInterceptor<>);
}
else if (!typeof(Task).IsAssignableFrom(returnType))
{
MethodInfoValidation.ValidateSyncMethod(method);

Expand All @@ -55,4 +61,4 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
return closedGenericType;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reflection;

namespace WampSharp.V2.CalleeProxy
{
internal class ObservableCalleeProxyInterceptor<T> : CalleeProxyInterceptorBase<T>
{
public ObservableCalleeProxyInterceptor(MethodInfo method, IWampCalleeProxyInvocationHandler handler, ICalleeProxyInterceptor interceptor) : base(method, handler, interceptor)
{
}

public override object Invoke(MethodInfo method, object[] arguments)
{
return Observable.Create<T>(async (obs, cancellationToken) =>
{
try
{
var last = await Handler.InvokeProgressiveAsync
(Interceptor, method, Extractor, arguments, obs.ToProgress(), cancellationToken);
if (last != null)
obs.OnNext(last);
obs.OnCompleted();
}
catch (Exception e)
{
obs.OnError(e);
}
});
}
}
}

0 comments on commit 77226d9

Please sign in to comment.