Skip to content

Commit

Permalink
Dashboard WIP 2
Browse files Browse the repository at this point in the history
  • Loading branch information
vyomkeshj committed Aug 16, 2021
1 parent 02a85d0 commit 741bbbc
Show file tree
Hide file tree
Showing 17 changed files with 786 additions and 3 deletions.
96 changes: 96 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Expand Up @@ -54,6 +54,11 @@ colored = "2"
byteorder = "1.4"
smallvec = "1.0"

#dashboard
tui = { version = "0.14" }
termion = "1.5"
indicatif = "0.16.2"

[features]
# Mode that does not execute tasks, useful for benchmarking HQ overhead
zero-worker = []
30 changes: 30 additions & 0 deletions src/bin/hq.rs
Expand Up @@ -20,6 +20,10 @@ use hyperqueue::common::arraydef::IntArray;
use hyperqueue::common::fsutils::absolute_path;
use hyperqueue::common::setup::setup_logging;
use hyperqueue::common::timeutils::ArgDuration;
use hyperqueue::common::WrappedRcRefCell;
use hyperqueue::dashboard::dashboard_manager::DashboardManager;
use hyperqueue::dashboard::dashboard_state::DashboardState;
use hyperqueue::rpc_call;
use hyperqueue::server::bootstrap::{
get_client_connection, init_hq_server, print_server_info, ServerConfig,
};
Expand Down Expand Up @@ -59,6 +63,10 @@ struct Opts {
}

#[allow(clippy::large_enum_variant)]
#[derive(Clap)]
#[clap(about = "HyperQueue Dashboard")]
struct DashboardOpts {}

#[derive(Clap)]
enum SubCommand {
/// Commands for controlling the HyperQueue server
Expand All @@ -79,6 +87,8 @@ enum SubCommand {
Wait(WaitOpts),
/// Operations with log
Log(LogOpts),
///Commands for the dashboard
Dashboard(DashboardOpts),
}

// Server CLI options
Expand Down Expand Up @@ -391,6 +401,25 @@ async fn command_wait(gsettings: GlobalSettings, opts: WaitOpts) -> anyhow::Resu
wait_for_job_with_selector(&mut connection, opts.selector_arg.into()).await
}

///Starts the hq Dashboard
/// Make an rpc call to the tako server in a loop and update the status data structure
/// this status data structure is used by the dashboard to
async fn command_dashboard_start(
gsettings: GlobalSettings,
opts: DashboardOpts,
) -> anyhow::Result<()> {
//todo: register dashboard handler here!

let mut dashboard_manager = DashboardManager::new(gsettings)?;
dashboard_manager
.dashboard_state
.get_mut()
.data_vec
.push(("B4", 15));
dashboard_manager.start_dashboard().await;
Ok(())
}

pub enum ColorPolicy {
Auto,
Always,
Expand Down Expand Up @@ -487,6 +516,7 @@ async fn main() -> hyperqueue::Result<()> {
SubCommand::Submit(opts) => command_submit(gsettings, opts).await,
SubCommand::Cancel(opts) => command_cancel(gsettings, opts).await,
SubCommand::Resubmit(opts) => command_resubmit(gsettings, opts).await,
SubCommand::Dashboard(opts) => command_dashboard_start(gsettings, opts).await,
SubCommand::Wait(opts) => command_wait(gsettings, opts).await,
SubCommand::Log(opts) => command_log(gsettings, opts),
};
Expand Down
3 changes: 2 additions & 1 deletion src/client/commands/submit.rs
Expand Up @@ -19,7 +19,7 @@ use crate::common::arraydef::IntArray;
use crate::common::timeutils::ArgDuration;
use crate::transfer::connection::ClientConnection;
use crate::transfer::messages::{
FromClientMessage, JobType, ResubmitRequest, SubmitRequest, ToClientMessage,
FromClientMessage, JobSelector, JobType, ResubmitRequest, SubmitRequest, ToClientMessage,
};
use crate::{rpc_call, JobId, JobTaskCount};

Expand Down Expand Up @@ -243,6 +243,7 @@ pub async fn submit_computation(
let response = rpc_call!(connection, message, ToClientMessage::SubmitResponse(r) => r).await?;
let info = response.job.info.clone();

let job_id = response.job.info.id;
print_job_detail(
gsettings,
response.job,
Expand Down
79 changes: 79 additions & 0 deletions src/dashboard/dashboard_events.rs
@@ -0,0 +1,79 @@
use std::io;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

use crate::dashboard::dashboard_state::CurrentActiveUi;
use std::sync::mpsc::Sender;
use termion::event::Key;
use termion::input::TermRead;
use tokio::task::JoinHandle;

pub enum DashboardEvent {
/// The event when a key is pressed
KeyPressEvent(Key),
/// Changes what is being drawn in the terminal
ChangeUIStateEvent(CurrentActiveUi),
/// Updates the dashboard with the latest data
Tick,
}

/// A small event handler that wrap termion input and tick events. Each event
/// type is handled in its own thread and returned to a common `Receiver`
/// Should run an event loop that gets the new data and
pub struct DashboardEventHandler {
receiver: mpsc::Receiver<DashboardEvent>,
sender: mpsc::Sender<DashboardEvent>,
//these insert data into the channel
key_event_sender: thread::JoinHandle<()>,
ui_clock_sender: thread::JoinHandle<()>,
}

impl DashboardEventHandler {
pub fn new() -> DashboardEventHandler {
let (tx, rx) = mpsc::channel();
DashboardEventHandler {
receiver: rx,
sender: tx.clone(),
ui_clock_sender: provide_clock(tx.clone()),
key_event_sender: key_event_listener(tx),
}
}

pub fn receive_event(&self) -> Result<DashboardEvent, mpsc::RecvError> {
self.receiver.recv()
}

pub fn send_ui_state_update_event(&self, event: DashboardEvent) {
self.sender.send(event); //todo: handle send error!
}
}

///Periodic updates to the dashboard
fn provide_clock(tx: Sender<DashboardEvent>) -> thread::JoinHandle<()> {
thread::spawn(move || loop {
if let Err(err) = tx.send(DashboardEvent::Tick) {
eprintln!("{}", err);
break;
}
thread::sleep(Duration::from_millis(250));
})
}

///Handles key press events when the dashboard_ui is active
fn key_event_listener(tx: Sender<DashboardEvent>) -> thread::JoinHandle<()> {
thread::spawn(move || {
let stdin = io::stdin();
for evt in stdin.keys() {
if let Ok(key) = evt {
//todo: instead of sending KeyPressEvent, resolve here to the correct next state
//todo: and send a ChangeUIStateEvent?

if let Err(err) = tx.send(DashboardEvent::KeyPressEvent(key)) {
eprintln!("{}", err);
return;
}
}
}
})
}

0 comments on commit 741bbbc

Please sign in to comment.