Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure versionPromise always completes #1155

Merged
merged 4 commits into from May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,21 +34,24 @@ final class AppVersionRevision(implicit system: ExtendedActorSystem) extends Ext
def getRevision(): Future[Version] = versionPromise.future

def start(): Unit = {
if (k8sSettings.podName.isEmpty) {
log.error(
"Not able to read the app version from the revision of the current ReplicaSet. Reason:" +
"No configuration found to extract the pod name from. " +
s"Be sure to provide the pod name with `$configPath.pod-name` " +
"or by setting ENV variable `KUBERNETES_POD_NAME`.")
} else {
if (isInitialized.compareAndSet(false, true)) {
if (isInitialized.compareAndSet(false, true)) {
if (k8sSettings.podName.isEmpty) {
val msg = "Not able to read the app version from the revision of the current ReplicaSet. Reason:" +
"No configuration found to extract the pod name from. " +
s"Be sure to provide the pod name with `$configPath.pod-name` " +
"or by setting ENV variable `KUBERNETES_POD_NAME`."
log.error(msg)
versionPromise.failure(new IllegalStateException(msg))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If podname is not set, there is nothing else we can do, but if we try to later use getRevision we get a Future that never completes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe we should not return an IllegalStateException, because it's not illegal. It's just missing info causing a degraded state.

Copy link
Member Author

@octonato octonato May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fixed in d7eb68c

} else {
Cluster(system).setAppVersionLater(getRevision())
KubernetesApiImpl(log, k8sSettings).foreach { kubernetesApi =>
versionPromise.completeWith(kubernetesApi.readRevision().map(Version(_)))
}
} else
log.warning("AppVersionRevision extension already initiated, yet start() method was called again. Ignoring.")
}
} else {
log.warning("AppVersionRevision extension already initiated, yet start() method was called again. Ignoring.")
}

}

// autostart if the extension is loaded through the config extension list
Expand Down
Expand Up @@ -5,34 +5,12 @@
package akka.rollingupdate.kubernetes

import akka.actor.ActorSystem
import akka.testkit.EventFilter
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import com.fasterxml.jackson.databind.ObjectMapper
import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock.aResponse
import com.github.tomakehurst.wiremock.client.WireMock.get
import com.github.tomakehurst.wiremock.client.WireMock.stubFor
import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo
import com.github.tomakehurst.wiremock.client.MappingBuilder
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder
import com.github.tomakehurst.wiremock.client.WireMock
import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
import com.github.tomakehurst.wiremock.matching.EqualToPattern
import com.github.tomakehurst.wiremock.stubbing.Scenario
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Millis
import org.scalatest.time.Seconds
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach

import scala.concurrent.duration._

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code here moved to KubernetesApiSpec because it's only testing KubernetesApi.

AppVersionRevisionSpec is not testing the failure case inside AppVersionRevision.start

object AppVersionRevisionSpec {
val config = ConfigFactory.parseString("""
akka.loggers = ["akka.testkit.TestEventListener"]
Expand All @@ -45,188 +23,36 @@ object AppVersionRevisionSpec {
akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
akka.test.filter-leeway = 10s
akka.rollingupdate.kubernetes {
api-ca-path = ""
api-token-path = ""
api-service-host = ""
api-service-port = 0
namespace = ""
namespace-path = ""
pod-name = ""
secure-api-server = false
api-service-request-timeout = 2s
}
""")
}

class AppVersionRevisionSpec
extends TestKit(
ActorSystem(
"AppVersionRevisionSpec",
AppVersionRevisionSpec.config
KubernetesApiSpec.config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this wrong?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, that's wrong. Copy-n-pasta wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

It turns out that tests were passing even without using the right config. Which makes sense because podname defaults to empty in the reference.conf.

Anyway, I keep the new config just to make it very explicit, but did some clean-up as well.

))
with ImplicitSender
with AnyWordSpecLike
with Matchers
with BeforeAndAfterAll
with BeforeAndAfterEach
with Eventually
with ScalaFutures {

private val wireMockServer = new WireMockServer(wireMockConfig().port(0))
wireMockServer.start()
WireMock.configureFor(wireMockServer.port())

// for wiremock to provide json
val mapper = new ObjectMapper()

private val namespace = "namespace-test"
private val podName1 = "pod-test-1"

private def settings(podName: String) = {
new KubernetesSettings(
apiCaPath = "",
apiTokenPath = "",
apiServiceHost = "localhost",
apiServicePort = wireMockServer.port(),
namespace = Some(namespace),
namespacePath = "",
podName = podName,
secure = false,
apiServiceRequestTimeout = 2.seconds,
customResourceSettings = new CustomResourceSettings(enabled = false, crName = None, 60.seconds)
)
}

private val kubernetesApi =
new KubernetesApiImpl(
system,
settings(podName1),
namespace,
apiToken = "apiToken",
clientHttpsConnectionContext = None)

override implicit val patienceConfig: PatienceConfig =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))

