Skip to content

Commit

Permalink
feat: new arisu
Browse files Browse the repository at this point in the history
  • Loading branch information
pxseu committed Sep 28, 2023
1 parent 2fcb26b commit 18f9f14
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 121 deletions.
11 changes: 8 additions & 3 deletions src/commands/containers/logs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::env::temp_dir;

use anyhow::{ensure, Result};
use anyhow::{ensure, Context, Result};
use clap::Parser;
use futures_util::StreamExt;
use tokio::fs;
Expand Down Expand Up @@ -119,19 +119,24 @@ pub async fn handle(options: Options, state: State) -> Result<()> {
format_logs(&logs, true, options.timestamps, options.details).join("\n")
);

let token = state.token().unwrap();
let token = state.token().context("No token found")?;

let mut arisu = ArisuClient::new(&container, &token).await?;

while let Some(message) = arisu.next().await {
match message {
ArisuMessage::Open => arisu.request_logs().await?,

ArisuMessage::ServiceMessage(data) => log::info!("Service: {data}"),
ArisuMessage::Out(log) => {

ArisuMessage::Logs(log) => {
print!(
"{}",
format_logs(&[log], true, options.timestamps, options.details)[0]
);
}

_ => {}
}
}

Expand Down
57 changes: 19 additions & 38 deletions src/commands/containers/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::io::Write;

use anyhow::{ensure, Result};
use anyhow::{ensure, Context, Result};
use clap::Parser;
use console::Term;
use leap_client_rs::leap::types::Event;
use leap_client_rs::{LeapEdge, LeapOptions};
use futures_util::StreamExt;

use super::types::ContainerEvents;
use super::utils::{format_containers, get_all_containers, get_container};
use crate::commands::containers::utils::format_single_metrics;
use crate::commands::ignite::utils::{format_deployments, get_all_deployments, get_deployment};
use crate::config::LEAP_PROJECT;
use crate::state::State;
use crate::utils::arisu::{ArisuClient, ArisuMessage};

#[derive(Debug, Parser)]
#[clap(about = "Get metrics for a container")]
Expand Down Expand Up @@ -70,43 +68,26 @@ pub async fn handle(options: Options, state: State) -> Result<()> {
return Ok(());
}

let mut leap = LeapEdge::new(LeapOptions {
token: Some(&state.ctx.current.clone().unwrap().leap_token),
project: &std::env::var("LEAP_PROJECT").unwrap_or_else(|_| LEAP_PROJECT.to_string()),
ws_url: &std::env::var("LEAP_WS_URL")
.unwrap_or_else(|_| LeapOptions::default().ws_url.to_string()),
})
.await?;

while let Some(msg) = leap.listen().await {
let capsuled = match msg {
Event::Message(message) => message,

_ => continue,
};

let Ok(container_events) = serde_json::from_value(serde_json::to_value(capsuled.data)?) else {
continue;
};

let metrics = match container_events {
ContainerEvents::ContainerMetricsUpdate {
container_id,
metrics,
} => {
if container_id != container.id {
continue;
}
let token = state.token().context("No token found")?;

metrics
}
};
let mut arisu = ArisuClient::new(&container.id, &token).await?;

while let Some(message) = arisu.next().await {
match message {
ArisuMessage::Open => arisu.request_metrics().await?,

let metrics = format_single_metrics(&Some(metrics), &deployment)?;
ArisuMessage::Metrics(metrics) => {
let metrics = format_single_metrics(&Some(metrics), &deployment)?;

term.clear_last_lines(metrics.len())?;
if !state.debug {
term.clear_last_lines(metrics.len())?;
}

writeln!(term, "{}", metrics.join("\n"))?
}

writeln!(term, "{}", metrics.join("\n"))?
_ => {}
}
}

Ok(())
Expand Down
76 changes: 38 additions & 38 deletions src/commands/volumes/backup.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,38 @@
use anyhow::{Context, Result};
use clap::Parser;

use super::copy::fslike::FsLike;
use crate::state::State;

#[derive(Debug, Parser)]
#[clap(about = "Backup files from a deployment to local machine")]
#[group(skip)]
pub struct Options {
#[clap(help = "Deployment name or id")]
pub source: String,
}

pub async fn handle(options: Options, state: State) -> Result<()> {
let source = FsLike::from_str(&state, &format!("{}:/", options.source)).await?;

let backup_file = dirs::download_dir()
.or(dirs::home_dir().map(|home| home.join("Downloads")))
.context("Could not find a download directory")?
.join(format!(
"hop-backup_{}_{}.tar.gz",
options.source,
chrono::Local::now().format("%Y-%m-%d_%H-%M-%S")
))
.to_string_lossy()
.to_string();

let (_, data) = source.read().await?;

tokio::fs::write(&backup_file, data)
.await
.with_context(|| format!("Could not write to {backup_file}"))?;

log::info!("Backup saved to {backup_file}");

Ok(())
}
use anyhow::{Context, Result};
use clap::Parser;

use super::copy::fslike::FsLike;
use crate::state::State;

#[derive(Debug, Parser)]
#[clap(about = "Backup files from a deployment to local machine")]
#[group(skip)]
pub struct Options {
#[clap(help = "Deployment name or id")]
pub source: String,
}

pub async fn handle(options: Options, state: State) -> Result<()> {
let source = FsLike::from_str(&state, &format!("{}:/", options.source)).await?;

let backup_file = dirs::download_dir()
.or(dirs::home_dir().map(|home| home.join("Downloads")))
.context("Could not find a download directory")?
.join(format!(
"hop-backup_{}_{}.tar.gz",
options.source,
chrono::Local::now().format("%Y-%m-%d_%H-%M-%S")
))
.to_string_lossy()
.to_string();

let (_, data) = source.read().await?;

tokio::fs::write(&backup_file, data)
.await
.with_context(|| format!("Could not write to {backup_file}"))?;

log::info!("Backup saved to {backup_file}");

Ok(())
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub async fn run() -> Result<()> {
let state = State::new(StateOptions {
override_project: std::env::var("PROJECT_ID").ok().or(cli.project),
override_token: std::env::var("TOKEN").ok(),
debug: cli.debug,
})
.await?;

Expand Down
3 changes: 3 additions & 0 deletions src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct State {
pub auth: Auth,
pub ctx: Context,
pub http: HttpClient,
pub debug: bool,
token: Option<String>,
token_type: Option<TokenType>,
}
Expand All @@ -24,6 +25,7 @@ pub struct State {
pub struct StateOptions {
pub override_project: Option<String>,
pub override_token: Option<String>,
pub debug: bool,
}

impl State {
Expand Down Expand Up @@ -65,6 +67,7 @@ impl State {
http,
auth,
ctx,
debug: options.debug,
})
}

Expand Down
66 changes: 59 additions & 7 deletions src/utils/arisu/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,45 @@
mod shard;
mod types;

use std::sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc,
};

use anyhow::Result;
use futures_util::Stream;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver /* , UnboundedSender */};
use serde_json::{json, Value};
use tokio::sync::mpsc::{
unbounded_channel, UnboundedReceiver, UnboundedSender, /* , UnboundedSender */
};

use self::shard::{ArisuShard, ArisuShardInfo};
pub use self::types::ArisuMessage;
use self::{
shard::{ArisuShard, ArisuShardInfo},
types::OpCode,
};

pub struct ArisuClient {
// tx: UnboundedSender<String>,
tx: UnboundedSender<Value>,
rx: UnboundedReceiver<ArisuMessage>,
logs_requested: Arc<AtomicBool>,
metrics_requested: Arc<AtomicBool>,
}

impl ArisuClient {
pub async fn new(container_id: &str, token: &str) -> Result<Self> {
let (_arisu_out_tx, arisu_out_rx) = unbounded_channel::<String>();
let (arisu_in_tx, arisu_in_rx) = unbounded_channel::<ArisuMessage>();
let (tx, arisu_out_rx) = unbounded_channel::<_>();
let (arisu_in_tx, rx) = unbounded_channel::<_>();
let logs_requested = Arc::new(AtomicBool::new(false));
let metrics_requested = Arc::new(AtomicBool::new(false));

let shard_info = ArisuShardInfo {
arisu_in_tx,
arisu_out_rx,
container_id: container_id.to_string(),
token: token.to_string(),
logs_requested: logs_requested.clone(),
metrics_requested: metrics_requested.clone(),
};

let mut shard = ArisuShard::new(shard_info).await?;
Expand All @@ -34,10 +51,45 @@ impl ArisuClient {
});

Ok(Self {
// tx: arisu_out_tx,
rx: arisu_in_rx,
tx,
rx,
logs_requested,
metrics_requested,
})
}

pub async fn request_logs(&self) -> Result<()> {
if self.logs_requested.load(Relaxed) {
return Ok(());
}

self.tx.send(json!({
"op": OpCode::RequestLogs,
}))?;

while !self.logs_requested.load(Relaxed) {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

Ok(())
}

#[allow(dead_code)]
pub async fn request_metrics(&self) -> Result<()> {
if self.metrics_requested.load(Relaxed) {
return Ok(());
}

self.tx.send(json!({
"op": OpCode::RequestMetrics,
}))?;

while !self.metrics_requested.load(Relaxed) {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

Ok(())
}
}

impl Stream for ArisuClient {
Expand Down

0 comments on commit 18f9f14

Please sign in to comment.