Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #16448: Trigger remote run in node details says it timeeout #2777

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import java.io.InputStream
import java.io.OutputStream
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.net.ConnectException
import java.nio.charset.StandardCharsets
import java.util.Arrays

Expand Down Expand Up @@ -98,6 +99,7 @@ import com.normation.zio._
import com.normation.errors._
import com.normation.rudder.domain.logger.NodeLogger
import com.normation.rudder.domain.logger.NodeLoggerPure
import zio.duration._
import zio.syntax._

/*
Expand Down Expand Up @@ -672,7 +674,7 @@ class NodeApiService8 (

def remoteRunRequest(nodeId: NodeId, classes : List[String], keepOutput : Boolean, asynchronous : Boolean) = {
val url = s"${relayApiEndpoint}/remote-run/nodes/${nodeId.value}"
// val url = "http://httpbin.org/post"
// val url = s"http://localhost/rudder/relay-api/remote-run/nodes/${nodeId.value}"
val params =
( "classes" , classes.mkString(",") ) ::
( "keep_output" , keepOutput.toString ) ::
Expand All @@ -689,16 +691,22 @@ class NodeApiService8 (
* Execute remote run on given node and pipe response to output stream
*/
def runNode[A](nodeId: NodeId, classes : List[String]): OutputStream => Unit = {
def copyStreamTo(pipeSize: Int, in : InputStream)(out: OutputStream )= {
/*
* read from in and copy to out
*/
def copyStreamTo(pipeSize: Int, in : InputStream)(out: OutputStream ): Unit= {
val bytes : Array[Byte] = new Array(pipeSize)
val zero = 0.toByte
var read = 0
try {
while (read != -1) {
while (read >= 0) { // stop on -1 because end of stream
Arrays.fill(bytes,zero)
read = in.read(bytes)
out.write(bytes)
out.flush()
if(read > 0) {
out.write(bytes)
out.flush()
}
// do not close os here
}
} catch {
case e : IOException =>
Expand All @@ -707,15 +715,17 @@ class NodeApiService8 (
}
}

def msg(s: String) = s"Error occured when contacting internal remote-run API to apply classes on Node '${nodeId.value}': ${s}"
def errorMessageWithHint(s: String) = s"Error occured when contacting internal remote-run API to apply classes on Node '${nodeId.value}': ${s}"

// buffer size for file I/O
val pipeSize = 4096

val request = remoteRunRequest(nodeId,classes,true,false).timeout(connTimeoutMs = 1000, readTimeoutMs = 5*1000)
val readTimeout = 30.seconds

val copy = (os: OutputStream) => {
import zio.duration._
val request = remoteRunRequest(nodeId, classes, true, false).timeout(connTimeoutMs = 1000, readTimeoutMs = readTimeout.toMillis.toInt)

// copy will close `os`
val copy = (os: OutputStream, timeout: Duration) => {
for {
_ <- NodeLoggerPure.debug(s"Executing remote run call: ${request.toString}")
opt <- IOResult.effect {
Expand All @@ -724,68 +734,47 @@ class NodeApiService8 (
if (status >= 200 && status < 300) {
copyStreamTo(pipeSize, is)(os)
} else {
val error = msg(s"(HTTP code ${status}) \n${HttpConstants.readString(is)}\n\n")
val error = errorMessageWithHint(s"(HTTP code ${status}) \n${HttpConstants.readString(is)}\n\n")
NodeLogger.error(error)
os.write(error.getBytes)
os.flush
}
os.close() //os must be closed here, else is never know that the stream is closed and wait forever
}
}.unit.catchAll { case error@SystemError(m, ex) =>
// special case for "Connection refused": it means that remoteRunRequest is not working
val err = ex match {
case _:ConnectException =>
Unexpected(s"Can not connect to local remote run API (${request.method.toUpperCase}:${request.url})")
case _ => error
}
}.catchAll { err =>
NodeLoggerPure.error(msg(err.fullMsg)) *> IOResult.effect {
os.write(msg(err.msg).getBytes)

NodeLoggerPure.error(errorMessageWithHint(err.fullMsg)) *> IOResult.effect {
os.write(errorMessageWithHint(err.msg).getBytes)
os.flush
os.close()
}
}.timeout(5.seconds).provide(ZioRuntime.environment)
_ <- NodeLoggerPure.debug("=== done processing request !")
res <- opt match {
case None =>
val error = msg(s"request timed out after ${(5.seconds.render)}")
/// I don't know what happen but the proccessing seems to stop here
/// log is written, and nothing else - actually, it's like there's a second query which doesn't terminate
NodeLoggerPure.error(error)
case Some(x) => x.succeed
}
_ <- NodeLoggerPure.debug("=== I'm going to close output stream!")
_ <- IOResult.effect(os.close())
_ <- NodeLoggerPure.debug("=== closed!")
} yield res
}.timeout(timeout).provide(ZioRuntime.environment)
_ <- NodeLoggerPure.debug("Done processing remote run request")
_ <- opt.notOptional(errorMessageWithHint(s"request timed out after ${(timeout.render)}"))
} yield ()
}

// all
// we use pipedStream between node answer and our caller answer to decouple a bit the two.
// A simpler solution would be to directly copy from request.exec input stream to caller out stream.

/* logs:
[2019-12-04 23:41:59+0000] DEBUG api-processing - Processing request: POST /rudder/secure/api/nodes/fd8799d1-ca1a-4d41-a75a-6329b13045c5/applyPolicy [JSON request with valid JSON body]
[2019-12-04 23:41:59+0000] DEBUG api-processing - Found a valid endpoint handler: 'applyPolicy' on [POST nodes/{id}/applyPolicy] with version '12'
[2019-12-04 23:41:59+0000] DEBUG api-processing - Account 'admin' has ACL authorizations.
[2019-12-04 23:41:59+0000] DEBUG nodes - Executing remote run call: HttpRequest(https://localhost/rudder/relay-api/remote-run/nodes/fd8799d1-ca1a-4d41-a75a-6329b13045c5,POST,FormBodyConnectFunc,List((classes,), (keep_output,true), (asynchronous,false)),List((User-Agent,scalaj-http/1.0), (User-Agent,rudder remote run query for node fd8799d1-ca1a-4d41-a75a-6329b13045c5), (content-type,application/x-www-form-urlencoded)),List(scalaj.http.HttpOptions$$$Lambda$2193/1042155450@44abf99d, scalaj.http.HttpOptions$$$Lambda$2194/1661822083@69968066, scalaj.http.HttpOptions$$$Lambda$5199/896988804@58442d5f, scalaj.http.HttpOptions$$$Lambda$2193/1042155450@7ac4bd3b, scalaj.http.HttpOptions$$$Lambda$2194/1661822083@61b8d4ef, scalaj.http.HttpOptions$$$Lambda$2195/1821575044@3cf0ed0e),None,UTF-8,4096,QueryStringUrlFunc,true)
[2019-12-04 23:42:00+0000] DEBUG nodes - Processing remore-run on fd8799d1-ca1a-4d41-a75a-6329b13045c5: HTTP status 200

[2019-12-04 23:42:04+0000] DEBUG nodes - Processing remore-run on fd8799d1-ca1a-4d41-a75a-6329b13045c5: HTTP status 200
[2019-12-04 23:42:05+0000] DEBUG nodes - === done processing request !
[2019-12-04 23:42:05+0000] ERROR nodes - Error occured when contacting internal remote-run API to apply classes on Node 'fd8799d1-ca1a-4d41-a75a-6329b13045c5': request timed out after 5 s
[2019-12-04 23:42:05+0000] DEBUG nodes - === I'm going to close output stream!
[2019-12-04 23:42:05+0000] DEBUG nodes - === closed!
[2019-12-04 23:42:05+0000] DEBUG nodes - === fiber joined!
[2019-12-04 23:42:05+0000] DEBUG nodes - === out closed!
[2019-12-04 23:42:05+0000] DEBUG api-processing - Handler for 'POST secure/api/nodes/fd8799d1-ca1a-4d41-a75a-6329b13045c5/applyPolicy' executed in 5154 ms


*/

(for {
in <- IOResult.effect(new PipedInputStream(pipeSize))
out <- IOResult.effect(new PipedOutputStream(in))
f <- copy(out).fork
res <- IOResult.effect(copyStreamTo(pipeSize, in) _)
_ <- f.join
_ <- NodeLoggerPure.debug("=== fiber joined!")
_ <- IOResult.effect(out.close())
_ <- NodeLoggerPure.debug("=== out closed!")
_ <- NodeLoggerPure.trace("remote-run: reading stream from remote API")
_ <- copy(out, readTimeout).fork // read from HTTP request
res <- IOResult.effect(copyStreamTo(pipeSize, in) _) // give the writter function waiting for out
// don't close out here, it was closed inside `copy`
} yield res).catchAll { err =>
NodeLoggerPure.error(msg(err.fullMsg)) *>
IOResult.effect(copyStreamTo(pipeSize, new ByteArrayInputStream(msg(err.msg).getBytes(StandardCharsets.UTF_8))) _)
NodeLoggerPure.error(errorMessageWithHint(err.fullMsg)) *>
IOResult.effect(copyStreamTo(pipeSize, new ByteArrayInputStream(errorMessageWithHint(err.msg).getBytes(StandardCharsets.UTF_8))) _)
}.runNow
}

Expand Down