Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve and document JSON output #270

Merged
merged 13 commits into from
Dec 21, 2021
105 changes: 50 additions & 55 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ struct CancelOpts {

// Commands
async fn command_server_start(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
opts: ServerStartOpts,
) -> anyhow::Result<()> {
let server_cfg = ServerConfig {
Expand All @@ -292,11 +292,11 @@ async fn command_server_start(
event_store_size: opts.event_store_size,
};

init_hq_server(&gsettings, server_cfg).await
init_hq_server(gsettings, server_cfg).await
}

async fn command_server_stop(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
_opts: ServerStopOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;
Expand All @@ -305,100 +305,95 @@ async fn command_server_stop(
}

async fn command_server_info(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
opts: ServerInfoOpts,
) -> anyhow::Result<()> {
if opts.stats {
let mut connection = get_client_connection(gsettings.server_directory()).await?;
print_server_stats(&gsettings, &mut connection).await
print_server_stats(gsettings, &mut connection).await
} else {
print_server_info(&gsettings).await
print_server_info(gsettings).await
}
}

async fn command_job_list(gsettings: GlobalSettings, opts: JobListOpts) -> anyhow::Result<()> {
async fn command_job_list(gsettings: &GlobalSettings, opts: JobListOpts) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;
output_job_list(&gsettings, &mut connection, opts.job_filters).await
output_job_list(gsettings, &mut connection, opts.job_filters).await
}

async fn command_job_detail(gsettings: GlobalSettings, opts: JobDetailOpts) -> anyhow::Result<()> {
async fn command_job_detail(gsettings: &GlobalSettings, opts: JobDetailOpts) -> anyhow::Result<()> {
if matches!(opts.selector_arg, SelectorArg::All) {
log::warn!("Job detail doesn't support the `all` selector, did you mean to use `hq jobs`?");
return Ok(());
}

let mut connection = get_client_connection(gsettings.server_directory()).await?;
output_job_detail(
&gsettings,
gsettings,
&mut connection,
opts.selector_arg.into(),
opts.tasks,
)
.await
}

async fn command_submit(gsettings: GlobalSettings, opts: SubmitOpts) -> anyhow::Result<()> {
async fn command_submit(gsettings: &GlobalSettings, opts: SubmitOpts) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;
submit_computation(&gsettings, &mut connection, opts).await
submit_computation(gsettings, &mut connection, opts).await
}

async fn command_cancel(gsettings: GlobalSettings, opts: CancelOpts) -> anyhow::Result<()> {
async fn command_cancel(gsettings: &GlobalSettings, opts: CancelOpts) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;

cancel_job(&gsettings, &mut connection, opts.selector_arg.into()).await
cancel_job(gsettings, &mut connection, opts.selector_arg.into()).await
}

async fn command_worker_start(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
opts: WorkerStartOpts,
) -> anyhow::Result<()> {
start_hq_worker(&gsettings, opts).await
start_hq_worker(gsettings, opts).await
}

async fn command_worker_stop(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
opts: WorkerStopOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;

stop_worker(&mut connection, opts.selector_arg.into()).await?;
Ok(())
}

async fn command_worker_list(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
opts: WorkerListOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;

let workers = get_worker_list(&mut connection, opts.all).await?;
gsettings.printer().print_worker_list(workers);
Ok(())
}

async fn command_worker_info(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
opts: WorkerInfoOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;
let response = get_worker_info(&mut connection, opts.worker_id).await?;

if let Some(worker) = response {
gsettings
.printer()
.print_worker_info(opts.worker_id, worker.configuration);
gsettings.printer().print_worker_info(worker);
} else {
log::error!("Worker {} not found", opts.worker_id);
}
Ok(())
}

async fn command_resubmit(gsettings: GlobalSettings, opts: ResubmitOpts) -> anyhow::Result<()> {
async fn command_resubmit(gsettings: &GlobalSettings, opts: ResubmitOpts) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;
resubmit_computation(&gsettings, &mut connection, opts).await
resubmit_computation(gsettings, &mut connection, opts).await
}

fn command_worker_hwdetect(gsettings: GlobalSettings, opts: HwDetectOpts) -> anyhow::Result<()> {
fn command_worker_hwdetect(gsettings: &GlobalSettings, opts: HwDetectOpts) -> anyhow::Result<()> {
let cpus = if opts.no_hyperthreading {
detect_cpus_no_ht()?
} else {
Expand All @@ -412,7 +407,7 @@ fn command_worker_hwdetect(gsettings: GlobalSettings, opts: HwDetectOpts) -> any
}

async fn command_worker_address(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
opts: WorkerAddressOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;
Expand All @@ -426,13 +421,12 @@ async fn command_worker_address(
Ok(())
}

async fn command_wait(gsettings: GlobalSettings, opts: WaitOpts) -> anyhow::Result<()> {
async fn command_wait(gsettings: &GlobalSettings, opts: WaitOpts) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;

wait_for_jobs(&gsettings, &mut connection, opts.selector_arg.into()).await
wait_for_jobs(gsettings, &mut connection, opts.selector_arg.into()).await
}

async fn command_progress(gsettings: GlobalSettings, opts: ProgressOpts) -> anyhow::Result<()> {
async fn command_progress(gsettings: &GlobalSettings, opts: ProgressOpts) -> anyhow::Result<()> {
let mut connection = get_client_connection(gsettings.server_directory()).await?;
let selector = opts.selector_arg.into();
let response = hyperqueue::rpc_call!(
Expand All @@ -448,10 +442,10 @@ async fn command_progress(gsettings: GlobalSettings, opts: ProgressOpts) -> anyh

///Starts the hq Dashboard
async fn command_dashboard_start(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
_opts: DashboardOpts,
) -> anyhow::Result<()> {
start_ui_loop(DashboardState::default(), &gsettings).await?;
start_ui_loop(DashboardState::default(), gsettings).await?;
Ok(())
}

Expand Down Expand Up @@ -529,45 +523,46 @@ async fn main() -> hyperqueue::Result<()> {
let result = match top_opts.subcmd {
SubCommand::Server(ServerOpts {
subcmd: ServerCommand::Start(opts),
}) => command_server_start(gsettings, opts).await,
}) => command_server_start(&gsettings, opts).await,
SubCommand::Server(ServerOpts {
subcmd: ServerCommand::Stop(opts),
}) => command_server_stop(gsettings, opts).await,
}) => command_server_stop(&gsettings, opts).await,
SubCommand::Server(ServerOpts {
subcmd: ServerCommand::Info(opts),
}) => command_server_info(gsettings, opts).await,
}) => command_server_info(&gsettings, opts).await,

SubCommand::Worker(WorkerOpts {
subcmd: WorkerCommand::Start(opts),
}) => command_worker_start(gsettings, opts).await,
}) => command_worker_start(&gsettings, opts).await,
SubCommand::Worker(WorkerOpts {
subcmd: WorkerCommand::Stop(opts),
}) => command_worker_stop(gsettings, opts).await,
}) => command_worker_stop(&gsettings, opts).await,
SubCommand::Worker(WorkerOpts {
subcmd: WorkerCommand::List(opts),
}) => command_worker_list(gsettings, opts).await,
}) => command_worker_list(&gsettings, opts).await,
SubCommand::Worker(WorkerOpts {
subcmd: WorkerCommand::Info(opts),
}) => command_worker_info(gsettings, opts).await,
}) => command_worker_info(&gsettings, opts).await,
SubCommand::Worker(WorkerOpts {
subcmd: WorkerCommand::HwDetect(opts),
}) => command_worker_hwdetect(gsettings, opts),
}) => command_worker_hwdetect(&gsettings, opts),
SubCommand::Worker(WorkerOpts {
subcmd: WorkerCommand::Address(opts),
}) => command_worker_address(gsettings, opts).await,
SubCommand::Jobs(opts) => command_job_list(gsettings, opts).await,
SubCommand::Job(opts) => command_job_detail(gsettings, opts).await,
SubCommand::Submit(opts) => command_submit(gsettings, opts).await,
SubCommand::Cancel(opts) => command_cancel(gsettings, opts).await,
SubCommand::Resubmit(opts) => command_resubmit(gsettings, opts).await,
SubCommand::Dashboard(opts) => command_dashboard_start(gsettings, opts).await,
SubCommand::Wait(opts) => command_wait(gsettings, opts).await,
SubCommand::Progress(opts) => command_progress(gsettings, opts).await,
SubCommand::Log(opts) => command_log(gsettings, opts),
SubCommand::AutoAlloc(opts) => command_autoalloc(gsettings, opts).await,
}) => command_worker_address(&gsettings, opts).await,
SubCommand::Jobs(opts) => command_job_list(&gsettings, opts).await,
SubCommand::Job(opts) => command_job_detail(&gsettings, opts).await,
SubCommand::Submit(opts) => command_submit(&gsettings, opts).await,
SubCommand::Cancel(opts) => command_cancel(&gsettings, opts).await,
SubCommand::Resubmit(opts) => command_resubmit(&gsettings, opts).await,
SubCommand::Dashboard(opts) => command_dashboard_start(&gsettings, opts).await,
SubCommand::Wait(opts) => command_wait(&gsettings, opts).await,
SubCommand::Progress(opts) => command_progress(&gsettings, opts).await,
SubCommand::Log(opts) => command_log(&gsettings, opts),
SubCommand::AutoAlloc(opts) => command_autoalloc(&gsettings, opts).await,
};

