diff --git a/rudder-web/pom.xml b/rudder-web/pom.xml index 3de92334ef2..a80e7bfff67 100644 --- a/rudder-web/pom.xml +++ b/rudder-web/pom.xml @@ -112,6 +112,12 @@ along with Rudder. If not, see . ${commons-io-version} + + io.monix + monix_${scala-binary-version} + ${monix-version} + + net.liftweb diff --git a/rudder-web/src/main/scala/com/normation/rudder/web/rest/node/NodeAPI8.scala b/rudder-web/src/main/scala/com/normation/rudder/web/rest/node/NodeAPI8.scala index 83948e54e21..5db62016dd1 100644 --- a/rudder-web/src/main/scala/com/normation/rudder/web/rest/node/NodeAPI8.scala +++ b/rudder-web/src/main/scala/com/normation/rudder/web/rest/node/NodeAPI8.scala @@ -52,6 +52,7 @@ import com.normation.rudder.web.rest.ApiVersion import com.normation.rudder.web.rest.RestDataSerializer import com.normation.rudder.domain.nodes.Node import net.liftweb.http.OutputStreamResponse +import net.liftweb.http.StreamingResponse class NodeAPI8 ( apiV6 : NodeAPI6 diff --git a/rudder-web/src/main/scala/com/normation/rudder/web/rest/node/NodeAPIService8.scala b/rudder-web/src/main/scala/com/normation/rudder/web/rest/node/NodeAPIService8.scala index b20ce9e7fb1..f72d8a790fd 100644 --- a/rudder-web/src/main/scala/com/normation/rudder/web/rest/node/NodeAPIService8.scala +++ b/rudder-web/src/main/scala/com/normation/rudder/web/rest/node/NodeAPIService8.scala @@ -70,6 +70,7 @@ import com.normation.rudder.domain.nodes.NodeInfo import scalaj.http.Http import com.normation.rudder.domain.nodes.CompareProperties import scalaj.http.HttpOptions +import monix.eval.Task class NodeApiService8 ( nodeRepository : WoNodeRepository @@ -102,6 +103,7 @@ class NodeApiService8 ( private[this] val pipeSize = 4096 def runResponse(in : InputStream)(out : OutputStream) = { + val bytes : Array[Byte] = new Array(pipeSize) val zero = 0.toByte var read = 0 @@ -115,11 +117,8 @@ class NodeApiService8 ( } catch { case e : IOException => - // should we log ? Should we end ? - // No need to close output or inputs - // Output will be managed by Lift / java servlet response - // Input is a Piped input stream and therefore, use only in memory ressources (no network, no database ...) - // And we already close the other side of the pipe, a case that PipedInputStream handles + out.write(e.getMessage.toByte) + out.flush() } } @@ -138,20 +137,27 @@ class NodeApiService8 ( } def runNode(nodeId: NodeId, classes : List[String]) : Box[OutputStream => Unit] = { - + import monix.execution.Scheduler.Implicits.global val request = remoteRunRequest(nodeId,classes,true,false) + + val in = new PipedInputStream(pipeSize) + val out = new PipedOutputStream(in) import net.liftweb.util.Helpers.tryo - for { - httpResponse <- tryo { request.execute { runResponse } } - response <- if (httpResponse.isSuccess ) { - Full(httpResponse.body) - } else { - Failure(s"An error occured when applying policy on Node '${nodeId.value}'") - } - } yield { - - response - } + + val response = + + Task( + request.exec{ + case (status,headers,input) => + if (status >= 200 && status < 300) { + runResponse(input)(out) + } else { + out.write(s"An error occured when applying policy on Node '${nodeId.value}'".toByte) + out.flush + } + }) + response.runAsync.onComplete { _ => out.close() } + Full(runResponse(in)) } def runAllNodes(classes : List[String]) : Box[JValue] = { @@ -171,10 +177,11 @@ class NodeApiService8 ( val request = remoteRunRequest(node.id, classes, false, true) val commandRun = { - if (request.asString.isSuccess) { + val result = request.asString + if (result.isSuccess) { "Started" } else { - s"An error occured when applying policy on Node '${node.id.value}'" + s"An error occured when applying policy on Node '${node.id.value}', cause is: ${result.body}" } } ( ( "id" -> node.id.value)