Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure versionPromise always completes #1155

Merged
merged 4 commits into from May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,21 +34,24 @@ final class AppVersionRevision(implicit system: ExtendedActorSystem) extends Ext
def getRevision(): Future[Version] = versionPromise.future

def start(): Unit = {
if (k8sSettings.podName.isEmpty) {
log.error(
"Not able to read the app version from the revision of the current ReplicaSet. Reason:" +
"No configuration found to extract the pod name from. " +
s"Be sure to provide the pod name with `$configPath.pod-name` " +
"or by setting ENV variable `KUBERNETES_POD_NAME`.")
} else {
if (isInitialized.compareAndSet(false, true)) {
if (isInitialized.compareAndSet(false, true)) {
if (k8sSettings.podName.isEmpty) {
val msg = "Not able to read the app version from the revision of the current ReplicaSet. Reason:" +
"No configuration found to extract the pod name from. " +
s"Be sure to provide the pod name with `$configPath.pod-name` " +
"or by setting ENV variable `KUBERNETES_POD_NAME`."
log.error(msg)
versionPromise.failure(new MissingPodNameException(msg))
} else {
Cluster(system).setAppVersionLater(getRevision())
KubernetesApiImpl(log, k8sSettings).foreach { kubernetesApi =>
versionPromise.completeWith(kubernetesApi.readRevision().map(Version(_)))
}
} else
log.warning("AppVersionRevision extension already initiated, yet start() method was called again. Ignoring.")
}
} else {
log.warning("AppVersionRevision extension already initiated, yet start() method was called again. Ignoring.")
}

}

// autostart if the extension is loaded through the config extension list
Expand Down
Expand Up @@ -49,6 +49,11 @@ private[akka] final case class PodCost(podName: String, cost: Int, address: Stri
*/
@InternalApi private[akka] sealed class ReadRevisionException(message: String) extends RuntimeException(message)

/**
* INTERNAL API
*/
@InternalApi private[akka] sealed class MissingPodNameException(message: String) extends RuntimeException(message)

/**
* INTERNAL API
*/
Expand Down
Expand Up @@ -5,228 +5,40 @@
package akka.rollingupdate.kubernetes

import akka.actor.ActorSystem
import akka.testkit.EventFilter
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import com.fasterxml.jackson.databind.ObjectMapper
import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock.aResponse
import com.github.tomakehurst.wiremock.client.WireMock.get
import com.github.tomakehurst.wiremock.client.WireMock.stubFor
import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo
import com.github.tomakehurst.wiremock.client.MappingBuilder
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder
import com.github.tomakehurst.wiremock.client.WireMock
import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
import com.github.tomakehurst.wiremock.matching.EqualToPattern
import com.github.tomakehurst.wiremock.stubbing.Scenario
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Millis
import org.scalatest.time.Seconds
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach

import scala.concurrent.duration._

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code here moved to KubernetesApiSpec because it's only testing KubernetesApi.

AppVersionRevisionSpec is not testing the failure case inside AppVersionRevision.start

object AppVersionRevisionSpec {
val config = ConfigFactory.parseString("""
akka.loggers = ["akka.testkit.TestEventListener"]
akka.actor.provider = cluster

akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1

akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
akka.test.filter-leeway = 10s
akka.rollingupdate.kubernetes.pod-name = ""
""")
}

class AppVersionRevisionSpec
extends TestKit(
ActorSystem(
"AppVersionRevisionSpec",
AppVersionRevisionSpec.config
))
with ImplicitSender
with AnyWordSpecLike
with Matchers
with BeforeAndAfterAll
with BeforeAndAfterEach
with Eventually
with ScalaFutures {

private val wireMockServer = new WireMockServer(wireMockConfig().port(0))
wireMockServer.start()
WireMock.configureFor(wireMockServer.port())

// for wiremock to provide json
val mapper = new ObjectMapper()

private val namespace = "namespace-test"
private val podName1 = "pod-test-1"

private def settings(podName: String) = {
new KubernetesSettings(
apiCaPath = "",
apiTokenPath = "",
apiServiceHost = "localhost",
apiServicePort = wireMockServer.port(),
namespace = Some(namespace),
namespacePath = "",
podName = podName,
secure = false,
apiServiceRequestTimeout = 2.seconds,
customResourceSettings = new CustomResourceSettings(enabled = false, crName = None, 60.seconds)
)
}

private val kubernetesApi =
new KubernetesApiImpl(
system,
settings(podName1),
namespace,
apiToken = "apiToken",
clientHttpsConnectionContext = None)

override implicit val patienceConfig: PatienceConfig =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))

override protected def afterAll(): Unit = super.shutdown()

override protected def beforeEach(): Unit = {
wireMockServer.resetAll()
WireMock.resetAllScenarios()
}

private def podPath(podName: String) =
urlEqualTo(s"/api/v1/namespaces/$namespace/pods/$podName")

private def replicaPath(replica: String) =
urlEqualTo(s"/apis/apps/v1/namespaces/$namespace/replicasets/$replica")

private def getPod(podName: String): MappingBuilder =
get(podPath(podName)).withHeader("Content-Type", new EqualToPattern("application/json"))

private def getReplicaSet(replica: String): MappingBuilder =
get(replicaPath(replica)).withHeader("Content-Type", new EqualToPattern("application/json"))

private val defaultPodResponseJson =
"""{
| "metadata": {
| "ownerReferences": [
| {"name": "wrong-replicaset-id", "kind": "SomethingElse"},
| {"name": "parent-replicaset-id", "kind": "ReplicaSet"}
| ]
| }
|}""".stripMargin

private val defaultReplicaResponseJson =
"""{
| "metadata": {
| "annotations": {
| "deployment.kubernetes.io/revision": "1"
| }
| }
|}""".stripMargin

private def stubPodResponse(json: String = defaultPodResponseJson, state: String = Scenario.STARTED) =
stubFor(
getPod(podName1)
.willReturn(
ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json))
)
.inScenario("pod")
.whenScenarioStateIs(state))

