Skip to content

Commit

Permalink
Allow interpreters to process completion requests asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
alexarchambault committed Oct 19, 2018
1 parent 61ee225 commit 483959c
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -68,7 +68,7 @@ lazy val interpreter = project

lazy val kernel = project
.underShared
.dependsOn(interpreter)
.dependsOn(interpreter, interpreter % "test->test")
.settings(
shared,
testSettings,
Expand Down
@@ -0,0 +1,8 @@
package almond.interpreter

import scala.concurrent.Future

final case class FutureCompletion(
future: Future[Completion],
cancel: () => Unit
)
Expand Up @@ -39,6 +39,9 @@ trait IOInterpreter {
def complete(code: String, pos: Int): IO[Completion] =
IO.pure(Completion.empty(pos))

final def complete(code: String): IO[Completion] =
complete(code, code.length)

def inspect(code: String, pos: Int, detailLevel: Int): IO[Option[Inspection]] =
IO.pure(None)

Expand Down
Expand Up @@ -82,6 +82,18 @@ trait Interpreter {
def complete(code: String, pos: Int): Completion =
Completion.empty(pos)

/**
* Asynchronously try to complete code.
*
* This is normally called before [[complete()]]. If this returns a non-empty option,
* it is assumed asynchronous completions are supported. Else, [[complete()]] is called.
*
* @param code: code to complete
* @param pos: cursor position (as a unicode code point index) in code
*/
def asyncComplete(code: String, pos: Int): Option[FutureCompletion] =
None

/**
* Tries to complete code.
*
Expand Down
Expand Up @@ -11,6 +11,7 @@ import fs2.async
import fs2.async.mutable.Signal

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

/**
*
Expand Down Expand Up @@ -96,13 +97,56 @@ final class InterpreterToIOInterpreter(
IO(interpreter.isComplete(code))
}

override def complete(code: String, pos: Int): IO[Completion] =
cancellable {
case true =>
IO.pure(Completion.empty(pos))
case false =>
IO(interpreter.complete(code, pos))
private var runningCompletionOpt = Option.empty[FutureCompletion]
private val runningCompletionLock = new Object

override def complete(code: String, pos: Int): IO[Completion] = {

val ioOrFutureCompletion =
runningCompletionLock.synchronized {

for (c <- runningCompletionOpt) {
log.debug(s"Cancelling completion request $c")
c.cancel()
runningCompletionOpt = None
}

interpreter.asyncComplete(code, pos) match {
case None =>
Left {
cancellable {
case true =>
IO.pure(Completion.empty(pos))
case false =>
IO(interpreter.complete(code, pos))
}
}
case Some(f) =>
runningCompletionOpt = Some(f)
Right(f)
}
}

ioOrFutureCompletion match {
case Left(io) => io
case Right(f) =>
IO.async[Completion] { cb =>
import scala.concurrent.ExecutionContext.Implicits.global // meh
f.future.onComplete { res =>
runningCompletionLock.synchronized {
log.debug(s"Completion request $f done: $res")
runningCompletionOpt = runningCompletionOpt.filter(_ != f)
}
res match {
case Success(c) =>
cb(Right(c))
case Failure(e) =>
cb(Left(e))
}
}
}
}
}

override def inspect(code: String, pos: Int, detailLevel: Int): IO[Option[Inspection]] =
cancellable {
Expand Down
@@ -0,0 +1,53 @@
package almond.interpreter

import java.util.concurrent.Executors

import almond.logger.LoggerContext
import almond.util.ThreadUtil
import cats.implicits._
import utest._

import scala.concurrent.ExecutionContext

object IOInterpreterTests extends TestSuite {

private val pool = Executors.newScheduledThreadPool(4, ThreadUtil.daemonThreadFactory("test"))
private val ec = ExecutionContext.fromExecutorService(pool)

override def utestAfterAll() = {
pool.shutdown()
}

val tests = Tests {

"completion" - {

"cancel previous requests" - {

val interpreter: Interpreter = new TestInterpreter
val ioInterpreter: IOInterpreter = new InterpreterToIOInterpreter(interpreter, ec, LoggerContext.nop)

val ios = Seq(
// the "cancel" completions are only completed if they are cancelled
ioInterpreter.complete("cancel"),
ioInterpreter.complete("cancel"),
ioInterpreter.complete("other")
)

val t = ios.toList.sequence

val res = t.unsafeRunSync()
val expectedRes = Seq(
Completion(0, "cancel".length, Seq("cancelled")),
Completion(0, "cancel".length, Seq("cancelled")),
Completion("other".length, "other".length, Seq("?"))
)

assert(res == expectedRes)
}

}

}

}
@@ -1,12 +1,12 @@
package almond.kernel
package almond.interpreter

import almond.interpreter.{ExecuteResult, Interpreter}
import almond.interpreter.api.{CommHandler, DisplayData, OutputHandler}
import almond.interpreter.comm.CommManager
import almond.interpreter.input.InputManager

import scala.concurrent.Await
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration.Duration
import scala.util.Success

final class TestInterpreter extends Interpreter {
def execute(
Expand Down Expand Up @@ -68,6 +68,21 @@ final class TestInterpreter extends Interpreter {

def kernelInfo() = ???

override def asyncComplete(code: String, pos: Int) = {

val res =
if (code == "cancel") {
val p = Promise[Completion]()
FutureCompletion(p.future, () => p.complete(Success(Completion(0, code.length, Seq("cancelled")))))
} else
FutureCompletion(Future.successful(Completion(pos, pos, Seq("?"))), () => sys.error("should not happen"))

Some(res)
}

override def complete(code: String, pos: Int) =
sys.error("should not be called")

private val commManager = CommManager.create()
private var commHandlerOpt0 = Option.empty[CommHandler]
override def commManagerOpt = Some(commManager)
Expand Down
Expand Up @@ -4,7 +4,7 @@ import java.util.UUID

import almond.channels.Channel
import almond.interpreter.messagehandlers.MessageHandler
import almond.interpreter.Message
import almond.interpreter.{Message, TestInterpreter}
import almond.logger.LoggerContext
import almond.protocol.{Execute, Header, History, Input, Shutdown}
import almond.util.ThreadUtil.{attemptShutdownExecutionContext, singleThreadedExecutionContext}
Expand Down

0 comments on commit 483959c

Please sign in to comment.