Skip to content

Commit

Permalink
fix: add keep alive logic for xlinectl lock command
Browse files Browse the repository at this point in the history
Closes: xline-kv#664
Signed-off-by: Phoeniix Zhao <Phoenix500526@163.com>
  • Loading branch information
Phoenix500526 committed Feb 29, 2024
1 parent a44f710 commit 149cae0
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 32 deletions.
19 changes: 3 additions & 16 deletions crates/xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ use xlineapi::{
};

use crate::{
clients::{lease::LeaseClient, watch::WatchClient},
clients::watch::WatchClient,
error::{Result, XlineClientError},
lease_gen::LeaseIdGenerator,
types::{
lease::LeaseGrantRequest,
lock::{LockRequest, UnlockRequest},
watch::WatchRequest,
},
Expand All @@ -35,8 +33,6 @@ use crate::{
pub struct LockClient {
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient>,
/// The lease client
lease_client: LeaseClient,
/// The watch client
watch_client: WatchClient,
/// Auth token
Expand All @@ -47,7 +43,6 @@ impl Debug for LockClient {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LockClient")
.field("lease_client", &self.lease_client)
.field("watch_client", &self.watch_client)
.field("token", &self.token)
.finish()
Expand All @@ -63,11 +58,9 @@ impl LockClient {
curp_client: Arc<CurpClient>,
channel: Channel,
token: Option<String>,
id_gen: Arc<LeaseIdGenerator>,
) -> Self {
Self {
curp_client: Arc::clone(&curp_client),
lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen),
watch_client: WatchClient::new(channel, token.clone()),
token,
}
Expand Down Expand Up @@ -116,14 +109,8 @@ impl LockClient {
/// ```
#[inline]
pub async fn lock(&self, request: LockRequest) -> Result<LockResponse> {
let mut lease_id = request.inner.lease;
if lease_id == 0 {
let resp = self
.lease_client
.grant(LeaseGrantRequest::new(request.ttl))
.await?;
lease_id = resp.id;
}
assert!(request.inner.lease > 0);
let lease_id = request.inner.lease;
let prefix = format!(
"{}/",
String::from_utf8_lossy(&request.inner.name).into_owned()
Expand Down
1 change: 0 additions & 1 deletion crates/xline-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ impl Client {
Arc::clone(&curp_client),
channel.clone(),
token.clone(),
id_gen,
);
let auth = AuthClient::new(curp_client, channel.clone(), token.clone());
let maintenance = MaintenanceClient::new(channel.clone(), token.clone());
Expand Down
9 changes: 6 additions & 3 deletions crates/xlinectl/src/command/lease/keep_alive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result
} else {
tokio::select! {
_ = ctrl_c() => {}
result = keep_alive_loop(keeper, stream) => {
result = keep_alive_loop(keeper, stream, true) => {
return result;
}
}
Expand All @@ -53,14 +53,17 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result
}

/// keep alive forever unless encounter error
async fn keep_alive_loop(
pub(crate) async fn keep_alive_loop(
mut keeper: LeaseKeeper,
mut stream: Streaming<LeaseKeepAliveResponse>,
verbose: bool,
) -> Result<()> {
loop {
keeper.keep_alive()?;
if let Some(resp) = stream.message().await? {
resp.print();
if verbose {
resp.print();
}
if resp.ttl < 0 {
return Err(XlineClientError::InvalidArgs(String::from(
"lease keepalive response has negative ttl",
Expand Down
2 changes: 1 addition & 1 deletion crates/xlinectl/src/command/lease/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::handle_matches;
/// `grant` command
mod grant;
/// `keep_alive` command
mod keep_alive;
pub(crate) mod keep_alive;
/// `list` command
mod list;
/// `revoke` command
Expand Down
35 changes: 24 additions & 11 deletions crates/xlinectl/src/command/lock.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use clap::{arg, ArgMatches, Command};
use tokio::signal;
use tokio::signal::ctrl_c;
use xline_client::{
error::Result,
types::lock::{LockRequest, UnlockRequest},
types::{lock::{LockRequest, UnlockRequest}, lease::{LeaseGrantRequest, LeaseKeepAliveRequest}},
Client,
};

use crate::utils::printer::Printer;
use crate::lease::keep_alive::keep_alive_loop;

/// Default session ttl
const DEFAULT_SESSION_TTL: i64 = 60;

/// Definition of `lock` command
pub(crate) fn command() -> Command {
Expand All @@ -23,20 +27,29 @@ pub(crate) fn build_request(matches: &ArgMatches) -> LockRequest {

/// Execute the command
pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result<()> {
let req = build_request(matches);

let lease_client = client.lease_client();
let resp = lease_client
.grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL))
.await?;
let lock_lease_id = resp.id;
let req = build_request(matches).with_lease(lock_lease_id);
let resp = client.lock_client().lock(req).await?;

resp.print();

signal::ctrl_c().await.expect("failed to listen for event");

println!("releasing the lock");

let unlock_req = UnlockRequest::new(resp.key);
let _unlock_resp = client.lock_client().unlock(unlock_req).await?;
let (keeper, stream) = client.lease_client().keep_alive(LeaseKeepAliveRequest::new(lock_lease_id)).await?;

Ok(())
tokio::select! {
_ = ctrl_c() => {
println!("releasing the lock");
let unlock_req = UnlockRequest::new(resp.key);
let _unlock_resp = client.lock_client().unlock(unlock_req).await?;
return Ok(())
}
result = keep_alive_loop(keeper, stream, false) => {
return result;
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit 149cae0

Please sign in to comment.