private def stubReplicaResponse(json: String = defaultReplicaResponseJson) =
stubFor(
getReplicaSet("parent-replicaset-id")
.willReturn(
ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json))
)
.inScenario("replica")
.whenScenarioStateIs(Scenario.STARTED))

"Read revision from Kubernetes" should {

"parse pod and replica response to get the revision" in {
stubPodResponse()
stubReplicaResponse()

EventFilter
.info(pattern = "Reading revision from Kubernetes: akka.cluster.app-version was set to 1", occurrences = 1)
.intercept {
kubernetesApi.readRevision().futureValue should be("1")
}
}

"retry and then fail when pod not found" in {
stubFor(getPod(podName1).willReturn(aResponse().withStatus(404)))
EventFilter
.warning(pattern = ".*Failed to get revision", occurrences = 5)
.intercept({
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
})
}

"retry and then fail when replicaset not found" in {
stubPodResponse()
stubFor(getReplicaSet("parent-replicaset-id").willReturn(aResponse().withStatus(404)))
EventFilter
.warning(pattern = ".*Failed to get revision", occurrences = 5)
.intercept({
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
})
}

"log if pod json can not be parsed" in {
stubPodResponse(json = """{ "invalid": "json" }""")
EventFilter
.warning(pattern = ".*Error while parsing Pod*")
.intercept({
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
})
}

"log if replica json can not be parsed" in {
stubPodResponse()
stubReplicaResponse(json = """{ "invalid": "json" }""")
EventFilter
.warning(pattern = ".*Error while parsing Pod*")
.intercept({
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
})
}

"break the loop if consecutive request succeeds" in {
stubFor(
getPod(podName1)
.willReturn(aResponse().withStatus(404))
.inScenario("pod")
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo("after first fail")
)
stubFor(
getPod(podName1)
.willReturn(aResponse().withStatus(404))
.inScenario("pod")
.whenScenarioStateIs("after first fail")
.willSetStateTo("k8s is happy now")
)
stubPodResponse(state = "k8s is happy now")
stubReplicaResponse()
EventFilter
.warning(pattern = ".*Try again*", occurrences = 2)
.intercept({
kubernetesApi.readRevision().futureValue should be("1")
})
"AppVersionRevision extension" should {
"return failed future if pod-name is not configured" in {
val revisionExtension = AppVersionRevision(system)
revisionExtension.start()
val failure = revisionExtension.getRevision().failed.futureValue
failure.getMessage should include("No configuration found to extract the pod name from")
}
}
}