Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recreate http client on resume() #4185

Merged
merged 3 commits into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ object Container {
loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
}

/**
* Abstraction for Container operations.
* Container manipulation (specifically suspend/resume/destroy) is NOT thread-safe and MUST be synchronized by caller.
* Container access (specifically run) is thread-safe (e.g. for concurrent activation processing).
*/
trait Container {

implicit protected val as: ActorSystem
Expand All @@ -73,7 +78,11 @@ trait Container {
/** HTTP connection to the container, will be lazily established by callContainer */
protected var httpConnection: Option[ContainerClient] = None

/** Stops the container from consuming CPU cycles. */
/** maxConcurrent+timeout are cached during first init, so that resuming connections can reference */
protected var containerHttpMaxConcurrent: Int = 1
protected var containerHttpTimeout: FiniteDuration = 60.seconds
Copy link
Contributor

@markusthoemmes markusthoemmes Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was the case before, but we should add a comment to the trait stating that it's not thread-safe and the caller MUST ensure synchronization (which we do by using an actor)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markusthoemmes Hello. I'm a newbie at scala and having a hard time reading the source code. Can you please explain what thread-safe means above, and why resume() (which I'm guessing that it somehow calls "docker container resume") is not thread safe??


/** Stops the container from consuming CPU cycles. NOT thread-safe - caller must synchronize. */
def suspend()(implicit transid: TransactionId): Future[Unit] = {
//close connection first, then close connection pool
//(testing pool recreation vs connection closing, time was similar - so using the simpler recreation approach)
Expand All @@ -82,8 +91,11 @@ trait Container {
closeConnections(toClose)
}

/** Dual of halt. */
def resume()(implicit transid: TransactionId): Future[Unit]
/** Dual of halt. NOT thread-safe - caller must synchronize.*/
def resume()(implicit transid: TransactionId): Future[Unit] = {
httpConnection = Some(openConnections(containerHttpTimeout, containerHttpMaxConcurrent))
Future.successful({})
}

/** Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear. */
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any]
Expand All @@ -101,7 +113,8 @@ trait Container {
LoggingMarkers.INVOKER_ACTIVATION_INIT,
s"sending initialization to $id $addr",
logLevel = InfoLevel)

containerHttpMaxConcurrent = maxConcurrent
containerHttpTimeout = timeout
val body = JsObject("value" -> initializer)
callContainer("/init", body, timeout, maxConcurrent, retry = true)
.andThen { // never fails
Expand Down Expand Up @@ -132,7 +145,7 @@ trait Container {
}
}

/** Runs code in the container. */
/** Runs code in the container. Thread-safe - caller may invoke concurrently for concurrent activation processing. */
def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, maxConcurrent: Int)(
implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
Expand Down Expand Up @@ -185,15 +198,7 @@ trait Container {
retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
val started = Instant.now()
val http = httpConnection.getOrElse {
val conn = if (Container.config.akkaClient) {
new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
} else {
new ApacheBlockingContainerClient(
s"${addr.host}:${addr.port}",
timeout,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
maxConcurrent)
}
val conn = openConnections(timeout, maxConcurrent)
httpConnection = Some(conn)
conn
}
Expand All @@ -204,6 +209,17 @@ trait Container {
RunResult(Interval(started, finished), response)
}
}
private def openConnections(timeout: FiniteDuration, maxConcurrent: Int) = {
if (Container.config.akkaClient) {
Copy link
Contributor

@ddragosd ddragosd Jan 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot why are we still maintaining the Akka Client and the Apache Client, since the latter was found in #3812 to perform poorly ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we ready to move to the akka client exclusively? there were some reservations about making sure we didn't get bit by some akka issues as in the past.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ready 😄
FWIW, we have been using akka client exclusively in prod, with concurrency enabled, since December.
Tests are already using it as well.
I'm not sure if there are others that are interested in testing it more, but it would be great to retire the apache client IMHO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's confirm that via dev-list.

new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
} else {
new ApacheBlockingContainerClient(
s"${addr.host}:${addr.port}",
timeout,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
maxConcurrent)
}
}
private def closeConnections(toClose: Option[ContainerClient]): Future[Unit] = {
toClose.map(_.close()).getOrElse(Future.successful(()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ class MesosTask(override protected val id: ContainerId,

/** Dual of halt. */
override def resume()(implicit transid: TransactionId): Future[Unit] = {
// resume not supported
Future.successful(Unit)
super.resume()
// resume not supported (just return result from super)
}

/** Completely destroys this instance of the container. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ class DockerContainer(protected val id: ContainerId,
override def suspend()(implicit transid: TransactionId): Future[Unit] = {
super.suspend().flatMap(_ => if (useRunc) runc.pause(id) else docker.pause(id))
}
def resume()(implicit transid: TransactionId): Future[Unit] =
if (useRunc) { runc.resume(id) } else { docker.unpause(id) }
override def resume()(implicit transid: TransactionId): Future[Unit] = {
if (useRunc) { runc.resume(id) } else { docker.unpause(id) }.map(_ => super.resume())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flatMap

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the map() really run on the full if expression or just the else block?

Copy link
Member

@rabbah rabbah Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when in doubt, refactor...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question @sven-lange-last. Better safe than sorry and make an interim value or put parentheses around the if expression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's needed, but agree it will at least be more clear. Also changed to flatMap

}
override def destroy()(implicit transid: TransactionId): Future[Unit] = {
super.destroy()
docker.rm(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ class KubernetesContainer(protected[core] val id: ContainerId,
super.suspend().flatMap(_ => kubernetes.suspend(this))
}

def resume()(implicit transid: TransactionId): Future[Unit] = kubernetes.resume(this)
override def resume()(implicit transid: TransactionId): Future[Unit] =
kubernetes.resume(this).map(_ => super.resume())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a flatMap to also "wait" for the future returned by super.resume.


override def destroy()(implicit transid: TransactionId): Future[Unit] = {
super.destroy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct
val logging: Logging)
extends Container {
override def suspend()(implicit transid: TransactionId): Future[Unit] = ???
def resume()(implicit transid: TransactionId): Future[Unit] = ???
override def resume()(implicit transid: TransactionId): Future[Unit] = ???

def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId) = lines

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
package org.apache.openwhisk.core.containerpool.test

import java.time.Instant

import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
import akka.actor.{ActorRef, ActorSystem, FSM}
import akka.stream.scaladsl.Source
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.ByteString
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction, WhiskProperties}
import java.util.concurrent.atomic.AtomicInteger

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
Expand All @@ -41,7 +39,7 @@ import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.core.database.UserContext

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}

Expand Down Expand Up @@ -1079,11 +1077,19 @@ class ContainerProxyTests
def runCount = atomicRunCount.get()
override def suspend()(implicit transid: TransactionId): Future[Unit] = {
suspendCount += 1
super.suspend()
val s = super.suspend()
Await.result(s, 5.seconds)
//verify that httpconn is closed
httpConnection should be(None)
s
}
def resume()(implicit transid: TransactionId): Future[Unit] = {
override def resume()(implicit transid: TransactionId): Future[Unit] = {
resumeCount += 1
Future.successful(())
val r = super.resume()
Await.result(r, 5.seconds)
//verify that httpconn is recreated
httpConnection should be('defined)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please await the futures of either resume and suspend before checking that the client is there. Right now this is not an issue but once the implementation of the trait changes we're prone to heisenbugs.

r
}
override def destroy()(implicit transid: TransactionId): Future[Unit] = {
destroyCount += 1
Expand Down