Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async output #344

Merged
merged 5 commits into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Reactive.Concurrency;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.DotNet.Interactive.Jupyter.Protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
using Microsoft.DotNet.Interactive.Jupyter.Protocol;
using WorkspaceServer.Kernel;
using Xunit;
using System.Reactive.Linq;
using Microsoft.DotNet.Interactive.Events;
using FluentAssertions.Extensions;
using System.Reactive.Concurrency;

namespace Microsoft.DotNet.Interactive.Jupyter.Tests
{
Expand Down
20 changes: 9 additions & 11 deletions Microsoft.DotNet.Interactive.Jupyter/CompleteRequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Reactive.Concurrency;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Microsoft.DotNet.Interactive.Commands;
Expand All @@ -12,14 +13,15 @@

namespace Microsoft.DotNet.Interactive.Jupyter
{
public class CompleteRequestHandler: RequestHandlerBase<CompleteRequest>
public class CompleteRequestHandler : RequestHandlerBase<CompleteRequest>
{
private static readonly Regex _lastToken = new Regex(@"(?<lastToken>\S+)$", RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.CultureInvariant | RegexOptions.Multiline);


public CompleteRequestHandler(IKernel kernel) : base(kernel)
public CompleteRequestHandler(IKernel kernel, IScheduler scheduler = null)
: base(kernel, scheduler ?? CurrentThreadScheduler.Instance)
{

}

public async Task Handle(JupyterRequestContext context)
Expand All @@ -34,22 +36,18 @@ public async Task Handle(JupyterRequestContext context)

InFlightRequests[command] = openRequest;

var kernelResult = await Kernel.SendAsync(command);
openRequest.AddDisposable(kernelResult.KernelEvents.Subscribe(OnKernelResultEvent));

await Kernel.SendAsync(command);
}

void OnKernelResultEvent(IKernelEvent value)
protected override void OnKernelEvent(IKernelEvent @event)
{
switch (value)
switch (@event)
{
case CompletionRequestCompleted completionRequestCompleted:
OnCompletionRequestCompleted(completionRequestCompleted, InFlightRequests);
break;
case CompletionRequestReceived _:
break;
default:
throw new NotSupportedException();
}
}

Expand All @@ -68,7 +66,7 @@ private static void OnCompletionRequestCompleted(CompletionRequestCompleted comp
openRequest.Context.ServerChannel.Send(completeReply);
openRequest.Context.RequestHandlerStatus.SetAsIdle();
openRequest.Dispose();

}

private static int ComputeReplacementStartPosition(string code, int cursorPosition)
Expand Down
13 changes: 6 additions & 7 deletions Microsoft.DotNet.Interactive.Jupyter/ExecuteRequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.DotNet.Interactive.Commands;
Expand All @@ -17,7 +18,8 @@ public class ExecuteRequestHandler : RequestHandlerBase<ExecuteRequest>
{
private int _executionCount;

public ExecuteRequestHandler(IKernel kernel) : base(kernel)
public ExecuteRequestHandler(IKernel kernel, IScheduler scheduler = null)
: base(kernel, scheduler ?? CurrentThreadScheduler.Instance)
{
}

Expand All @@ -36,8 +38,7 @@ public async Task Handle(JupyterRequestContext context)

try
{
var kernelResult = await Kernel.SendAsync(command);
openRequest.AddDisposable(kernelResult.KernelEvents.Subscribe(OnKernelResultEvent));
await Kernel.SendAsync(command);
}
catch (Exception e)
{
Expand Down Expand Up @@ -84,9 +85,9 @@ public async Task Handle(JupyterRequestContext context)
return transient;
}

void OnKernelResultEvent(IKernelEvent value)
protected override void OnKernelEvent(IKernelEvent @event)
{
switch (value)
switch (@event)
{
case ValueProduced valueProduced:
OnValueProduced(valueProduced, InFlightRequests);
Expand All @@ -101,8 +102,6 @@ void OnKernelResultEvent(IKernelEvent value)
case IncompleteCodeSubmissionReceived _:
case CompleteCodeSubmissionReceived _:
break;
default:
throw new NotSupportedException();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Reactive.Concurrency;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Clockwise;
using Microsoft.DotNet.Interactive.Jupyter.Protocol;
Expand All @@ -24,8 +26,16 @@ public class JupyterRequestContextHandler : ICommandHandler<JupyterRequestContex
PackageRegistry packageRegistry,
IKernel kernel)
{
_executeHandler = new ExecuteRequestHandler(kernel);
_completeHandler = new CompleteRequestHandler(kernel);
var scheduler = new EventLoopScheduler(t =>
{
var thread = new Thread(t);
thread.IsBackground = true;
thread.Name = "MessagePump";
return thread;
});

_executeHandler = new ExecuteRequestHandler(kernel, scheduler);
_completeHandler = new CompleteRequestHandler(kernel, scheduler);

if (packageRegistry == null)
{
Expand Down
13 changes: 11 additions & 2 deletions Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,32 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Microsoft.DotNet.Interactive.Commands;
using Microsoft.DotNet.Interactive.Events;
using Microsoft.DotNet.Interactive.Jupyter.Protocol;

namespace Microsoft.DotNet.Interactive.Jupyter
{
public abstract class RequestHandlerBase<T> : IDisposable
where T : JupyterMessageContent
{

private readonly CompositeDisposable _disposables = new CompositeDisposable();
protected IObservable<IKernelEvent> _kernelEventSource { get; }

protected RequestHandlerBase(IKernel kernel)
protected RequestHandlerBase(IKernel kernel, IScheduler scheduler)
{
Kernel = kernel ?? throw new ArgumentNullException(nameof(kernel));

_kernelEventSource = Kernel.KernelEvents.ObserveOn(scheduler ?? throw new ArgumentNullException(nameof(scheduler)));
_disposables.Add(_kernelEventSource.Subscribe(OnKernelEvent));
}

protected abstract void OnKernelEvent(IKernelEvent @event);

protected static T GetJupyterRequest(JupyterRequestContext context)
{
var request = context.GetRequestContent<T>() ??
Expand Down
4 changes: 1 addition & 3 deletions Microsoft.DotNet.Interactive.Jupyter/Shell.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
activity.Info("Received: {message}", message.ToJson());

var status = new RequestHandlerStatus(message.Header, new MessageSender(_ioPubSocket, _signatureValidator));


switch (message.Header.MessageType)
{
Expand All @@ -107,8 +107,6 @@ public async Task StartAsync(CancellationToken cancellationToken)

break;
}


}
}
}
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.DotNet.Interactive/Events/ValueProduced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace Microsoft.DotNet.Interactive.Events
public class ValueProduced : KernelEventBase
{
public ValueProduced(object value,
SubmitCode submitCode,
IKernelCommand command,
bool isLastValue = false,
IReadOnlyCollection<FormattedValue> formattedValues = null) : base(submitCode)
IReadOnlyCollection<FormattedValue> formattedValues = null) : base(command)
{
Value = value;
IsLastValue = isLastValue;
Expand Down
6 changes: 4 additions & 2 deletions WorkspaceServer.Tests/Kernel/CSharpKernelTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
using Pocket;
using WorkspaceServer.Kernel;
using Xunit.Abstractions;
using System.Reactive.Linq;
using System.Reactive;

namespace WorkspaceServer.Tests.Kernel
{
Expand All @@ -24,14 +26,14 @@ protected CSharpKernel CreateKernel()
.LogEventsToPocketLogger();

DisposeAfterTest(
kernel.KernelEvents.Subscribe(KernelEvents.Add));
kernel.KernelEvents.Timestamp().Subscribe(KernelEvents.Add));

return kernel;
}

private readonly CompositeDisposable _disposables = new CompositeDisposable();

protected IList<IKernelEvent> KernelEvents { get; } = new List<IKernelEvent>();
protected IList<Timestamped<IKernelEvent>> KernelEvents { get; } = new List<Timestamped<IKernelEvent>>();

protected void DisposeAfterTest(IDisposable disposable)
{
Expand Down
Loading