override protected def afterAll(): Unit = super.shutdown()

override protected def beforeEach(): Unit = {
wireMockServer.resetAll()
WireMock.resetAllScenarios()
}

private def podPath(podName: String) =
urlEqualTo(s"/api/v1/namespaces/$namespace/pods/$podName")

private def replicaPath(replica: String) =
urlEqualTo(s"/apis/apps/v1/namespaces/$namespace/replicasets/$replica")

private def getPod(podName: String): MappingBuilder =
get(podPath(podName)).withHeader("Content-Type", new EqualToPattern("application/json"))

private def getReplicaSet(replica: String): MappingBuilder =
get(replicaPath(replica)).withHeader("Content-Type", new EqualToPattern("application/json"))

private val defaultPodResponseJson =
"""{
| "metadata": {
| "ownerReferences": [
| {"name": "wrong-replicaset-id", "kind": "SomethingElse"},
| {"name": "parent-replicaset-id", "kind": "ReplicaSet"}
| ]
| }
|}""".stripMargin

private val defaultReplicaResponseJson =
"""{
| "metadata": {
| "annotations": {
| "deployment.kubernetes.io/revision": "1"
| }
| }
|}""".stripMargin

private def stubPodResponse(json: String = defaultPodResponseJson, state: String = Scenario.STARTED) =
stubFor(
getPod(podName1)
.willReturn(
ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json))
)
.inScenario("pod")
.whenScenarioStateIs(state))

private def stubReplicaResponse(json: String = defaultReplicaResponseJson) =
stubFor(
getReplicaSet("parent-replicaset-id")
.willReturn(
ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json))
)
.inScenario("replica")
.whenScenarioStateIs(Scenario.STARTED))

"Read revision from Kubernetes" should {

"parse pod and replica response to get the revision" in {
stubPodResponse()
stubReplicaResponse()

EventFilter
.info(pattern = "Reading revision from Kubernetes: akka.cluster.app-version was set to 1", occurrences = 1)
.intercept {
kubernetesApi.readRevision().futureValue should be("1")
}
}

"retry and then fail when pod not found" in {
stubFor(getPod(podName1).willReturn(aResponse().withStatus(404)))
EventFilter
.warning(pattern = ".*Failed to get revision", occurrences = 5)
.intercept({
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
})
}

"retry and then fail when replicaset not found" in {
stubPodResponse()
stubFor(getReplicaSet("parent-replicaset-id").willReturn(aResponse().withStatus(404)))
EventFilter
.warning(pattern = ".*Failed to get revision", occurrences = 5)
.intercept({
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
})
}

"log if pod json can not be parsed" in {
stubPodResponse(json = """{ "invalid": "json" }""")
EventFilter
.warning(pattern = ".*Error while parsing Pod*")
.intercept({
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
})
}

"log if replica json can not be parsed" in {
stubPodResponse()
stubReplicaResponse(json = """{ "invalid": "json" }""")
EventFilter
.warning(pattern = ".*Error while parsing Pod*")
.intercept({
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
})
}

"break the loop if consecutive request succeeds" in {
stubFor(
getPod(podName1)
.willReturn(aResponse().withStatus(404))
.inScenario("pod")
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo("after first fail")
)
stubFor(
getPod(podName1)
.willReturn(aResponse().withStatus(404))
.inScenario("pod")
.whenScenarioStateIs("after first fail")
.willSetStateTo("k8s is happy now")
)
stubPodResponse(state = "k8s is happy now")
stubReplicaResponse()
EventFilter
.warning(pattern = ".*Try again*", occurrences = 2)
.intercept({
kubernetesApi.readRevision().futureValue should be("1")
})
"AppVersionRevision extension" should {
"return failed future if pod-name not configured" in {
val revisionExtension = AppVersionRevision(system)
revisionExtension.start()
val failure = revisionExtension.getRevision().failed.futureValue
failure.getMessage should include("No configuration found to extract the pod name from")
}
}
}