Skip to content

Commit

Permalink
Upgrade ya-runtime-api to 0.2 (#6)
Browse files Browse the repository at this point in the history
- make `task_package` argument optional
- display `main` errors via `ui`
  • Loading branch information
mfranciszkiewicz committed May 11, 2021
1 parent 227a63f commit 6795003
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 41 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[package]
name = "ya-runtime-dbg"
version = "0.2.1"
version = "0.2.2"
authors = ["Golem Factory <contact@golem.network>"]
edition = "2018"
repository = "https://github.com/golemfactory/ya-runtime-dbg"

[dependencies]
ya-runtime-api = { version = "0.1", features = ["server"] }
ya-runtime-api = { version = "0.2", features = ["server"] }

actix = { version = "0.9", default-features = false }
actix-rt = "1.0.1"
Expand All @@ -22,4 +22,4 @@ tokio = { version = "0.2", features = ["io-util", "process"], default_features =
tokio-util = { version = "0.2", features = ["codec"] }

[patch.crates-io]
ya-runtime-api = { git = "https://github.com/golemfactory/yagna.git", branch="release/v0.6" }
ya-runtime-api = { git = "https://github.com/golemfactory/yagna.git", rev="9877a31c9c325e884c86f48d88f91d77e8cee94d" }
92 changes: 57 additions & 35 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::ui::*;
use actix::{Arbiter, System};
use anyhow::{Context, Result};
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::{FutureExt, SinkExt, StreamExt};
use std::ffi::OsString;
use std::fs::create_dir_all;
Expand Down Expand Up @@ -49,7 +50,7 @@ struct Args {
workdir: PathBuf,
/// Task package to deploy
#[structopt(short, long)]
task_package: PathBuf,
task_package: Option<PathBuf>,
/// Service protocol version
#[structopt(short, long, default_value = "0.1.0")]
protocol: String,
Expand All @@ -73,20 +74,28 @@ impl Args {
.workdir
.canonicalize()
.context(format!("workdir not found: {:?}", self.workdir))?;
self.task_package = self
.task_package
.canonicalize()
.context(format!("task package not found: {:?}", self.task_package))?;

if let Some(ref task_package) = self.task_package {
let task_package = task_package
.canonicalize()
.context(format!("task package not found: {:?}", self.task_package))?;
self.task_package.replace(task_package);
}

Ok(())
}

fn to_runtime_args(&self) -> Vec<OsString> {
let mut args = vec![
OsString::from("--workdir"),
self.workdir.clone().into_os_string(),
OsString::from("--task-package"),
self.task_package.clone().into_os_string(),
];

if let Some(ref task_package) = self.task_package {
args.push(OsString::from("--task-package"));
args.push(task_package.clone().into_os_string());
}

args.extend(self.varargs.iter().map(OsString::from));
args
}
Expand Down Expand Up @@ -125,19 +134,17 @@ impl ToString for ExecMode {

struct EventHandler<T: Terminal> {
tx: mpsc::Sender<()>,
arbiter: actix::Arbiter,
ui: UI<T>,
}

impl<T: Terminal> EventHandler<T> {
pub fn new(tx: mpsc::Sender<()>, ui: UI<T>) -> Self {
let arbiter = Arbiter::current().clone();
EventHandler { tx, ui, arbiter }
EventHandler { tx, ui }
}
}

impl<T: Terminal + 'static> RuntimeEvent for EventHandler<T> {
fn on_process_status(&self, status: ProcessStatus) {
fn on_process_status<'a>(&self, status: ProcessStatus) -> BoxFuture<'a, ()> {
if !status.stdout.is_empty() {
write_output(&self.ui, status.stdout);
}
Expand All @@ -151,12 +158,12 @@ impl<T: Terminal + 'static> RuntimeEvent for EventHandler<T> {
}

let mut tx = self.tx.clone();
self.arbiter.send(
async move {
let _ = tx.send(()).await;
}
.boxed(),
);
async move {
let _ = tx.send(()).await;
}
.boxed()
} else {
async move {}.boxed()
}
}
}
Expand All @@ -168,7 +175,7 @@ where
{
let stream = FramedRead::new(read, BytesCodec::new())
.filter_map(|result| async { result.ok() })
.ready_chunks(16)
.ready_chunks(4)
.map(|v| v.into_iter().map(|b| b.to_vec()).flatten().collect());
Arbiter::spawn(async move {
stream
Expand Down Expand Up @@ -209,7 +216,9 @@ where
forward_output(child.stderr.take().unwrap(), ui.clone());

if !child.await?.success() {
return Err(anyhow::anyhow!("deployment failed"));
return Err(anyhow::anyhow!(
"Deployment failed. Most runtimes require a '--task-package' argument, see '--help' for more info."
));
}

writeln!(ui, "").unwrap();
Expand Down Expand Up @@ -237,7 +246,7 @@ where
let (tx, mut rx) = mpsc::channel(1);
let service = spawn(command, EventHandler::new(tx, ui.clone()))
.await
.context("unable to spawn runtime")?;
.context("Unable to spawn runtime")?;

// FIXME: handle hello result with newer version of runtime api
let _ = service.hello(args.protocol.as_str()).await;
Expand All @@ -261,6 +270,7 @@ where
}

ui.close();

if let Err(e) = service.shutdown().await {
let message = format!("{:?}", e);
if !is_broken_pipe(&message) {
Expand All @@ -286,7 +296,7 @@ async fn run(service: impl RuntimeService, input: String, mode: &ExecMode) -> Re
let bin_path = PathBuf::from_str(args.remove(0).as_str())?;
let bin_name = bin_path
.file_name()
.ok_or_else(|| anyhow::anyhow!("invalid command: {}", bin_path.display()))?
.ok_or_else(|| anyhow::anyhow!("Invalid command: {}", bin_path.display()))?
.to_string_lossy()
.to_string();

Expand Down Expand Up @@ -330,17 +340,10 @@ fn is_broken_pipe(message: &str) -> bool {
message.to_lowercase().find("error 32").is_some()
}

#[actix_rt::main]
async fn main() -> Result<()> {
let mut args = Args::from_args();
let _ = create_dir_all(&args.workdir);

let proj_dir = project_dir()?;
let history_path = proj_dir.join(".ya_dbg_history");
let exec_mode = ExecMode::new(args.exec_mode, args.exec_shell.clone());
let mut ui = ui(history_path, &exec_mode.to_string())?;

args.canonicalize()?;
async fn ui_main<T>(args: Args, exec_mode: ExecMode, mut ui: UI<T>) -> Result<()>
where
T: Terminal + 'static,
{
let rt_args = args
.to_runtime_args()
.into_iter()
Expand All @@ -355,9 +358,7 @@ async fn main() -> Result<()> {
);

if args.deploy {
deploy(&args, ui.clone())
.await
.context("deployment failed")?;
deploy(&args, ui.clone()).await?;
}

let (start_tx, start_rx) = oneshot::channel();
Expand All @@ -377,7 +378,28 @@ async fn main() -> Result<()> {
start_rx.await?;
ui.enter_prompt(input_tx).await;

Ok(())
}

#[actix_rt::main]
async fn main() -> Result<()> {
let mut args = Args::from_args();
let _ = create_dir_all(&args.workdir);

args.canonicalize()?;

let dir = project_dir()?;
let history = dir.join(".ya_dbg_history");
let mode = ExecMode::new(args.exec_mode, args.exec_shell.clone());
let mut ui = ui(history, &mode.to_string())?;

if let Err(err) = ui_main(args, mode, ui.clone()).await {
ui_err!(ui, "{}", err);
std::process::exit(1);
}

ui_info!(ui, "Shutting down");
ui.close();

Ok(())
}

0 comments on commit 6795003

Please sign in to comment.