if let Err(e) = result {
eprintln!("{:?}", e);
gsettings.printer().print_error(e);
std::process::exit(1);
}

Expand Down
8 changes: 4 additions & 4 deletions crates/hyperqueue/src/client/commands/autoalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,25 @@ impl FromStr for AllocationStateFilter {
}

pub async fn command_autoalloc(
gsettings: GlobalSettings,
gsettings: &GlobalSettings,
opts: AutoAllocOpts,
) -> anyhow::Result<()> {
match opts.subcmd {
AutoAllocCommand::List => {
let connection = get_client_connection(gsettings.server_directory()).await?;
print_allocation_queues(&gsettings, connection).await?;
print_allocation_queues(gsettings, connection).await?;
}
AutoAllocCommand::Add(opts) => {
let connection = get_client_connection(gsettings.server_directory()).await?;
add_queue(connection, opts).await?;
}
AutoAllocCommand::Events(opts) => {
let connection = get_client_connection(gsettings.server_directory()).await?;
print_event_log(&gsettings, connection, opts).await?;
print_event_log(gsettings, connection, opts).await?;
}
AutoAllocCommand::Info(opts) => {
let connection = get_client_connection(gsettings.server_directory()).await?;
print_allocations(&gsettings, connection, opts).await?;
print_allocations(gsettings, connection, opts).await?;
}
AutoAllocCommand::Remove(opts) => {
let connection = get_client_connection(gsettings.server_directory()).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl FromStr for Channel {
}
}

pub fn command_log(gsettings: GlobalSettings, opts: LogOpts) -> anyhow::Result<()> {
pub fn command_log(gsettings: &GlobalSettings, opts: LogOpts) -> anyhow::Result<()> {
let mut log_file = LogFile::open(&opts.filename)?;
match opts.command {
LogCommand::Summary(_) => {
Expand Down
8 changes: 5 additions & 3 deletions crates/hyperqueue/src/client/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ pub async fn start_hq_worker(
)
.await?;

gsettings
.printer()
.print_worker_info(worker_id, configuration);
gsettings.printer().print_worker_info(WorkerInfo {
id: worker_id,
configuration,
ended: None,
});
let local_set = LocalSet::new();
local_set
.run_until(async move {
Expand Down
Loading