From 4864499850e35bef823489f2187f3d8daff9585f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Fri, 17 Dec 2021 16:34:43 +0100 Subject: [PATCH 01/13] Manually format JSON worker and autoalloc output --- crates/hyperqueue/src/bin/hq.rs | 4 +- .../hyperqueue/src/client/commands/worker.rs | 8 +- crates/hyperqueue/src/client/output/cli.rs | 12 +- crates/hyperqueue/src/client/output/json.rs | 226 +++++++++++++++--- .../hyperqueue/src/client/output/outputs.rs | 5 +- crates/hyperqueue/src/client/output/quiet.rs | 4 +- 6 files changed, 215 insertions(+), 44 deletions(-) diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index 251256c9d..d4bac452b 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -384,9 +384,7 @@ async fn command_worker_info( let response = get_worker_info(&mut connection, opts.worker_id).await?; if let Some(worker) = response { - gsettings - .printer() - .print_worker_info(opts.worker_id, worker.configuration); + gsettings.printer().print_worker_info(worker); } else { log::error!("Worker {} not found", opts.worker_id); } diff --git a/crates/hyperqueue/src/client/commands/worker.rs b/crates/hyperqueue/src/client/commands/worker.rs index 2502487cd..efd582bd4 100644 --- a/crates/hyperqueue/src/client/commands/worker.rs +++ b/crates/hyperqueue/src/client/commands/worker.rs @@ -123,9 +123,11 @@ pub async fn start_hq_worker( ) .await?; - gsettings - .printer() - .print_worker_info(worker_id, configuration); + gsettings.printer().print_worker_info(WorkerInfo { + id: worker_id, + configuration, + ended: None, + }); let local_set = LocalSet::new(); local_set .run_until(async move { diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index 16ac0d56d..7ea155b3a 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -29,7 +29,7 @@ use std::path::Path; use std::time::SystemTime; use tako::common::resources::{CpuRequest, ResourceDescriptor}; -use tako::messages::common::{StdioDef, WorkerConfiguration}; +use tako::messages::common::StdioDef; use crate::common::strutils::pluralize; use crate::worker::start::WORKER_EXTRA_PROCESS_PID; @@ -122,10 +122,16 @@ impl Output for CliOutput { self.print_table(table); } - fn print_worker_info(&self, worker_id: WorkerId, configuration: WorkerConfiguration) { + fn print_worker_info(&self, worker_info: WorkerInfo) { + let WorkerInfo { + id, + configuration, + ended: _ended, + } = worker_info; + let manager_info = configuration.get_manager_info(); let rows = vec![ - vec!["Worker ID".cell().bold(true), worker_id.cell()], + vec!["Worker ID".cell().bold(true), id.cell()], vec!["Hostname".cell().bold(true), configuration.hostname.cell()], vec![ "Data provider".cell().bold(true), diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index 92a08e583..97ad937ae 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -2,18 +2,26 @@ use crate::client::job::WorkerMap; use crate::client::output::cli::{format_job_workers, format_task_duration}; use crate::client::output::outputs::Output; use crate::client::status::{task_status, Status}; +use crate::common::manager::info::ManagerType; use crate::common::serverdir::AccessRecord; -use crate::server::autoalloc::{Allocation, AllocationEventHolder}; +use crate::server::autoalloc::{ + Allocation, AllocationEvent, AllocationEventHolder, AllocationStatus, +}; use crate::server::job::{JobTaskCounters, JobTaskInfo}; use crate::stream::reader::logfile::Summary; use crate::transfer::messages::{ - AutoAllocListResponse, JobDetail, JobInfo, StatsResponse, WaitForJobsResponse, WorkerInfo, + AutoAllocListResponse, JobDetail, JobInfo, QueueDescriptorData, StatsResponse, + WaitForJobsResponse, WorkerInfo, }; -use crate::{JobTaskId, WorkerId}; +use crate::JobTaskId; +use chrono::{DateTime, Utc}; use serde_json; +use serde_json::json; use std::path::Path; use std::time::Duration; -use tako::common::resources::ResourceDescriptor; +use tako::common::resources::{ + GenericResourceDescriptor, GenericResourceDescriptorKind, ResourceDescriptor, +}; use tako::messages::common::WorkerConfiguration; #[derive(Default)] @@ -21,26 +29,27 @@ pub struct JsonOutput; impl JsonOutput { fn print(&self, data: serde_json::Value) { - println!("{}", data.to_string()); + println!( + "{}", + serde_json::to_string_pretty(&data).expect("Could not format JSON") + ); } } // TODO: output machine-readable data +// hq jobs, hq submit, hq cancel, +// time in UTC impl Output for JsonOutput { fn print_worker_list(&self, workers: Vec) { - self.print(serde_json::json!(workers)); + self.print(workers.into_iter().map(format_worker_info).collect()); } - fn print_worker_info(&self, worker_id: WorkerId, configuration: WorkerConfiguration) { - let json = serde_json::json!({ - "id": worker_id, - "worker_configuration": configuration - }); - self.print(json); + fn print_worker_info(&self, worker_info: WorkerInfo) { + self.print(format_worker_info(worker_info)); } // Server fn print_server_record(&self, server_dir: &Path, record: &AccessRecord) { - let json = serde_json::json!({ + let json = json!({ "server_dir": server_dir, "host": record.host(), "pid": record.pid(), @@ -52,22 +61,22 @@ impl Output for JsonOutput { self.print(json); } fn print_server_stats(&self, stats: StatsResponse) { - self.print(serde_json::json!(stats)); + self.print(json!(stats)); } fn print_job_submitted(&self, job: JobDetail) { - self.print(serde_json::json!({ + self.print(json!({ "id": job.info.id })) } // Jobs fn print_job_list(&self, tasks: Vec) { - self.print(serde_json::json!(tasks)); + self.print(json!(tasks)); } fn print_job_detail(&self, job: JobDetail, show_tasks: bool, worker_map: WorkerMap) { let worker = format_job_workers(&job, &worker_map); - let json = serde_json::json!({ + let json = json!({ "job_detail": job, "worker": worker, }); @@ -99,7 +108,7 @@ impl Output for JsonOutput { .collect(); let tasks_id: Vec = tasks.iter().map(|t| t.task_id).collect(); let tasks_state: Vec = tasks.iter().map(|t| task_status(&t.state)).collect(); - let json = serde_json::json!({ + let json = json!({ "tasks_state": tasks_state, "tasks_duration": output_tasks_duration, "tasks_id": tasks_id, @@ -110,29 +119,190 @@ impl Output for JsonOutput { } fn print_job_wait(&self, _duration: Duration, _response: &WaitForJobsResponse) {} - // Log fn print_summary(&self, filename: &Path, summary: Summary) { - let json = serde_json::json!({ - "filename":filename, - "summary":summary, + let json = json!({ + "filename": filename, + "summary": summary, }); self.print(json); } - // Autoalloc fn print_autoalloc_queues(&self, info: AutoAllocListResponse) { - self.print(serde_json::json!(info)); + self.print( + info.descriptors + .iter() + .map(|(key, descriptor)| (key.to_string(), format_queue_descriptor(descriptor))) + .collect(), + ); } fn print_event_log(&self, events: Vec) { - self.print(serde_json::json!(events)); + self.print(events.into_iter().map(format_allocation_event).collect()); } fn print_allocations(&self, allocations: Vec) { - self.print(serde_json::json!(allocations)); + self.print(allocations.into_iter().map(format_allocation).collect()); } - // Hw fn print_hw(&self, descriptor: &ResourceDescriptor) { - self.print(serde_json::json!(descriptor)); + self.print(format_resource_descriptor(descriptor)); } } + +fn format_allocation(allocation: Allocation) -> serde_json::Value { + let Allocation { + id, + worker_count, + queued_at, + status, + working_dir, + } = allocation; + + let status_name = match &status { + AllocationStatus::Queued => "queue", + AllocationStatus::Running { .. } => "running", + AllocationStatus::Finished { .. } => "finished", + AllocationStatus::Failed { .. } => "failed", + }; + let started_at = match status { + AllocationStatus::Running { started_at } + | AllocationStatus::Finished { started_at, .. } + | AllocationStatus::Failed { started_at, .. } => Some(started_at), + _ => None, + }; + let ended_at = match status { + AllocationStatus::Finished { finished_at, .. } + | AllocationStatus::Failed { finished_at, .. } => Some(finished_at), + _ => None, + }; + + json!({ + "id": id, + "worker_count": worker_count, + "queued_at": format_datetime(queued_at), + "started_at": started_at.map(format_datetime), + "ended_at": ended_at.map(format_datetime), + "status": status_name, + "workdir": working_dir + }) +} + +fn format_allocation_event(event: AllocationEventHolder) -> serde_json::Value { + let name = match &event.event { + AllocationEvent::AllocationQueued(_) => "allocation-queued", + AllocationEvent::AllocationStarted(_) => "allocation-started", + AllocationEvent::AllocationFinished(_) => "allocation-finished", + AllocationEvent::AllocationFailed(_) => "allocation-failed", + AllocationEvent::AllocationDisappeared(_) => "allocation-disappeared", + AllocationEvent::QueueFail { .. } => "queue-fail", + AllocationEvent::StatusFail { .. } => "status-fail", + }; + let params = match event.event { + AllocationEvent::AllocationQueued(id) + | AllocationEvent::AllocationStarted(id) + | AllocationEvent::AllocationFinished(id) + | AllocationEvent::AllocationFailed(id) + | AllocationEvent::AllocationDisappeared(id) => { + json!({ "id": id }) + } + AllocationEvent::QueueFail { error } | AllocationEvent::StatusFail { error } => { + json!({ "error": error }) + } + }; + + json!({ + "date": format_datetime(event.date), + "event": name, + "params": params + }) +} + +fn format_resource_descriptor(descriptor: &ResourceDescriptor) -> serde_json::Value { + let ResourceDescriptor { cpus, generic } = descriptor; + json!({ + "cpus": cpus, + "generic": generic.iter().map(format_generic_resource).collect::>() + }) +} + +fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value { + let WorkerInfo { + id, + configuration: + WorkerConfiguration { + resources, + listen_address, + hostname, + work_dir, + log_dir, + heartbeat_interval, + hw_state_poll_interval: _, + idle_timeout, + time_limit, + extra: _, + }, + ended, + } = worker_info; + + json!({ + "id": id, + "configuration": json!({ + "heartbeat_interval": format_duration(heartbeat_interval), + "idle_timeout": idle_timeout.map(format_duration), + "time_limit": time_limit.map(format_duration), + "log_dir": log_dir, + "work_dir": work_dir, + "hostname": hostname, + "listen_address": listen_address, + "resources": format_resource_descriptor(&resources) + }), + "ended": ended.map(|info| json!({ + "at": format_datetime(info.ended_at) + })) + }) +} +fn format_generic_resource(resource: &GenericResourceDescriptor) -> serde_json::Value { + json!({ + "name": resource.name, + "kind": match &resource.kind { + GenericResourceDescriptorKind::Indices(_) => "indices", + GenericResourceDescriptorKind::Sum(_) => "sum", + }, + "params": match &resource.kind { + GenericResourceDescriptorKind::Indices(params) => json!({ + "start": params.start, + "end": params.end + }), + GenericResourceDescriptorKind::Sum(params) => json!({ + "size": params.size + }), + } + }) +} + +fn format_queue_descriptor(descriptor: &QueueDescriptorData) -> serde_json::Value { + let manager = match descriptor.manager_type { + ManagerType::Pbs => "PBS", + ManagerType::Slurm => "Slurm", + }; + let info = &descriptor.info; + + json!({ + "manager": manager, + "additional_args": info.additional_args(), + "backlog": info.backlog(), + "workers_per_alloc": info.workers_per_alloc(), + "timelimit": format_duration(info.timelimit()), + "max_worker_count": info.max_worker_count(), + "worker_cpu_args": info.worker_cpu_args(), + "worker_resource_args": info.worker_resource_args(), + "name": descriptor.name + }) +} + +fn format_duration(duration: Duration) -> serde_json::Value { + let value = duration.as_secs() as f64 + duration.subsec_nanos() as f64 * 1e-9; + json!(value) +} +fn format_datetime>>(time: T) -> serde_json::Value { + json!(time.into()) +} diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs index 5f3a961bb..916a5e54f 100644 --- a/crates/hyperqueue/src/client/output/outputs.rs +++ b/crates/hyperqueue/src/client/output/outputs.rs @@ -7,14 +7,11 @@ use crate::client::job::WorkerMap; use crate::server::autoalloc::{Allocation, AllocationEventHolder}; use crate::server::job::{JobTaskCounters, JobTaskInfo}; use crate::stream::reader::logfile::Summary; -use crate::WorkerId; - use std::path::Path; use std::str::FromStr; use core::time::Duration; use tako::common::resources::ResourceDescriptor; -use tako::messages::common::WorkerConfiguration; pub const MAX_DISPLAYED_WORKERS: usize = 2; @@ -40,7 +37,7 @@ impl FromStr for Outputs { pub trait Output { // Workers fn print_worker_list(&self, workers: Vec); - fn print_worker_info(&self, worker_id: WorkerId, configuration: WorkerConfiguration); + fn print_worker_info(&self, worker_info: WorkerInfo); // Server fn print_server_record(&self, server_dir: &Path, record: &AccessRecord); diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 7a9f9256d..9ec9a0d33 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -9,11 +9,9 @@ use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, LostWorkerReasonInfo, StatsResponse, WaitForJobsResponse, WorkerExitInfo, WorkerInfo, }; -use crate::WorkerId; use std::path::Path; use std::time::Duration; use tako::common::resources::ResourceDescriptor; -use tako::messages::common::WorkerConfiguration; fn to_str(stat: &Status) -> anyhow::Result<&str> { Ok(match stat { @@ -54,7 +52,7 @@ impl Output for Quiet { println!("{} {}", worker.id, worker_status) } } - fn print_worker_info(&self, _worker_id: WorkerId, _configuration: WorkerConfiguration) {} + fn print_worker_info(&self, _worker_info: WorkerInfo) {} // Server fn print_server_record(&self, server_dir: &Path, _record: &AccessRecord) { From f94cbc20e100fbcf71f38ca26da007ecab9046a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Fri, 17 Dec 2021 16:48:15 +0100 Subject: [PATCH 02/13] Print errors using output printer --- crates/hyperqueue/src/bin/hq.rs | 101 +++++++++--------- .../src/client/commands/autoalloc.rs | 8 +- crates/hyperqueue/src/client/commands/log.rs | 2 +- crates/hyperqueue/src/client/output/cli.rs | 5 + crates/hyperqueue/src/client/output/json.rs | 8 +- .../hyperqueue/src/client/output/outputs.rs | 2 + crates/hyperqueue/src/client/output/quiet.rs | 5 + 7 files changed, 71 insertions(+), 60 deletions(-) diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index d4bac452b..d93ce78cc 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -278,7 +278,7 @@ struct CancelOpts { // Commands async fn command_server_start( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, opts: ServerStartOpts, ) -> anyhow::Result<()> { let server_cfg = ServerConfig { @@ -292,11 +292,11 @@ async fn command_server_start( event_store_size: opts.event_store_size, }; - init_hq_server(&gsettings, server_cfg).await + init_hq_server(gsettings, server_cfg).await } async fn command_server_stop( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, _opts: ServerStopOpts, ) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; @@ -305,23 +305,23 @@ async fn command_server_stop( } async fn command_server_info( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, opts: ServerInfoOpts, ) -> anyhow::Result<()> { if opts.stats { let mut connection = get_client_connection(gsettings.server_directory()).await?; - print_server_stats(&gsettings, &mut connection).await + print_server_stats(gsettings, &mut connection).await } else { - print_server_info(&gsettings).await + print_server_info(gsettings).await } } -async fn command_job_list(gsettings: GlobalSettings, opts: JobListOpts) -> anyhow::Result<()> { +async fn command_job_list(gsettings: &GlobalSettings, opts: JobListOpts) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; - output_job_list(&gsettings, &mut connection, opts.job_filters).await + output_job_list(gsettings, &mut connection, opts.job_filters).await } -async fn command_job_detail(gsettings: GlobalSettings, opts: JobDetailOpts) -> anyhow::Result<()> { +async fn command_job_detail(gsettings: &GlobalSettings, opts: JobDetailOpts) -> anyhow::Result<()> { if matches!(opts.selector_arg, SelectorArg::All) { log::warn!("Job detail doesn't support the `all` selector, did you mean to use `hq jobs`?"); return Ok(()); @@ -329,7 +329,7 @@ async fn command_job_detail(gsettings: GlobalSettings, opts: JobDetailOpts) -> a let mut connection = get_client_connection(gsettings.server_directory()).await?; output_job_detail( - &gsettings, + gsettings, &mut connection, opts.selector_arg.into(), opts.tasks, @@ -337,47 +337,44 @@ async fn command_job_detail(gsettings: GlobalSettings, opts: JobDetailOpts) -> a .await } -async fn command_submit(gsettings: GlobalSettings, opts: SubmitOpts) -> anyhow::Result<()> { +async fn command_submit(gsettings: &GlobalSettings, opts: SubmitOpts) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; - submit_computation(&gsettings, &mut connection, opts).await + submit_computation(gsettings, &mut connection, opts).await } -async fn command_cancel(gsettings: GlobalSettings, opts: CancelOpts) -> anyhow::Result<()> { +async fn command_cancel(gsettings: &GlobalSettings, opts: CancelOpts) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; - - cancel_job(&gsettings, &mut connection, opts.selector_arg.into()).await + cancel_job(gsettings, &mut connection, opts.selector_arg.into()).await } async fn command_worker_start( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, opts: WorkerStartOpts, ) -> anyhow::Result<()> { - start_hq_worker(&gsettings, opts).await + start_hq_worker(gsettings, opts).await } async fn command_worker_stop( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, opts: WorkerStopOpts, ) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; - stop_worker(&mut connection, opts.selector_arg.into()).await?; Ok(()) } async fn command_worker_list( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, opts: WorkerListOpts, ) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; - let workers = get_worker_list(&mut connection, opts.all).await?; gsettings.printer().print_worker_list(workers); Ok(()) } async fn command_worker_info( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, opts: WorkerInfoOpts, ) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; @@ -391,12 +388,12 @@ async fn command_worker_info( Ok(()) } -async fn command_resubmit(gsettings: GlobalSettings, opts: ResubmitOpts) -> anyhow::Result<()> { +async fn command_resubmit(gsettings: &GlobalSettings, opts: ResubmitOpts) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; - resubmit_computation(&gsettings, &mut connection, opts).await + resubmit_computation(gsettings, &mut connection, opts).await } -fn command_worker_hwdetect(gsettings: GlobalSettings, opts: HwDetectOpts) -> anyhow::Result<()> { +fn command_worker_hwdetect(gsettings: &GlobalSettings, opts: HwDetectOpts) -> anyhow::Result<()> { let cpus = if opts.no_hyperthreading { detect_cpus_no_ht()? } else { @@ -410,7 +407,7 @@ fn command_worker_hwdetect(gsettings: GlobalSettings, opts: HwDetectOpts) -> any } async fn command_worker_address( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, opts: WorkerAddressOpts, ) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; @@ -424,13 +421,12 @@ async fn command_worker_address( Ok(()) } -async fn command_wait(gsettings: GlobalSettings, opts: WaitOpts) -> anyhow::Result<()> { +async fn command_wait(gsettings: &GlobalSettings, opts: WaitOpts) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; - - wait_for_jobs(&gsettings, &mut connection, opts.selector_arg.into()).await + wait_for_jobs(gsettings, &mut connection, opts.selector_arg.into()).await } -async fn command_progress(gsettings: GlobalSettings, opts: ProgressOpts) -> anyhow::Result<()> { +async fn command_progress(gsettings: &GlobalSettings, opts: ProgressOpts) -> anyhow::Result<()> { let mut connection = get_client_connection(gsettings.server_directory()).await?; let selector = opts.selector_arg.into(); let response = hyperqueue::rpc_call!( @@ -446,10 +442,10 @@ async fn command_progress(gsettings: GlobalSettings, opts: ProgressOpts) -> anyh ///Starts the hq Dashboard async fn command_dashboard_start( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, _opts: DashboardOpts, ) -> anyhow::Result<()> { - start_ui_loop(DashboardState::default(), &gsettings).await?; + start_ui_loop(DashboardState::default(), gsettings).await?; Ok(()) } @@ -527,45 +523,46 @@ async fn main() -> hyperqueue::Result<()> { let result = match top_opts.subcmd { SubCommand::Server(ServerOpts { subcmd: ServerCommand::Start(opts), - }) => command_server_start(gsettings, opts).await, + }) => command_server_start(&gsettings, opts).await, SubCommand::Server(ServerOpts { subcmd: ServerCommand::Stop(opts), - }) => command_server_stop(gsettings, opts).await, + }) => command_server_stop(&gsettings, opts).await, SubCommand::Server(ServerOpts { subcmd: ServerCommand::Info(opts), - }) => command_server_info(gsettings, opts).await, + }) => command_server_info(&gsettings, opts).await, SubCommand::Worker(WorkerOpts { subcmd: WorkerCommand::Start(opts), - }) => command_worker_start(gsettings, opts).await, + }) => command_worker_start(&gsettings, opts).await, SubCommand::Worker(WorkerOpts { subcmd: WorkerCommand::Stop(opts), - }) => command_worker_stop(gsettings, opts).await, + }) => command_worker_stop(&gsettings, opts).await, SubCommand::Worker(WorkerOpts { subcmd: WorkerCommand::List(opts), - }) => command_worker_list(gsettings, opts).await, + }) => command_worker_list(&gsettings, opts).await, SubCommand::Worker(WorkerOpts { subcmd: WorkerCommand::Info(opts), - }) => command_worker_info(gsettings, opts).await, + }) => command_worker_info(&gsettings, opts).await, SubCommand::Worker(WorkerOpts { subcmd: WorkerCommand::HwDetect(opts), - }) => command_worker_hwdetect(gsettings, opts), + }) => command_worker_hwdetect(&gsettings, opts), SubCommand::Worker(WorkerOpts { subcmd: WorkerCommand::Address(opts), - }) => command_worker_address(gsettings, opts).await, - SubCommand::Jobs(opts) => command_job_list(gsettings, opts).await, - SubCommand::Job(opts) => command_job_detail(gsettings, opts).await, - SubCommand::Submit(opts) => command_submit(gsettings, opts).await, - SubCommand::Cancel(opts) => command_cancel(gsettings, opts).await, - SubCommand::Resubmit(opts) => command_resubmit(gsettings, opts).await, - SubCommand::Dashboard(opts) => command_dashboard_start(gsettings, opts).await, - SubCommand::Wait(opts) => command_wait(gsettings, opts).await, - SubCommand::Progress(opts) => command_progress(gsettings, opts).await, - SubCommand::Log(opts) => command_log(gsettings, opts), - SubCommand::AutoAlloc(opts) => command_autoalloc(gsettings, opts).await, + }) => command_worker_address(&gsettings, opts).await, + SubCommand::Jobs(opts) => command_job_list(&gsettings, opts).await, + SubCommand::Job(opts) => command_job_detail(&gsettings, opts).await, + SubCommand::Submit(opts) => command_submit(&gsettings, opts).await, + SubCommand::Cancel(opts) => command_cancel(&gsettings, opts).await, + SubCommand::Resubmit(opts) => command_resubmit(&gsettings, opts).await, + SubCommand::Dashboard(opts) => command_dashboard_start(&gsettings, opts).await, + SubCommand::Wait(opts) => command_wait(&gsettings, opts).await, + SubCommand::Progress(opts) => command_progress(&gsettings, opts).await, + SubCommand::Log(opts) => command_log(&gsettings, opts), + SubCommand::AutoAlloc(opts) => command_autoalloc(&gsettings, opts).await, }; + if let Err(e) = result { - eprintln!("{:?}", e); + gsettings.printer().print_error(e); std::process::exit(1); } diff --git a/crates/hyperqueue/src/client/commands/autoalloc.rs b/crates/hyperqueue/src/client/commands/autoalloc.rs index 59dfe5fea..cb454096a 100644 --- a/crates/hyperqueue/src/client/commands/autoalloc.rs +++ b/crates/hyperqueue/src/client/commands/autoalloc.rs @@ -152,13 +152,13 @@ impl FromStr for AllocationStateFilter { } pub async fn command_autoalloc( - gsettings: GlobalSettings, + gsettings: &GlobalSettings, opts: AutoAllocOpts, ) -> anyhow::Result<()> { match opts.subcmd { AutoAllocCommand::List => { let connection = get_client_connection(gsettings.server_directory()).await?; - print_allocation_queues(&gsettings, connection).await?; + print_allocation_queues(gsettings, connection).await?; } AutoAllocCommand::Add(opts) => { let connection = get_client_connection(gsettings.server_directory()).await?; @@ -166,11 +166,11 @@ pub async fn command_autoalloc( } AutoAllocCommand::Events(opts) => { let connection = get_client_connection(gsettings.server_directory()).await?; - print_event_log(&gsettings, connection, opts).await?; + print_event_log(gsettings, connection, opts).await?; } AutoAllocCommand::Info(opts) => { let connection = get_client_connection(gsettings.server_directory()).await?; - print_allocations(&gsettings, connection, opts).await?; + print_allocations(gsettings, connection, opts).await?; } AutoAllocCommand::Remove(opts) => { let connection = get_client_connection(gsettings.server_directory()).await?; diff --git a/crates/hyperqueue/src/client/commands/log.rs b/crates/hyperqueue/src/client/commands/log.rs index 18917d79e..3848dbf7b 100644 --- a/crates/hyperqueue/src/client/commands/log.rs +++ b/crates/hyperqueue/src/client/commands/log.rs @@ -73,7 +73,7 @@ impl FromStr for Channel { } } -pub fn command_log(gsettings: GlobalSettings, opts: LogOpts) -> anyhow::Result<()> { +pub fn command_log(gsettings: &GlobalSettings, opts: LogOpts) -> anyhow::Result<()> { let mut log_file = LogFile::open(&opts.filename)?; match opts.command { LogCommand::Summary(_) => { diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index 7ea155b3a..e71a3f572 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -33,6 +33,7 @@ use tako::messages::common::StdioDef; use crate::common::strutils::pluralize; use crate::worker::start::WORKER_EXTRA_PROCESS_PID; +use anyhow::Error; use colored::Color as Colorization; use colored::Colorize; use std::collections::BTreeSet; @@ -651,6 +652,10 @@ impl Output for CliOutput { println!("Summary: {}", descriptor.summary(true)); println!("Cpu Ids: {}", descriptor.full_describe()); } + + fn print_error(&self, error: Error) { + eprintln!("{:?}", error); + } } struct AllocationTimes { diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index 97ad937ae..b564504b4 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -14,6 +14,7 @@ use crate::transfer::messages::{ WaitForJobsResponse, WorkerInfo, }; use crate::JobTaskId; +use anyhow::Error; use chrono::{DateTime, Utc}; use serde_json; use serde_json::json; @@ -36,9 +37,6 @@ impl JsonOutput { } } -// TODO: output machine-readable data -// hq jobs, hq submit, hq cancel, -// time in UTC impl Output for JsonOutput { fn print_worker_list(&self, workers: Vec) { self.print(workers.into_iter().map(format_worker_info).collect()); @@ -146,6 +144,10 @@ impl Output for JsonOutput { fn print_hw(&self, descriptor: &ResourceDescriptor) { self.print(format_resource_descriptor(descriptor)); } + + fn print_error(&self, error: Error) { + self.print(json!({ "error": format!("{:?}", error) })) + } } fn format_allocation(allocation: Allocation) -> serde_json::Value { diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs index 916a5e54f..00cb0bc4b 100644 --- a/crates/hyperqueue/src/client/output/outputs.rs +++ b/crates/hyperqueue/src/client/output/outputs.rs @@ -67,4 +67,6 @@ pub trait Output { // Hw fn print_hw(&self, descriptor: &ResourceDescriptor); + + fn print_error(&self, error: anyhow::Error); } diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 9ec9a0d33..11a298b86 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -9,6 +9,7 @@ use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, LostWorkerReasonInfo, StatsResponse, WaitForJobsResponse, WorkerExitInfo, WorkerInfo, }; +use anyhow::Error; use std::path::Path; use std::time::Duration; use tako::common::resources::ResourceDescriptor; @@ -99,4 +100,8 @@ impl Output for Quiet { // Hw fn print_hw(&self, _descriptor: &ResourceDescriptor) {} + + fn print_error(&self, error: Error) { + eprintln!("{:?}", error); + } } From 79287185b986e4a366de05f804e6bddc141e64b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Fri, 17 Dec 2021 17:13:18 +0100 Subject: [PATCH 03/13] Improve testing of outputs --- tests/output/__init__.py | 0 tests/output/test_json.py | 115 ++++++++++++++++++++++++++++++ tests/output/test_quiet.py | 30 ++++++++ tests/requirements.txt | 1 + tests/test_output.py | 139 ------------------------------------- 5 files changed, 146 insertions(+), 139 deletions(-) create mode 100644 tests/output/__init__.py create mode 100644 tests/output/test_json.py create mode 100644 tests/output/test_quiet.py delete mode 100644 tests/test_output.py diff --git a/tests/output/__init__.py b/tests/output/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/output/test_json.py b/tests/output/test_json.py new file mode 100644 index 000000000..83b5d93bb --- /dev/null +++ b/tests/output/test_json.py @@ -0,0 +1,115 @@ +import datetime +import json +import socket +from typing import List + +import iso8601 +from schema import Schema + +from ..conftest import HqEnv + + +def parse_json_output(hq_env: HqEnv, command: List[str]): + return json.loads(hq_env.command(command)) + + +def test_print_worker_list(hq_env: HqEnv): + hq_env.start_server() + + for i in range(5): + hq_env.start_worker() + + output = parse_json_output(hq_env, ["--output-type=json", "worker", "list"]) + assert len(output) == 5 + + +def test_print_worker_info(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_worker() + output = parse_json_output(hq_env, ["--output-type=json", "worker", "info", "1"]) + + schema = Schema( + { + "configuration": { + "heartbeat_interval": 8.0, + "hostname": "worker1", + "idle_timeout": None, + "listen_address": str, + "log_dir": str, + "resources": {"cpus": [[0]], "generic": []}, + "time_limit": None, + "work_dir": str, + }, + "ended": None, + "id": 1, + } + ) + schema.validate(output) + + +def test_print_server_record(hq_env: HqEnv): + process = hq_env.start_server() + output = parse_json_output(hq_env, ["--output-type=json", "server", "info"]) + + schema = Schema( + { + "host": socket.gethostname(), + "hq_port": int, + "pid": process.pid, + "server_dir": hq_env.server_dir, + "start_date": str, + "version": str, + "worker_port": int + } + ) + schema.validate(output) + + time = iso8601.parse_date(output["start_date"]) + now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) + duration = now - time + assert abs(duration).total_seconds() > 0 + + assert 0 < int(output["hq_port"]) < 65536 + assert 0 < int(output["worker_port"]) < 65536 + + +def test_print_job_list(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_worker() + hq_env.command(["submit", "echo", "tt"]) + output = parse_json_output(hq_env, ["--output-type=json", "jobs"]) + assert isinstance(output, list) + assert isinstance(output[0], dict) + + +def test_print_job_detail(hq_env: HqEnv): + hq_env.start_server() + hq_env.command(["submit", "echo", "tt"]) + tasks, job_detail = parse_json_output(hq_env, ["--output-type=json", "job", "1"]) + assert isinstance(tasks, dict) + assert isinstance(job_detail, dict) + assert job_detail["job_detail"]["job_type"] == "Simple" + assert tasks["tasks_id"] == [0] + + +def test_print_job_tasks(hq_env: HqEnv): + hq_env.start_server() + hq_env.command(["submit", "echo", "tt", "--array=1-10"]) + tasks, job_detail = parse_json_output( + hq_env, ["--output-type=json", "job", "1", "--tasks"] + ) + assert isinstance(tasks, dict) + for key in tasks: + if isinstance(tasks[key], list): + assert len(tasks[key]) == 10 + + +def test_print_hw(hq_env: HqEnv): + hq_env.start_server() + output = parse_json_output(hq_env, ["--output-type=json", "worker", "hwdetect"]) + + schema = Schema({ + "cpus": [[int]], + "generic": list + }) + schema.validate(output) diff --git a/tests/output/test_quiet.py b/tests/output/test_quiet.py new file mode 100644 index 000000000..999989571 --- /dev/null +++ b/tests/output/test_quiet.py @@ -0,0 +1,30 @@ +from ..conftest import HqEnv +from ..utils import wait_for_job_state + + +def test_print_worker_list(hq_env: HqEnv): + hq_env.start_server() + for i in range(9): + hq_env.start_worker() + output = hq_env.command(["--output-type=quiet", "worker", "list"]) + output = output.splitlines(keepends=False) + assert output == [f"{id + 1} RUNNING" for id in range(9)] + + +def test_print_job_list(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_worker() + for i in range(9): + hq_env.command(["submit", "echo", "tt"]) + + wait_for_job_state(hq_env, list(range(1, 10)), "FINISHED") + + output = hq_env.command(["--output-type=quiet", "jobs"]) + output = output.splitlines(keepends=False) + assert output == [f"{id + 1} FINISHED" for id in range(9)] + + +def test_submit(hq_env: HqEnv): + hq_env.start_server() + output = hq_env.command(["--output-type=quiet", "submit", "echo", "tt"]) + assert output == "1\n" diff --git a/tests/requirements.txt b/tests/requirements.txt index bf8dde6e6..d0c233cf2 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -4,3 +4,4 @@ flake8==3.9.2 black==21.9b0 isort==5.9.3 iso8601==1.0.0 +schema==0.7.5 diff --git a/tests/test_output.py b/tests/test_output.py deleted file mode 100644 index f04896702..000000000 --- a/tests/test_output.py +++ /dev/null @@ -1,139 +0,0 @@ -import datetime -import json -import os -from typing import List - -import iso8601 - -from .conftest import HqEnv -from .utils import wait_for_job_state - -""" -Json output tests -""" - - -def hq_json_wrapper(hq_env: HqEnv, command: List[str]): - output = hq_env.command(command) - output = output.split("\n") - output = [json.loads(i) for i in output if i] - if len(output) == 1: - return output[0] - return output - - -def test_print_worker_list(hq_env: HqEnv): - hq_env.start_server() - hq_env.start_worker() - output = hq_json_wrapper(hq_env, ["--output-type=json", "worker", "list"]) - assert isinstance(output, list) - assert isinstance(output[0], dict) - assert len(output) == 1 - assert output[0]["configuration"]["hostname"] == "worker1" - - for i in range(13): - hq_env.start_worker() - - output = hq_json_wrapper(hq_env, ["--output-type=json", "worker", "list"]) - assert len(output) == 14 - - -def test_print_worker_info(hq_env: HqEnv): - hq_env.start_server() - hq_env.start_worker() - output = hq_json_wrapper(hq_env, ["--output-type=json", "worker", "info", "1"]) - assert isinstance(output, dict) - assert output["id"] == 1 - assert len(output) == 2 - assert output["worker_configuration"]["time_limit"] is None - - -def test_print_server_record(hq_env: HqEnv): - hq_env.start_server() - hq_env.start_worker() - output = hq_json_wrapper(hq_env, ["--output-type=json", "server", "info"]) - assert isinstance(output, dict) - assert output["host"] == os.uname()[1] - assert output["server_dir"] == hq_env.server_dir - time = iso8601.parse_date(output["start_date"]) - now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - duration = now - time - assert abs(duration).total_seconds() > 0 - - assert 0 < int(output["worker_port"]) < 65536 - - -def test_print_job_list(hq_env: HqEnv): - hq_env.start_server() - hq_env.start_worker() - hq_env.command(["submit", "echo", "tt"]) - output = hq_json_wrapper(hq_env, ["--output-type=json", "jobs"]) - assert isinstance(output, list) - assert isinstance(output[0], dict) - - -def test_print_job_detail(hq_env: HqEnv): - hq_env.start_server() - hq_env.command(["submit", "echo", "tt"]) - tasks, job_detail = hq_json_wrapper(hq_env, ["--output-type=json", "job", "1"]) - assert isinstance(tasks, dict) - assert isinstance(job_detail, dict) - assert job_detail["job_detail"]["job_type"] == "Simple" - assert tasks["tasks_id"] == [0] - - -def test_print_job_tasks(hq_env: HqEnv): - hq_env.start_server() - hq_env.command(["submit", "echo", "tt", "--array=1-10"]) - tasks, job_detail = hq_json_wrapper( - hq_env, ["--output-type=json", "job", "1", "--tasks"] - ) - assert isinstance(tasks, dict) - for key in tasks: - if isinstance(tasks[key], list): - assert len(tasks[key]) == 10 - - -def test_print_hw(hq_env: HqEnv): - hq_env.start_server() - output = hq_json_wrapper(hq_env, ["--output-type=json", "worker", "hwdetect"]) - assert isinstance(output, dict) - assert "cpus" in output.keys() - output = hq_json_wrapper( - hq_env, ["--output-type=json", "worker", "hwdetect", "--no-hyperthreading"] - ) - assert isinstance(output, dict) - assert "cpus" in output.keys() - - -""" -Quiet flag tests -""" - - -def test_print_worker_list_quiet(hq_env: HqEnv): - hq_env.start_server() - for i in range(9): - hq_env.start_worker() - output = hq_env.command(["--output-type=quiet", "worker", "list"]) - output = output.splitlines(keepends=False) - assert output == [f"{id + 1} RUNNING" for id in range(9)] - - -def test_print_job_list_quiet(hq_env: HqEnv): - hq_env.start_server() - hq_env.start_worker() - for i in range(9): - hq_env.command(["submit", "echo", "tt"]) - - wait_for_job_state(hq_env, list(range(1, 10)), "FINISHED") - - output = hq_env.command(["--output-type=quiet", "jobs"]) - output = output.splitlines(keepends=False) - assert output == [f"{id + 1} FINISHED" for id in range(9)] - - -def test_submit_quiet(hq_env: HqEnv): - hq_env.start_server() - output = hq_env.command(["--output-type=quiet", "submit", "echo", "tt"]) - assert output == "1\n" From 9110aa0bf58d56c817711299fb8570313b43498d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Fri, 17 Dec 2021 17:24:54 +0100 Subject: [PATCH 04/13] Specify the output of JSON job list --- crates/hyperqueue/src/client/output/json.rs | 113 ++++++++++++++------ tests/output/test_json.py | 18 +++- 2 files changed, 96 insertions(+), 35 deletions(-) diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index b564504b4..8e0e28f51 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -21,9 +21,10 @@ use serde_json::json; use std::path::Path; use std::time::Duration; use tako::common::resources::{ - GenericResourceDescriptor, GenericResourceDescriptorKind, ResourceDescriptor, + CpuRequest, GenericResourceDescriptor, GenericResourceDescriptorKind, ResourceDescriptor, }; use tako::messages::common::WorkerConfiguration; +use tako::messages::gateway::ResourceRequest; #[derive(Default)] pub struct JsonOutput; @@ -45,7 +46,6 @@ impl Output for JsonOutput { self.print(format_worker_info(worker_info)); } - // Server fn print_server_record(&self, server_dir: &Path, record: &AccessRecord) { let json = json!({ "server_dir": server_dir, @@ -68,9 +68,8 @@ impl Output for JsonOutput { })) } - // Jobs fn print_job_list(&self, tasks: Vec) { - self.print(json!(tasks)); + self.print(tasks.into_iter().map(format_job_info).collect()); } fn print_job_detail(&self, job: JobDetail, show_tasks: bool, worker_map: WorkerMap) { let worker = format_job_workers(&job, &worker_map); @@ -150,6 +149,76 @@ impl Output for JsonOutput { } } +fn format_job_info(info: JobInfo) -> serde_json::Value { + let JobInfo { + id, + name, + n_tasks, + counters, + resources: + ResourceRequest { + cpus, + generic, + min_time, + }, + } = info; + + json!({ + "id": id, + "name": name, + "task_count": n_tasks, + "task_stats": json!({ + "running": counters.n_running_tasks, + "finished": counters.n_finished_tasks, + "failed": counters.n_failed_tasks, + "canceled": counters.n_canceled_tasks, + "waiting": counters.n_waiting_tasks(n_tasks) + }), + "resources": json!({ + "cpus": format_cpu_request(cpus), + "generic": generic, + "min_time": format_duration(min_time) + }) + }) +} +fn format_cpu_request(request: CpuRequest) -> serde_json::Value { + let cpus = &match request { + CpuRequest::Compact(count) + | CpuRequest::ForceCompact(count) + | CpuRequest::Scatter(count) => Some(count), + CpuRequest::All => None, + }; + let name = match request { + CpuRequest::Compact(_) => "compact", + CpuRequest::ForceCompact(_) => "force-compact", + CpuRequest::Scatter(_) => "scatter", + CpuRequest::All => "all", + }; + json!({ + "type": name, + "cpus": cpus + }) +} + +fn format_queue_descriptor(descriptor: &QueueDescriptorData) -> serde_json::Value { + let manager = match descriptor.manager_type { + ManagerType::Pbs => "PBS", + ManagerType::Slurm => "Slurm", + }; + let info = &descriptor.info; + + json!({ + "manager": manager, + "additional_args": info.additional_args(), + "backlog": info.backlog(), + "workers_per_alloc": info.workers_per_alloc(), + "timelimit": format_duration(info.timelimit()), + "max_worker_count": info.max_worker_count(), + "worker_cpu_args": info.worker_cpu_args(), + "worker_resource_args": info.worker_resource_args(), + "name": descriptor.name + }) +} fn format_allocation(allocation: Allocation) -> serde_json::Value { let Allocation { id, @@ -187,7 +256,6 @@ fn format_allocation(allocation: Allocation) -> serde_json::Value { "workdir": working_dir }) } - fn format_allocation_event(event: AllocationEventHolder) -> serde_json::Value { let name = match &event.event { AllocationEvent::AllocationQueued(_) => "allocation-queued", @@ -218,14 +286,6 @@ fn format_allocation_event(event: AllocationEventHolder) -> serde_json::Value { }) } -fn format_resource_descriptor(descriptor: &ResourceDescriptor) -> serde_json::Value { - let ResourceDescriptor { cpus, generic } = descriptor; - json!({ - "cpus": cpus, - "generic": generic.iter().map(format_generic_resource).collect::>() - }) -} - fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value { let WorkerInfo { id, @@ -262,6 +322,13 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value { })) }) } +fn format_resource_descriptor(descriptor: &ResourceDescriptor) -> serde_json::Value { + let ResourceDescriptor { cpus, generic } = descriptor; + json!({ + "cpus": cpus, + "generic": generic.iter().map(format_generic_resource).collect::>() + }) +} fn format_generic_resource(resource: &GenericResourceDescriptor) -> serde_json::Value { json!({ "name": resource.name, @@ -281,26 +348,6 @@ fn format_generic_resource(resource: &GenericResourceDescriptor) -> serde_json:: }) } -fn format_queue_descriptor(descriptor: &QueueDescriptorData) -> serde_json::Value { - let manager = match descriptor.manager_type { - ManagerType::Pbs => "PBS", - ManagerType::Slurm => "Slurm", - }; - let info = &descriptor.info; - - json!({ - "manager": manager, - "additional_args": info.additional_args(), - "backlog": info.backlog(), - "workers_per_alloc": info.workers_per_alloc(), - "timelimit": format_duration(info.timelimit()), - "max_worker_count": info.max_worker_count(), - "worker_cpu_args": info.worker_cpu_args(), - "worker_resource_args": info.worker_resource_args(), - "name": descriptor.name - }) -} - fn format_duration(duration: Duration) -> serde_json::Value { let value = duration.as_secs() as f64 + duration.subsec_nanos() as f64 * 1e-9; json!(value) diff --git a/tests/output/test_json.py b/tests/output/test_json.py index 83b5d93bb..e77638771 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -78,8 +78,22 @@ def test_print_job_list(hq_env: HqEnv): hq_env.start_worker() hq_env.command(["submit", "echo", "tt"]) output = parse_json_output(hq_env, ["--output-type=json", "jobs"]) - assert isinstance(output, list) - assert isinstance(output[0], dict) + + schema = Schema([{ + "id": 1, + "name": "echo", + "resources": { + "cpus": { + "cpus": 1, + "type": "compact" + }, + "generic": [], + "min_time": 0.0 + }, + "task_count": 1, + "task_stats": {"canceled": 0, "failed": 0, "finished": 1, "running": 0, "waiting": 0} + }]) + schema.validate(output) def test_print_job_detail(hq_env: HqEnv): From f9d405abf0719247c145dfd5e78bb79441834103 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Fri, 17 Dec 2021 17:47:19 +0100 Subject: [PATCH 05/13] Remove `ResourceRequest` from `JobDetail` It is also contained in `JobInfo`, so the information was duplicated --- crates/hyperqueue/src/client/output/cli.rs | 2 +- crates/hyperqueue/src/server/job.rs | 9 --------- crates/hyperqueue/src/transfer/messages.rs | 1 - 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index e71a3f572..94bba56e8 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -305,7 +305,7 @@ impl Output for CliOutput { format_job_workers(&job, &worker_map).cell(), ]); - let resources = format_resource_request(&job.resources); + let resources = format_resource_request(&job.info.resources); rows.push(vec![ "Resources".cell().bold(true), if job.pin { diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs index e1220d7b3..72557777a 100644 --- a/crates/hyperqueue/src/server/job.rs +++ b/crates/hyperqueue/src/server/job.rs @@ -182,7 +182,6 @@ impl Job { info: self.make_job_info(), job_type: self.job_type.clone(), program_def: self.program_def.clone(), - resources: self.resources.clone(), tasks: if include_tasks { match &self.state { JobState::SingleTask(s) => { @@ -206,14 +205,6 @@ impl Job { } pub fn make_job_info(&self) -> JobInfo { - /*let error = match &self.state { - JobState::Waiting => (JobStatus::Waiting, None), - JobState::Finished => (JobStatus::Finished, None), - JobState::Failed(e) => (JobStatus::Failed, Some(e.clone())), - JobState::Running => (JobStatus::Running, None), - JobState::Canceled => (JobStatus::Canceled, None), - };*/ - JobInfo { id: self.job_id, name: self.name.clone(), diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index e95d7b132..95fd53f10 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -240,7 +240,6 @@ pub struct JobDetail { pub job_type: JobType, pub program_def: ProgramDefinition, pub tasks: Vec, - pub resources: ResourceRequest, pub pin: bool, pub max_fails: Option, pub priority: tako::Priority, From f029518630ab5184520997baa686c6a8ab4e6c32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Fri, 17 Dec 2021 17:56:47 +0100 Subject: [PATCH 06/13] Improve JSON job output --- crates/hyperqueue/src/client/output/cli.rs | 154 ++++++++-------- crates/hyperqueue/src/client/output/json.rs | 169 ++++++++++++------ .../hyperqueue/src/client/output/outputs.rs | 9 - crates/hyperqueue/src/client/output/quiet.rs | 52 ++---- crates/hyperqueue/src/server/job.rs | 6 +- tests/output/test_json.py | 89 +++++---- 6 files changed, 273 insertions(+), 206 deletions(-) diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index 94bba56e8..1e8662664 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -66,6 +66,83 @@ impl CliOutput { log::error!("Cannot print table to stdout: {:?}", e); } } + + fn print_job_tasks( + &self, + completion_date_or_now: DateTime, + mut tasks: Vec, + show_tasks: bool, + counters: &JobTaskCounters, + worker_map: &WorkerMap, + ) { + tasks.sort_unstable_by_key(|t| t.task_id); + let make_error_row = |t: &JobTaskInfo| match &t.state { + JobTaskState::Failed { worker, error, .. } => Some(vec![ + t.task_id.cell(), + format_worker(*worker, worker_map).cell(), + error.to_owned().cell().foreground_color(Some(Color::Red)), + ]), + _ => None, + }; + + if show_tasks { + let rows: Vec<_> = tasks + .iter() + .map(|t| { + vec![ + t.task_id.cell(), + task_status_to_cell(task_status(&t.state)), + match t.state.get_worker() { + Some(worker) => format_worker(worker, worker_map), + _ => "", + } + .cell(), + format_task_duration(&completion_date_or_now, &t.state).cell(), + match &t.state { + JobTaskState::Failed { error, .. } => { + error.to_owned().cell().foreground_color(Some(Color::Red)) + } + _ => "".cell(), + }, + ] + }) + .collect(); + let table = rows.table().title(vec![ + "Task Id".cell().bold(true), + "State".cell().bold(true), + "Worker".cell().bold(true), + "Time".cell().bold(true), + "Message".cell().bold(true), + ]); + self.print_table(table); + } else { + const SHOWN_TASKS: usize = 5; + let fail_rows: Vec<_> = tasks + .iter() + .filter_map(make_error_row) + .take(SHOWN_TASKS) + .collect(); + + if !fail_rows.is_empty() { + let count = fail_rows.len() as JobTaskCount; + let table = fail_rows.table().title(vec![ + "Task Id".cell().bold(true), + "Worker".cell().bold(true), + "Error".cell().bold(true), + ]); + self.print_table(table); + + if count < counters.n_failed_tasks { + println!( + "{} tasks failed. ({} shown)", + counters.n_failed_tasks, count + ); + } else { + println!("{} tasks failed.", counters.n_failed_tasks); + } + } + } + } } impl Output for CliOutput { @@ -392,83 +469,6 @@ impl Output for CliOutput { } } - fn print_job_tasks( - &self, - completion_date_or_now: DateTime, - mut tasks: Vec, - show_tasks: bool, - counters: &JobTaskCounters, - worker_map: &WorkerMap, - ) { - tasks.sort_unstable_by_key(|t| t.task_id); - let make_error_row = |t: &JobTaskInfo| match &t.state { - JobTaskState::Failed { worker, error, .. } => Some(vec![ - t.task_id.cell(), - format_worker(*worker, worker_map).cell(), - error.to_owned().cell().foreground_color(Some(Color::Red)), - ]), - _ => None, - }; - - if show_tasks { - let rows: Vec<_> = tasks - .iter() - .map(|t| { - vec![ - t.task_id.cell(), - task_status_to_cell(task_status(&t.state)), - match t.state.get_worker() { - Some(worker) => format_worker(worker, worker_map), - _ => "", - } - .cell(), - format_task_duration(&completion_date_or_now, &t.state).cell(), - match &t.state { - JobTaskState::Failed { error, .. } => { - error.to_owned().cell().foreground_color(Some(Color::Red)) - } - _ => "".cell(), - }, - ] - }) - .collect(); - let table = rows.table().title(vec![ - "Task Id".cell().bold(true), - "State".cell().bold(true), - "Worker".cell().bold(true), - "Time".cell().bold(true), - "Message".cell().bold(true), - ]); - self.print_table(table); - } else { - const SHOWN_TASKS: usize = 5; - let fail_rows: Vec<_> = tasks - .iter() - .filter_map(make_error_row) - .take(SHOWN_TASKS) - .collect(); - - if !fail_rows.is_empty() { - let count = fail_rows.len() as JobTaskCount; - let table = fail_rows.table().title(vec![ - "Task Id".cell().bold(true), - "Worker".cell().bold(true), - "Error".cell().bold(true), - ]); - self.print_table(table); - - if count < counters.n_failed_tasks { - println!( - "{} tasks failed. ({} shown)", - counters.n_failed_tasks, count - ); - } else { - println!("{} tasks failed.", counters.n_failed_tasks); - } - } - } - } - fn print_job_wait(&self, duration: Duration, response: &WaitForJobsResponse) { let mut msgs = vec![]; diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index 8e0e28f51..867bac1cf 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -1,30 +1,30 @@ +use std::path::Path; +use std::time::Duration; + +use anyhow::Error; +use chrono::{DateTime, Utc}; +use serde_json; +use serde_json::json; + +use tako::common::resources::{ + CpuRequest, GenericResourceDescriptor, GenericResourceDescriptorKind, ResourceDescriptor, +}; +use tako::messages::common::{ProgramDefinition, WorkerConfiguration}; +use tako::messages::gateway::ResourceRequest; + use crate::client::job::WorkerMap; -use crate::client::output::cli::{format_job_workers, format_task_duration}; use crate::client::output::outputs::Output; -use crate::client::status::{task_status, Status}; use crate::common::manager::info::ManagerType; use crate::common::serverdir::AccessRecord; use crate::server::autoalloc::{ Allocation, AllocationEvent, AllocationEventHolder, AllocationStatus, }; -use crate::server::job::{JobTaskCounters, JobTaskInfo}; +use crate::server::job::JobTaskState; use crate::stream::reader::logfile::Summary; use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, QueueDescriptorData, StatsResponse, WaitForJobsResponse, WorkerInfo, }; -use crate::JobTaskId; -use anyhow::Error; -use chrono::{DateTime, Utc}; -use serde_json; -use serde_json::json; -use std::path::Path; -use std::time::Duration; -use tako::common::resources::{ - CpuRequest, GenericResourceDescriptor, GenericResourceDescriptorKind, ResourceDescriptor, -}; -use tako::messages::common::WorkerConfiguration; -use tako::messages::gateway::ResourceRequest; #[derive(Default)] pub struct JsonOutput; @@ -71,50 +71,113 @@ impl Output for JsonOutput { fn print_job_list(&self, tasks: Vec) { self.print(tasks.into_iter().map(format_job_info).collect()); } - fn print_job_detail(&self, job: JobDetail, show_tasks: bool, worker_map: WorkerMap) { - let worker = format_job_workers(&job, &worker_map); - let json = json!({ - "job_detail": job, - "worker": worker, + fn print_job_detail(&self, job: JobDetail, show_tasks: bool, _worker_map: WorkerMap) { + let JobDetail { + info, + job_type: _, + program_def: + ProgramDefinition { + args, + env, + stdout, + stderr, + cwd, + }, + tasks, + pin, + max_fails, + priority, + time_limit, + submission_date, + completion_date_or_now, + } = job; + + let finished_at = if info.counters.is_terminated(info.n_tasks) { + Some(completion_date_or_now) + } else { + None + }; + + let mut json = json!({ + "info": format_job_info(info), + "program": json!({ + "args": args.into_iter().map(|args| args.to_string()).collect::>(), + "env": env, + "cwd": cwd, + "stderr": stderr, + "stdout": stdout, + }), + "pin": pin, + "max_fails": max_fails, + "priority": priority, + "time_limit": time_limit.map(format_duration), + "started_at": format_datetime(submission_date), + "finished_at": finished_at.map(format_datetime) }); - if !job.tasks.is_empty() { - self.print_job_tasks( - job.completion_date_or_now, - job.tasks, - show_tasks, - &job.info.counters, - &worker_map, - ); + if show_tasks { + json["tasks"] = tasks + .into_iter() + .map(|task| { + let state = &match task.state { + JobTaskState::Waiting => "waiting", + JobTaskState::Running { .. } => "running", + JobTaskState::Finished { .. } => "finished", + JobTaskState::Failed { .. } => "failed", + JobTaskState::Canceled => "canceled", + }; + let mut data = json!({ + "id": task.task_id, + "state": state, + }); + match task.state { + JobTaskState::Running { start_date, worker } => { + data["worker"] = worker.as_num().into(); + data["started_at"] = format_datetime(start_date) + } + JobTaskState::Finished { + start_date, + worker, + end_date, + } => { + data["worker"] = worker.as_num().into(); + data["started_at"] = format_datetime(start_date); + data["finished_at"] = format_datetime(end_date); + } + JobTaskState::Failed { + start_date, + end_date, + worker, + error, + } => { + data["worker"] = worker.as_num().into(); + data["started_at"] = format_datetime(start_date); + data["finished_at"] = format_datetime(end_date); + data["error"] = error.into(); + } + _ => {} + }; + data + }) + .collect(); } self.print(json); } - fn print_job_tasks( - &self, - completion_date_or_now: chrono::DateTime, - mut tasks: Vec, - _show_tasks: bool, - counters: &JobTaskCounters, - worker_map: &WorkerMap, - ) { - tasks.sort_unstable_by_key(|t| t.task_id); - - let output_tasks_duration: Vec = tasks - .iter() - .map(|t| format_task_duration(&completion_date_or_now, &t.state)) - .collect(); - let tasks_id: Vec = tasks.iter().map(|t| t.task_id).collect(); - let tasks_state: Vec = tasks.iter().map(|t| task_status(&t.state)).collect(); - let json = json!({ - "tasks_state": tasks_state, - "tasks_duration": output_tasks_duration, - "tasks_id": tasks_id, - "counters": counters, - "worker": worker_map, - }); - self.print(json); + fn print_job_wait(&self, duration: Duration, response: &WaitForJobsResponse) { + let WaitForJobsResponse { + finished, + failed, + canceled, + invalid, + } = response; + self.print(json!({ + "duration": format_duration(duration), + "finished": finished, + "failed": failed, + "canceled": canceled, + "invalid": invalid, + })) } - fn print_job_wait(&self, _duration: Duration, _response: &WaitForJobsResponse) {} fn print_summary(&self, filename: &Path, summary: Summary) { let json = json!({ diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs index 00cb0bc4b..129de5c0f 100644 --- a/crates/hyperqueue/src/client/output/outputs.rs +++ b/crates/hyperqueue/src/client/output/outputs.rs @@ -5,7 +5,6 @@ use crate::transfer::messages::{ use crate::client::job::WorkerMap; use crate::server::autoalloc::{Allocation, AllocationEventHolder}; -use crate::server::job::{JobTaskCounters, JobTaskInfo}; use crate::stream::reader::logfile::Summary; use std::path::Path; use std::str::FromStr; @@ -47,14 +46,6 @@ pub trait Output { fn print_job_submitted(&self, job: JobDetail); fn print_job_list(&self, tasks: Vec); fn print_job_detail(&self, job: JobDetail, show_tasks: bool, worker_map: WorkerMap); - fn print_job_tasks( - &self, - completion_date_or_now: chrono::DateTime, - tasks: Vec, - show_tasks: bool, - counters: &JobTaskCounters, - worker_map: &WorkerMap, - ); fn print_job_wait(&self, duration: Duration, response: &WaitForJobsResponse); // Log diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 11a298b86..52300bc63 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -1,28 +1,20 @@ +use std::path::Path; +use std::time::Duration; + +use anyhow::Error; + +use tako::common::resources::ResourceDescriptor; + use crate::client::job::WorkerMap; use crate::client::output::outputs::Output; use crate::client::status::{job_status, Status}; use crate::common::serverdir::AccessRecord; use crate::server::autoalloc::{Allocation, AllocationEventHolder}; -use crate::server::job::{JobTaskCounters, JobTaskInfo}; use crate::stream::reader::logfile::Summary; use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, LostWorkerReasonInfo, StatsResponse, WaitForJobsResponse, WorkerExitInfo, WorkerInfo, }; -use anyhow::Error; -use std::path::Path; -use std::time::Duration; -use tako::common::resources::ResourceDescriptor; - -fn to_str(stat: &Status) -> anyhow::Result<&str> { - Ok(match stat { - Status::Waiting => "WAITING", - Status::Running => "RUNNING", - Status::Finished => "FINISHED", - Status::Failed => "FAILED", - Status::Canceled => "CANCELED", - }) -} #[derive(Default)] pub struct Quiet; @@ -68,27 +60,11 @@ impl Output for Quiet { fn print_job_list(&self, tasks: Vec) { for task in tasks { let status = job_status(&task); - - println!( - "{} {}", - task.id, - String::from(to_str(&status).unwrap_or("ERROR")) - ) + println!("{} {}", task.id, format_status(&status)) } } fn print_job_detail(&self, _job: JobDetail, _show_tasks: bool, _worker_map: WorkerMap) {} - fn print_job_tasks( - &self, - _completion_date_or_now: chrono::DateTime, - _tasks: Vec, - _show_tasks: bool, - _counters: &JobTaskCounters, - _worker_map: &WorkerMap, - ) { - } - fn print_job_wait(&self, _duration: Duration, _response: &WaitForJobsResponse) { - println!("\n") - } + fn print_job_wait(&self, _duration: Duration, _response: &WaitForJobsResponse) {} // Log fn print_summary(&self, _filename: &Path, _summary: Summary) {} @@ -105,3 +81,13 @@ impl Output for Quiet { eprintln!("{:?}", error); } } + +fn format_status(status: &Status) -> &str { + match status { + Status::Waiting => "WAITING", + Status::Running => "RUNNING", + Status::Finished => "FINISHED", + Status::Failed => "FAILED", + Status::Canceled => "CANCELED", + } +} diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs index 72557777a..f9531ffda 100644 --- a/crates/hyperqueue/src/server/job.rs +++ b/crates/hyperqueue/src/server/job.rs @@ -87,6 +87,10 @@ impl JobTaskCounters { pub fn has_unsuccessful_tasks(&self) -> bool { self.n_failed_tasks > 0 || self.n_canceled_tasks > 0 } + + pub fn is_terminated(&self, n_tasks: JobTaskCount) -> bool { + self.n_running_tasks == 0 && self.n_waiting_tasks(n_tasks) == 0 + } } pub struct Job { @@ -223,7 +227,7 @@ impl Job { } pub fn is_terminated(&self) -> bool { - self.counters.n_running_tasks == 0 && self.counters.n_waiting_tasks(self.n_tasks()) == 0 + self.counters.is_terminated(self.n_tasks()) } pub fn get_task_state_mut( diff --git a/tests/output/test_json.py b/tests/output/test_json.py index e77638771..40627fc5e 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -59,7 +59,7 @@ def test_print_server_record(hq_env: HqEnv): "server_dir": hq_env.server_dir, "start_date": str, "version": str, - "worker_port": int + "worker_port": int, } ) schema.validate(output) @@ -79,51 +79,74 @@ def test_print_job_list(hq_env: HqEnv): hq_env.command(["submit", "echo", "tt"]) output = parse_json_output(hq_env, ["--output-type=json", "jobs"]) - schema = Schema([{ - "id": 1, - "name": "echo", - "resources": { - "cpus": { - "cpus": 1, - "type": "compact" - }, - "generic": [], - "min_time": 0.0 - }, - "task_count": 1, - "task_stats": {"canceled": 0, "failed": 0, "finished": 1, "running": 0, "waiting": 0} - }]) + schema = Schema( + [ + { + "id": 1, + "name": "echo", + "resources": { + "cpus": {"cpus": 1, "type": "compact"}, + "generic": [], + "min_time": 0.0, + }, + "task_count": 1, + "task_stats": { + "canceled": 0, + "failed": 0, + "finished": 1, + "running": 0, + "waiting": 0, + }, + } + ] + ) schema.validate(output) def test_print_job_detail(hq_env: HqEnv): hq_env.start_server() hq_env.command(["submit", "echo", "tt"]) - tasks, job_detail = parse_json_output(hq_env, ["--output-type=json", "job", "1"]) - assert isinstance(tasks, dict) - assert isinstance(job_detail, dict) - assert job_detail["job_detail"]["job_type"] == "Simple" - assert tasks["tasks_id"] == [0] + output = parse_json_output(hq_env, ["--output-type=json", "job", "1"]) + schema = Schema( + { + "info": { + "id": 1, + "name": "echo", + "resources": dict, + "task_count": 1, + "task_stats": dict, + }, + "finished_at": None, + "max_fails": None, + "pin": False, + "priority": 0, + "program": { + "args": ["echo", "tt"], + "env": {}, + "cwd": str, + "stdout": dict, + "stderr": dict, + }, + "started_at": str, + "time_limit": None, + } + ) + schema.validate(output) -def test_print_job_tasks(hq_env: HqEnv): + +def test_print_job_with_tasks(hq_env: HqEnv): hq_env.start_server() - hq_env.command(["submit", "echo", "tt", "--array=1-10"]) - tasks, job_detail = parse_json_output( - hq_env, ["--output-type=json", "job", "1", "--tasks"] - ) - assert isinstance(tasks, dict) - for key in tasks: - if isinstance(tasks[key], list): - assert len(tasks[key]) == 10 + hq_env.command(["submit", "echo", "tt", "--array=1-4"]) + output = parse_json_output(hq_env, ["--output-type=json", "job", "1", "--tasks"]) + + schema = Schema([{"id": id, "state": "waiting"} for id in range(1, 5)]) + schema.validate(output["tasks"]) def test_print_hw(hq_env: HqEnv): hq_env.start_server() output = parse_json_output(hq_env, ["--output-type=json", "worker", "hwdetect"]) - schema = Schema({ - "cpus": [[int]], - "generic": list - }) + schema = Schema({"cpus": [[int]], "generic": list}) schema.validate(output) From a497cb3172ad7c8e600d7df1193c199cd7ad8f8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Dec 2021 09:34:26 +0100 Subject: [PATCH 07/13] Enable specifying output type through an environment variable --- crates/hyperqueue/Cargo.toml | 2 +- crates/hyperqueue/src/bin/hq.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/hyperqueue/Cargo.toml b/crates/hyperqueue/Cargo.toml index e0dcfd8c3..1abc66d32 100644 --- a/crates/hyperqueue/Cargo.toml +++ b/crates/hyperqueue/Cargo.toml @@ -13,7 +13,7 @@ env_logger = "0.9" futures = "0.3" tokio = { version = "1.14", features = ["full"] } tokio-util = { version = "0.6", features = ["codec"] } -clap = { version = "=3.0.0-rc.4", features = ["derive"] } +clap = { version = "=3.0.0-rc.4", features = ["derive", "env"] } rmp-serde = "0.15.4" rmpv = { version = "1.0", features = ["with-serde"] } serde = { version = "1.0", features = ["derive"] } diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index d93ce78cc..fa68ebb41 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -54,7 +54,7 @@ struct CommonOpts { colors: ColorPolicy, /// Output selection - #[clap(long, default_value = "cli", possible_values = &["cli", "json", "quiet"])] + #[clap(long, env = "HQ_OUTPUT_TYPE", default_value = "cli", possible_values = &["cli", "json", "quiet"])] output_type: Outputs, } From 8c42204c93b42688368666770684ccb6af4237b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Dec 2021 10:59:38 +0100 Subject: [PATCH 08/13] Fix outputting program environment variables in JSON --- crates/hyperqueue/src/client/output/json.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index 867bac1cf..beb084c4b 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -25,6 +25,7 @@ use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, QueueDescriptorData, StatsResponse, WaitForJobsResponse, WorkerInfo, }; +use crate::Map; #[derive(Default)] pub struct JsonOutput; @@ -102,7 +103,7 @@ impl Output for JsonOutput { "info": format_job_info(info), "program": json!({ "args": args.into_iter().map(|args| args.to_string()).collect::>(), - "env": env, + "env": env.into_iter().map(|(key, value)| (key.to_string(), value.to_string())).collect::>(), "cwd": cwd, "stderr": stderr, "stdout": stdout, From 7d4584a7f8931a97dba5c51f066c8cf87f998f04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Dec 2021 11:00:02 +0100 Subject: [PATCH 09/13] Rename --output-type to --output-mode --- crates/hyperqueue/src/bin/hq.rs | 8 ++++---- tests/output/test_json.py | 16 +++++++++------- tests/output/test_quiet.py | 6 +++--- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index fa68ebb41..620d441b9 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -53,9 +53,9 @@ struct CommonOpts { #[clap(long, default_value = "auto", possible_values = &["auto", "always", "never"])] colors: ColorPolicy, - /// Output selection - #[clap(long, env = "HQ_OUTPUT_TYPE", default_value = "cli", possible_values = &["cli", "json", "quiet"])] - output_type: Outputs, + /// How should the output of the command be formatted. + #[clap(long, env = "HQ_OUTPUT_MODE", default_value = "cli", possible_values = &["cli", "json", "quiet"])] + output_mode: Outputs, } // Root CLI options @@ -493,7 +493,7 @@ fn make_global_settings(opts: CommonOpts) -> GlobalSettings { }; // Create Printer - let printer: Box = match opts.output_type { + let printer: Box = match opts.output_mode { Outputs::CLI => { // Set colored settings for CLI match color_policy { diff --git a/tests/output/test_json.py b/tests/output/test_json.py index 40627fc5e..5c3ceb99c 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -6,6 +6,7 @@ import iso8601 from schema import Schema +from utils import wait_for_job_state from ..conftest import HqEnv @@ -19,14 +20,14 @@ def test_print_worker_list(hq_env: HqEnv): for i in range(5): hq_env.start_worker() - output = parse_json_output(hq_env, ["--output-type=json", "worker", "list"]) + output = parse_json_output(hq_env, ["--output-mode=json", "worker", "list"]) assert len(output) == 5 def test_print_worker_info(hq_env: HqEnv): hq_env.start_server() hq_env.start_worker() - output = parse_json_output(hq_env, ["--output-type=json", "worker", "info", "1"]) + output = parse_json_output(hq_env, ["--output-mode=json", "worker", "info", "1"]) schema = Schema( { @@ -49,7 +50,7 @@ def test_print_worker_info(hq_env: HqEnv): def test_print_server_record(hq_env: HqEnv): process = hq_env.start_server() - output = parse_json_output(hq_env, ["--output-type=json", "server", "info"]) + output = parse_json_output(hq_env, ["--output-mode=json", "server", "info"]) schema = Schema( { @@ -77,7 +78,8 @@ def test_print_job_list(hq_env: HqEnv): hq_env.start_server() hq_env.start_worker() hq_env.command(["submit", "echo", "tt"]) - output = parse_json_output(hq_env, ["--output-type=json", "jobs"]) + wait_for_job_state(hq_env, 1, "FINISHED") + output = parse_json_output(hq_env, ["--output-mode=json", "jobs"]) schema = Schema( [ @@ -106,7 +108,7 @@ def test_print_job_list(hq_env: HqEnv): def test_print_job_detail(hq_env: HqEnv): hq_env.start_server() hq_env.command(["submit", "echo", "tt"]) - output = parse_json_output(hq_env, ["--output-type=json", "job", "1"]) + output = parse_json_output(hq_env, ["--output-mode=json", "job", "1"]) schema = Schema( { @@ -138,7 +140,7 @@ def test_print_job_detail(hq_env: HqEnv): def test_print_job_with_tasks(hq_env: HqEnv): hq_env.start_server() hq_env.command(["submit", "echo", "tt", "--array=1-4"]) - output = parse_json_output(hq_env, ["--output-type=json", "job", "1", "--tasks"]) + output = parse_json_output(hq_env, ["--output-mode=json", "job", "1", "--tasks"]) schema = Schema([{"id": id, "state": "waiting"} for id in range(1, 5)]) schema.validate(output["tasks"]) @@ -146,7 +148,7 @@ def test_print_job_with_tasks(hq_env: HqEnv): def test_print_hw(hq_env: HqEnv): hq_env.start_server() - output = parse_json_output(hq_env, ["--output-type=json", "worker", "hwdetect"]) + output = parse_json_output(hq_env, ["--output-mode=json", "worker", "hwdetect"]) schema = Schema({"cpus": [[int]], "generic": list}) schema.validate(output) diff --git a/tests/output/test_quiet.py b/tests/output/test_quiet.py index 999989571..b08199415 100644 --- a/tests/output/test_quiet.py +++ b/tests/output/test_quiet.py @@ -6,7 +6,7 @@ def test_print_worker_list(hq_env: HqEnv): hq_env.start_server() for i in range(9): hq_env.start_worker() - output = hq_env.command(["--output-type=quiet", "worker", "list"]) + output = hq_env.command(["--output-mode=quiet", "worker", "list"]) output = output.splitlines(keepends=False) assert output == [f"{id + 1} RUNNING" for id in range(9)] @@ -19,12 +19,12 @@ def test_print_job_list(hq_env: HqEnv): wait_for_job_state(hq_env, list(range(1, 10)), "FINISHED") - output = hq_env.command(["--output-type=quiet", "jobs"]) + output = hq_env.command(["--output-mode=quiet", "jobs"]) output = output.splitlines(keepends=False) assert output == [f"{id + 1} FINISHED" for id in range(9)] def test_submit(hq_env: HqEnv): hq_env.start_server() - output = hq_env.command(["--output-type=quiet", "submit", "echo", "tt"]) + output = hq_env.command(["--output-mode=quiet", "submit", "echo", "tt"]) assert output == "1\n" From df3212ccfa377a3381a52ee7d7e5b2479deea3b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Dec 2021 11:13:12 +0100 Subject: [PATCH 10/13] Print autoalloc queues as a list in JSON output mode --- crates/hyperqueue/src/client/output/json.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index beb084c4b..fbb7cf79f 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -17,7 +17,7 @@ use crate::client::output::outputs::Output; use crate::common::manager::info::ManagerType; use crate::common::serverdir::AccessRecord; use crate::server::autoalloc::{ - Allocation, AllocationEvent, AllocationEventHolder, AllocationStatus, + Allocation, AllocationEvent, AllocationEventHolder, AllocationStatus, DescriptorId, }; use crate::server::job::JobTaskState; use crate::stream::reader::logfile::Summary; @@ -189,10 +189,13 @@ impl Output for JsonOutput { } fn print_autoalloc_queues(&self, info: AutoAllocListResponse) { + let mut descriptors: Vec<_> = info.descriptors.into_iter().collect(); + descriptors.sort_by_key(|descriptor| descriptor.0); + self.print( - info.descriptors + descriptors .iter() - .map(|(key, descriptor)| (key.to_string(), format_queue_descriptor(descriptor))) + .map(|(key, descriptor)| format_queue_descriptor(*key, descriptor)) .collect(), ); } @@ -264,7 +267,10 @@ fn format_cpu_request(request: CpuRequest) -> serde_json::Value { }) } -fn format_queue_descriptor(descriptor: &QueueDescriptorData) -> serde_json::Value { +fn format_queue_descriptor( + id: DescriptorId, + descriptor: &QueueDescriptorData, +) -> serde_json::Value { let manager = match descriptor.manager_type { ManagerType::Pbs => "PBS", ManagerType::Slurm => "Slurm", @@ -272,6 +278,8 @@ fn format_queue_descriptor(descriptor: &QueueDescriptorData) -> serde_json::Valu let info = &descriptor.info; json!({ + "id": id, + "name": descriptor.name, "manager": manager, "additional_args": info.additional_args(), "backlog": info.backlog(), @@ -280,7 +288,6 @@ fn format_queue_descriptor(descriptor: &QueueDescriptorData) -> serde_json::Valu "max_worker_count": info.max_worker_count(), "worker_cpu_args": info.worker_cpu_args(), "worker_resource_args": info.worker_resource_args(), - "name": descriptor.name }) } fn format_allocation(allocation: Allocation) -> serde_json::Value { From f5bd4e982c9a992e31105027d4694d53613be549 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Dec 2021 11:13:26 +0100 Subject: [PATCH 11/13] Fix typo in allocation event JSON output name --- crates/hyperqueue/src/client/output/json.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index fbb7cf79f..6fddc98f2 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -300,7 +300,7 @@ fn format_allocation(allocation: Allocation) -> serde_json::Value { } = allocation; let status_name = match &status { - AllocationStatus::Queued => "queue", + AllocationStatus::Queued => "queued", AllocationStatus::Running { .. } => "running", AllocationStatus::Finished { .. } => "finished", AllocationStatus::Failed { .. } => "failed", From 2902e0a99b416c9da888861ec89b13802e4e2389 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Dec 2021 11:16:19 +0100 Subject: [PATCH 12/13] Document output modes --- CHANGELOG.md | 9 + docs/cli/output-mode.md | 278 ++++++++++++++++++ .../cli-shortcuts.md => cli/shortcuts.md} | 0 docs/deployment/allocation.md | 2 +- docs/deployment/worker.md | 4 +- docs/jobs/arrays.md | 2 +- docs/jobs/jobs.md | 4 +- mkdocs.yml | 5 +- 8 files changed, 296 insertions(+), 8 deletions(-) create mode 100644 docs/cli/output-mode.md rename docs/{tips/cli-shortcuts.md => cli/shortcuts.md} (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index dbd7b5442..41cd11a90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# DEV + +## New features + +### CLI +You can now force HyperQueue commands to output machine-readable data using the `--output-mode` flag +available to all HyperQueue commands. Notably, you can output data of the commands as JSON. You can +find more information in the [documentation](https://it4innovations.github.io/hyperqueue/stable/cli/output-mode/). + # v0.7.0 ## Fixes diff --git a/docs/cli/output-mode.md b/docs/cli/output-mode.md new file mode 100644 index 000000000..3352b4c16 --- /dev/null +++ b/docs/cli/output-mode.md @@ -0,0 +1,278 @@ +By default, HyperQueue CLI commands output information in a human-readable way, usually in the form +of a table. If you want to use the CLI commands programmatically, HyperQueue offers two additional +output modes that are designed to be machine-readable. + +You can change the output type of any HyperQueue CLI command either by using the `--output-mode` flag +or by setting the `HQ_OUTPUT_MODE` environment variable. + +=== "Flag" + + ```bash + $ hq --output-mode=json jobs + ``` + +=== "Environment variable" + + ``` bash + $ HQ_OUTPUT_MODE=json hq jobs + ``` + +Currently, there are three output modes available. The default, human-readable `cli` mode, and then +two machine-readable modes, [JSON](#json) and [Quiet](#quiet). + +!!! important + Each machine-readable mode supports a set of commands. You can also use commands that are not + listed here, but their output might be unstable, or they might not output anything for a given + output mode. + +## JSON +The `json` output mode is intended to provide very detailed information in the form of a JSON value. +With this mode, HyperQueue will always output exactly one JSON value, either an array or an object. + +### Error handling +When an error occurs during the execution of a command, the program will exit with exit code `1` +and the program will output a JSON object with a single `error` key containing a human-readable +description of the error. + +### Date formatting +Time-based items are formatted in the following way: + +- **Duration** - formatted as a floating point number of seconds. +- **Datetime (timestamp)** - formatted as a `ISO8601` date in UTC + +### Supported commands +- Server info: `hq server info` + + ??? Example + ```json + { + "host": "my-machine", + "hq_port": 42189, + "pid": 32586, + "server_dir": "/foo/bar/.hq-server", + "start_date": "2021-12-20T08:45:41.775753188Z", + "version": "0.7.0", + "worker_port": 38627 + } + ``` + +- Worker list: `hq worker list` + + ??? Example + ```json + [{ + "configuration": { + "heartbeat_interval": 8.0, + "hostname": "my-machine", + "idle_timeout": null, + "listen_address": "my-machine:45611", + "log_dir": "...", + "resources": { + "cpus": [[0, 1, 2, 3]], + "generic": [{ + "kind": "sum", + "name": "resource1", + "params": { + "size": 1000 + } + }] + }, + "time_limit": null, + "work_dir": "..." + }, + "ended": null, + "id": 1 + }] + ``` + +- Worker info: `hq worker info ` + + ??? Example + ```json + { + "configuration": { + "heartbeat_interval": 8.0, + "hostname": "my-machine", + "idle_timeout": null, + "listen_address": "my-machine:45611", + "log_dir": "...", + "resources": { + "cpus": [[0, 1, 2, 3]], + "generic": [{ + "kind": "sum", + "name": "resource1", + "params": { + "size": 1000 + } + }] + }, + "time_limit": null, + "work_dir": "..." + }, + "ended": null, + "id": 1 + } + ``` + +- Submit a job: `hq submit ` + + ??? Example + ```json + { + "id": 1 + } + ``` + +- Job list: `hq jobs` + + ??? Example + ```json + [{ + "id": 1, + "name": "ls", + "resources": { + "cpus": { + "cpus": 1, + "type": "compact" + }, + "generic": [], + "min_time": 0.0 + }, + "task_count": 1, + "task_stats": { + "canceled": 0, + "failed": 0, + "finished": 1, + "running": 0, + "waiting": 0 + } + }] + ``` + +- Job info: `hq job --tasks` + + ??? Example + ```json + { + "finished_at": "2021-12-20T08:56:16.438062340Z", + "info": { + "id": 1, + "name": "ls", + "resources": { + "cpus": { + "cpus": 1, + "type": "compact" + }, + "generic": [], + "min_time": 0.0 + }, + "task_count": 1, + "task_stats": { + "canceled": 0, + "failed": 0, + "finished": 1, + "running": 0, + "waiting": 0 + } + }, + "max_fails": null, + "pin": false, + "priority": 0, + "program": { + "args": [ + "ls" + ], + "cwd": "%{SUBMIT_DIR}", + "env": { + "FOO": "BAR" + }, + "stderr": { + "File": "job-%{JOB_ID}/%{TASK_ID}.stderr" + }, + "stdout": { + "File": "job-%{JOB_ID}/%{TASK_ID}.stdout" + } + }, + "started_at": "2021-12-20T08:45:53.458919345Z", + "tasks": [{ + "finished_at": "2021-12-20T08:56:16.438062340Z", + "id": 0, + "started_at": "2021-12-20T08:56:16.437123396Z", + "state": "finished", + "worker": 1 + }], + "time_limit": null + } + ``` + +- Automatic allocation queue list: `hq alloc list` + + ??? Example + ```json + [{ + "additional_args": [], + "backlog": 4, + "id": 1, + "manager": "PBS", + "max_worker_count": null, + "name": null, + "timelimit": 1800.0, + "worker_cpu_args": null, + "worker_resource_args": [], + "workers_per_alloc": 1 + }] + ``` + +- Automatic allocation queue info: `hq alloc info ` + + ??? Example + ```json + [{ + "id": "pbs-1", + "worker_count": 4, + "queue_at": "2021-12-20T08:56:16.437123396Z", + "started_at": "2021-12-20T08:58:25.538001256Z", + "ended_at": null, + "status": "running", + "workdir": "/foo/bar" + }] + ``` + +- Automatic allocation queue events: `hq alloc events ` + + ??? Example + ```json + [{ + "date": "2021-12-20T08:56:16.437123396Z", + "event": "allocation-finished", + "params": { + "id": "pbs-1" + } + }, { + "date": "2021-12-20T08:58:16.437123396Z", + "event": "status-fail", + "params": { + "error": "qstat failed" + } + }] + ``` + +## Quiet +The `quiet` output mode will cause HyperQueue to output only the most important information that +should be parseable without any complex parsing logic, e.g. using only Bash scripts. + +### Error handling +When an error occurs during the execution of a command, the program will exit with exit code `1` +and the error will be printed to the standard error output. + +### Supported commands +- Submit a job: `hq submit ` + + ??? Schema + Outputs a single line containing the ID of the created job. + + ??? Example + ```bash + $ hq --output-mode=quiet submit ls + 1 + ``` diff --git a/docs/tips/cli-shortcuts.md b/docs/cli/shortcuts.md similarity index 100% rename from docs/tips/cli-shortcuts.md rename to docs/cli/shortcuts.md diff --git a/docs/deployment/allocation.md b/docs/deployment/allocation.md index 2258a2b07..68ea5b44c 100644 --- a/docs/deployment/allocation.md +++ b/docs/deployment/allocation.md @@ -88,7 +88,7 @@ creating a new allocation queue: - `--name ` Name of the allocation queue. Will be used to name allocations. Serves for debug purposes only. -[^1]: You can use various [shortcuts](../tips/cli-shortcuts.md#duration) for the duration value. +[^1]: You can use various [shortcuts](../cli/shortcuts.md#duration) for the duration value. ## Behavior The automatic allocator is a periodic process that runs every few seconds and does the following: diff --git a/docs/deployment/worker.md b/docs/deployment/worker.md index 25ef91d9a..c9282e94d 100644 --- a/docs/deployment/worker.md +++ b/docs/deployment/worker.md @@ -79,7 +79,7 @@ If you have started a worker manually, and you want to stop it, you can use the $ hq worker stop ``` -[^2]: You can use various [shortcuts](../tips/cli-shortcuts.md#id-selector) to select multiple workers at once. +[^2]: You can use various [shortcuts](../cli/shortcuts.md#id-selector) to select multiple workers at once. ## Time limit HyperQueue workers are designed to be volatile, i.e. it is expected that they will be stopped from time to time, because @@ -92,7 +92,7 @@ When a worker is started inside a PBS or Slurm job, it will automatically calcul metadata. If you want to set time limit manually for workers started outside of PBS/Slurm jobs or if you want to override the detected settings, you can use the `--time-limit=` option[^1] when starting the worker. -[^1]: You can use various [shortcuts](../tips/cli-shortcuts.md#duration) for the duration value. +[^1]: You can use various [shortcuts](../cli/shortcuts.md#duration) for the duration value. When the time limit is reached, the worker is automatically terminated. diff --git a/docs/jobs/arrays.md b/docs/jobs/arrays.md index 9df8a16c2..d359ff68e 100644 --- a/docs/jobs/arrays.md +++ b/docs/jobs/arrays.md @@ -36,7 +36,7 @@ that can be accessed through the `HQ_TASK_ID` [environment variable](jobs.md#env You can enter the range as two unsigned numbers separated by a dash[^2], where the first number should be smaller than the second one. The range is inclusive. -[^2]: The full syntax can be seen in the second selector of the [ID selector shortcut](../tips/cli-shortcuts.md). +[^2]: The full syntax can be seen in the second selector of the [ID selector shortcut](../cli/shortcuts.md). The range is entered using the `--array` option: diff --git a/docs/jobs/jobs.md b/docs/jobs/jobs.md index 616d4305b..0cdb14fc2 100644 --- a/docs/jobs/jobs.md +++ b/docs/jobs/jobs.md @@ -149,7 +149,7 @@ You can specify two time-related parameters when submitting a job. They will be Workers with an unknown remaining lifetime will be able to execute any task, disregarding its time request. -[^2]: You can use various [shortcuts](../tips/cli-shortcuts.md#duration) for the duration value. +[^2]: You can use various [shortcuts](../cli/shortcuts.md#duration) for the duration value. Here is an example situation where time limit and time request can be used: @@ -207,7 +207,7 @@ can query the state of a job with the following commands[^1]: $ hq job --tasks ``` -[^1]: You can use various [shortcuts](../tips/cli-shortcuts.md#id-selector) to select multiple jobs at once. +[^1]: You can use various [shortcuts](../cli/shortcuts.md#id-selector) to select multiple jobs at once. ### Task state Each task starts in the `Waiting` state and can end up in one of the terminal states: `Finished`, `Failed` diff --git a/mkdocs.yml b/mkdocs.yml index e4be6cbe4..427d3c78a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -25,8 +25,9 @@ nav: - Generic Resources: jobs/gresources.md - Handling Failure: jobs/failure.md - Output Streaming: jobs/streaming.md - - Tips: - - CLI Shortcuts: tips/cli-shortcuts.md + - CLI: + - Shortcuts: cli/shortcuts.md + - Output mode: cli/output-mode.md - FAQ: faq.md - Comparison With Other Tools: other-tools.md From 12377206549602c907f9d9296327cbcbf3049a8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 20 Dec 2021 11:31:38 +0100 Subject: [PATCH 13/13] Fix tests --- tests/output/test_json.py | 2 +- tests/test_job.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/output/test_json.py b/tests/output/test_json.py index 5c3ceb99c..9ed4445c4 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -6,8 +6,8 @@ import iso8601 from schema import Schema -from utils import wait_for_job_state from ..conftest import HqEnv +from ..utils import wait_for_job_state def parse_json_output(hq_env: HqEnv, command: List[str]): diff --git a/tests/test_job.py b/tests/test_job.py index 59f8ad122..7be31b8a4 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -339,7 +339,6 @@ def test_cancel_last(hq_env: HqEnv): def test_cancel_some(hq_env: HqEnv): hq_env.start_server() - hq_env.start_worker(cpus=1) hq_env.command(["submit", "sleep", "100"]) hq_env.command(["submit", "hostname"]) hq_env.command(["submit", "/invalid"])