Skip to content

Commit

Permalink
Implement updates of deployment when echo changes
Browse files Browse the repository at this point in the history
Implements updating the replicas field for the deployment when the
accociated echo resource gets updated.
  • Loading branch information
AlexanderThaller committed Nov 22, 2023
1 parent 6674e76 commit 1ccd903
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 10 deletions.
2 changes: 1 addition & 1 deletion echo-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ metadata:
name: test-echo # Name of the "Echo" custom resource instance, may be changed to your liking
namespace: default # Namespace must exist and account in KUBECONFIG must have sufficient permissions
spec:
replicas: 2 # Number of "Echo" pods created.
replicas: 2 # Number of "Echo" pods created.
28 changes: 27 additions & 1 deletion src/echo.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec};
use k8s_openapi::api::core::v1::{Container, ContainerPort, PodSpec, PodTemplateSpec};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::api::{DeleteParams, ObjectMeta, PostParams};
use kube::api::{DeleteParams, ObjectMeta, Patch, PatchParams, PostParams};
use kube::{Api, Client, Error};
use serde_json::json;
use std::collections::BTreeMap;

/// Creates a new deployment of `n` pods with the `inanimate/echo-server:latest` docker image inside,
Expand Down Expand Up @@ -68,6 +69,31 @@ pub async fn deploy(
.await
}

pub async fn update(
client: Client,
name: &str,
replicas: i32,
namespace: &str,
) -> Result<Deployment, Error> {
// Get the existing deployment
let deployment_api: Api<Deployment> = Api::namespaced(client, namespace);
let patch = json!({
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": name,
},
"spec": {
"replicas": replicas,
}
});
let params = PatchParams::apply("echo-operator").force();
let patch = Patch::Apply(&patch);
let deployment = deployment_api.patch(name, &params, &patch).await?;

Ok(deployment)
}

/// Deletes an existing deployment.
///
/// # Arguments:
Expand Down
41 changes: 33 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use futures::stream::StreamExt;
use k8s_openapi::api::apps::v1::Deployment;
use kube::runtime::watcher::Config;
use kube::Resource;
use kube::ResourceExt;
Expand Down Expand Up @@ -69,6 +70,8 @@ enum EchoAction {
Create,
/// Delete all subresources created in the `Create` phase
Delete,
/// Update existing subresources to match the desired state
Update,
/// This `Echo` resource is in desired state and requires no actions to be taken
NoOp,
}
Expand All @@ -94,7 +97,7 @@ async fn reconcile(echo: Arc<Echo>, context: Arc<ContextData>) -> Result<Action,
let name = echo.name_any(); // Name of the Echo resource is used to name the subresources as well.

// Performs action as decided by the `determine_action` function.
return match determine_action(&echo) {
match determine_action(context.client.clone(), &echo).await? {
EchoAction::Create => {
// Creates a deployment with `n` Echo service pods, but applies a finalizer first.
// Finalizer is applied first, as the operator might be shut down and restarted
Expand All @@ -108,6 +111,7 @@ async fn reconcile(echo: Arc<Echo>, context: Arc<ContextData>) -> Result<Action,
echo::deploy(client, &name, echo.spec.replicas, &namespace).await?;
Ok(Action::requeue(Duration::from_secs(10)))
}

EchoAction::Delete => {
// Deletes any subresources related to this `Echo` resources. If and only if all subresources
// are deleted, the finalizer is removed and Kubernetes is free to remove the `Echo` resource.
Expand All @@ -123,9 +127,16 @@ async fn reconcile(echo: Arc<Echo>, context: Arc<ContextData>) -> Result<Action,
finalizer::delete(client, &name, &namespace).await?;
Ok(Action::await_change()) // Makes no sense to delete after a successful delete, as the resource is gone
}

EchoAction::Update => {
// Update the deployment to match the desired state
echo::update(client, &name, echo.spec.replicas, &namespace).await?;
Ok(Action::requeue(Duration::from_secs(10)))
}

// The resource is already in desired state, do nothing and re-check after 10 seconds
EchoAction::NoOp => Ok(Action::requeue(Duration::from_secs(10))),
};
}
}

/// Resources arrives into reconciliation queue in a certain state. This function looks at
Expand All @@ -134,19 +145,33 @@ async fn reconcile(echo: Arc<Echo>, context: Arc<ContextData>) -> Result<Action,
///
/// # Arguments
/// - `echo`: A reference to `Echo` being reconciled to decide next action upon.
fn determine_action(echo: &Echo) -> EchoAction {
return if echo.meta().deletion_timestamp.is_some() {
EchoAction::Delete
async fn determine_action(client: Client, echo: &Echo) -> Result<EchoAction, Error> {
if echo.meta().deletion_timestamp.is_some() {
Ok(EchoAction::Delete)
} else if echo
.meta()
.finalizers
.as_ref()
.map_or(true, |finalizers| finalizers.is_empty())
{
EchoAction::Create
Ok(EchoAction::Create)
} else {
EchoAction::NoOp
};
let deployment_api: Api<Deployment> = Api::namespaced(
client,
echo.meta()
.namespace
.as_ref()
.expect("expected namespace to be set"),
);

let deployment = deployment_api.get(&echo.name_any()).await?;

if deployment.spec.expect("expected spec to be set").replicas != Some(echo.spec.replicas) {
Ok(EchoAction::Update)
} else {
Ok(EchoAction::NoOp)
}
}
}

/// Actions to be taken when a reconciliation fails - for whatever reason.
Expand Down

0 comments on commit 1ccd903

Please sign in to comment.